## Libraries

In [None]:
%%capture
! pip install pandas scikit-learn keybert transformers torch huggingface_hub keyphrase-vectorizers

In [None]:
# Standard library imports
import os
import re
import time
import gc
import itertools
import warnings
import configparser

# Third-party imports
import pandas as pd
from tqdm.notebook import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score
import torch
from typing import List, Dict, Tuple, Union, Any
from sklearn.metrics.pairwise import cosine_similarity

# Hugging Face imports
from huggingface_hub import login
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM

# KeyBERT and Sentence Transformers imports
from keybert import KeyBERT, KeyLLM
from keybert.llm import TextGeneration
from keyphrase_vectorizers import KeyphraseCountVectorizer
from sentence_transformers import SentenceTransformer

## Set-Up

In [None]:
# Load Hugging Face Token
config = configparser.ConfigParser()
config.read('./config.ini')
HF_TOKEN = config['hf_token']['access_token']  # For Hugging Face
login(HF_TOKEN)

In [None]:
warnings.simplefilter('ignore')

## Load SemEval data

In [None]:
!ls data/SemEval2017

In [None]:
root_doc= 'data/SemEval2017/docsutf8/'
root_key= 'data/SemEval2017/keys/'

In [None]:
# Creazione di una lista per salvare i dati
data = []

# Scansione della cartella per leggere i file .txt
for filename in os.listdir(root_doc):
    if filename.endswith(".txt"):  # Controlla che sia un file di testo
        file_path = os.path.join(root_doc, filename)
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read()
        data.append({"ID": re.sub('.txt', '', filename), "text": content})

# Creazione del DataFrame
doc_df = pd.DataFrame(data)
doc_df.head()

In [None]:
# Creazione di una lista per salvare i dati
data = []

# Scansione della cartella per leggere i file .txt
for filename in os.listdir(root_key):
    if filename.endswith(".key"):  # Controlla che sia un file di testo
        file_path = os.path.join(root_key, filename)
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read() # sep '\n'
        data.append({"ID": re.sub('.key', '', filename), "keys": content})

# Creazione del DataFrame
key_df = pd.DataFrame(data)
key_df.head()

In [None]:
sem_eval_df = pd.merge(doc_df, key_df, on="ID")
sem_eval_df.head()

## Test different KeyWords approaches

In [None]:
def compute_metrics(pred_kw, true_kw):
    """
    Computes precision, recall, and F1-score for keyword/keyphrase extraction.

    Args:
        pred_kw (list of list): Nested lists with predicted keywords/keyphrases.
                                Ogni lista interna contiene tuple, dove il primo elemento è la keyword.
        true_kw (list of list): Nested lists with true keywords/keyphrases.

    Returns:
        dict: Dictionary with evaluation metrics (precision, recall, F1-score).
    """
    precision_list, recall_list, f1_list = [], [], []
    
    for pred, true in zip(pred_kw, true_kw):
        pred_keywords = list(dict.fromkeys([p[0] for p in pred])) if pred else []
        true_keywords = list(dict.fromkeys(true)) if true else []
        
        pred_set = set(pred_keywords)
        true_set = set(true_keywords)
        if pred_set:
            precision = len(pred_set & true_set) / len(pred_set)
        else:
            precision = 0.0
        
        if true_set:
            recall = len(pred_set & true_set) / len(true_set)
        else:
            recall = 0.0
        
        if precision + recall == 0:
            f1 = 0.0
        else:
            f1 = 2 * precision * recall / (precision + recall)
        
        precision_list.append(precision)
        recall_list.append(recall)
        f1_list.append(f1)
    
    eval_dict = {
        "precision": sum(precision_list) / len(precision_list) if precision_list else 0,
        "recall": sum(recall_list) / len(recall_list) if recall_list else 0,
        "f1_score": sum(f1_list) / len(f1_list) if f1_list else 0,
    }
    
    return eval_dict

In [None]:
KEY_LLM_PROMPT = """
<s>[INST] <<SYS>>

You are a helpful assistant specialized in extracting comma-separated keywords.
You are to the point and only give the answer in isolation without any chat-based fluff.

<</SYS>>
I have the following document:
- The website mentions that it only takes a couple of days to deliver but I still have not received mine.

Please give me the keywords that are present in this document and separate them with commas.
Make sure you to only return the keywords and say nothing else. For example, don't say:
"Here are the keywords present in the document"
[/INST] meat, beef, eat, eating, emissions, steak, food, health, processed, chicken [INST]

I have the following document:
- [DOCUMENT]

With the following candidate keywords:
- [CANDIDATES]

Please give me the keywords that are present in this document and separate them with commas.
Make sure you to only return the keywords and say nothing else. For example, don't say:
"Here are the keywords present in the document"
[/INST]
"""


