In [5]:
# prompt: write to which clean GPU memory used by ths notebook

# clear variables and free up memory
%reset -f

# Optional: Garbage collection
import gc
gc.collect()

# Optional: Clear TensorFlow/PyTorch cache if used
# import torch
# torch.cuda.empty_cache()
# import tensorflow as tf
# tf.keras.backend.clear_session()

0

In [6]:
!pip install  seqeval



In [2]:
!pip install lime

Collecting lime
  Downloading lime-0.2.0.1.tar.gz (275 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/275.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━[0m [32m204.8/275.7 kB[0m [31m9.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m275.7/275.7 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: lime
  Building wheel for lime (setup.py) ... [?25l[?25hdone
  Created wheel for lime: filename=lime-0.2.0.1-py3-none-any.whl size=283834 sha256=2407baba6b10ee4977eff11fd6c09de85ec92c632d70d55ec655afd4cadbc59a
  Stored in directory: /root/.cache/pip/wheels/85/fa/a3/9c2d44c9f3cd77cf4e533b58900b2bf4487f2a17e8ec212a3d
Successfully built lime
Installing collected packages: lime
Successfully installed lime-0.2.0.1


In [8]:
from google.colab import drive
import pandas as pd
import numpy as np
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline, TrainingArguments, Trainer
from seqeval.metrics import f1_score, precision_score, recall_score
import torch
from seqeval.metrics import classification_report
import shap
from lime.lime_text import LimeTextExplainer
import os
import gc
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

# Define paths and configurations
CONLL_FILE = "/content/drive/My Drive/kifiya5/labeled_data.conll"
MODEL_PATH = "/content/drive/My Drive/kifiya5/ner_model_xlmroberta"
OUTPUT_REPORT = "/content/drive/My Drive/kifiya5/ner_interpretability_report.md"
label_list = ["O", "B-PRODUCT", "I-PRODUCT", "B-LOC", "I-LOC", "B-PRICE", "I-PRICE"]
label2id = {label: idx for idx, label in enumerate(label_list)}
id2label = {idx: label for label, idx in label2id.items()}

def read_conll(file_path):
    """
    Read CoNLL file and convert to list of sentences with tokens and labels.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"CoNLL file not found at {file_path}")

    sentences = []
    current_sentence = {"tokens": [], "labels": []}

    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                try:
                    token, label = line.split()
                    current_sentence["tokens"].append(token)
                    current_sentence["labels"].append(label)
                except ValueError:
                    print(f"Skipping malformed line: {line}")
            else:
                if current_sentence["tokens"]:
                    sentences.append(current_sentence)
                    current_sentence = {"tokens": [], "labels": []}

    if current_sentence["tokens"]:
        sentences.append(current_sentence)

    return sentences

def prepare_dataset(sentences):
    """
    Convert sentences to Hugging Face Dataset format.
    """
    data = {"tokens": [s["tokens"] for s in sentences], "ner_tags": [s["labels"] for s in sentences]}
    dataset = Dataset.from_dict(data)

    def convert_labels(example):
        example["ner_tags"] = [label2id[label] for label in example["ner_tags"]]
        return example

    dataset = dataset.map(convert_labels)
    return dataset

def load_or_fine_tune_model(model_path):
    """
    Load fine-tuned model or fine-tune if not available.
    """
    if not os.path.exists(model_path) or not os.path.exists(os.path.join(model_path, "pytorch_model.bin")):
        print(f"Model not found at {model_path}. Initiating fine-tuning...")
        sentences = read_conll(CONLL_FILE)
        dataset = prepare_dataset(sentences)
        dataset = dataset.train_test_split(test_size=0.2, seed=42)
        train_dataset = dataset["train"]
        val_dataset = dataset["test"]

        tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
        model = AutoModelForTokenClassification.from_pretrained(
            "xlm-roberta-base",
            num_labels=len(label_list),
            id2label=id2label,
            label2id=label2id
        )

        def tokenize_and_align_labels(examples):
            tokenized_inputs = tokenizer(
                examples["tokens"],
                is_split_into_words=True,
                truncation=True,
                padding=True,
                return_tensors="pt"
            )
            labels = []
            for i, label in enumerate(examples["ner_tags"]):
                word_ids = tokenized_inputs.word_ids(batch_index=i)
                previous_word_idx = None
                label_ids = []
                for word_idx in word_ids:
                    if word_idx is None:
                        label_ids.append(-100)
                    elif word_idx != previous_word_idx:
                        label_ids.append(label[word_idx])
                    else:
                        label_str = id2label[label[word_idx]]
                        if label_str.startswith("B-"):
                            label_str = "I-" + label_str[2:]
                            if label_str not in label2id:
                                label_str = "O"
                            label_ids.append(label2id[label_str])
                        else:
                            label_ids.append(label[word_idx])
                    previous_word_idx = word_idx
                labels.append(label_ids)
            tokenized_inputs["labels"] = labels
            return tokenized_inputs

        tokenized_train = train_dataset.map(
            tokenize_and_align_labels,
            batched=True,
            remove_columns=["tokens", "ner_tags"]
        )
        tokenized_val = val_dataset.map(
            tokenize_and_align_labels,
            batched=True,
            remove_columns=["tokens", "ner_tags"]
        )

        training_args = TrainingArguments(
            output_dir="./ner_model_xlmroberta",
            eval_strategy="epoch",
            save_strategy="epoch",
            load_best_model_at_end=True,
            metric_for_best_model="f1",
            report_to="none",
            learning_rate=2e-5,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            num_train_epochs=3,
            weight_decay=0.01
        )

        def compute_metrics(eval_pred):
            predictions, labels = eval_pred
            predictions = np.argmax(predictions, axis=2)
            true_labels = [[id2label[l] for l in label if l != -100] for label in labels]
            predicted_labels = [[id2label[p] for p, l in zip(pred, label) if l != -100] for pred, label in zip(predictions, labels)]
            return {
                "precision": precision_score(true_labels, predicted_labels),
                "recall": recall_score(true_labels, predicted_labels),
                "f1": f1_score(true_labels, predicted_labels)
            }

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_train,
            eval_dataset=tokenized_val,
            tokenizer=tokenizer,
            compute_metrics=compute_metrics
        )

        trainer.train()
        trainer.save_model("./ner_model_xlmroberta")
        trainer.save_model(model_path)
        tokenizer.save_pretrained("./ner_model_xlmroberta")
        tokenizer.save_pretrained(model_path)
        print(f"Model fine-tuned and saved to {model_path}")
    else:
        print(f"Loading existing model from {model_path}")

    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForTokenClassification.from_pretrained(model_path)
    return model, tokenizer

def get_ner_pipeline(model, tokenizer):
    """
    Create NER pipeline for predictions.
    """
    return pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

def tokenize_and_align_labels(examples, tokenizer):
    """
    Tokenize inputs and align labels with tokenized inputs.
    """
    tokenized_inputs = tokenizer(
        examples["tokens"],
        is_split_into_words=True,
        truncation=True,
        padding=True,
        return_tensors="pt"
    )

    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_str = id2label[label[word_idx]]
                if label_str.startswith("B-"):
                    label_str = "I-" + label_str[2:]
                    if label_str not in label2id:
                        label_str = "O"
                    label_ids.append(label2id[label_str])
                else:
                    label_ids.append(label[word_idx])
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

def shap_explain(model, tokenizer, sentence_tokens):
    """
    Use SHAP to explain NER predictions with pretokenized input.
    """
    def predict_fn(token_lists):
        # Convert list of token lists to list of joined strings for batch processing
        texts = [" ".join(tokens) for tokens in token_lists]
        inputs = tokenizer(
            texts,
            return_tensors="pt",
            padding=True,
            truncation=True,
            is_split_into_words=False
        ).to(model.device)
        outputs = model(**inputs).logits
        probs = torch.softmax(outputs, dim=-1).detach().cpu().numpy()
        # Reshape to match token-level predictions
        return probs

    # Prepare input as a batch of pretokenized examples
    token_lists = [sentence_tokens]
    explainer = shap.KernelExplainer(predict_fn, shap.sample(token_lists, 10))
    shap_values = explainer.shap_values(token_lists, nsamples=100)
    return shap_values

def lime_explain(model, tokenizer, sentence_tokens):
    """
    Use LIME to explain NER predictions with pretokenized input.
    """
    def predict_fn(texts):
        # Handle both single string and list of tokens
        if isinstance(texts, str):
            texts = [texts]
        inputs = tokenizer(
            texts,
            return_tensors="pt",
            padding=True,
            truncation=True,
            is_split_into_words=False
        ).to(model.device)
        outputs = model(**inputs).logits
        probs = torch.softmax(outputs, dim=-1).detach().cpu().numpy()
        return probs

    explainer = LimeTextExplainer(class_names=label_list)
    text = " ".join(sentence_tokens)  # Join for LIME, which works with raw text
    explanation = explainer.explain_instance(text, predict_fn, num_features=10)
    return explanation

def analyze_difficult_cases(dataset, ner_pipeline, num_examples=3):
    """
    Identify and analyze difficult cases (ambiguous text, overlapping entities).
    """
    difficult_cases = []
    for example in dataset:
        tokens = example["tokens"]
        true_labels = [id2label[l] for l in example["ner_tags"]]
        prediction = ner_pipeline(tokens)

        pred_labels = ["O"] * len(tokens)
        for entity in prediction:
            start_idx = entity.get("start", 0)
            end_idx = entity.get("end", len(tokens))
            label = entity["entity_group"]
            for i in range(start_idx, min(end_idx, len(tokens))):
                if i == start_idx:
                    pred_labels[i] = f"B-{label.split('-')[1]}"
                else:
                    pred_labels[i] = f"I-{label.split('-')[1]}"

        mismatches = sum(1 for t, p in zip(true_labels, pred_labels) if t != p)
        has_multi_word = any(l.startswith("I-") for l in true_labels)
        has_numeric = any(t.isdigit() for t in tokens)

        if mismatches > 0 and (has_multi_word or has_numeric):
            difficult_cases.append({
                "tokens": tokens,
                "true_labels": true_labels,
                "pred_labels": pred_labels,
                "mismatches": mismatches
            })

        if len(difficult_cases) >= num_examples:
            break

    return difficult_cases

def generate_interpretability_report(dataset, model, tokenizer, ner_pipeline):
    """
    Generate a report summarizing model interpretability and difficult cases.
    """
    dataset = dataset.train_test_split(test_size=0.2, seed=42)
    val_dataset = dataset["test"]

    report_lines = ["# NER Model Interpretability Report\n"]
    report_lines.append("## Model Overview\n")
    report_lines.append("- Model: xlm-roberta-base\n")
    report_lines.append("- Task: Named Entity Recognition for Amharic Telegram messages\n")
    report_lines.append("- Entities: Product, Price, Location\n")

    # SHAP Analysis
    report_lines.append("## SHAP Analysis\n")
    sample_sentence = val_dataset[0]["tokens"]
    shap_values = shap_explain(model, tokenizer, sample_sentence)
    report_lines.append("### Example Sentence\n")
    report_lines.append(f"Tokens: {' '.join(sample_sentence)}\n")
    report_lines.append("### SHAP Insights\n")
    for i, token in enumerate(sample_sentence):
        top_label = np.argmax(shap_values[0][i])
        report_lines.append(f"- Token: {token}, Most Influential Label: {label_list[top_label]}, SHAP Value: {shap_values[0][i][top_label]:.4f}\n")

    # LIME Analysis
    report_lines.append("## LIME Analysis\n")
    lime_explanation = lime_explain(model, tokenizer, sample_sentence)
    report_lines.append("### LIME Insights\n")
    for label, weight in lime_explanation.as_list():
        report_lines.append(f"- Feature: {label}, Weight: {weight:.4f}\n")

    # Difficult Cases
    report_lines.append("## Difficult Cases Analysis\n")
    difficult_cases = analyze_difficult_cases(val_dataset, ner_pipeline)
    for idx, case in enumerate(difficult_cases, 1):
        report_lines.append(f"### Case {idx}\n")
        report_lines.append(f"- Tokens: {' '.join(case['tokens'])}\n")
        report_lines.append(f"- True Labels: {case['true_labels']}\n")
        report_lines.append(f"- Predicted Labels: {case['pred_labels']}\n")
        report_lines.append(f"- Mismatches: {case['mismatches']}\n")
        report_lines.append("- Analysis: Potential ambiguity in multi-word entities or numeric tokens.\n")

    # Recommendations
    report_lines.append("## Recommendations for Improvement\n")
    report_lines.append("- **Data Augmentation**: Increase dataset size with diverse Amharic examples.\n")
    report_lines.append("- **Contextual Features**: Incorporate sentence-level context for disambiguation.\n")
    report_lines.append("- **Multi-Modal Integration**: Add OCR for Telegram media.\n")
    report_lines.append("- **Regularization**: Apply dropout or weight decay to reduce overfitting.\n")

    with open(OUTPUT_REPORT, "w", encoding="utf-8") as f:
        f.writelines(report_lines)
    print(f"Interpretability report saved to {OUTPUT_REPORT}")

def main():
    """
    Main function for NER model interpretability analysis.
    """
    # Load dataset
    sentences = read_conll(CONLL_FILE)
    dataset = prepare_dataset(sentences)

    # Load or fine-tune model and tokenizer
    model, tokenizer = load_or_fine_tune_model(MODEL_PATH)
    model.to("cuda" if torch.cuda.is_available() else "cpu")

    # Create NER pipeline
    ner_pipeline = get_ner_pipeline(model, tokenizer)

    # Generate interpretability report
    generate_interpretability_report(dataset, model, tokenizer, ner_pipeline)

    # Clean up memory
    del model, tokenizer, ner_pipeline
    gc.collect()
    torch.cuda.empty_cache()

if __name__ == "__main__":
    main()

Mounted at /content/drive


Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Model not found at /content/drive/My Drive/kifiya5/ner_model_xlmroberta. Initiating fine-tuning...


Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Some weights of XLMRobertaForTokenClassification were not initialized from the model checkpoint at xlm-roberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Map:   0%|          | 0/80 [00:00<?, ? examples/s]

Map:   0%|          | 0/20 [00:00<?, ? examples/s]

  trainer = Trainer(


OutOfMemoryError: CUDA out of memory. Tried to allocate 734.00 MiB. GPU 0 has a total capacity of 14.74 GiB of which 2.12 MiB is free. Process 13334 has 14.74 GiB memory in use. Of the allocated memory 14.55 GiB is allocated by PyTorch, and 57.38 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
# Rerun the main function after fixing the error
if __name__ == "__main__":
    main()