# 키워드 추출 테스트

In [None]:
#필요한 라이브러리 설치
!pip install transformers torch rouge-score konlpy keybert scikit-learn PyMuPDF nltk
!pip install sentence-transformers

# Java 설치 (Komoran 사용을 위해 필요)
!apt-get update
!apt-get install -y openjdk-8-jdk
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
!pip install -U datasets huggingface_hub fsspec

Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting keybert
  Downloading keybert-0.9.0-py3-none-any.whl.metadata (15 kB)
Collecting PyMuPDF
  Downloading pymupdf-1.26.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (3.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_

In [2]:
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from konlpy.tag import Komoran
from keybert import KeyBERT
import networkx as nx
from collections import Counter
import re
from tqdm import tqdm
import warnings
import random
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag
warnings.filterwarnings('ignore')

In [3]:
# NLTK 데이터 다운로드
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')

try:
    nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
    nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


In [4]:
class KeywordExtractorComparison:
    def __init__(self):
        """키워드 추출 모델들 초기화"""
        print("모델들 초기화 중...")

        # 언어별 형태소 분석기
        self.komoran = Komoran()  # 한국어용

        # 영어 불용어
        try:
            self.english_stopwords = set(stopwords.words('english'))
        except:
            self.english_stopwords = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'])

        # KeyBERT 모델
        self.keybert = KeyBERT('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

        # TF-IDF 벡터라이저 (나중에 fit)
        self.tfidf_vectorizer = None

        print("모델 초기화 완료!")

    def _detect_language(self, text):
        """간단한 언어 감지"""
        # 한글이 포함되어 있으면 한국어로 판단
        korean_chars = re.findall(r'[가-힣]', text)
        if len(korean_chars) > 10:
            return 'korean'
        else:
            return 'english'

    def _english_tokenizer(self, text):
        """영어 명사/형용사 추출"""
        try:
            # 토큰화
            tokens = word_tokenize(text.lower())

            # POS 태깅
            pos_tags = pos_tag(tokens)

            # 명사(NN, NNS, NNP, NNPS)와 형용사(JJ) 추출
            keywords = []
            for word, pos in pos_tags:
                if (pos.startswith('NN') or pos.startswith('JJ')) and len(word) >= 2:
                    if word not in self.english_stopwords and word.isalpha():
                        keywords.append(word)

            return keywords
        except:
            # 실패시 간단한 분할
            words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
            return [w for w in words if w not in self.english_stopwords]

    def _korean_tokenizer(self, text):
        """한국어 명사 추출"""
        try:
            nouns = self.komoran.nouns(text)
            return [noun for noun in nouns if len(noun) >= 2]
        except:
            return []

    def _multilingual_tokenizer(self, text):
        """다국어 토크나이저"""
        language = self._detect_language(text)

        if language == 'korean':
            return self._korean_tokenizer(text)
        else:
            return self._english_tokenizer(text)

    def _preprocess_text(self, text):
        """텍스트 전처리"""
        if not text:
            return ""
        # 특수문자 제거, 공백 정리
        text = re.sub(r'[^\w\s]', ' ', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

    def extract_keybert(self, text, top_k=10):
        """KeyBERT로 키워드 추출"""
        try:
            if not text.strip():
                return []

            # KeyBERT 버전에 따른 파라미터 조정
            try:
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 1),
                    stop_words=None,  # 영어 불용어 사용
                    top_n=top_k,
                    use_mmr=True,
                    diversity=0.5
                )
            except TypeError:
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 2),
                    stop_words=None,
                    top_k=top_k,
                    use_mmr=True,
                    diversity=0.5
                )
            except:
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 2),
                    stop_words=None,
                    top_n=top_k
                )

            return [kw[0] for kw in keywords]

        except Exception as e:
            print(f"KeyBERT 오류: {e}")
            # 대체 방법
            tokens = self._multilingual_tokenizer(text)
            token_counts = Counter(tokens)
            return [word for word, count in token_counts.most_common(top_k)]

    def fit_tfidf(self, texts):
        """TF-IDF 벡터라이저 학습"""
        print("TF-IDF 벡터라이저 학습 중...")
        try:
            self.tfidf_vectorizer = TfidfVectorizer(
                tokenizer=self._multilingual_tokenizer,  # 다국어 토크나이저 사용
                ngram_range=(1, 2),
                max_features=5000,
                min_df=2,
                max_df=0.95,
                lowercase=True
            )
            self.tfidf_vectorizer.fit(texts)
            print("TF-IDF 학습 완료!")
        except Exception as e:
            print(f"TF-IDF 학습 오류: {e}")
            # 더 간단한 벡터라이저로 대체
            self.tfidf_vectorizer = TfidfVectorizer(
                ngram_range=(1, 1),
                max_features=1000,
                min_df=1,
                stop_words='english'  # 영어 불용어 사용
            )
            self.tfidf_vectorizer.fit(texts)
            print("TF-IDF 대체 학습 완료!")

    def extract_tfidf(self, text, top_k=10):
        """TF-IDF로 키워드 추출"""
        try:
            if not text.strip() or self.tfidf_vectorizer is None:
                return []

            # 텍스트의 TF-IDF 점수
            tfidf_vector = self.tfidf_vectorizer.transform([text])
            feature_names = self.tfidf_vectorizer.get_feature_names_out()
            scores = tfidf_vector.toarray()[0]

            # 상위 키워드 추출
            top_indices = scores.argsort()[-top_k:][::-1]
            keywords = [feature_names[i] for i in top_indices if scores[i] > 0]

            return keywords
        except Exception as e:
            print(f"TF-IDF 오류: {e}")
            # 대체 방법: 빈도 기반
            tokens = self._multilingual_tokenizer(text)
            token_counts = Counter(tokens)
            return [word for word, count in token_counts.most_common(top_k)]

    def extract_textrank(self, text, top_k=10):
        """TextRank로 키워드 추출"""
        try:
            if not text.strip():
                return []

            # 토큰 추출
            tokens = self._multilingual_tokenizer(text)

            if len(tokens) < 3:
                # TextRank 실패시 빈도 기반 대체
                token_counts = Counter(tokens)
                return [word for word, count in token_counts.most_common(top_k)]

            # 동시 출현 그래프 생성
            graph = nx.Graph()

            # 윈도우 크기 5로 동시 출현 관계 생성
            window_size = 5
            for i in range(len(tokens) - window_size + 1):
                window = tokens[i:i + window_size]
                for j in range(len(window)):
                    for k in range(j + 1, len(window)):
                        if window[j] != window[k]:
                            if graph.has_edge(window[j], window[k]):
                                graph[window[j]][window[k]]['weight'] += 1
                            else:
                                graph.add_edge(window[j], window[k], weight=1)

            # PageRank 계산
            if len(graph.nodes()) == 0:
                token_counts = Counter(tokens)
                return [word for word, count in token_counts.most_common(top_k)]

            pagerank_scores = nx.pagerank(graph, weight='weight')

            # 상위 키워드 반환
            sorted_keywords = sorted(pagerank_scores.items(),
                                   key=lambda x: x[1], reverse=True)

            return [kw[0] for kw in sorted_keywords[:top_k]]
        except Exception as e:
            print(f"TextRank 오류: {e}")
            # 대체 방법: 빈도 기반
            try:
                tokens = self._multilingual_tokenizer(text)
                token_counts = Counter(tokens)
                return [word for word, count in token_counts.most_common(top_k)]
            except:
                return []

