In [None]:
import pandas as pd
import torch
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, AdamW, AutoModelForSequenceClassification
from datasets import load_dataset
import random
import numpy as np

In [None]:
# Define the training function
def train(model, train_loader, optimizer):
    model.train()
    for epoch in tqdm(range(1)): # num epochs
        for batch in tqdm(train_loader):
            inputs = {"input_ids": batch[0].to(device),
                      "attention_mask": batch[1].to(device),
                      "labels": batch[2].to(device)}
            optimizer.zero_grad()
            outputs = model(**inputs) # forward pass
            loss = outputs.loss
            loss.backward()
            optimizer.step() # update weights
    torch.save(model.state_dict(), "model.pth")

# Define the testing function
def test(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in test_loader:
            inputs = {"input_ids": batch[0].to(device),
                      "attention_mask": batch[1].to(device),
                      "labels": batch[2].to(device)}
            outputs = model(**inputs)
            _, predicted = torch.max(outputs.logits, dim=1)
            total += inputs["labels"].size(0)
            correct += (predicted == inputs["labels"]).sum().item()
    print("Accuracy: {:.3f}".format(correct / total))

#CUDA_LAUNCH_BLOCKING=1
# Set seed for reproducibility
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)

#define path
path = 'your_path'

# Load dataset
df = pd.read_csv(f"{path}/OUTPUT/fiqa_enriched_allagree.csv", usecols=[1,2],
                 header=None, skiprows=1, names=['sentence', 'label'], nrows=771)

df.loc[df['label'] == 2, 'label'] = 1

#dataset_origin = load_dataset("ChanceFocus/fiqa-sentiment-classification")
#dataset_sentences = []
#dataset_labels = []
#for i in dataset_origin['train']:
#    if i['score']>0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(1)
#    elif i['score']<-0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(0)
#        
#for i in dataset_origin['valid']:
#    if i['score']>0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(1)
#    elif i['score']<-0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(0)
#        
#for i in dataset_origin['test']:
#    if i['score']>0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(1)
#    elif i['score']<-0.3:
#        dataset_sentences.append(i['sentence'])
#        dataset_labels.append(0)
#
#df = pd.DataFrame({'text': dataset_sentences, 'label': dataset_labels})

#da qui uguale per tutti

df = df.sample(frac=1).reset_index(drop=True)  # Shuffle

# Split into train and test sets
train_text, test_text, train_labels, test_labels = train_test_split(df['sentence'].to_numpy(), df['label'].to_numpy(), test_size=0.2, random_state=seed_value)

# Load the tokenizer and encode the data
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased', unknown_token="[UNK]")
train_encodings = tokenizer(train_text.tolist(), truncation=True, padding=True)
test_encodings = tokenizer(test_text.tolist(), truncation=True, padding=True)

# Convert the data into PyTorch tensors
train_dataset = torch.utils.data.TensorDataset(torch.tensor(train_encodings['input_ids']),
                                               torch.tensor(train_encodings['attention_mask']),
                                               torch.tensor(train_labels))
test_dataset = torch.utils.data.TensorDataset(torch.tensor(test_encodings['input_ids']),
                                              torch.tensor(test_encodings['attention_mask']),
                                              torch.tensor(test_labels))

# Define the model
# Set seed for model

model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)
model.seed = seed_value

# Define the optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)

# Define the device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Train the model
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)
model.to(device)
train(model, train_loader, optimizer)

# Test the model
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False)
test(model, test_loader)

# Save fine-tuned model to file
model.save_pretrained('fine_tuned_distilbert')

In [1]:
# loaded_model = DistilBertForSequenceClassification.from_pretrained('fine_tuned_distilbert')
loaded_model = AutoModelForSequenceClassification.from_pretrained('fine_tuned_distilbert')

# Evaluate fine-tuned model on evaluation data
predicted = []
for text in tqdm(test_text):
    encoded_inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    output = loaded_model(**encoded_inputs)
    predicted_label = torch.argmax(output.logits, dim=1).item()
    predicted.append(predicted_label)

print(classification_report(predicted, test_labels))