## SetFit Few-Shot Classification

In questo notebook vengono mostrati i segmenti di codice e i procedimenti eseguiti per effettuare l'allenamento e l'inferenza del modello di classificazione.

L'obiettivo è quello di classificare le note di chiusura degli interventi dei tecnici in base alle aree di guasto possibili della macchina. Queste comprendono: CASSETTE, CT, NE, NF, NV e SHUTTER.
Il sistema deve poter categorizzare questi elementi testuali in maniera più o meno affidabile attraverso IA, includendo anche più di un'area per selezione.

La task che quindi dobbiamo eseguire si ricongiunge ad una classificazione di testo multi-label.

L'approccio utilizzato per la risoluzione consiste nel Few-Shot Learning, dove un modello IA di embedding viene allenato su un numero N di esempi reali per ogni singola label. Richiede più tempo e risorse hardware ma permette di ottenere risultati migliori in ambienti con pochi dati di allenamento e somprattutto categorizzazioni complesse come nel nostro caso.

Utilizziamo diverse librerie di HuggingFace come Sentence-Transformers e SetFit per la definizione e l'allenamento del modello IA, insieme a Datasets per la manipolazione dei dati necessari agli step di training, validation e test.



Qui di seguito nella prima cella definiamo la variabile d'ambiente per poter utilizzare la GPU durante l'allenamento.


In [142]:
import os

from sklearn.metrics import accuracy_score
os.environ["CUDA_VISIBLE_DEVICES"] = "GPU-7f4915f4-0bb6-f23e-021c-200070c6cf70"

Come primo step andiamo ad importare i 3 dataset necessari a completare l'operazione di training.
Consistono nel dataset di training che presenta una varietà di dati labellizzati su cui il modello eseguirà l'allenamento, poi c'è il dataset di validation che contiene altri elementi labellizzati e conosciuti per validare l'accuratezza del modello in casi controllati.
Infine abbiamo il dataset di test, contenente una grande collezione di note di chiusura senza labellizzazioni revisionate in cui il modello verrà messo contro le selezioni dei tecnici.

I dataset sono estratti in pandas Dataframe da dei file Excel, e successivamente convertiti in Dataset eseguendo una pulizia sulle colonne ritenute non rilevanti per l'allenamento.

Le label vengono estratte dalle colonne del dataset di training nel formato "{label} ground-truth".

In [143]:
import pandas as pd
from datasets import Dataset, Sequence

df_training = pd.read_excel("docs/new_tests/AllCombinedNotes-20250702-training.xlsx")
df_validation = pd.read_excel("docs/training_docs/ClosingNotes-sample-20250625-validation.xlsx")
df_test = pd.read_excel("docs/new_tests/ClosingNotes-sample-20250702-test.xlsx")

#df_test = df_test.replace(to_replace='None', value="'Something'")

dataset = Dataset.from_pandas(df_training.iloc[:, 9:16])
validation_dataset = Dataset.from_pandas(df_validation.iloc[:, 9:16])
#test_dataset = Dataset.from_pandas(df_test.iloc[:, 9:16])
test_dataset = Dataset.from_pandas(df_test.iloc[:, [0,1,2,3,4,5, 32, 49]])

features = dataset.column_names
features.remove("Closing Note")
features

['CASSETTE ground-truth',
 'CT ground-truth',
 'NE ground-truth',
 'NF ground-truth',
 'NV ground-truth',
 'SHUTTER ground-truth']

Eseguiamo ulteriori elaborazioni sui nostri dataset correnti, in cui rimappiamo le colonne delle singole label in una singola colonna "labels" contenente un vettore delle singole selezioni in formato binario.

Successivamente rimuoviamo tutte le righe in cui le note di chiusura risultano vuote.

In [144]:
from datasets import Dataset