In [5]:
def load_kptimes_data(num_samples=100):
    """KPTimes 데이터셋 로드 및 전처리"""
    print("KPTimes 데이터셋 로딩 중...")

    try:
        # 데이터셋 로드
        dataset = load_dataset("taln-ls2n/kptimes")
        test_data = dataset['test']

        print(f"전체 테스트 데이터: {len(test_data)}개")

        # 랜덤 샘플링
        if num_samples < len(test_data):
            indices = random.sample(range(len(test_data)), num_samples)
            sampled_data = [test_data[i] for i in indices]
        else:
            sampled_data = list(test_data)

        # 데이터 전처리
        processed_data = []
        for item in sampled_data:
            # title + abstract를 합쳐서 전체 텍스트 생성
            title = item.get('title', '')
            abstract = item.get('abstract', '')
            full_text = f"{title} {abstract}".strip()

            # keyphrases 처리 (리스트 형태로 변환)
            keyphrases = item.get('keyphrases', [])
            if isinstance(keyphrases, str):
                # 문자열인 경우 분할
                keyphrases = [kp.strip() for kp in keyphrases.split(',') if kp.strip()]
            elif not isinstance(keyphrases, list):
                keyphrases = []

            if full_text and keyphrases:
                processed_data.append({
                    'text': full_text,
                    'keywords': keyphrases
                })

        print(f"처리된 데이터: {len(processed_data)}개")
        return processed_data

    except Exception as e:
        print(f"데이터 로드 오류: {e}")
        return []

In [6]:
def calculate_metrics(predicted_keywords, true_keywords, k=5):
    """평가 지표 계산"""
    # 상위 k개만 고려
    pred_k = set([kw.lower().strip() for kw in predicted_keywords[:k] if kw])
    true_set = set([kw.lower().strip() for kw in true_keywords if kw])

    if len(true_set) == 0 or len(pred_k) == 0:
        return {"precision": 0, "recall": 0, "f1": 0}

    # 교집합
    intersection = pred_k.intersection(true_set)

    # Precision@K
    precision = len(intersection) / len(pred_k) if len(pred_k) > 0 else 0

    # Recall@K
    recall = len(intersection) / len(true_set)

    # F1@K
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