def initialize_models(embedding_model, llm_model, use_keyllm=True):
    """
    Loads and initializes machine learning models for embeddings, keyword extraction, text generation, 
    question answering, and summarization, using GPU if available.

    Args:
        embedding_model (str): Model for sentence embeddings. Defaults to 'all-MiniLM-L6-v2'.
        llm_model (str): Model for text generation. Defaults to 'gpt-2'.
        qa_model (str): Model for question answering. Defaults to 'distilbert-base-cased-distilled-squad'.
        sum_model (str): Model for summarization. Defaults to 'facebook/bart-large-cnn'.

    Returns:
        tuple: Initialized models:
            - SentenceTransformer for embeddings.
            - KeyBERT for keyword extraction.
            - KeyLLM for LLM-based keyword extraction.
            - HuggingFace pipelines for question answering and summarization.
    """
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    try:
        # Initialize the sentence transformer model
        print("Loading Sentence Transformer model...")
        model = SentenceTransformer(embedding_model, device=device)
        
        # Initialize the KeyBERT model
        print("Loading KeyBERT model...")
        kw_bert_model = KeyBERT(model)
        
        # Initialize the KeyLLM model
        if use_keyllm:
            print("Loading KeyLLM model...")
            tokenizer = AutoTokenizer.from_pretrained(llm_model)
            llm_model = AutoModelForCausalLM.from_pretrained(
                llm_model,
                trust_remote_code=True,
                device_map='auto'
            )
            generator = pipeline(
                model=llm_model, tokenizer=tokenizer,
                task='text-generation',
                max_new_tokens=50,
                repetition_penalty=1.1,
                model_kwargs={"load_in_4bit": True}
            )
            llm = TextGeneration(generator, prompt=KEY_LLM_PROMPT)
            kw_llm_model = KeyLLM(llm)
        else:
            kw_llm_model = None
        
        print("Models loaded successfully!")
    except Exception as e:
        print(f"An error occurred while loading the models: {e}")
    
    return model, kw_bert_model, kw_llm_model

def get_top_kw (doc, candidates, model, top_n):
    """
    Get top keywords based on similarity to the document.
    
    Args:
        doc: Input document text
        candidates: List of candidate keywords
        model: SentenceTransformer model for encoding
        top_n: Number of top keywords to return
    
    Returns:
        List of tuples containing (keyword, similarity_score)
    """
    # Encode document and candidates
    doc_embedding = model.encode(doc).reshape(1, -1)  # Reshape for sklearn
    candidate_embeddings = model.encode(candidates)
    
    # Calculate cosine similarities using sklearn
    similarities = cosine_similarity(candidate_embeddings, doc_embedding).flatten()
    
    # Get top keywords with their scores
    top_indices = similarities.argsort()[-top_n:][::-1]
    return [(candidates[idx], float(similarities[idx])) for idx in top_indices]
    
def extract_keywords_from_text(doc, kw_bert_model, kw_llm_model, model, use_keyllm, diversity, top_n):
    """
    Extract keywords from text using KeyBERT and optionally KeyLLM.
    
    Args:
        doc: Input text
        kw_bert_model: KeyBERT model instance
        kw_llm_model: KeyLLM model instance (optional)
        model: SentenceTransformer model for encoding
        use_keyllm: Whether to use KeyLLM for refinement
        diversity: Diversity parameter for MMR
        top_n: Number of keywords to extract
    
    Returns:
        List of tuples containing (keyword, score)
    """
    def extract_with_keybert(n: int) -> List[Tuple[str, float]]:
        """Helper function for KeyBERT extraction with error handling"""
        try:
            return kw_bert_model.extract_keywords(
                docs=doc,
                vectorizer=KeyphraseCountVectorizer(),
                use_mmr=True,
                diversity=diversity,
                top_n=n
            )
        except ValueError as e:
            print(f"KeyphraseCountVectorizer failed, falling back to default: {e}")
            return kw_bert_model.extract_keywords(
                docs=doc,
                use_mmr=True,
                diversity=diversity,
                top_n=n
            )

    if use_keyllm:
        # Get initial keywords from KeyBERT (limited to 20 for LLM processing)
        initial_keywords = extract_with_keybert(20)
        initial_keyword_texts = [kw[0] for kw in initial_keywords]
        # Refine using KeyLLM
        refined_keywords = kw_llm_model.extract_keywords(
            docs=doc,
            candidate_keywords=initial_keywords
        )[0]
        # Combine and deduplicate candidates
        all_candidates = list(set(initial_keyword_texts) | set(refined_keywords))
        all_candidates = [c for c in all_candidates if c]  # Remove empty strings
        # Get final keywords based on similarity
        print(initial_keywords)
        print(refined_keywords)
        return get_top_kw(doc, all_candidates, model, top_n)
    
    # Use KeyBERT only
    return extract_with_keybert(top_n)