dataset = dataset.map(lambda entry: {"labels": [entry[label] for label in features]})
validation_dataset = validation_dataset.map(lambda entry: {"labels": [entry[label] for label in features]})
validation_dataset = validation_dataset.map(lambda entry: {"text": entry["Closing Note"]})
test_dataset = test_dataset.map(lambda entry: {"labels": [entry[label] for label in features]})
test_dataset = test_dataset.map(lambda entry: {"text": entry["Closing Note"]})

  obj.co_lnotab,  # for < python 3.10 [not counted in args]
Map: 100%|██████████| 153/153 [00:00<00:00, 6173.26 examples/s]
Map: 100%|██████████| 50/50 [00:00<00:00, 4066.14 examples/s]
Map: 100%|██████████| 50/50 [00:00<00:00, 6482.70 examples/s]
Map: 100%|██████████| 4453/4453 [00:00<00:00, 9375.20 examples/s]
Map: 100%|██████████| 4453/4453 [00:00<00:00, 10794.65 examples/s]


In [145]:
dictionary = {"text": dataset["Closing Note"], "labels": dataset['labels']}
train_dataset = Dataset.from_dict(dictionary)

train_dataset = Dataset.from_pandas(train_dataset.to_pandas().dropna())
train_dataset = Dataset.from_pandas(train_dataset.to_pandas().replace(r'^\s*$', "Empty", regex=True))

dictionary = {"text": validation_dataset["Closing Note"], "labels": validation_dataset['labels']}
validation_dataset = Dataset.from_dict(dictionary)

validation_dataset = Dataset.from_pandas(validation_dataset.to_pandas().dropna())
validation_dataset = Dataset.from_pandas(validation_dataset.to_pandas().replace(r'^\s*$', "Empty", regex=True))

#dictionary = {"text": test_dataset["Closing Note"], "labels": test_dataset['labels']}
#test_dataset = Dataset.from_dict(dictionary)

test_dataset = Dataset.from_pandas(test_dataset.to_pandas().dropna())
test_dataset = Dataset.from_pandas(test_dataset.to_pandas().replace(r'^\s*$', "Empty", regex=True))

In [146]:
from setfit import get_templated_dataset, sample_dataset

train_dataset = get_templated_dataset(train_dataset, candidate_labels=features, sample_size=5, label_column="labels", multi_label=True, template="Il problema è del {}")

In [147]:
state = ["Guasto", "Funzionante"]

# Questo metodo aggiunge degli esempi sintetici su cui allenare il modello. Potrebbero aiutare
#train_state_dataset = sample_dataset(train_state_dataset)
#train_state_dataset = get_templated_dataset(train_state_dataset, candidate_labels=state, sample_size=10, template="L'ATM è {}")

#print(train_state_dataset['text'])

Qui di seguito definiamo la funzione per l'inizializzazione e il fetching del modello SetFit base da utilizzare pe effettuare il training.
Questo comprende anche eventuali parametri relativi al modello stesso come specificare l'uso della GPU, definire una strategia di multi-label-selection, il parametro di temperatura e anche il numero di label

In [148]:
import torch
from setfit import SetFitModel
import os
os.environ["WANDB_DISABLED"] = "true"

# model instantiation for each trial run of the hyperparameter search
def model_init(params):
    #-- 39% accuracy
    #params = {"device": torch.device("cuda"), 'out_features': 6, 'temperature': 0.4}
    #return SetFitModel.from_pretrained("BAAI/bge-small-en-v1.5",
    #                                   multi_target_strategy="multi-output", params=params, labels=features)
    #params = {"device": torch.device("cuda"), 'out_features': 6, 'temperature': 0.2}
    #return SetFitModel.from_pretrained("SMARTICT/bge-small-en-v1.5-tr-rag-v1",
    #                                   multi_target_strategy="one-vs-rest", params=params, labels=features)
    #-- 40.48%
    params = {"device": torch.device("cuda"), 'out_features': 6, 'temperature': 0}
    return SetFitModel.from_pretrained("BAAI/bge-small-en-v1.5",
                                       multi_target_strategy="multi-output", params=params, labels=features)
    #params = {"device": torch.device("cuda"), 'out_features': 6, 'temperature': 0.2}
    #return SetFitModel.from_pretrained("sentence-transformers/all-MiniLM-L12-v2",
    #                                   multi_target_strategy="multi-output", params=params, labels=features, num_samples=len(features))