In [7]:
def evaluate_extractors(num_samples=100):
    """키워드 추출기들 성능 평가"""
    extractor = KeywordExtractorComparison()

    # 데이터 로드
    processed_data = load_kptimes_data(num_samples)

    if len(processed_data) == 0:
        print("처리할 데이터가 없습니다.")
        return None, None

    # 전체 텍스트 수집 (TF-IDF 학습용)
    all_texts = [item['text'] for item in processed_data]

    # TF-IDF 벡터라이저 학습
    extractor.fit_tfidf(all_texts)

    print(f"\n{len(processed_data)}개 샘플로 평가를 시작합니다...")

    results = {
        "KeyBERT": {"precision": [], "recall": [], "f1": []},
        "TF-IDF": {"precision": [], "recall": [], "f1": []},
        "TextRank": {"precision": [], "recall": [], "f1": []}
    }

    sample_results = []

    for i, item in enumerate(tqdm(processed_data, desc="키워드 추출 평가")):
        text = item['text']
        true_keywords = item['keywords']

        # 전처리
        clean_text = extractor._preprocess_text(text)

        # 각 방법으로 키워드 추출
        keybert_kw = extractor.extract_keybert(clean_text, top_k=10)
        tfidf_kw = extractor.extract_tfidf(clean_text, top_k=10)
        textrank_kw = extractor.extract_textrank(clean_text, top_k=10)

        # 평가 지표 계산 (상위 5개 기준)
        keybert_metrics = calculate_metrics(keybert_kw, true_keywords, k=5)
        tfidf_metrics = calculate_metrics(tfidf_kw, true_keywords, k=5)
        textrank_metrics = calculate_metrics(textrank_kw, true_keywords, k=5)

        # 결과 저장
        for method, metrics in [("KeyBERT", keybert_metrics),
                               ("TF-IDF", tfidf_metrics),
                               ("TextRank", textrank_metrics)]:
            results[method]["precision"].append(metrics["precision"])
            results[method]["recall"].append(metrics["recall"])
            results[method]["f1"].append(metrics["f1"])

        # 샘플 결과 저장 (처음 3개)
        if i < 3:
            sample_results.append({
                "text": text[:150] + "..." if len(text) > 150 else text,
                "true_keywords": true_keywords[:10],  # 너무 많으면 일부만
                "keybert": keybert_kw[:5],
                "tfidf": tfidf_kw[:5],
                "textrank": textrank_kw[:5]
            })

    return results, sample_results

In [8]:
def print_evaluation_results(results):
    """평가 결과 출력"""
    print("\n" + "="*80)
    print("키워드 추출 모델 성능 비교 (KPTimes 데이터셋)")
    print("="*80)

    methods = ["KeyBERT", "TF-IDF", "TextRank"]

    # 헤더
    print(f"{'Method':<12} {'Precision@5':<18} {'Recall@5':<18} {'F1@5':<18}")
    print("-" * 80)

    best_scores = {"precision": 0, "recall": 0, "f1": 0}
    best_method = {"precision": "", "recall": "", "f1": ""}

    for method in methods:
        prec_mean = np.mean(results[method]["precision"])
        prec_std = np.std(results[method]["precision"])

        rec_mean = np.mean(results[method]["recall"])
        rec_std = np.std(results[method]["recall"])

        f1_mean = np.mean(results[method]["f1"])
        f1_std = np.std(results[method]["f1"])

        print(f"{method:<12} {prec_mean:.3f} (±{prec_std:.3f}){'':<4} "
              f"{rec_mean:.3f} (±{rec_std:.3f}){'':<4} "
              f"{f1_mean:.3f} (±{f1_std:.3f})")

        # 최고 성능 추적
        if prec_mean > best_scores["precision"]:
            best_scores["precision"] = prec_mean
            best_method["precision"] = method
        if rec_mean > best_scores["recall"]:
            best_scores["recall"] = rec_mean
            best_method["recall"] = method
        if f1_mean > best_scores["f1"]:
            best_scores["f1"] = f1_mean
            best_method["f1"] = method

    print("\n" + "="*80)
    print("최고 성능:")
    print(f"  Precision@5: {best_method['precision']} ({best_scores['precision']:.3f})")
    print(f"  Recall@5: {best_method['recall']} ({best_scores['recall']:.3f})")
    print(f"  F1@5: {best_method['f1']} ({best_scores['f1']:.3f})")
    print("="*80)

In [9]:
def print_sample_results(sample_results):
    """샘플 결과 출력"""
    print("\n" + "="*80)
    print("샘플 키워드 추출 결과")
    print("="*80)

    for i, sample in enumerate(sample_results):
        print(f"\n--- 샘플 {i+1} ---")
        print(f"텍스트: {sample['text']}")
        print(f"정답 키워드: {sample['true_keywords']}")
        print(f"KeyBERT: {sample['keybert']}")
        print(f"TF-IDF: {sample['tfidf']}")
        print(f"TextRank: {sample['textrank']}")
        print("-" * 60)

