# Task4_Model_Comparison.ipynb

## import dependencies

In [None]:
# Objective: Compare different NER models and select the best-performing one.

# --- Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')

# --- Step 1: Install Necessary Libraries ---
!pip install transformers datasets seqeval accelerate evaluate

# IMPORTANT: After running this cell, if prompted, click "Restart runtime"
# and then "Run all cells" to ensure all libraries are correctly loaded.

# --- Step 2: Import Libraries ---
import os
import json
import numpy as np
import pandas as pd
import torch
from datasets import Dataset, Features, Value, ClassLabel, Sequence
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification
from seqeval.metrics import classification_report
import evaluate


## --- Configuration ---

In [None]:

DRIVE_PROJECT_BASE_PATH = "/content/drive/MyDrive/colab_projects/EthioMart_NER"

# Path to your labeled CoNLL file within Google Drive
CONLL_FILE_PATH = os.path.join(DRIVE_PROJECT_BASE_PATH, "data/labeled_data/labeled_data.conll")

# Define your entity types (must match what you used for training)
LABEL_NAMES = ["O", "B-PRODUCT", "I-PRODUCT", "B-LOC", "I-LOC", "B-PRICE", "I-PRICE"]

# --- Data Loading and Preparation (Repeated from Task 3 for independence) ---
def parse_conll_file(file_path):
    """Parses a CoNLL formatted file into a list of dictionaries."""
    try:
        raw_text = open(file_path, "r", encoding="utf-8").read()
    except FileNotFoundError:
        print(f"Error: CoNLL file not found at {file_path}. Please upload it or check the path.")
        return []
    
    sentences = raw_text.strip().split("\n\n")
    data = []
    for sentence_str in sentences:
        tokens = []
        ner_tags = []
        lines = sentence_str.split("\n")
        for line in lines:
            if line.strip():
                parts = line.split("\t")
                if len(parts) == 2:
                    tokens.append(parts[0])
                    ner_tags.append(parts[1])
        if tokens and ner_tags:
            data.append({"tokens": tokens, "ner_tags": ner_tags})
    return data

print(f"Loading labeled data from {CONLL_FILE_PATH} for fine-tuning...")
conll_data = parse_conll_file(CONLL_FILE_PATH)
if not conll_data:
    print("No labeled data found. Model comparison will be skipped.")
    exit() # Exit if no data to train on

print(f"Loaded {len(conll_data)} sentences for fine-tuning.")

features = Features({
    "tokens": Sequence(Value("string")),
    "ner_tags": Sequence(ClassLabel(names=LABEL_NAMES))
})
dataset = Dataset.from_list(conll_data, features=features)
train_test_split = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = train_test_split["train"]
eval_dataset = train_test_split["test"]

print(f"\nDataset split: {len(train_dataset)} training examples, {len(eval_dataset)} evaluation examples.")

# --- Tokenization and Label Alignment Function ---
def tokenize_and_align_labels(examples, id2label_map, label2id_map):
    """Aligns word-level CoNLL labels to subword tokens."""
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True
    )
    labels = []
    for i, label_ids_raw in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        current_labels = []
        for word_idx in word_ids:
            if word_idx is None:
                current_labels.append(-100)
            elif word_idx != previous_word_idx:
                original_label_string = id2label_map[label_ids_raw[word_idx]]
                current_labels.append(label2id_map[original_label_string])
            else:
                original_label_string = id2label_map[label_ids_raw[word_idx]]
                if original_label_string.startswith("B-"):
                    i_label_string = "I-" + original_label_string[2:]
                    if i_label_string in label2id_map:
                        current_labels.append(label2id_map[i_label_string])
                    else:
                        current_labels.append(label2id_map[original_label_string])
                else:
                    current_labels.append(label2id_map[original_label_string])
            previous_word_idx = word_idx
        labels.append(current_labels)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# --- Metrics for Evaluation ---
metric = evaluate.load("seqeval")

