In [None]:
import os
import pandas as pd
import torch
import numpy as np
import random
from sklearn.model_selection import KFold
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from datasets import Dataset
from datasets.features import Features, Value, Sequence
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from typing import List, Dict
import time

# Set random seeds for reproducibility
SEED = 1
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# Unified data loading
def load_data(csv_path: str) -> tuple[List[str], List[List[float]], int, List[str]]:
    encodings = ['utf-8', 'latin-1', 'cp1252']
    for encoding in encodings:
        try:
            df = pd.read_csv(csv_path, encoding=encoding, delimiter=';', quotechar='"', on_bad_lines='warn')
            if not df.empty:
                break
        except UnicodeDecodeError:
            continue
    else:
        raise UnicodeDecodeError(f"Failed to decode {csv_path} with tried encodings: {encodings}")
    
    text_column = 'Text'
    all_columns = df.columns.tolist()
    if 'SANTA_ID' in all_columns:
        ids = df['SANTA_ID'].tolist()
        all_columns.remove('SANTA_ID')
    else:
        ids = [f"ID_{i}" for i in range(len(df))]
    if text_column in all_columns:
        all_columns.remove(text_column)
    
    # Remove the last 3 columns to match 11 labels
    label_columns = all_columns[:-3]
    
    # Regularize label columns
    df[label_columns] = df[label_columns].apply(pd.to_numeric, errors='coerce').fillna(0)
    
    texts = df[text_column].tolist()
    labels = df[label_columns].values.astype(float).tolist()
    return texts, labels, len(label_columns), ids

def tokenize_function(examples, tokenizer):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=128)

def compute_metrics(pred):
    labels = pred.label_ids
    preds = (pred.predictions > 0.5).astype(float)
    f1 = f1_score(labels, preds, average='micro')
    roc_auc = roc_auc_score(labels, pred.predictions, average='micro')
    acc = accuracy_score(labels, preds)
    return {'f1': f1, 'roc_auc': roc_auc, 'accuracy': acc}

# Function to perform k-fold training and evaluation
def train_with_kfold(model_name: str, folder_path: str, output_dir: str, k: int = 5, epochs: int = 20, batch_size: int = 16) -> Dict:
    all_texts, all_labels, num_labels, all_ids = [], [], 0, []
    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'):
            csv_path = os.path.join(folder_path, filename)
            texts, labels, n_labels, ids = load_data(csv_path)
            all_texts.extend(texts)
            all_labels.extend(labels)
            all_ids.extend(ids)
            num_labels = n_labels  # Assume consistent number of labels
    
    if len(all_texts) < k:
        raise ValueError(f"Insufficient samples ({len(all_texts)}) for {k}-fold cross-validation")
    
    kf = KFold(n_splits=k, shuffle=True, random_state=SEED)
    fold_results = []
    
    for fold, (train_idx, test_idx) in enumerate(kf.split(all_texts)):
        print(f"Training {model_name} fold {fold + 1}/{k}...")
        train_texts = [all_texts[i] for i in train_idx]
        test_texts = [all_texts[i] for i in test_idx]
        train_labels = [all_labels[i] for i in train_idx]
        test_labels = [all_labels[i] for i in test_idx]
        train_ids = [all_ids[i] for i in train_idx]
        test_ids = [all_ids[i] for i in test_idx]
        
        features = Features({'text': Value('string'), 'labels': Sequence(Value('float32'))})
        train_dataset_dict = {'text': train_texts, 'labels': train_labels}
        test_dataset_dict = {'text': test_texts, 'labels': test_labels}
        
        train_dataset = Dataset.from_dict(train_dataset_dict, features=features)
        test_dataset = Dataset.from_dict(test_dataset_dict, features=features)
        
        tokenizer = RobertaTokenizer.from_pretrained('FacebookAI/roberta-base')
        train_dataset = train_dataset.map(lambda x: tokenize_function(x, tokenizer), batched=True)
        test_dataset = test_dataset.map(lambda x: tokenize_function(x, tokenizer), batched=True)
        
        train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
        test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
        
        model = RobertaForSequenceClassification.from_pretrained(
            'FacebookAI/roberta-base', num_labels=num_labels, problem_type='multi_label_classification'
        )
        
        training_args = TrainingArguments(
            output_dir=f"{output_dir}/{model_name}_fold_{fold + 1}",
            num_train_epochs=epochs,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps=10,
            eval_strategy='steps',
            eval_steps=100,
            save_strategy='steps',
            save_steps=100,
            save_total_limit=3,
            load_best_model_at_end=True,
            metric_for_best_model='f1',
            seed=SEED
        )
        
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=test_dataset,
            compute_metrics=compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
        )
        
        trainer.train()
        eval_results = trainer.evaluate()
        fold_results.append(eval_results)
        trainer.save_model(f"{output_dir}/{model_name}_fold_{fold + 1}")
        tokenizer.save_pretrained(f"{output_dir}/{model_name}_fold_{fold + 1}")
    
    avg_results = {k: np.mean([r[k] for r in fold_results]) for k in fold_results[0].keys() if k not in ['eval_runtime', 'eval_samples_per_second', 'eval_steps_per_second']}
    return avg_results

def compare_models():
    models = {
        'Santa': 'SANTA', #Specify paths
        'Mystery': 'MD',
        'Combined': 'Combination'
    }
    output_dir = './fine_tuned_models'
    results = {}
    
    # Perform k-fold for each model separately
    for model_name, folder_path in models.items():
        results[model_name] = train_with_kfold(model_name, folder_path, output_dir, k=5, epochs=20, batch_size=16)
    
    # Summary table
    summary_df = pd.DataFrame(results).T.round(4)
    print("\nOverall Metrics Summary:")
    print(summary_df)
    summary_df.to_csv('comparison_summary.csv', index=True)

if __name__ == "__main__":
    compare_models()