In [10]:
def main():
    """메인 실행 함수"""
    print("KPTimes 데이터셋을 사용한 키워드 추출 성능 평가")
    print("="*80)

    # 성능 평가 (100개 샘플로 시작, 필요시 조정 가능)
    results, sample_results = evaluate_extractors(num_samples=100)

    if results is None:
        print("평가를 완료할 수 없습니다.")
        return

    # 결과 출력
    print_evaluation_results(results)
    print_sample_results(sample_results)

    print("\n평가 완료!")
    print("* Precision@5: 상위 5개 키워드 중 정답 비율")
    print("* Recall@5: 전체 정답 키워드 중 상위 5개에서 찾은 비율")
    print("* F1@5: Precision과 Recall의 조화평균")
    print("* 점수가 높을수록 좋은 성능을 의미합니다.")

In [11]:
main()

KPTimes 데이터셋을 사용한 키워드 추출 성능 평가
모델들 초기화 중...


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/3.89k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/645 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/471M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

모델 초기화 완료!
KPTimes 데이터셋 로딩 중...


README.md:   0%|          | 0.00/3.37k [00:00<?, ?B/s]

kptimes.py:   0%|          | 0.00/7.79k [00:00<?, ?B/s]

The repository for taln-ls2n/kptimes contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/taln-ls2n/kptimes.
You can avoid this prompt in future by passing the argument `trust_remote_code=True`.

Do you wish to run the custom code? [y/N] y


test.jsonl:   0%|          | 0.00/84.7M [00:00<?, ?B/s]

train.jsonl:   0%|          | 0.00/1.32G [00:00<?, ?B/s]