params = {"device": torch.device("cuda"), 'out_features': 2, 'temperature': 0.2}
model = SetFitModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2", params=params)

params = {"device": torch.device("cuda"), 'out_features': 6, 'temperature': 0}
#model2 = SetFitModel.from_pretrained("hello/model_no_5_new_data", params=params, labels=features)
#model.model_head
#SetFitModel.from_pretrained("SMARTICT/bge-small-en-v1.5-tr-rag-v1" -- 68%, sentence-transformers/all-MiniLM-L12-v2 -- 68%
    #"BAAI/bge-small-en-v1.5", sentence-transformers/all-roberta-large-v1
                                    #multi_target_strategy="one-vs-rest",
                                    #)
                                    #multi_target_strategy="classifier-chain",
                                    #num_samples=len(features),
                                    #use_differentiable_head=True,
                                    #head_params={"out_features": len(features)},
                                    #)

model_head.pkl not found on HuggingFace Hub, initialising classification head with random weights. You should TRAIN this model on a downstream task to use it for predictions and inference.


In [149]:
from setfit import TrainingArguments

args2 = TrainingArguments(
    # Optional training parameters:
    body_learning_rate=1.9859376752033417e-05,
    num_epochs=2,
    batch_size=6,
    warmup_proportion=0.2,
    sampling_strategy="unique",
    # Optional tracking/debugging parameters:
    logging_strategy="steps",
    logging_steps=1000,
    eval_strategy="steps",
    logging_first_step=True,
    eval_steps=1000,
    save_strategy="steps",
    save_steps=1000,
    run_name="finetune-setfit",
    load_best_model_at_end=True
)


args = TrainingArguments(
    batch_size=6,
    num_epochs=1,
    eval_strategy="epoch",
)

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


In [150]:
def compute_metrics(preds, labels):
    if hasattr(preds, "tolist"):
        preds = preds.tolist()

    if isinstance(preds[0], list):
        preds = [p[0] for p in preds]

    if isinstance(labels[0], list):
        labels = [l[0] for l in labels]

    return {"accuracy": accuracy_score(labels, preds)}

In [151]:
from google.cloud.aiplatform_v1 import Trial
from setfit import Trainer

trainer = Trainer(
    #model=model2,
    model_init=model_init,
    args=args2,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    column_mapping={"text": "text", "labels": "label"},
    #metric=compute_metrics
)

#best_run = trainer.hyperparameter_search(direction="maximize", hp_space=hp_space, n_trials=3)


Applying column mapping to the training dataset
Applying column mapping to the evaluation dataset
model_head.pkl not found on HuggingFace Hub, initialising classification head with random weights. You should TRAIN this model on a downstream task to use it for predictions and inference.
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
  obj.co_lnotab,  # for < python 3.10 [not counted in args]
Map: 100%|██████████| 183/183 [00:00<00:00, 11587.70 examples/s]


In [152]:
#trainer2.train()

In [153]:
#trainer.apply_hyperparameters(best_run.hyperparameters, final_model=True)
trainer.train()

***** Running training *****
  Num unique pairs = 16836
  Batch size = 6
  Num epochs = 2