def compute_metrics(p):
    """Computes and returns evaluation metrics using seqeval."""
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    true_labels = [[LABEL_NAMES[l] for l in label if l != -100] for label in labels]
    true_predictions = [[LABEL_NAMES[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)]
    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

# --- Main Model Training and Evaluation Function ---
def train_and_evaluate_model(model_name: str, model_short_name: str):
    """Loads, fine-tunes, and evaluates a single NER model."""
    print(f"\n--- Starting fine-tuning for {model_short_name} ({model_name}) ---")
    
    global tokenizer # Global tokenizer will be set per model
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    model = AutoModelForTokenClassification.from_pretrained(
        model_name,
        num_labels=len(LABEL_NAMES),
        id2label={i: label for i, label in enumerate(LABEL_NAMES)},
        label2id={label: i for i, label in enumerate(LABEL_NAMES)},
        ignore_mismatched_sizes=True
    )

    print("Tokenizing and aligning labels for current model...")
    tokenized_train_dataset_model = train_dataset.map(lambda x: tokenize_and_align_labels(x, model.config.id2label, model.config.label2id), batched=True)
    tokenized_eval_dataset_model = eval_dataset.map(lambda x: tokenize_and_align_labels(x, model.config.id2label, model.config.label2id), batched=True)

    # Define output and logging directories for this specific model within Google Drive
    output_dir_model = os.path.join(DRIVE_PROJECT_BASE_PATH, f"{model_short_name}_ner_output")
    logging_dir_model = os.path.join(DRIVE_PROJECT_BASE_PATH, f"{model_short_name}_ner_logs")
    os.makedirs(output_dir_model, exist_ok=True)
    os.makedirs(logging_dir_model, exist_ok=True)

    training_args = TrainingArguments(
        output_dir=output_dir_model,
        eval_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        num_train_epochs=20, # Reduced epochs for faster comparison
        weight_decay=0.01,
        logging_dir=logging_dir_model,
        logging_steps=10,
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        report_to="none",
    )

    data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train_dataset_model,
        eval_dataset=tokenized_eval_dataset_model,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    trainer.train()
    eval_results = trainer.evaluate()
    print(f"\nEvaluation Results for {model_short_name}:")
    print(eval_results)

    # Save the fine-tuned model
    final_model_path = os.path.join(output_dir_model, "final_model")
    trainer.save_model(final_model_path)
    tokenizer.save_pretrained(final_model_path)
    print(f"Model and tokenizer for {model_short_name} saved to {final_model_path}")
    
    return {
        "model_name": model_short_name,
        "eval_loss": eval_results["eval_loss"],
        "eval_precision": eval_results["eval_precision"],
        "eval_recall": eval_results["eval_recall"],
        "eval_f1": eval_results["eval_f1"],
        "eval_accuracy": eval_results["eval_accuracy"],
        "eval_runtime": eval_results["eval_runtime"],
    }

# --- Models to Compare ---
model_configs = [
    {
        "name": "XLM-R-Amharic-NER",
        "id": "mbeukman/xlm-roberta-base-finetuned-ner-amharic"
    },
    {
        "name": "BERT-Medium-Amharic-NER",
        "id": "rasyosef/bert-medium-amharic-finetuned-ner"
    },
    {
        "name": "mBERT-Base-Cased",
        "id": "bert-base-multilingual-cased"
    }
]

all_comparison_results = []
tokenizer = None # Initialize global tokenizer for first use

for config_item in model_configs:
    results = train_and_evaluate_model(config_item["id"], config_item["name"])
    all_comparison_results.append(results)

# --- Compare Models and Select Best ---
print("\n--- Model Comparison Results ---")
results_df = pd.DataFrame(all_comparison_results)
print(results_df.to_markdown(index=False))

# Select the best model based on F1-score
best_model_row = results_df.loc[results_df['eval_f1'].idxmax()]
BEST_MODEL_FOR_INFERENCE_PATH = os.path.join(DRIVE_PROJECT_BASE_PATH, f"{best_model_row['model_name']}_ner_output/final_model")

print(f"\nBest performing model based on F1-score: {best_model_row['model_name']}")
print(f"F1-score: {best_model_row['eval_f1']:.4f}")
print(f"Path to best model for subsequent tasks: {BEST_MODEL_FOR_INFERENCE_PATH}")

# Save the path to the best model for use in other notebooks (optional, for automation)
with open(os.path.join(DRIVE_PROJECT_BASE_PATH, "best_model_path.txt"), "w") as f:
    f.write(BEST_MODEL_FOR_INFERENCE_PATH)
print(f"Best model path saved to {os.path.join(DRIVE_PROJECT_BASE_PATH, 'best_model_path.txt')}")