dev.jsonl:   0%|          | 0.00/50.9M [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

전체 테스트 데이터: 20000개
처리된 데이터: 100개
TF-IDF 벡터라이저 학습 중...
TF-IDF 학습 완료!

100개 샘플로 평가를 시작합니다...


키워드 추출 평가: 100%|██████████| 100/100 [00:31<00:00,  3.13it/s]


키워드 추출 모델 성능 비교 (KPTimes 데이터셋)
Method       Precision@5        Recall@5           F1@5              
--------------------------------------------------------------------------------
KeyBERT      0.100 (±0.128)     0.110 (±0.154)     0.101 (±0.133)
TF-IDF       0.126 (±0.140)     0.143 (±0.169)     0.129 (±0.143)
TextRank     0.112 (±0.142)     0.120 (±0.176)     0.111 (±0.145)

최고 성능:
  Precision@5: TF-IDF (0.126)
  Recall@5: TF-IDF (0.143)
  F1@5: TF-IDF (0.129)

샘플 키워드 추출 결과

--- 샘플 1 ---
텍스트: Presidents of Mexico and El Salvador meet to discuss curbing U.S.-bound migrant surge  TAPACHULA, MEXICO - The Mexican and Salvadoran presidents were ...
정답 키워드: ['mexico', 'el salvador', 'andres manuel lopez obrador', 'immigration', 'guatemala', 'nayib bukele', 'u.s .', 'refugees', 'donald trump']
KeyBERT: ['mexico', 'presidents', 'salvadorans', 'meet', 'us']
TF-IDF: ['mexico', 'salvador', 'migrants', 'gangs', 'mexican']
TextRank: ['said', 'year', 'mexico', 'old', 'migrants']
----------------




In [12]:
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from konlpy.tag import Komoran
from keybert import KeyBERT
import networkx as nx
from collections import Counter
import re
from tqdm import tqdm
import warnings
import random
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag
from sentence_transformers import SentenceTransformer
warnings.filterwarnings('ignore')

# NLTK 데이터 다운로드
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')

try:
    nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
    nltk.download('averaged_perceptron_tagger')

class OptimizedKeywordExtractorComparison:
    def __init__(self):
        """키워드 추출 모델들 초기화"""
        print("모델들 초기화 중...")

        # 언어별 형태소 분석기
        self.komoran = Komoran()  # 한국어용

        # 영어 불용어
        try:
            self.english_stopwords = set(stopwords.words('english'))
        except:
            self.english_stopwords = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'])

        # KeyBERT 모델 (최적화된 설정)
        self.keybert = KeyBERT('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

        # 의미적 유사도 계산용 모델
        self.semantic_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')

        # TF-IDF 벡터라이저 (나중에 fit)
        self.tfidf_vectorizer = None

        print("모델 초기화 완료!")

    def _detect_language(self, text):
        """간단한 언어 감지"""
        korean_chars = re.findall(r'[가-힣]', text)
        if len(korean_chars) > 10:
            return 'korean'
        else:
            return 'english'

    def _english_tokenizer(self, text):
        """영어 명사/형용사 추출"""
        try:
            tokens = word_tokenize(text.lower())
            pos_tags = pos_tag(tokens)

            keywords = []
            for word, pos in pos_tags:
                if (pos.startswith('NN') or pos.startswith('JJ')) and len(word) >= 2:
                    if word not in self.english_stopwords and word.isalpha():
                        keywords.append(word)

            return keywords
        except:
            words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
            return [w for w in words if w not in self.english_stopwords]

    def _korean_tokenizer(self, text):
        """한국어 명사 추출"""
        try:
            nouns = self.komoran.nouns(text)
            return [noun for noun in nouns if len(noun) >= 2]
        except:
            return []

    def _multilingual_tokenizer(self, text):
        """다국어 토크나이저"""
        language = self._detect_language(text)

        if language == 'korean':
            return self._korean_tokenizer(text)
        else:
            return self._english_tokenizer(text)

    def _preprocess_text(self, text):
        """텍스트 전처리"""
        if not text:
            return ""
        text = re.sub(r'[^\w\s]', ' ', text)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

    def _post_process_keywords(self, keywords, top_k=10):
        """키워드 후처리 (모든 방법에 공통 적용)"""
        if not keywords:
            return []

        # 1. 길이 제약 (2-15 문자)
        filtered = [kw for kw in keywords if 2 <= len(kw) <= 15]

        # 2. 숫자만 있는 키워드 제거
        filtered = [kw for kw in filtered if not kw.isdigit()]

        # 3. 특수문자만 있는 키워드 제거
        filtered = [kw for kw in filtered if re.search(r'[a-zA-Z가-힣]', kw)]

        # 4. 중복 제거 (대소문자 구분 없이)
        seen = set()
        unique_keywords = []
        for kw in filtered:
            kw_lower = kw.lower()
            if kw_lower not in seen:
                seen.add(kw_lower)
                unique_keywords.append(kw)

        return unique_keywords[:top_k]

    def extract_keybert_optimized(self, text, top_k=10):
        """최적화된 KeyBERT 키워드 추출"""
        try:
            if not text.strip():
                return []

            # 최적화된 파라미터 설정
            try:
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 3),  # 1-3 gram으로 확장
                    stop_words=None,  # 불용어 제거 안함 (공정한 비교)
                    top_n=top_k * 3,  # 더 많이 추출 후 필터링
                    use_mmr=True,
                    diversity=0.7,  # 다양성 증가
                    use_maxsum=False,
                    nr_candidates=30  # 후보 키워드 증가
                )
            except TypeError:
                # 구버전 호환성
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 3),
                    stop_words=None,
                    top_k=top_k * 3,
                    use_mmr=True,
                    diversity=0.7
                )
            except:
                # 기본 버전
                keywords = self.keybert.extract_keywords(
                    text,
                    keyphrase_ngram_range=(1, 2),
                    stop_words=None,
                    top_n=top_k * 2
                )

            # 키워드만 추출 (점수 제거)
            keyword_list = [kw[0] for kw in keywords]

            # 후처리 적용
            return self._post_process_keywords(keyword_list, top_k)

        except Exception as e:
            print(f"KeyBERT 오류: {e}")
            # 대체 방법
            tokens = self._multilingual_tokenizer(text)
            token_counts = Counter(tokens)
            fallback_keywords = [word for word, count in token_counts.most_common(top_k * 2)]
            return self._post_process_keywords(fallback_keywords, top_k)

    def fit_tfidf(self, texts):
        """TF-IDF 벡터라이저 학습"""
        print("TF-IDF 벡터라이저 학습 중...")
        try:
            self.tfidf_vectorizer = TfidfVectorizer(
                tokenizer=self._multilingual_tokenizer,
                ngram_range=(1, 2),  # KeyBERT와 동일하게 제한
                max_features=5000,
                min_df=2,
                max_df=0.95,
                lowercase=True
            )
            self.tfidf_vectorizer.fit(texts)
            print("TF-IDF 학습 완료!")
        except Exception as e:
            print(f"TF-IDF 학습 오류: {e}")
            self.tfidf_vectorizer = TfidfVectorizer(
                ngram_range=(1, 1),
                max_features=1000,
                min_df=1,
                stop_words='english'
            )
            self.tfidf_vectorizer.fit(texts)
            print("TF-IDF 대체 학습 완료!")

    def extract_tfidf_constrained(self, text, top_k=10):
        """제약이 추가된 TF-IDF 키워드 추출"""
        try:
            if not text.strip() or self.tfidf_vectorizer is None:
                return []

            tfidf_vector = self.tfidf_vectorizer.transform([text])
            feature_names = self.tfidf_vectorizer.get_feature_names_out()
            scores = tfidf_vector.toarray()[0]

            # 상위 키워드 추출 (더 많이 추출 후 필터링)
            top_indices = scores.argsort()[-(top_k * 3):][::-1]
            keywords = [feature_names[i] for i in top_indices if scores[i] > 0]

            # 후처리 적용
            return self._post_process_keywords(keywords, top_k)

        except Exception as e:
            print(f"TF-IDF 오류: {e}")
            tokens = self._multilingual_tokenizer(text)
            token_counts = Counter(tokens)
            fallback_keywords = [word for word, count in token_counts.most_common(top_k * 2)]
            return self._post_process_keywords(fallback_keywords, top_k)

    def extract_textrank_constrained(self, text, top_k=10):
        """제약이 추가된 TextRank 키워드 추출"""
        try:
            if not text.strip():
                return []

            tokens = self._multilingual_tokenizer(text)

            if len(tokens) < 3:
                token_counts = Counter(tokens)
                fallback_keywords = [word for word, count in token_counts.most_common(top_k * 2)]
                return self._post_process_keywords(fallback_keywords, top_k)

            # 동시 출현 그래프 생성
            graph = nx.Graph()

            # 윈도우 크기 제한 (KeyBERT와 공정한 비교)
            window_size = 4  # 더 작은 윈도우
            for i in range(len(tokens) - window_size + 1):
                window = tokens[i:i + window_size]
                for j in range(len(window)):
                    for k in range(j + 1, len(window)):
                        if window[j] != window[k]:
                            if graph.has_edge(window[j], window[k]):
                                graph[window[j]][window[k]]['weight'] += 1
                            else:
                                graph.add_edge(window[j], window[k], weight=1)

            if len(graph.nodes()) == 0:
                token_counts = Counter(tokens)
                fallback_keywords = [word for word, count in token_counts.most_common(top_k * 2)]
                return self._post_process_keywords(fallback_keywords, top_k)

            pagerank_scores = nx.pagerank(graph, weight='weight')
            sorted_keywords = sorted(pagerank_scores.items(),
                                   key=lambda x: x[1], reverse=True)

            keywords = [kw[0] for kw in sorted_keywords[:top_k * 2]]

            # 후처리 적용
            return self._post_process_keywords(keywords, top_k)

        except Exception as e:
            print(f"TextRank 오류: {e}")
            try:
                tokens = self._multilingual_tokenizer(text)
                token_counts = Counter(tokens)
                fallback_keywords = [word for word, count in token_counts.most_common(top_k * 2)]
                return self._post_process_keywords(fallback_keywords, top_k)
            except:
                return []

