In [None]:
import spacy
from sentence_transformers import SentenceTransformer
import numpy as np
from datasets import load_dataset
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from torch.utils.data import Dataset, DataLoader
from numpy.linalg import norm

# Dispositivo (MPS o CPU)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Sto usando il dispositivo: {device}")

# Inizializzazione modelli
nlp = spacy.load('en_core_web_sm')
embedder = SentenceTransformer('all-mpnet-base-v2')

data = load_dataset("taln-ls2n/inspec")["train"].to_list()

# Funzione di preprocessing
def preprocess(text):
    doc = nlp(text.lower())
    tokens = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop]
    return tokens

def preprocess_keyphrase(text):
    tokens = []
    for t in text:
        doc = t.replace("-", " ")
        doc = nlp(doc.lower())
        doc = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop]
        if len(doc) < 4 and len(doc) > 1:
            tokens.append(" ".join(doc))
    return tokens


In [None]:
# Liste finali dove accumuleremo features e labels
all_features = []
all_labels = []

for doc in data:
    title = doc['title']
    abstract = doc['abstract']
    keyphrases = doc['keyphrases']  # lista di stringhe
    prmu = doc['prmu']
    keyphrases = [kp for kp, p in zip(keyphrases, prmu) if p == "P"]
    keyphrases = preprocess_keyphrase(keyphrases)

    # Preprocessing del testo (title + abstract)
    preprocessed_tokens = preprocess(title + '. ' + abstract)
    processed_text = ' '.join(preprocessed_tokens)

    # 1) Estraggo i candidati n-gram dal testo processato
    vectorizer = CountVectorizer(ngram_range=(2, 3)).fit([processed_text])
    candidates = vectorizer.get_feature_names_out()  # tutti gli n-gram

    # 2) Embedding del documento
    doc_embedding = embedder.encode(processed_text, show_progress_bar=False)
    # Normalizziamo (per calcolo coseno semplificato)
    doc_emb_norm = doc_embedding / norm(doc_embedding)

    # 3) Embedding di tutti i candidati
    candidate_embeddings = embedder.encode(candidates, show_progress_bar=False)
    # Normalizzo ogni embedding di candidato (asse 1)
    cand_emb_norm = candidate_embeddings / norm(candidate_embeddings, axis=1, keepdims=True)

    # 4) Calcolo la similarità coseno tra doc e ciascun candidato
    #    cos_sim = dot(u,v) / (||u|| * ||v||)
    #    ma avendo già normalizzato doc_emb_norm e cand_emb_norm,
    #    cos_sim = dot(doc_emb_norm, cand_emb_norm[i])

    cos_sims = np.sum(doc_emb_norm * cand_emb_norm, axis=1)  # shape = (num_candidates,)

    # 5) Etichette 1/0 in base alla corrispondenza letterale
    #    (Se c e' ESATTAMENTE in doc['keyphrases'], label=1, altrimenti 0)
    #    Attenzione: potresti dover normalizzare i Keyphrase reali per un match robusto.
    labels = np.array([1 if c in keyphrases else 0 for c in candidates])

    # 6) Creiamo le feature unendo doc_emb + candidate_emb (concatenazione)
    combined_embeddings = np.hstack([
        np.tile(doc_embedding, (len(candidates), 1)),
        candidate_embeddings
    ])

    # 7) Negative sampling:
    #    a) Prendiamo tutti i positivi
    pos_indices = np.where(labels == 1)[0]
    neg_indices = np.where(labels == 0)[0]

    # Se il documento non ha nessun positivo, decidi come gestirlo:
    # puoi saltarlo, o prendere qualche negativo a caso...
    if len(pos_indices) == 0:
        # Per esempio, skippo il documento
        # continue
        # Oppure prendo i top 10 negativi:
        # neg_sorted = np.argsort(-cos_sims[neg_indices])
        # top_neg = neg_sorted[:10]
        # chosen_neg_indices = neg_indices[top_neg]
        # final_feats = combined_embeddings[chosen_neg_indices]
        # final_labels = labels[chosen_neg_indices]
        # all_features.extend(final_feats)
        # all_labels.extend(final_labels)
        # continue  # e saltiamo i successivi passaggi
        continue  # Saltiamo per semplificare

    # b) Teniamo TUTTI i positivi
    pos_feats = combined_embeddings[pos_indices]
    pos_labels = labels[pos_indices]

    # c) Ordiniamo i negativi per coseno discendente (i piu' simili in alto)
    neg_sims = cos_sims[neg_indices]  # sim di tutti i negativi
    neg_sorted_by_sim = np.argsort(-neg_sims)  # sort discendente

    # d) Decidiamo quanti negativi tenere.
    #    Esempio: 2 * (numero di positivi), i "piu' simili" (hard negatives)
    n_pos = len(pos_indices)
    keep_neg = n_pos * 4

    chosen_neg = neg_sorted_by_sim[:keep_neg]  # prime 'keep_neg' posizioni
    chosen_neg_indices = neg_indices[chosen_neg]

    # e) Recuperiamo le feature e le label corrispondenti
    neg_feats = combined_embeddings[chosen_neg_indices]
    neg_labels = labels[chosen_neg_indices]

    # 8) Combiniamo i positivi e i negativi selezionati
    final_feats = np.concatenate([pos_feats, neg_feats], axis=0)
    final_labels = np.concatenate([pos_labels, neg_labels], axis=0)

    # 9) Aggiungiamo al dataset globale
    all_features.extend(final_feats)
    all_labels.extend(final_labels)

