In [1]:
import tokenizers
import transformers
from transformers import BertTokenizer, BertForMaskedLM
import sklearn
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MinMaxScaler
from transformers.pipelines import pipeline
from sklearn.linear_model import Ridge
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm


### Dataset

Si caricano i training set e i validation set per effettuare il probing. I dataset contengono la frase con le rispettive feature linguistiche

In [2]:
#caricamento dataset
train_df = pd.read_csv("C:/Users/bergo/OneDrive - University of Pisa/Tesi Magistrale/probing_data/probing_train.csv")
val_df = pd.read_csv("C:/Users/bergo/OneDrive - University of Pisa/Tesi Magistrale/probing_data/probing_test.csv")

In [None]:
val_df.info()

In [3]:
#trasformazione in dizionario
train_dict = train_df.to_dict(orient='records')
val_dict = val_df.to_dict(orient='records')

In [None]:
train_dict[0]

### PROBING

Si definiscono delle funzioni per l'estrazione degli hidden states dei modelli per poi passarli al modello Ridge Regression come features per il task di regressione sulle caratteristiche linguistiche delle frasi.

In [5]:
#funzione per ottenere gli embedding delle frasi.
def feature_extraction(samples, model_name):
    first_layer = 1
    last_layer = 8
    tokenizer = BertTokenizer.from_pretrained("dbmdz/bert-base-italian-cased")
    model = BertForMaskedLM.from_pretrained(model_name)
    for sample in tqdm(samples, desc="Estrazione features", unit="sample"):
        encoded_sen = tokenizer(sample["sent"], padding=True, truncation=True, max_length=128, return_tensors='pt') 
        with torch.no_grad():    
            model_output = model(**encoded_sen, output_hidden_states=True)
            hidden_states = model_output.hidden_states
            for layer in range(first_layer, last_layer+1):
                layer_output = torch.squeeze(hidden_states[layer])
                cls_embedding = layer_output[0, :].cpu().detach().numpy()
                sample[f'layer_{layer}'] = {'cls_embedding': cls_embedding}
    return samples

#funzione per ottenere features e lables
def get_features_lables(samples, feature, layer):
    X = []
    y = []
    for sample in samples:
        emb = sample[layer]["cls_embedding"]
        label =  sample[feature]
        X.append(emb)
        y.append(label)
    return X, y


#funzione per addestrare e valutare il modello
def train_eval(train_set, val_set, feature, layer):
    scaler = MinMaxScaler()
    X_train, y_train = get_features_lables(train_set, feature, layer)
    X_val, y_val = get_features_lables(val_set, feature, layer)
    X_train = np.array(X_train) 
    X_val = np.array(X_val)
    scaled_X_train = scaler.fit_transform(X_train)
    scaled_X_val = scaler.transform(X_val)
    clf = sklearn.linear_model.Ridge(alpha=1.0)
    clf.fit(scaled_X_train, y_train)
    y_pred = clf.predict(scaled_X_val) 
    #return compute_metrics(np.array(y_val), y_pred)
    return y_pred

In [6]:

checkpoints = [2, 32, 512, 8192, 0]  #step su cui iterare
ling_features = ["n_tokens",  "char_per_tok", "upos_dist_DET", "upos_dist_ADV", "upos_dist_PUNCT", "upos_dist_NUM", "upos_dist_PRON", "upos_dist_ADP", "upos_dist_PROPN","upos_dist_ADJ","upos_dist_VERB","upos_dist_NOUN", "upos_dist_CCONJ", "upos_dist_AUX", "avg_links_len", "max_links_len", "avg_max_depth", "dep_dist_obj", "dep_dist_nsubj", "subj_pre", "subj_post", "n_prepositional_chains", "avg_prepositional_chain_len", "avg_subordinate_chain_len", "subordinate_proposition_dist", "avg_verb_edges"] #features di LP selezionate
training_id = "CURRICULUM" #id del training usato sui modelli 

In [7]:
def probing_checkpoints(checkpoints, training_id, train_dict, val_dict, ling_features):
    first_layer = 1
    last_layer = 8
    results = pd.DataFrame()
    for n_step in checkpoints:    #iteriamo sugli step selezionati
        checkpoint_name = f'checkpoint-step{n_step}'
        #selezioniamo il modello corretto allo step e con il training id giusti
        if n_step == 0:  #lo 0 identifica l'ultimo step in questo caso
            model_name = f"C:/Users/bergo/OneDrive - University of Pisa/Tesi Magistrale/models/{training_id}/final_pretrained_model"
            checkpoint = 15449
            print("Inizio probing per il modello finale")
        else:
            checkpoint = n_step
            model_name = f"C:/Users/bergo/OneDrive - University of Pisa/Tesi Magistrale/models/{training_id}/checkpoints/{checkpoint_name}"
            print(f"Inizio probing per il checkpoint {n_step}")
        print("Estrazione delle feature di training...")
        train_samples = feature_extraction(train_dict, model_name)  #si effettua l'estrazione delle feature per il checkpoint
        print("Estrazione delle features di validation...")
        val_samples = feature_extraction(val_dict, model_name)
        for ling_feature in ling_features:    #addestramento e validation su ogni feature linguistica del dataset
            print(f'Addestramento del modello sulla feature linguistica: {ling_feature}') 
            for layer in range(first_layer, last_layer+1):
                layer_result = train_eval(train_samples, val_samples, ling_feature, f'layer_{layer}')   #addestriamo il ridge per ogni layer ottenendo le metriche
                row = {"model": training_id, "step": checkpoint, "ling_feature": ling_feature, "layer": layer, "preds": layer_result}
                results = results._append(row, ignore_index = True)         
    return results
        

In [None]:
final_results = probing_checkpoints(checkpoints, training_id, train_dict, val_dict, ling_features) #esecuzione probing

In [None]:
final_results.head()

In [10]:
import json
#si salvano i risultati in un json
result = final_results.to_json(f'C:/Users/bergo/OneDrive - University of Pisa/Tesi Magistrale/models/{training_id}/predictions.json', orient="columns")