def load_kptimes_data(num_samples=100):
    """KPTimes 데이터셋 로드 및 전처리"""
    print("KPTimes 데이터셋 로딩 중...")

    try:
        dataset = load_dataset("taln-ls2n/kptimes")
        test_data = dataset['test']

        print(f"전체 테스트 데이터: {len(test_data)}개")

        if num_samples < len(test_data):
            indices = random.sample(range(len(test_data)), num_samples)
            sampled_data = [test_data[i] for i in indices]
        else:
            sampled_data = list(test_data)

        processed_data = []
        for item in sampled_data:
            title = item.get('title', '')
            abstract = item.get('abstract', '')
            full_text = f"{title} {abstract}".strip()

            keyphrases = item.get('keyphrases', [])
            if isinstance(keyphrases, str):
                keyphrases = [kp.strip() for kp in keyphrases.split(',') if kp.strip()]
            elif not isinstance(keyphrases, list):
                keyphrases = []

            if full_text and keyphrases:
                processed_data.append({
                    'text': full_text,
                    'keywords': keyphrases
                })

        print(f"처리된 데이터: {len(processed_data)}개")
        return processed_data

    except Exception as e:
        print(f"데이터 로드 오류: {e}")
        return []

def calculate_exact_metrics(predicted_keywords, true_keywords, k=5):
    """정확한 매칭 기반 평가"""
    pred_k = set([kw.lower().strip() for kw in predicted_keywords[:k] if kw])
    true_set = set([kw.lower().strip() for kw in true_keywords if kw])

    if len(true_set) == 0 or len(pred_k) == 0:
        return {"precision": 0, "recall": 0, "f1": 0}

    intersection = pred_k.intersection(true_set)

    precision = len(intersection) / len(pred_k) if len(pred_k) > 0 else 0
    recall = len(intersection) / len(true_set)
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {"precision": precision, "recall": recall, "f1": f1}

def calculate_flexible_metrics(predicted_keywords, true_keywords, k=5):
    """부분 매칭을 허용하는 평가 (KeyBERT에 유리)"""
    pred_k = [kw.lower().strip() for kw in predicted_keywords[:k] if kw]
    true_set = [kw.lower().strip() for kw in true_keywords if kw]

    if len(true_set) == 0 or len(pred_k) == 0:
        return {"precision": 0, "recall": 0, "f1": 0}

    matches = 0
    for pred in pred_k:
        for true in true_set:
            # 부분 매칭 또는 포함 관계 확인
            if pred in true or true in pred or pred == true:
                matches += 1
                break

    precision = matches / len(pred_k) if len(pred_k) > 0 else 0
    recall = matches / len(true_set) if len(true_set) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {"precision": precision, "recall": recall, "f1": f1}

