In [None]:
import json
import numpy as np
import random
from collections import defaultdict
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import silhouette_score, accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from umap import UMAP
import spacy
import matplotlib.pyplot as plt

# Caricamento di Spacy per l'estrazione delle embedding e del testo distintivo
nlp = spacy.load("en_core_web_lg")

# Mappatura per Gold Label
label_mapping = {
    "STEREOTYPED": 0,
    "ANTISTEREOTYPICAL": 1,
    "NEUTRAL": 2
}

# Funzione per salvare i risultati dei cluster in formato JSON
def save_clusters_to_json(documents, labels, filename, label_mapping):
    clusters = defaultdict(list)
    for doc, label in zip(documents, labels):
        label_name = list(label_mapping.keys())[list(label_mapping.values()).index(label)]
        clusters[label_name].append(doc)
    
    with open(filename, 'w') as file:
        json.dump(clusters, file, indent=4)
    print(f"Risultati dei cluster salvati in {filename}")

# Funzione per visualizzare il clustering
def plot_clusters(data, labels=None, title="Plot dei Cluster"):
    plt.figure(figsize=(10, 7))
    plt.scatter(data[:, 0], data[:, 1], c=labels, cmap='viridis', s=50, alpha=0.7)
    plt.colorbar(label='Cluster Labels')
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.title(title)
    plt.show()

# Funzione per estrarre la porzione distintiva della frase
def extract_distinctive_segment(phrase):
    doc = nlp(phrase)
    segments = [chunk.text for chunk in doc.noun_chunks if chunk.root.dep_ in {'nsubj', 'attr'}]
    distinctive_part = " ".join(segments[1:]) if len(segments) > 1 else phrase
    return distinctive_part

# Funzione per ottenere embedding basato sul testo distintivo
def get_distinctive_embedding(phrase):
    distinctive_part = extract_distinctive_segment(phrase)
    distinctive_vector = nlp(distinctive_part).vector
    return distinctive_vector

# Funzione per aggregare i dati per Contesto
def aggregate_by_context(data):
    context_dict = defaultdict(lambda: {"Bias Type": None, "Frasi": []})
    
    for item in data:
        context = item["Contesto"]
        bias_type = item["Bias Type"]
        
        context_dict[context]["Bias Type"] = bias_type
        context_dict[context]["Frasi"].append({
            "Frase": item["Frase"],
            "Gold Label": item["Gold Label"],
            "Frase completa": item["Frase filtrata"]["Frase completa"]
        })
    
    aggregated_data = []
    for context, values in context_dict.items():
        labels = {frase["Gold Label"] for frase in values["Frasi"]}
        if all(label in labels for label in label_mapping.keys()):
            aggregated_data.append({
                "Contesto": context,
                "Bias Type": values["Bias Type"],
                "Frasi": values["Frasi"]
            })
    
    return aggregated_data

# Funzione per bilanciare il set di allenamento per Bias Type e Contesto
def create_balanced_training_set(aggregated_data, n_per_bias_type):
    bias_type_groups = defaultdict(list)
    for item in aggregated_data:
        bias_type = item["Bias Type"]
        bias_type_groups[bias_type].append(item)
    
    training_set = []
    test_set = []
    for bias_type, contexts in bias_type_groups.items():
        selected_contexts = random.sample(contexts, min(n_per_bias_type, len(contexts)))
        training_set.extend(selected_contexts)
        
        remaining_contexts = [context for context in contexts if context not in selected_contexts]
        test_set.extend(remaining_contexts)
    
    return training_set, test_set

# Funzione per calcolare il numero massimo bilanciato di contesti per ciascun Bias Type
def calculate_max_per_bias_type(aggregated_data):
    bias_type_counts = defaultdict(int)
    for item in aggregated_data:
        bias_type_counts[item["Bias Type"]] += 1
    return min(bias_type_counts.values())

# Caricamento del dataset
with open('stereoset_with_multivectors_for_clustering.json', 'r') as file:
    data = json.load(file)

# Filtra i dati e recupera contesto, frase e multivettore
filtered_data = [
    item for item in data 
    if item["Frase filtrata"]["Frase completa"]["Tipo"] == 2 and "BLANK" not in item["Frase filtrata"]["Frase completa"]["Testo"]
]

