In [None]:
import os
import numpy as np
from datasets import Dataset, DatasetDict, load_dataset
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification
import evaluate

In [None]:
# Configuration
GDRIVE_PROJECT_PATH = '/content/drive/MyDrive/Ethio_mart'
LABELED_DATA_PATH = os.path.join(GDRIVE_PROJECT_PATH, 'labeled_data_conll.txt')
OUTPUT_MODEL_DIR = os.path.join(GDRIVE_PROJECT_PATH, 'models', 'xlm-roberta-ner-amharic')
MODEL_CHECKPOINT = "xlm-roberta-base"

In [None]:
# Define labels
labels_list = ['O', 'B-PRODUCT', 'I-PRODUCT', 'B-LOC', 'I-LOC', 'B-PRICE', 'I-PRICE']
label2id = {label: i for i, label in enumerate(labels_list)}
id2label = {i: label for i, label in enumerate(labels_list)}

def create_dataset_from_conll(file_path):
    """Parse CoNLL file and create a Hugging Face Dataset."""
    tokens_list, tags_list = [], []
    current_tokens, current_tags = [], []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line == "":
                if current_tokens:
                    tokens_list.append(current_tokens)
                    tags_list.append(current_tags)
                    current_tokens, current_tags = [], []
            else:
                parts = line.split()
                current_tokens.append(parts[0])
                current_tags.append(label2id[parts[1]])

    if current_tokens:
        tokens_list.append(current_tokens)
        tags_list.append(current_tags)

    return Dataset.from_dict({'tokens': tokens_list, 'ner_tags': tags_list})

In [None]:
def tokenize_and_align_labels(examples, tokenizer):
    """Tokenize inputs and align NER labels with tokens."""
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
def compute_metrics(p):
    """Compute evaluation metrics using seqeval."""
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    true_predictions = [
        [labels_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [labels_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    metric = evaluate.load("seqeval")
    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
def main():
    # Check if data exists
    if not os.path.exists(LABELED_DATA_PATH):
        raise FileNotFoundError(f"Data file not found at {LABELED_DATA_PATH}")

    # Load and split dataset
    full_dataset = create_dataset_from_conll(LABELED_DATA_PATH)
    train_test_split = full_dataset.train_test_split(test_size=0.2, seed=42)
    final_dataset = DatasetDict({
        'train': train_test_split['train'],
        'test': train_test_split['test']
    })

    # Load tokenizer and tokenize dataset
    tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)
    tokenized_datasets = final_dataset.map(lambda x: tokenize_and_align_labels(x, tokenizer), batched=True)

    # Load model
    model = AutoModelForTokenClassification.from_pretrained(
        MODEL_CHECKPOINT,
        num_labels=len(labels_list),
        id2label=id2label,
        label2id=label2id
    )

    # Training arguments
    args = TrainingArguments(
        output_dir=OUTPUT_MODEL_DIR,
        evaluation_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=5,
        weight_decay=0.01,
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
    )

    # Data collator
    data_collator = DataCollatorForTokenClassification(tokenizer)

    # Initialize trainer
    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["test"],
        data_collator=data_collator,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )

    # Train and evaluate
    print("Starting model training...")
    trainer.train()
    print("Training complete.")
    final_evaluation = trainer.evaluate()
    print("Final evaluation results:", final_evaluation)

    # Save model and tokenizer
    trainer.save_model(OUTPUT_MODEL_DIR)
    tokenizer.save_pretrained(OUTPUT_MODEL_DIR)
    print(f"Model saved to {OUTPUT_MODEL_DIR}")

if __name__ == "__main__":
    main()