In [None]:
# Import necessary libraries
import os
import re
import json
import time
import pandas as pd
import numpy as np
import requests
from datetime import datetime
from telethon import TelegramClient, events
from telethon.tl.types import InputPeerChannel
from PIL import Image
import pytesseract
from io import BytesIO
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset, DatasetDict
from sklearn.metrics import classification_report
import shap
import lime
from lime.lime_text import LimeTextExplainer

# ------------------------------
# Task 1: Data Ingestion and Preprocessing
# ------------------------------

# Telegram API credentials (Replace with your actual credentials)
api_id = 'YOUR_API_ID'
api_hash = 'YOUR_API_HASH'
bot_token = 'YOUR_BOT_TOKEN'

# Initialize Telegram client
client = TelegramClient('ethiomart_session', api_id, api_hash).start(bot_token=bot_token)

# List of Telegram channel usernames or IDs
channels = ['Shageronlinestore', 'AnotherEcommerceChannel']

# Directory to store fetched data
data_dir = 'raw'
os.makedirs(data_dir, exist_ok=True)

# Function to preprocess Amharic text
def preprocess_amharic(text):
    # Normalize text: remove unwanted characters, handle diacritics
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

# Function to download and preprocess images using OCR
def preprocess_image(image_bytes):
    image = Image.open(BytesIO(image_bytes))
    text = pytesseract.image_to_string(image, lang='amh')  # Ensure Amharic OCR is supported
    return preprocess_amharic(text)

# Event handler for new messages
@client.on(events.NewMessage(chats=channels))
async def handler(event):
    message = event.message.message
    sender = await event.get_sender()
    sender_id = sender.id
    timestamp = event.message.date.isoformat()
    media = event.message.media

    # Initialize data dictionary
    data = {
        'sender_id': sender_id,
        'timestamp': timestamp,
        'message': message,
        'entities': {}
    }

    # Handle media (images)
    if media:
        if hasattr(media, 'photo'):
            image = await event.download_media(media)
            with open(os.path.join(data_dir, f"{sender_id}_{int(time.time())}.jpg"), 'wb') as f:
                f.write(image)
            # Extract text from image
            ocr_text = preprocess_image(image)
            data['message'] += ' ' + ocr_text

    # Preprocess text
    clean_text = preprocess_amharic(message)
    data['message'] = clean_text

    # Save data to JSON
    with open(os.path.join(data_dir, f"{sender_id}_{int(time.time())}.json"), 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

    print(f"Saved message from {sender_id} at {timestamp}")

# Start the client
print("Starting Telegram client...")
client.run_until_disconnected()

# ------------------------------
# Task 2: Labeling Dataset in CoNLL Format
# ------------------------------

messages_df = pd.read_csv('messages.csv')  # Replace with your actual file path


# ------------------------------
# Task 3: Fine-Tuning the NER Model
# ------------------------------

# Load the labeled dataset
def load_conll_data(file_path):
    sentences = []
    labels = []
    sentence = []
    label = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip() == "":
                if sentence:
                    sentences.append(sentence)
                    labels.append(label)
                    sentence = []
                    label = []
            else:
                token, tag = line.strip().split()
                sentence.append(token)
                label.append(tag)
    return sentences, labels

train_sentences, train_labels = load_conll_data('data/labeled/train.conll')
val_sentences, val_labels = load_conll_data('data/labeled/val.conll')

# Create Hugging Face Dataset
train_dataset = Dataset.from_dict({'tokens': train_sentences, 'ner_tags': train_labels})
val_dataset = Dataset.from_dict({'tokens': val_sentences, 'ner_tags': val_labels})

dataset = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset
})

# Define label list
label_list = ['O', 'B-Product', 'I-Product', 'B-LOC', 'I-LOC', 'B-PRICE', 'I-PRICE']
label_to_id = {label: idx for idx, label in enumerate(label_list)}
id_to_label = {idx: label for label, idx in label_to_id.items()}

# Tokenizer and Model
model_name = "xlm-roberta-base"  # You can choose other models like bert-tiny-amharic if available
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(label_list))