def test_keywords_extraction(kw_bert_model, kw_llm_model, model, texts, true_kw, use_keyllm, diversity, top_n):
    """
    Tests keyword extraction with KeyBERT or KeyBERT + LLM, calculating execution time and metrics.

    Args:
        kw_bert_model: KeyBERT model for keyword extraction.
        kw_llm_model: LLM model for refining keywords (used only if use_keyllm=True).
        texts (list of str): List of texts to process.
        true_kw (list of list): List of reference keywords.
        use_keyllm (bool): If True, use KeyBERT + LLM. If False, use only KeyBERT.
        diversity (float): Diversity parameter for MMR.
        top_n (int): Maximum number of keywords to extract per text.

    Returns:
        dict: Dictionary with execution time and precision, recall, and F1-score metrics.
    """
    pred_kw = []
    start_time = time.time()  # Start time measurement

    # Generate keywords for each text
    for text in tqdm(texts, desc="Processing texts"):
        print(text)
        keywords_list = extract_keywords_from_text(text, kw_bert_model, kw_llm_model, model, use_keyllm, diversity, top_n)
        pred_kw.append(keywords_list)

    end_time = time.time()  # End time measurement

    # Calculate performance metrics
    eval_metrics = compute_metrics(pred_kw, true_kw)
    eval_metrics["execution_time"] = end_time - start_time

    return eval_metrics



In [None]:

def grid_search_extraction(param_combinations, texts, true_kw):
    """
    Performs a Grid Search to test different parameter combinations
    in the test_extraction function, freeing GPU memory between iterations.

    Args:
        param_combinations (list of tuples): List with all parameter combinations.
        texts (list of str): List of texts to process.
        true_kw (list of list): List of reference keywords.

    Returns:
        pd.DataFrame: DataFrame with metric results for each combination.
    """
    results = []

    # Test each parameter combination
    for embedding_model, llm_model, use_keyllm, diversity, top_n in tqdm(param_combinations, desc="Processing parameter combinations"):

        print(f"Testing: emb_model={embedding_model}, llm_model={llm_model}, use_keyllm={use_keyllm}, diversity={diversity}, top_n={top_n}")

        # Initialize the KeyBERT model with the specified embedding model
        embedding_model, kw_bert_model, kw_llm_model = initialize_models(embedding_model, llm_model, use_keyllm)

        # Start the test with the current parameters
        metrics = test_keywords_extraction(
            kw_bert_model, kw_llm_model, embedding_model, texts, true_kw,
            use_keyllm=use_keyllm, diversity=diversity, top_n=top_n
        )

        # Save the results in a list
        results.append({
            "embedding_model": embedding_model,
            "llm_model": llm_model,
            "use_keyllm": use_keyllm,
            "diversity": diversity,
            "top_n": top_n,
            "precision": metrics["precision"],
            "recall": metrics["recall"],
            "f1_score": metrics["f1_score"],
            "execution_time": metrics["execution_time"]
        })

        del kw_bert_model
        del kw_llm_model
        gc.collect()
        torch.cuda.empty_cache()

    # Convert the results to a DataFrame for easier analysis
    results_df = pd.DataFrame(results)
    
    return results_df


In [None]:

# Define parameters for Grid Search
param_grid = {
    "embedding_model": ["all-MiniLM-L6-v2", "paraphrase-MiniLM-L12-v2"],  # KeyBERT embedding models
    "llm_model": ["meta-llama/Llama-3.2-3B", "Qwen/Qwen2.5-3B"],  # LLM models for KeyLLM
    "use_keyllm": [True, False],  # Test both KeyBERT and KeyBERT + LLM
    "diversity": [0.3, 0.5, 0.7],  # Variation of the MMR parameter
    "top_n": [3, 5, 10]  # Maximum number of extracted keywords
}

# Generate all possible parameter combinations
param_combinations = list(itertools.product(
    param_grid["embedding_model"],
    param_grid["llm_model"],
    param_grid["use_keyllm"],
    param_grid["diversity"],
    param_grid["top_n"]
))

In [None]:
texts = sem_eval_df["text"].tolist()
true_kw = sem_eval_df["keys"].str.split("\n").tolist()

# Test della funzione grid_search_extraction
results_df = grid_search_extraction(param_combinations, texts, true_kw)
print("Grid Search Results:")
print(results_df)

In [None]:
# pick the best model
best_f1_params = results_df.iloc[results_df["f1_score"].idxmax()]
best_precision_params = results_df.iloc[results_df["precision"].idxmax()]
best_recall_params = results_df.iloc[results_df["recall"].idxmax()]

# print the best model parameters
print("Best F1 Score Parameters:")
print(best_f1_params)
print("\nBest Precision Parameters:")
print(best_precision_params)
print("\nBest Recall Parameters:")
print(best_recall_params)