# Ora all_features e all_labels contengono (doc + cand) + label,
# con un numero ridotto di negativi per ridurre lo sbilanciamento.
all_features = np.array(all_features)
all_labels = np.array(all_labels)

print("Dimensione finale delle feature:", all_features.shape)
print("Distribuzione delle etichette (0/1):", np.bincount(all_labels))


In [None]:
# Indici positivi (label = 1)
pos_indices = np.where(labels == 1)[0]

# Se ci sono keyphrase positive...
if len(pos_indices) > 0:
    # Ordinamento discendente della similarità coseno (per i soli positivi)
    # np.argsort(-array) li ordina in ordine decrescente
    sorted_pos_indices = pos_indices[np.argsort(-cos_sims[pos_indices])]

    # Scegliamo quanti ne vogliamo stampare (esempio: i top 5)
    top_k = 5
    best_pos_indices = sorted_pos_indices[:top_k]

    print("Top candidati positivi (label=1) per similarità coseno:")
    for i in best_pos_indices:
        print(f"  - '{candidates[i]}' | coseno={cos_sims[i]:.4f}")
else:
    print("Nessuna keyphrase positiva in questo documento.")


In [None]:
# Creazione di una classe Dataset per gestire features e labels
class KeyphraseDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features  # numpy array con forma (n_samples, 2 * base_emb_dim)
        self.labels = labels      # numpy array con forma (n_samples,)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        feature = torch.tensor(self.features[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return feature, label

# Supponiamo di avere all_features e all_labels ottenuti dal preprocessing
# all_features = np.array(all_features)   # forma (n_samples, 2 * base_emb_dim)
# all_labels = np.array(all_labels)         # forma (n_samples,)
# Per esempio:
# all_features = np.random.rand(1000, 2 * base_emb_dim)
# all_labels = np.random.randint(0, 2, size=(1000,))

dataset = KeyphraseDataset(all_features, all_labels)
batch_size = 512
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
class KeyphraseClassifierLSTMComplex(nn.Module):

    def __init__(self, base_emb_dim, hidden_dim=512, num_layers=4, bidirectional=True, dropout=0.5):
        super(KeyphraseClassifierLSTMComplex, self).__init__()
        self.bidirectional = bidirectional

        # LSTM bidirezionale a più layer con dropout tra i layer
        self.lstm = nn.LSTM(
            input_size=base_emb_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0
        )
        # Calcoliamo la dimensione dell'output dell'LSTM
        lstm_output_dim = hidden_dim * (2 if bidirectional else 1)

        # Meccanismo di attenzione: calcola pesi per ciascun output dell'LSTM
        self.attention = nn.Linear(lstm_output_dim, 1)

        # Layer fully connected con dropout per la classificazione finale
        self.fc1 = nn.Linear(lstm_output_dim, 128)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x ha forma (batch_size, 2 * base_emb_dim)
        # Lo rimodelliamo in una sequenza di lunghezza 2: (batch_size, 2, base_emb_dim)
        batch_size = x.size(0)
        x_seq = x.view(batch_size, 2, -1)

        # Elaborazione con LSTM: lstm_out ha forma (batch_size, seq_len, lstm_output_dim)
        lstm_out, _ = self.lstm(x_seq)

        # Calcolo dei pesi di attenzione per ogni step della sequenza
        attn_scores = self.attention(lstm_out)  # (batch_size, seq_len, 1)
        attn_weights = torch.softmax(attn_scores, dim=1)  # normalizziamo su seq_len

        # Calcoliamo la rappresentazione pesata della sequenza
        attn_output = torch.sum(attn_weights * lstm_out, dim=1)  # (batch_size, lstm_output_dim)

        # Passaggio attraverso i layer fully connected con dropout
        x_fc = self.fc1(attn_output)
        x_fc = torch.relu(x_fc)
        x_fc = self.dropout(x_fc)
        x_fc = self.fc2(x_fc)
        #out = self.sigmoid(x_fc)

        return x_fc

In [None]:
# Conversione in tensori e trasferimento sul dispositivo
X = torch.tensor(np.array(all_features), dtype=torch.float32).to(device)
y = torch.tensor(np.array(all_labels), dtype=torch.float32).unsqueeze(1).to(device)

# Inizializzazione rete
model = KeyphraseClassifierLSTMComplex(embedder.get_sentence_embedding_dimension()).to(device)
pos_weight = torch.tensor([2.0]).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

