In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
from torch.optim import AdamW
from datasets import load_dataset
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import argparse
from tqdm import tqdm
import random


MODEL_NAME = "lordtt13/emo-mobilebert"
#MODEL_NAME = "JuliusAlphonso/distilbert-plutchik"
#DATASET_PATH = "/kaggle/input/dataset-5/dataset.csv"
#TEXT_COLUMN = "TESTO"
#LABEL_COLUMN = "EMOZIONI"
OUTPUT_DIR = "/kaggle/working/best_model"
BATCH_SIZE = 32
EPOCHS = 50
LEARNING_RATE = 2e-5
MAX_LENGTH = 128
SEED = 12

# Reproducibility
set_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Dispositivo in uso: {device}")

# Custom Dataset
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long)
        }

def load_data(file_path, text_col, label_col):
    """Carica il dataset da un file CSV."""
    df = pd.read_csv(file_path)
    
    # Verifica la presenza delle colonne necessarie
    if text_col not in df.columns or label_col not in df.columns:
        available_cols = ", ".join(df.columns)
        raise ValueError(f"Colonne richieste non trovate. Colonne disponibili: {available_cols}")
    
    # Se le etichette sono testuali, convertiamole in numeriche
    if not pd.api.types.is_numeric_dtype(df[label_col]):
        label_map = {label: idx for idx, label in enumerate(df[label_col].unique())}
        df['label_id'] = df[label_col].map(label_map)
        print(f"Mappatura etichette: {label_map}")
        return df[text_col].values, df['label_id'].values, label_map
    
    return df[text_col].values, df[label_col].values, None

def train_epoch(model, data_loader, optimizer, scheduler, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    progress_bar = tqdm(data_loader, desc="Training")
    
    for batch in progress_bar:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
            
        loss = outputs.loss
        total_loss += loss.item()
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        
        preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.cpu().numpy())
        
        progress_bar.set_postfix({"loss": loss.item()})
    
    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    return avg_loss, accuracy, f1