Step,Training Loss,Validation Loss
1000,0.1372,0.058745
2000,0.0454,0.049934
3000,0.0286,0.042624
4000,0.0254,0.042483
5000,0.0228,0.037453


  opt_res = optimize.minimize(
  opt_res = optimize.minimize(
  opt_res = optimize.minimize(
  opt_res = optimize.minimize(
  opt_res = optimize.minimize(
  opt_res = optimize.minimize(


In [154]:
def print_and_w(f, text):
    print(text)
    f.write(str(text))

In [155]:
metrics = trainer.evaluate()
with open("textual_output/metrics.txt", "a") as f:
    print_and_w(f, metrics)

***** Running evaluation *****


{'accuracy': 0.86}


In [156]:
def export_as_excel(filename, preds, bool):
    preds = map(lambda i: i.numpy().astype(numpy.int64).tolist(), preds)
    if bool:
        df_out = Dataset.to_pandas(test_dataset)
    else:
        df_out = Dataset.to_pandas(validation_dataset)

    print(preds)
    print(k.numpy().astype(numpy.int64).tolist() for k in preds)

    cassette, ct, ne, nf, nv, shutter = [], [], [], [], [], []


    for k in preds:
        cassette.append(int(k[0]))
        ct.append(int(k[1]))
        ne.append(int(k[2]))
        nf.append(int(k[3]))
        nv.append(int(k[4]))
        shutter.append(int(k[5]))

    #df_out.dropna(subset = ['Closing Note'], axis=0, inplace=True)
    df_out = pd.concat([df_out, pd.DataFrame({
                "CASSETTE Model": cassette,
                "CT Model": ct,
                "NE Model": ne,
                "NF Model": nf,
                "NV Model": nv,
                "SHUTTER Model": shutter
    })], axis=1)

    df_out = pd.concat([df_out, pd.DataFrame({
        "model_body": model2.model_body
    })], axis=0)

    df_out.to_excel(filename, index=False)


In [157]:
TP = 0  # True Positives
TN = 0  # True Negatives
FP = 0  # False Positives
FN = 0  # False Negatives

In [160]:
import numpy
from sklearn.metrics import classification_report
from sklearn.metrics import balanced_accuracy_score

"""Riscontrato apparato con fs 9301. Aperto dispensatore e rinvenuto biglietto di carta, scritto a penna, incastrato all'interno. Rimosso corpo estrano e resettato dspositivo. Carta stampante mancante, verrà la vigilanza in un secondo momento a sostituirla."""
"""Rimosse due banconote accartocciate. Controllo percorso banconote gruppo ricircolo con esito positivo."""

def elaborate_pred(i):
    result = []
    count = 0
    for k in i:
        if k == 1:
            result.append(str(features[count]))
        count += 1
    return result

def confusion_matrix(w_dataset):
    from sklearn.metrics import multilabel_confusion_matrix
    res = multilabel_confusion_matrix(w_dataset['labels'], preds).ravel().tolist()
    n = 4
    res = [res[i:i + n] for i in range(0, len(res), n)]
    res = {"CASSETTE": res[0], "CT": res[1], "NE": res[2], "NF": res[3], "NV": res[4], "SHUTTER": res[5]}
    return res

def percentage(array):
    if not len(array) == 0:
        return round((sum(array) / len(array)) * 100, 2)
    else:
        return 0

def repeating_w(f, w_dataset, TP, TN, FP, FN):
    CASSETTE, CT, NF, NE, NV, SHUTTER, CRM = [], [], [], [], [], [], []
    ac_cassette, ac_ct, ac_nf, ac_ne, ac_nv, ac_shutter = [], [], [], [], [], []
    ac_cassette_pred, ac_ct_pred, ac_nf_pred, ac_ne_pred, ac_nv_pred, ac_shutter_pred = [], [], [], [], [], []

    array = []
    count = 0
    for i in preds:
        i = i.numpy().astype(numpy.int64).tolist()
        matching = w_dataset['labels'][count]
        print_and_w(f, str(w_dataset['text'][count]) + '\n' + str(elaborate_pred(matching)) + '\n' + str(elaborate_pred(i)) + '\n' + str(pred_proba[count]) + '\n')
        array.append(w_dataset['labels'][count] == i)

        ac_cassette.append(matching[0])
        ac_cassette_pred.append(i[0])
        ac_ct.append(matching[1])
        ac_ct_pred.append(i[1])
        ac_ne.append(matching[2])
        ac_ne_pred.append(i[2])
        ac_nf.append(matching[3])
        ac_nf_pred.append(i[3])
        ac_nv.append(matching[4])
        ac_nv_pred.append(i[4])
        ac_shutter.append(matching[5])
        ac_shutter_pred.append(i[5])

        count2 = 0
        for k in matching:
            truth_selection = k == 1
            pred_selection = i[count2] == 1
            if truth_selection or pred_selection:
                match count2:
                    case 0:
                        CASSETTE.append(k == i[count2])
                    case 1:
                        CT.append(k == i[count2])
                    case 2:
                        NE.append(k == i[count2])
                    case 3:
                        NF.append(k == i[count2])
                    case 4:
                        NV.append(k == i[count2])
                    case 5:
                        SHUTTER.append(k == i[count2])
                    case 6:
                        CRM.append(k == i[count2])

            if truth_selection and pred_selection:
                TP += 1
            elif not truth_selection and not pred_selection:
                TN += 1
            elif not truth_selection and pred_selection:
                FP += 1
            elif truth_selection and not pred_selection:
                FN += 1

            count2 += 1
        count += 1
    result = "Total: " + str(round(accuracy_score(w_dataset["labels"], preds), 2)) + " - " + str(len(array)) + "\nCASSETTE: " + str(round(balanced_accuracy_score(ac_cassette, ac_cassette_pred), 2)) + " - " + str(len(CASSETTE)) + "\nCT: " + str(round(balanced_accuracy_score(ac_ct, ac_ct_pred), 2)) + " - " + str(len(CT)) + "\nNE: " + str(round(balanced_accuracy_score(ac_ne, ac_ne_pred), 2)) + " - " + str(len(NE)) + "\nNF: " + str(round(balanced_accuracy_score(ac_nf, ac_nf_pred), 2)) + " - " + str(len(NF)) + "\nNV: " + str(round(balanced_accuracy_score(ac_nv, ac_nv_pred),2)) + " - " + str(len(NV)) + "\nSHUTTER: " + str(round(balanced_accuracy_score(ac_shutter, ac_shutter_pred),2)) + " - " + str(len(SHUTTER)) + '\n\n'
    print_and_w(f, str(result))
    print_and_w(f, "TN, FP, FN, TP \n")
    print_and_w(f, str(confusion_matrix(w_dataset)) + '\n')

    return TP, TN, FP, FN

model2 = trainer.model
model2.save_pretrained("hello/model_delta_0610_17_23_validations")
#model2 = SetFitModel.from_pretrained("hello/MINImymodelUNIQUE") # 58.4
#model2 = SetFitModel.from_pretrained("hello/mymodel") # 58.75
#model2 = SetFitModel.from_pretrained("hello/model_no_1") #58.99
#model2 = SetFitModel.from_pretrained("hello/model_no_2") #58.13
#model2 = SetFitModel.from_pretrained("hello/model_gamma_0610_17_23")
#model2 = SetFitModel.from_pretrained("hello/model_no_6_24_06")
#model2 = SetFitModel.from_pretrained("hello/model_no_5_new_data")
#model2 = SetFitModel.from_pretrained("hello/model_no_6_24_06_epoch_2")

preds = model2.predict(validation_dataset['text'])
pred_proba = model2.predict_proba(validation_dataset['text'])

export_as_excel('closing_notes_output/ClosingNotesValidationModelDelta.xlsx', preds, False)
with open("textual_output/model_delta_validation.txt", 'w') as f:
    TP, TN, FP, FN = repeating_w(f, validation_dataset, TP, TN, FP, FN)
    print_and_w(f, "body_learning_rate: " + str(args2.body_learning_rate))
    print_and_w(f, "num_epochs: " + str(args2.num_epochs))
    print_and_w(f, "batch_size: " + str(args2.batch_size))
    print_and_w(f, "warmup_proportion: " + str(args2.warmup_proportion))
    print_and_w(f, "sampling_strategy: " + str(args2.sampling_strategy))
    print_and_w(f, classification_report(validation_dataset['labels'], preds, target_names=features, zero_division=0))

<map object at 0x000001C150BA0940>
<generator object export_as_excel.<locals>.<genexpr> at 0x000001C1255EE670>
Sostituzione cassetto rc 50 e regolazioni varie. 
['CASSETTE ground-truth']
['CASSETTE ground-truth']
tensor([[0.1190, 0.8810],
        [0.9335, 0.0665],
        [0.9668, 0.0332],
        [0.9598, 0.0402],
        [0.9647, 0.0353],
        [0.9757, 0.0243]], dtype=torch.float64)

Guasto riscontrato: banconote incastrate su cassetto rc2
Sostituito cassetto rc da 50€ e cassetto unfit
Controllato periferiche 
['CASSETTE ground-truth']
['CASSETTE ground-truth']
tensor([[0.1186, 0.8814],
        [0.9332, 0.0668],
        [0.9632, 0.0368],
        [0.9622, 0.0378],
        [0.9655, 0.0345],
        [0.9762, 0.0238]], dtype=torch.float64)

Eseguito sostituzione cassetto per molteplici errori. 
Riallineamento della parte lower,prove di movimentazione banconote con esito positivo 
['CASSETTE ground-truth']
['CASSETTE ground-truth']
tensor([[0.1183, 0.8817],
        [0.9283, 0.0717],
  

In [159]:
preds = model2.predict(test_dataset['text'])
pred_proba = model2.predict_proba(test_dataset['text'])

TP, TN, FP, FN = 0,0,0,0

export_as_excel('cosing_notes_output/ClosingNotesOutputModelDeltaTest.xlsx', preds, True)
with open("textual_output/model_delta_new_test_0207.txt", 'w') as f:
    TP, TN, FP, FN = repeating_w(f, test_dataset, TP, TN, FP, FN)
    print_and_w(f, "body_learning_rate: " + str(args2.body_learning_rate))
    print_and_w(f, "num_epochs: " + str(args2.num_epochs))
    print_and_w(f, "batch_size: " + str(args2.batch_size))
    print_and_w(f, "warmup_proportion: " + str(args2.warmup_proportion))
    print_and_w(f, "sampling_strategy: " + str(args2.sampling_strategy))
    print_and_w(f, classification_report(test_dataset['labels'], preds, target_names=features, zero_division=0))

<map object at 0x000001C136895C60>
<generator object export_as_excel.<locals>.<genexpr> at 0x000001C198D91630>
Guasto 
Piatto del ct AC-B con dentino rotto causa malfunzionamenti.
Soluzione 
Sostituito parte danneggiata.
Test di dispensazione e deposito con esito positivo.
Test di prelievo e versamento con esito positivo. 
['CT ground-truth']
['CT ground-truth']
tensor([[0.9120, 0.0880],
        [0.0809, 0.9191],
        [0.8679, 0.1321],
        [0.8932, 0.1068],
        [0.9686, 0.0314],
        [0.9735, 0.0265]], dtype=torch.float64)

Rimosso banconote incastrate in area escrow.
Eseguito test di prelievo e deposito con esito positivo 
Atm rimesso in servizio e funzionante 
['NE ground-truth']
['NE ground-truth', 'NF ground-truth']
tensor([[0.9654, 0.0346],
        [0.8841, 0.1159],
        [0.2551, 0.7449],
        [0.2861, 0.7139],
        [0.9578, 0.0422],
        [0.9704, 0.0296]], dtype=torch.float64)

ATM in servizio regolare. Attesa disponibilità apparato ed ispezionata parte 