epochs = 60
for epoch in range(epochs):
    model.train()
    epoch_losses = []
    for batch_features, batch_labels in dataloader:
        batch_features = batch_features.to(device)
        batch_labels = batch_labels.to(device)
        optimizer.zero_grad()
        outputs = model(batch_features)  # outputs sono logit non sigmoidi
        loss = criterion(outputs, batch_labels.unsqueeze(1))
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())

    avg_loss = np.mean(epoch_losses)

    if (epoch + 1) % 10 == 0:
        model.eval()
        with torch.no_grad():
            all_preds = []
            all_true = []
            for batch_features, batch_labels in dataloader:
                batch_features = batch_features.to(device)
                batch_labels = batch_labels.to(device)
                outputs = model(batch_features)
                # Applico la sigmoid per ottenere probabilità
                preds = (torch.sigmoid(outputs) >= 0.5).float()
                all_preds.append(preds.cpu().numpy())
                all_true.append(batch_labels.cpu().numpy())
            all_preds = np.concatenate(all_preds).flatten()
            all_true = np.concatenate(all_true).flatten()

            from sklearn.metrics import precision_score, recall_score
            precision = precision_score(all_true, all_preds, zero_division=0)
            recall = recall_score(all_true, all_preds, zero_division=0)

            print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

In [None]:
# Caricamento dati JSON Lines
data = load_dataset("taln-ls2n/inspec")["test"].to_list()

import torch
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer

# Funzione per calcolare la similarità di Jaccard
def jaccard_similarity(list1, list2):
    set1 = set(list1)
    set2 = set(list2)
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return intersection / union if union != 0 else 0.0

def predict_keyphrases(model, data, embedder, threshold):
    model.eval()  # pass in eval mode

    # Se il modello è su GPU, assicuriamoci di spostare i dati su GPU
    device = next(model.parameters()).device
    jaccard_scores = []
    for doc in data:
        doc_id = doc.get('id', 'N/A')
        title = doc.get('title', '')
        abstract = doc.get('abstract', '')

        # Preprocess
        preprocessed_tokens = preprocess(title + ". " + abstract)
        processed_text = ' '.join(preprocessed_tokens)

        # 1) Generiamo i candidati n-gram (1,2,3) dal testo preprocessato
        vectorizer = CountVectorizer(ngram_range=(2,3)).fit([processed_text])
        candidates = vectorizer.get_feature_names_out()

        # 2) Embedding del documento
        doc_embedding = embedder.encode(processed_text, show_progress_bar=False)

        # 3) Embedding di tutti i candidati
        cand_embeddings = embedder.encode(candidates, show_progress_bar=False)

        # 4) Creiamo la concatenazione (doc + cand)
        combined_features = []
        for cand_emb in cand_embeddings:
            feat = np.hstack([doc_embedding, cand_emb])
            combined_features.append(feat)
        combined_features = np.array(combined_features, dtype=np.float32)

        # Converto in tensori PyTorch
        X_test = torch.tensor(combined_features).to(device)

        # 5) Ottengo le probabilità dal modello
        with torch.no_grad():
            outputs = model(X_test)   # (num_candidates, 1)
            probs = torch.sigmoid(outputs).float()

        # 6) Stampo i candidati con prob >= threshold
        pred = []
        #predicted_keyphrases = []
        for cand, prob in zip(candidates, probs):
            if prob >= threshold:
                pred.append(cand)
                #predicted_keyphrases.append((cand, prob))

        # Se vuoi stampare i risultati
        #print(f"\nDocumento ID: {doc_id}")
        #print(f"Title: {title}")

        # Indici posiivi (label = 1)
        keyphrases = doc['keyphrases']  # lista di stringhe
        prmu = doc["prmu"]
        keyphrases = [kp for kp, p in zip(keyphrases, prmu) if p == "P"]
        keyphrases = preprocess_keyphrase(keyphrases)

        # Filtraggio: per ogni keyphrase, se la parola iniziale è già presente, mantieni quella con prob maggiore
        """filtered_keyphrases = {}
        for cand, prob in predicted_keyphrases:
            first_word = cand.split()[0]  # prendi la prima parola della keyphrase
            # Se la parola non è ancora presente oppure la nuova ha probabilità maggiore, aggiorna il dizionario
            if (first_word not in filtered_keyphrases) or (prob.item() > filtered_keyphrases[first_word][1].item()):
                filtered_keyphrases[first_word] = (cand, prob)"""

        # Converti il dizionario in una lista di tuple per la stampa
        #filtered_list = list(filtered_keyphrases.values())

        jaccard_score_value = jaccard_similarity(pred, keyphrases)
        jaccard_scores.append(jaccard_score_value)

        #print(f"Jaccard score: {jaccard_score_value}")

        #print("Keyphrase predette filtrate:")
        #for cand, prob in filtered_list:
            #print(f" - '{cand}' (prob={prob.item():.2f})")

        #print(f"Keyphrase reali")
        #for k in keyphrases:
            #print(f"- {k}")
    average_jaccard_score = np.mean(jaccard_scores)
    print("Average Jaccard Similarity:", average_jaccard_score)

# Supponiamo di aver definito 'test_data' come lista di doc con id, title, abstract, ...
predict_keyphrases(model, data, embedder, threshold=0.5)