def evaluate(model, data_loader, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Validation"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            
            loss = outputs.loss
            total_loss += loss.item()
            
            preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    
    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    return avg_loss, accuracy, f1

def optimize_for_raspberry_pi(model, tokenizer, output_dir):
    # Convert to quantized model to reduce size and improve inference speed
    # Use torch.quantization for 8-bit quantization
    quantized_model = torch.quantization.quantize_dynamic(
        model, {torch.nn.Linear}, dtype=torch.qint8
    )
    
    # Save the quantized model
    torch.save(quantized_model.state_dict(), f"{output_dir}/mobilebert_sentiment_quantized.pt")
    
    # Save the tokenizer
    tokenizer.save_pretrained(output_dir)
    
    # Export to ONNX for better performance (optional)
    dummy_input = torch.randint(1, 10000, (1, 128)).to('cuda')
    torch.onnx.export(
        model, 
        dummy_input, 
        f"{output_dir}/mobilebert_sentiment.onnx",
        export_params=True,
        opset_version=11,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
    )

def main():
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # Carica il dataset
    print(f"Caricamento del dataset...")
    dataset = load_dataset("MelmaGrigia/italian-text-sentiment-analysis")
    #texts, labels, label_map = load_data(DATASET_PATH, TEXT_COLUMN, LABEL_COLUMN)
    
    # Divisione in training e validation set
    #train_texts, val_texts, train_labels, val_labels = train_test_split(
    #    texts, labels, test_size=0.2, random_state=SEED
    #)

    print(f"Struttura del dataset: {dataset}")
    print(f"Colonne: {dataset['train'].column_names}")
    
    # Estrai i testi e le etichette
    train_texts = dataset['train']['text']
    train_labels = dataset['train']['label']

    val_texts = dataset['test']['text']
    val_labels = dataset['test']['label']

    num_labels = len(set(train_labels))
    print(f"Numero di etichette: {num_labels}")
    
    # Calcola la distribuzione delle classi
    class_counts = np.bincount(train_labels)
    print(f"Distribuzione delle classi: {class_counts}")
    
    print(f"Testi di training: {len(train_texts)}")
    print(f"Testi di validazione: {len(val_texts)}")

    # Print class distribution
    #class_counts = np.bincount(labels)
    #print("Class distribution:", class_counts)
    
    # You might need class weights
    #class_weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
    #class_weights = class_weights / class_weights.sum()
    #class_weights = class_weights.to(device)
    #print("Pesi delle classi:", class_weights)
    
    # Then in your loss calculation:
    #loss_fct = nn.CrossEntropyLoss(weight=class_weights)
    
    # Carica il tokenizer e il modello
    print(f"Caricamento del modello {MODEL_NAME}...")
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_NAME,
        num_labels=num_labels,
        ignore_mismatched_sizes=True  
    )

    # Freeze all parameters except the classifier
    #for param in model.parameters():
    #    param.requires_grad = False
    
    # Unfreeze only the classifier parameters
    #for param in model.classifier.parameters():
    #    param.requires_grad = True

    # Prepara i dataset
    train_dataset = SentimentDataset(train_texts, train_labels, tokenizer, MAX_LENGTH)
    val_dataset = SentimentDataset(val_texts, val_labels, tokenizer, MAX_LENGTH)
    
    # Prepara i dataloader
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True
    )
    
    val_dataloader = DataLoader(
        val_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False
    )
    
    # Prepara l'ottimizzatore e lo scheduler
    optimizer = AdamW(model.parameters(), lr=LEARNING_RATE) 
    #optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LEARNING_RATE, weight_decay=1e-4)
        
    total_steps = len(train_dataloader) * EPOCHS
    #warmup_steps = int(0.1 * total_steps)
    scheduler = CosineAnnealingLR(optimizer, T_max=total_steps)
    #scheduler = get_linear_schedule_with_warmup(
    #    optimizer,
    #    num_warmup_steps=warmup_steps,
    #    num_training_steps=total_steps
    #)
    
    # Training
    print("Inizio dell'addestramento...")
    best_val_f1 = 0.0
    patience = 5
    patience_counter = 0

    model.to(device)
    
    for epoch in range(EPOCHS):
        print(f"\nEpoca {epoch+1}/{EPOCHS}")
        
        train_loss, train_acc, train_f1 = train_epoch(
            model, train_dataloader, optimizer, scheduler, device
        )
        
        print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}, F1: {train_f1:.4f}")
        
        val_loss, val_acc, val_f1 = evaluate(
            model, val_dataloader, device
        )
        
        print(f"Val Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}, F1: {val_f1:.4f}")

        # Salva il modello se abbiamo ottenuto un miglior F1 score
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            patience_counter = 0
            
            # Salva il modello
            output_path = 'best_model'
            model.save_pretrained(output_path)
            tokenizer.save_pretrained(output_path)
            print(f"Modello salvato in {output_path}")

        
        #else:
        #    patience_counter += 1
        #    print(f"Early stopping patience: {patience_counter}/{patience}")
            
        #    if patience_counter >= patience:
        #        print("Early stopping attivato.")
        #        break
    
    print("\nAddestramento completato!")
    print(f"Miglior F1 score di validazione: {best_val_f1:.4f}")
    optimize_for_raspberry_pi(model, tokenizer, OUTPUT_DIR)

    print("\nOptimization completed!")


if __name__ == "__main__":
    main()

In [16]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# Load your pre-trained quantized model and tokenizer
model_path = "/kaggle/working/best_model"
tokenizer_path = "/kaggle/working/best_model"

# Load the quantized model
model = AutoModelForSequenceClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)

# Set the model to evaluation mode
model.eval()

# Use the tokenizer to create a valid dummy input
sample_text = "This is a sample input for the model."
inputs = tokenizer(sample_text, return_tensors="pt", padding="max_length", truncation=True, max_length=128)

# Extract the input_ids tensor
dummy_input = inputs["input_ids"]

# Export the model to ONNX format
output_onnx_path = "/kaggle/working/best_model/mobilebert_sentiment.onnx"
torch.onnx.export(
    model,
    dummy_input,
    output_onnx_path,
    export_params=True,  # Store the trained parameter weights inside the model file
    opset_version=11,    # The ONNX version to export the model to
    input_names=["input_ids"],  # The model's input names
    output_names=["output"],    # The model's output names
    dynamic_axes={
        "input_ids": {0: "batch_size"},  # Dynamic axes for input (batch size)
        "output": {0: "batch_size"},    # Dynamic axes for output (batch size)
    },
    do_constant_folding=True,  # Optimize the model by folding constants
)

print(f"Model has been exported to {output_onnx_path}")

Model has been exported to /kaggle/working/best_model/mobilebert_sentiment.onnx