# Aggrega i dati per Contesto
aggregated_data = aggregate_by_context(filtered_data)

# Calcola il numero massimo di contesti bilanciati per ciascun Bias Type
n_per_bias_type = calculate_max_per_bias_type(aggregated_data)

# Crea il dataset bilanciato per il training e il set di test
training_data, test_data = create_balanced_training_set(aggregated_data, n_per_bias_type)

# Estrai embedding distintive e label dal set di allenamento
train_embeddings = np.array([
    get_distinctive_embedding(frase["Frase completa"]["Testo"])
    for item in training_data for frase in item["Frasi"]
])
train_labels = np.array([
    label_mapping[frase["Gold Label"]] for item in training_data for frase in item["Frasi"]
])

# Estrai embedding distintive dal set di test
test_documents = [frase["Frase completa"]["Testo"] for item in test_data for frase in item["Frasi"]]
test_embeddings = np.array([
    get_distinctive_embedding(frase["Frase completa"]["Testo"])
    for item in test_data for frase in item["Frasi"]
])

# Esegui una classificazione supervisionata sui dati di training
print("Eseguo classificazione supervisionata...")
classifier = SVC(kernel="linear", probability=True, random_state=42)
classifier.fit(train_embeddings, train_labels)

# Classifica i dati di test
pseudo_labels = classifier.predict(test_embeddings)
print("Pseudo-labels generate dal classificatore:", pseudo_labels)

# Riduzione dimensionale per visualizzazione e clustering
umap_reducer = UMAP(n_components=2, random_state=42, metric="cosine")
test_reduced = umap_reducer.fit_transform(test_embeddings)

# Clustering con KMeans
print("Eseguo clustering semi-supervisionato con KMeans...")
kmeans = KMeans(n_clusters=3, random_state=42)
test_cluster_labels_kmeans = kmeans.fit_predict(test_reduced)
print("Etichette dei cluster KMeans:", test_cluster_labels_kmeans)

# Clustering con Agglomerative Clustering
print("Eseguo clustering semi-supervisionato con Agglomerative Clustering...")
agg_clustering = AgglomerativeClustering(n_clusters=3, affinity='euclidean', linkage='ward')
test_cluster_labels_agg = agg_clustering.fit_predict(test_reduced)
print("Etichette dei cluster Agglomerative:", test_cluster_labels_agg)

# Visualizza e salva i risultati KMeans
print("Visualizzo risultati KMeans")
plot_clusters(test_reduced, labels=test_cluster_labels_kmeans, title="Clustering KMeans su dati di test con pseudo-labels")
save_clusters_to_json(test_documents, test_cluster_labels_kmeans, "cluster/kmeans_semi_supervised_test_results.json", label_mapping)

# Visualizza e salva i risultati Agglomerative
print("Visualizzo risultati Agglomerative Clustering")
plot_clusters(test_reduced, labels=test_cluster_labels_agg, title="Clustering Agglomerative su dati di test con pseudo-labels")
save_clusters_to_json(test_documents, test_cluster_labels_agg, "cluster/agglomerative_semi_supervised_test_results.json", label_mapping)

# Calcola silhouette score per valutare la separazione
silhouette_avg_kmeans = silhouette_score(test_reduced, test_cluster_labels_kmeans)
silhouette_avg_agg = silhouette_score(test_reduced, test_cluster_labels_agg)
print(f"Silhouette Score per KMeans: {silhouette_avg_kmeans:.4f}")
print(f"Silhouette Score per Agglomerative Clustering: {silhouette_avg_agg:.4f}")

# Calcola l'accuratezza delle pseudo-etichetta per entrambi i metodi
accuracy_kmeans = accuracy_score(pseudo_labels, test_cluster_labels_kmeans)
accuracy_agg = accuracy_score(pseudo_labels, test_cluster_labels_agg)
print(f"Accuratezza delle pseudo-etichetta per KMeans: {accuracy_kmeans:.4f}")
print(f"Accuratezza delle pseudo-etichetta per Agglomerative Clustering: {accuracy_agg:.4f}")


Eseguo classificazione supervisionata...
Pseudo-labels generate dal classificatore: [1 1 2 ... 0 0 2]


  warn(