def calculate_semantic_metrics(predicted_keywords, true_keywords, semantic_model, k=5):
    """의미적 유사도 기반 평가 (KeyBERT의 핵심 장점)"""
    pred_k = [kw for kw in predicted_keywords[:k] if kw.strip()]
    true_list = [kw for kw in true_keywords if kw.strip()]

    if len(true_list) == 0 or len(pred_k) == 0:
        return {"precision": 0, "recall": 0, "f1": 0}

    try:
        pred_embeddings = semantic_model.encode(pred_k)
        true_embeddings = semantic_model.encode(true_list)

        # 코사인 유사도 계산
        similarities = cosine_similarity(pred_embeddings, true_embeddings)

        # 임계값 이상의 유사도를 매칭으로 간주
        threshold = 0.6  # 조정 가능한 임계값
        matches = np.sum(np.max(similarities, axis=1) > threshold)

        precision = matches / len(pred_k)
        recall = matches / len(true_list)
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        return {"precision": precision, "recall": recall, "f1": f1}
    except:
        # 의미적 유사도 계산 실패시 부분 매칭으로 대체
        return calculate_flexible_metrics(predicted_keywords, true_keywords, k)

def evaluate_extractors_comprehensive(num_samples=100):
    """종합적인 키워드 추출기 성능 평가"""
    extractor = OptimizedKeywordExtractorComparison()

    processed_data = load_kptimes_data(num_samples)

    if len(processed_data) == 0:
        print("처리할 데이터가 없습니다.")
        return None, None

    all_texts = [item['text'] for item in processed_data]
    extractor.fit_tfidf(all_texts)

    print(f"\n{len(processed_data)}개 샘플로 종합 평가를 시작합니다...")

    # 세 가지 평가 방식
    results = {
        "exact": {
            "KeyBERT": {"precision": [], "recall": [], "f1": []},
            "TF-IDF": {"precision": [], "recall": [], "f1": []},
            "TextRank": {"precision": [], "recall": [], "f1": []}
        },
        "flexible": {
            "KeyBERT": {"precision": [], "recall": [], "f1": []},
            "TF-IDF": {"precision": [], "recall": [], "f1": []},
            "TextRank": {"precision": [], "recall": [], "f1": []}
        },
        "semantic": {
            "KeyBERT": {"precision": [], "recall": [], "f1": []},
            "TF-IDF": {"precision": [], "recall": [], "f1": []},
            "TextRank": {"precision": [], "recall": [], "f1": []}
        }
    }

    sample_results = []

    for i, item in enumerate(tqdm(processed_data, desc="종합 키워드 추출 평가")):
        text = item['text']
        true_keywords = item['keywords']

        clean_text = extractor._preprocess_text(text)

        # 각 방법으로 키워드 추출
        keybert_kw = extractor.extract_keybert_optimized(clean_text, top_k=10)
        tfidf_kw = extractor.extract_tfidf_constrained(clean_text, top_k=10)
        textrank_kw = extractor.extract_textrank_constrained(clean_text, top_k=10)

        # 세 가지 평가 방식으로 평가
        for eval_type, calc_func in [
            ("exact", calculate_exact_metrics),
            ("flexible", calculate_flexible_metrics),
            ("semantic", lambda p, t, k: calculate_semantic_metrics(p, t, extractor.semantic_model, k))
        ]:
            keybert_metrics = calc_func(keybert_kw, true_keywords, 5)
            tfidf_metrics = calc_func(tfidf_kw, true_keywords, 5)
            textrank_metrics = calc_func(textrank_kw, true_keywords, 5)

            for method, metrics in [("KeyBERT", keybert_metrics),
                                   ("TF-IDF", tfidf_metrics),
                                   ("TextRank", textrank_metrics)]:
                results[eval_type][method]["precision"].append(metrics["precision"])
                results[eval_type][method]["recall"].append(metrics["recall"])
                results[eval_type][method]["f1"].append(metrics["f1"])

        # 샘플 결과 저장 (처음 3개)
        if i < 3:
            sample_results.append({
                "text": text[:150] + "..." if len(text) > 150 else text,
                "true_keywords": true_keywords[:10],
                "keybert": keybert_kw[:5],
                "tfidf": tfidf_kw[:5],
                "textrank": textrank_kw[:5]
            })

    return results, sample_results