# Tokenize the data
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples['tokens'], truncation=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(examples['ner_tags']):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label_to_id[label[word_idx]])
            else:
                label_ids.append(label_to_id[label[word_idx]] if label[word_idx].startswith('I-') else -100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)

# Define Metrics
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_labels = [[id_to_label[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id_to_label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    report = classification_report(true_labels, true_predictions, output_dict=True, zero_division=0)
    return {
        'precision': report['weighted avg']['precision'],
        'recall': report['weighted avg']['recall'],
        'f1': report['weighted avg']['f1-score']
    }

# Training Arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    save_total_limit=2,
    logging_dir='./logs',
)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset['train'],
    eval_dataset=tokenized_dataset['validation'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# Fine-Tune the Model
trainer.train()

# Evaluate the Model
eval_results = trainer.evaluate()
print(f"Validation F1 Score: {eval_results['eval_f1']}")

# Save the Fine-Tuned Model
trainer.save_model("fine-tuned-ner-amharic")
tokenizer.save_pretrained("fine-tuned-ner-amharic")

# ------------------------------
# Task 4: Model Comparison & Selection
# ------------------------------

# Define multiple models to compare
models_to_compare = {
    'xlm-roberta-base': "xlm-roberta-base",
    'distilbert-base-multilingual-cased': "distilbert-base-multilingual-cased",
    'bert-base-multilingual-cased': "bert-base-multilingual-cased",
    # Add more models as needed
}

model_performance = {}

for model_key, model_name in models_to_compare.items():
    print(f"Fine-tuning model: {model_name}")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(label_list))
    
    # Tokenize
    tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)
    
    # Initialize Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset['train'],
        eval_dataset=tokenized_dataset['validation'],
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )
    
    # Fine-Tune
    trainer.train()
    
    # Evaluate
    eval_results = trainer.evaluate()
    f1 = eval_results['eval_f1']
    model_performance[model_key] = f1
    print(f"Model: {model_key}, F1 Score: {f1}")

# Select the best-performing model
best_model_name = max(model_performance, key=model_performance.get)
print(f"Best model selected: {best_model_name} with F1 Score: {model_performance[best_model_name]}")

# ------------------------------
# Task 5: Model Interpretability
# ------------------------------

# Load the best model
best_model = AutoModelForTokenClassification.from_pretrained(f"fine-tuned-ner-amharic")
best_tokenizer = AutoTokenizer.from_pretrained(f"fine-tuned-ner-amharic")

# Function to predict entities
def predict_entities(text):
    inputs = best_tokenizer(text, return_tensors="pt")
    outputs = best_model(**inputs).logits
    predictions = torch.argmax(outputs, dim=2)
    tokens = best_tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
    entities = []
    for token, prediction in zip(tokens, predictions[0].tolist()):
        label = id_to_label[prediction]
        entities.append((token, label))
    return entities

# Example usage
example_text = "ዋጋ 1000 ብር ያለው ቤቲ ቦትል በቦሌ አዲስ አበባ"
entities = predict_entities(example_text)
print(entities)

# SHAP Interpretability
# Note: SHAP for transformers is complex; here is a simplified example

# Define a prediction function for SHAP
def model_predict(texts):
    inputs = best_tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = best_model(**inputs).logits
    return outputs.numpy()

explainer = shap.Explainer(model_predict, best_tokenizer)
shap_values = explainer([example_text])
shap.summary_plot(shap_values, feature_names=best_tokenizer.tokenize(example_text))

# LIME Interpretability
explainer = LimeTextExplainer(class_names=label_list)

def predict_proba(texts):
    inputs = best_tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = best_model(**inputs).logits
    probs = torch.softmax(outputs, dim=2)
    return probs.numpy()

exp = explainer.explain_instance(example_text, predict_proba, num_features=10)
exp.show_in_notebook(text=True)

# Save interpretability plots if needed
# shap.summary_plot(shap_values, feature_names=best_tokenizer.tokenize(example_text), show=False)
# plt.savefig('shap_summary.png')
# exp.save_to_file('lime_explanation.html')
