In [None]:
import gc
import os
import re
import warnings
from itertools import product
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import torch
from datasets import Dataset
from peft import LoraConfig, PeftModel, get_peft_model
from sklearn.metrics import f1_score, hamming_loss, jaccard_score
from tqdm import tqdm
from transformers import (
    AutoConfig,
    AutoModelForCausalLM,
    AutoTokenizer,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
)

warnings.filterwarnings("ignore")

BASE_PATH = "XED/processed"
LANGUAGE = "en"
MODEL_NAME = "EleutherAI/gpt-neo-1.3B"

R_VALUE = 8
ALPHA_VALUE = 32
DROPOUT_VALUE = 0.10
FINAL_EPOCHS = 5
BATCH_SIZE = 4
LEARNING_RATE = 2e-4
ALL_LABELS = [str(i) for i in range(1, 9)]
MAX_INPUT_LENGTH = 256
MAX_NEW_TOKENS = 10
EVAL_SAMPLES_COUNT_CV = 300
LORA_TARGET_MODULES = ["q_proj", "v_proj", "k_proj", "out_proj"]

OUTPUT_DIR_FINAL = "./en_gpt_results/final_lora_gpt_neo_best"
LOG_DIR_FINAL = "./logs_gpt_neo_en_final"
SAVE_DIR_FINAL = "./weights/gpt_neo/lora_gpt_neo_en_best_final"


def predict_emotions_gpt2(
    model: torch.nn.Module,
    tokenizer: AutoTokenizer,
    df: pd.DataFrame,
    max_samples: Optional[int] = None
) -> Tuple[List[str], List[str]]:
    preds: List[str] = []
    golds: List[str] = []

    df_to_process = df.head(max_samples) if max_samples is not None else df
    total_samples = len(df_to_process)

    model.eval()
    device = model.device

    for _, row in tqdm(
        df_to_process.iterrows(),
        total=total_samples,
        desc="Generating Predictions"
    ):
        prompt = f"Classify the emotion in this sentence: {row['text']}\nEmotion:"
        inputs = tokenizer(prompt, return_tensors="pt").to(device)

        with torch.no_grad():
            output = model.generate(
                **inputs,
                max_new_tokens=MAX_NEW_TOKENS,
                pad_token_id=tokenizer.eos_token_id
            )

        input_length = inputs['input_ids'].shape[1]
        pred_text = tokenizer.decode(
            output[0][input_length:], skip_special_tokens=True
        ).strip().lower()

        preds.append(pred_text)
        golds.append(str(row["labels"]))

    return preds, golds


def compute_metrics_numeric(
    preds: List[str], golds: List[str]
) -> Dict[str, float]:
    y_true = np.zeros((len(golds), len(ALL_LABELS)))
    y_pred = np.zeros((len(golds), len(ALL_LABELS)))

    for i, (g, p) in enumerate(zip(golds, preds)):
        true_ids = [s.strip() for s in str(g).split(",") if s.strip().isdigit()]
        pred_ids = re.findall(r'\b[1-8]\b', str(p))

        for t in true_ids:
            if t in ALL_LABELS:
                y_true[i, ALL_LABELS.index(t)] = 1
        for t in pred_ids:
            if t in ALL_LABELS:
                y_pred[i, ALL_LABELS.index(t)] = 1

    metrics = {
        "micro_f1": f1_score(
            y_true, y_pred, average="micro", zero_division=0
        ),
        "macro_f1": f1_score(
            y_true, y_pred, average="macro", zero_division=0
        ),
        "jaccard": jaccard_score(
            y_true, y_pred, average="samples", zero_division=0
        ),
        "hamming": hamming_loss(y_true, y_pred),
    }
    return metrics


def main():
    try:
        train_df = pd.read_csv(os.path.join(BASE_PATH, f"train_{LANGUAGE}.csv"))
        test_df = pd.read_csv(os.path.join(BASE_PATH, f"test_{LANGUAGE}.csv"))
    except FileNotFoundError:
        print(f"Error: Data files not found in {BASE_PATH}. Exiting.")
        return

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, padding_side='left')
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    base_model = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
    base_model.resize_token_embeddings(len(tokenizer))
    base_model.to(device)

    def preprocess_function_local(examples):
        texts = [
            f"Classify the emotion in this sentence: {t}\nEmotion: {l}"
            for t, l in zip(examples["text"], examples["labels"])
        ]
        return tokenizer(
            texts,
            truncation=True,
            max_length=MAX_INPUT_LENGTH,
            padding="max_length"
        )

    train_dataset = Dataset.from_pandas(train_df)
    test_dataset = Dataset.from_pandas(test_df)

    tokenized_train = train_dataset.map(
        preprocess_function_local,
        batched=True,
        remove_columns=train_dataset.column_names
    )
    tokenized_test = test_dataset.map(
        preprocess_function_local,
        batched=True,
        remove_columns=test_dataset.column_names
    )

    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

    print("--- Running Zero-Shot Baseline Evaluation on GPT-Neo-1.3B ---")
    base_preds, base_golds = predict_emotions_gpt2(
        base_model, tokenizer, test_df, max_samples=EVAL_SAMPLES_COUNT_CV
    )
    base_metrics = compute_metrics_numeric(base_preds, base_golds)
    print(f"Base Model Metrics (GPT-Neo-1.3B): {base_metrics}")

    del base_model
    gc.collect()
    torch.cuda.empty_cache()

    print("--- Starting LoRA Fine-Tuning with GPT-Neo-1.3B ---")

    base_model_ft = AutoModelForCausalLM.from_pretrained(MODEL_NAME)
    base_model_ft.resize_token_embeddings(len(tokenizer))
    base_model_ft.to(device)
    base_model_ft.config.pad_token_id = tokenizer.eos_token_id

    lora_config = LoraConfig(
        r=R_VALUE,
        lora_alpha=ALPHA_VALUE,
        target_modules=LORA_TARGET_MODULES,
        lora_dropout=DROPOUT_VALUE,
        bias="none",
        task_type="CAUSAL_LM"
    )
    model = get_peft_model(base_model_ft, lora_config)
    model.print_trainable_parameters()

    training_args = TrainingArguments(
        output_dir=OUTPUT_DIR_FINAL,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        learning_rate=LEARNING_RATE,
        num_train_epochs=FINAL_EPOCHS,
        eval_strategy="epoch",
        save_strategy="epoch",
        logging_dir=LOG_DIR_FINAL,
        report_to="none",
        fp16=torch.cuda.is_available(),
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train,
        eval_dataset=tokenized_test,
        tokenizer=tokenizer,
        data_collator=data_collator,
    )

    trainer.train()

    print("--- Running Final Fine-Tuned Model Evaluation ---")

    preds, golds = predict_emotions_gpt2(
        model, tokenizer, test_df, max_samples=EVAL_SAMPLES_COUNT_CV
    )
    metrics = compute_metrics_numeric(preds, golds)
    print(f"Fine-Tuned Model Metrics (GPT-Neo-1.3B): {metrics}")

    model.save_pretrained(SAVE_DIR_FINAL)
    tokenizer.save_pretrained(SAVE_DIR_FINAL)
    print(f"Final fine-tuned model saved to {SAVE_DIR_FINAL}")

    del model, base_model_ft, trainer
    gc.collect()
    torch.cuda.empty_cache()


if __name__ == "__main__":
    main()