def print_comprehensive_results(results):
    """종합 평가 결과 출력"""
    eval_types = {
        "exact": "정확한 매칭",
        "flexible": "부분 매칭 허용",
        "semantic": "의미적 유사도"
    }

    for eval_type, eval_name in eval_types.items():
        print(f"\n{'='*80}")
        print(f"키워드 추출 성능 비교 - {eval_name}")
        print(f"{'='*80}")

        methods = ["KeyBERT", "TF-IDF", "TextRank"]

        print(f"{'Method':<12} {'Precision@5':<18} {'Recall@5':<18} {'F1@5':<18}")
        print("-" * 80)

        best_scores = {"precision": 0, "recall": 0, "f1": 0}
        best_method = {"precision": "", "recall": "", "f1": ""}

        for method in methods:
            prec_mean = np.mean(results[eval_type][method]["precision"])
            prec_std = np.std(results[eval_type][method]["precision"])

            rec_mean = np.mean(results[eval_type][method]["recall"])
            rec_std = np.std(results[eval_type][method]["recall"])

            f1_mean = np.mean(results[eval_type][method]["f1"])
            f1_std = np.std(results[eval_type][method]["f1"])

            print(f"{method:<12} {prec_mean:.3f} (±{prec_std:.3f}){'':<4} "
                  f"{rec_mean:.3f} (±{rec_std:.3f}){'':<4} "
                  f"{f1_mean:.3f} (±{f1_std:.3f})")

            if prec_mean > best_scores["precision"]:
                best_scores["precision"] = prec_mean
                best_method["precision"] = method
            if rec_mean > best_scores["recall"]:
                best_scores["recall"] = rec_mean
                best_method["recall"] = method
            if f1_mean > best_scores["f1"]:
                best_scores["f1"] = f1_mean
                best_method["f1"] = method

        print(f"\n최고 성능 ({eval_name}):")
        print(f"  Precision@5: {best_method['precision']} ({best_scores['precision']:.3f})")
        print(f"  Recall@5: {best_method['recall']} ({best_scores['recall']:.3f})")
        print(f"  F1@5: {best_method['f1']} ({best_scores['f1']:.3f})")

def print_sample_results(sample_results):
    """샘플 결과 출력"""
    print(f"\n{'='*80}")
    print("샘플 키워드 추출 결과 (최적화 적용)")
    print(f"{'='*80}")

    for i, sample in enumerate(sample_results):
        print(f"\n--- 샘플 {i+1} ---")
        print(f"텍스트: {sample['text']}")
        print(f"정답 키워드: {sample['true_keywords']}")
        print(f"KeyBERT (최적화): {sample['keybert']}")
        print(f"TF-IDF (제약): {sample['tfidf']}")
        print(f"TextRank (제약): {sample['textrank']}")
        print("-" * 60)

def main():
    """메인 실행 함수"""
    print("최적화된 키워드 추출 성능 비교 (KPTimes 데이터셋)")
    print("="*80)

    # 종합 성능 평가
    results, sample_results = evaluate_extractors_comprehensive(num_samples=100)

    if results is None:
        print("평가를 완료할 수 없습니다.")
        return

    # 결과 출력
    print_comprehensive_results(results)
    print_sample_results(sample_results)

    print(f"\n{'='*80}")
    print("평가 완료!")
    print("* 정확한 매칭: 완전히 일치하는 키워드만 인정")
    print("* 부분 매칭: 포함 관계도 인정 (예: 'AI' ⊆ 'AI technology')")
    print("* 의미적 유사도: 의미가 유사한 키워드도 인정 (KeyBERT 장점)")
    print("* KeyBERT는 의미적 유사도에서 가장 좋은 성능을 보일 것으로 예상됩니다.")

In [13]:
main()

최적화된 키워드 추출 성능 비교 (KPTimes 데이터셋)
모델들 초기화 중...
모델 초기화 완료!
KPTimes 데이터셋 로딩 중...
전체 테스트 데이터: 20000개
처리된 데이터: 100개
TF-IDF 벡터라이저 학습 중...
TF-IDF 학습 완료!

100개 샘플로 종합 평가를 시작합니다...


종합 키워드 추출 평가: 100%|██████████| 100/100 [02:27<00:00,  1.47s/it]


키워드 추출 성능 비교 - 정확한 매칭
Method       Precision@5        Recall@5           F1@5              
--------------------------------------------------------------------------------
KeyBERT      0.004 (±0.028)     0.005 (±0.032)     0.004 (±0.030)
TF-IDF       0.098 (±0.148)     0.101 (±0.153)     0.096 (±0.143)
TextRank     0.094 (±0.145)     0.096 (±0.154)     0.094 (±0.148)

최고 성능 (정확한 매칭):
  Precision@5: TF-IDF (0.098)
  Recall@5: TF-IDF (0.101)
  F1@5: TF-IDF (0.096)

키워드 추출 성능 비교 - 부분 매칭 허용
Method       Precision@5        Recall@5           F1@5              
--------------------------------------------------------------------------------
KeyBERT      0.050 (±0.095)     0.058 (±0.115)     0.052 (±0.100)
TF-IDF       0.312 (±0.239)     0.322 (±0.254)     0.307 (±0.231)
TextRank     0.326 (±0.229)     0.338 (±0.258)     0.324 (±0.230)

최고 성능 (부분 매칭 허용):
  Precision@5: TextRank (0.326)
  Recall@5: TextRank (0.338)
  F1@5: TextRank (0.324)

키워드 추출 성능 비교 - 의미적 유사도
Method       Precision@5    


