In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install -qU transformers==4.48.3 datasets==3.2.0 optimum==1.24.0

In [None]:
#Custom Dataset 
model_path = "/kaggle/input/qa-arasquad/transformers/default/2/finetuned-AraT5-QA/checkpoint-8125"

print("Files in model directory:", os.listdir(model_path))

In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

# Load tokenizer with custom special tokens
tokenizer = AutoTokenizer.from_pretrained(
    model_path,
    use_fast=True  # Required for SentencePiece (spiece.model)
)

# Load model
model = AutoModelForSeq2SeqLM.from_pretrained(model_path)

In [None]:
import torch

def generate_answer(formatted_text, max_input_len=1024, max_output_len=64):
    """
    Generate an answer from a formatted input: "<context> ... <question> ..."
    """
    model.eval()

    # Tokenize formatted input
    inputs = tokenizer(
        formatted_text,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=max_input_len
    )

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Generate output
    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_output_len,
            num_beams=4,
            early_stopping=True
        )

    # Decode and strip "<answer>" if desired
    decoded = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    # Optional: remove <answer> prefix if present
    if decoded.startswith("<answer>"):
        decoded = decoded.replace("<answer>", "").strip()

    return decoded

In [None]:
context = "تم تولية محمد على حكم مصر بإرادة الشعب المصرى، رغم اعتراض الباب العالى العثمانى، فسعى نحو جعل نقطة الارتكاز له ولأبنائه من بعده فى مصر لا فى الآستانة، وقد نجح فى فترة وجيزة فى جعل ولايته (مصر) أكثر حضارة وتقدمًا من الدولة العثمانية صاحبة السيادة؛ فكان من الطبيعى أن تستقل عنها. ولكن محمد على أراد أن يحمى هذا الاستقلال ويحيطه بسياج من الحدود الطبيعية فى الشام شرقًا والسودان جنوبًا، وبدأت من هنا سياسة (محمد على) الخارجية وتحركاته التوسعية لحماية الحدود المصرىة."

question = "من الذي تولى حكم مصر بإرادة الشعب المصري؟"
formatted_input = f"<context>{context}<question>{question}"

answer = generate_answer(formatted_input)
print("Generated Answer:", answer)

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
import pandas as pd

test_data = pd.read_json("/kaggle/input/arabic-squad-v20/asquadv2-test.json", encoding='ISO-8859-1')

print("Dataset loaded successfully!")


In [None]:
# Normalize and clean data
def normalize_data(data):
    data = pd.json_normalize(data['data'], ['paragraphs', 'qas'], ['title', ['paragraphs', 'context']])
    data = data.rename(columns={'paragraphs.context': 'context', 'question': 'question', 'answers': 'answers'})
    data['answers'] = data['answers'].apply(lambda x: {'text': [x[0]['text']] if x else [], 'answer_start': [x[0]['answer_start']] if x else []})
    data = data[data["is_impossible"] == False].drop(['is_impossible', 'plausible_answers'], axis=1, errors='ignore')
    return data

test_data = normalize_data(test_data)

In [None]:
# Extract relevant fields
def extract_relevant_data(dataset):
    return pd.DataFrame({
        "context": dataset["context"],
        "question": dataset["question"],
        "answer": dataset["answers"].apply(lambda ans: ans["text"][0] if ans["text"] else "")
    })


test_data = extract_relevant_data(test_data)

# Check lengths

print(f"Test data size: {len(test_data)}")

In [None]:
from datasets import DatasetDict, Dataset, load_dataset
import pandas as pd
def format_for_answer_generation(dataset):
    return pd.DataFrame({
        "text": "<context>" + dataset["context"] + "<question>" + dataset["question"],
        "required": "<answer>" + dataset["answer"]
    })

# # Process test data
test_ag = format_for_answer_generation(test_data)  
test_ag_dataset = Dataset.from_pandas(test_ag)


# Create dataset dictionary including train, validation, and test sets
datasets_ag = DatasetDict({
    "test": test_ag_dataset
})


In [None]:
datasets_ag

In [None]:
datasets_ag['test'] = datasets_ag['test'].remove_columns(['__index_level_0__'])

In [None]:
seed= 42
# datasets_ag["test"] = datasets_ag["test"].shuffle(seed=seed).select(range(min(1000, len(datasets_ag["test"]))))

In [None]:
datasets_ag

In [None]:
for split in datasets_ag:
    print(f"{split} columns: {datasets_ag[split].column_names}")


In [None]:
datasets_ag["test"][0]

# F1_Score

In [None]:
import torch

def generate_answers(context, questions, num_answers_per_question=1):
    """
    Generate multiple answers for a list of questions based on shared context.
    
    Args:
        context (str): Shared context.
        questions (list): List of questions.
        num_answers_per_question (int): Number of diverse answers to generate.
    
    Returns:
        dict: {question: [answer1, answer2, ...]}
    """
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    results = {}

    for question in questions:
        formatted_text = f"<context>{context}<question>{question}"

        inputs = tokenizer(
            formatted_text,
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=768
        ).to(device)

        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=64,
            do_sample=True,
            temperature=0.9,
            top_k=50,
            top_p=0.95,
            num_return_sequences=num_answers_per_question
        )

        decoded = [
            tokenizer.decode(output, skip_special_tokens=True).strip().replace("<answer>", "").strip()
            for output in outputs
        ]

        results[question] = decoded

    return results


In [None]:
import re
import numpy as np
import pandas as pd

def evaluate_answer_generation_f1(dataset, model, tokenizer, num_samples=20, csv_path="f1_evaluation_results.csv"):
    """
    Evaluates the model using token-level F1 with Arabic-aware tokenization and saves results to CSV.

    Args:
        dataset: List of dicts with "text" and "required" fields.
        model: The fine-tuned QA model.
        tokenizer: The tokenizer used.
        num_samples: Number of samples to evaluate.
        csv_path: Path to save the CSV file.

    Returns:
        Tuple: (list of detailed scores, average F1 score)
    """
    def arabic_tokenize(text):
        text = re.sub(r'[^\w\s]', '', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text.split()

    results = []
    f1_scores = []
    precision_scores = []
    recall_scores = []
    
    for i, example in enumerate(dataset):
        if i >= num_samples:
            break

        full_text = example["text"]
        reference_answer = example["required"].replace("<answer>", "").strip()

        # Parse "<context> ... <question> ..."
        try:
            context_split = full_text.split("<context>", 1)[1]
            context, question = context_split.split("<question>", 1)
            context = context.strip()
            question = question.strip()
        except Exception:
            print(f"❌ Skipping sample {i+1} - invalid format")
            continue

        generated_dict = generate_answers(context, [question], num_answers_per_question=1)
        generated_answer = generated_dict[question][0].replace("<answer>", "").strip()

        ref_tokens = set(arabic_tokenize(reference_answer))
        gen_tokens = set(arabic_tokenize(generated_answer))

        tp = len(ref_tokens & gen_tokens)
        precision = tp / len(gen_tokens) if gen_tokens else 0
        recall = tp / len(ref_tokens) if ref_tokens else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

        f1_scores.append(f1)
        precision_scores.append(precision)
        recall_scores.append(recall)
        
        results.append({
            "Sample": i + 1,
            "F1_Score": f1,
            "Precision": precision,
            "Recall": recall,
            "Context": context,  # ✅ Now context is included
            "Question": question,
            "Reference_Answer": reference_answer,
            "Generated_Answer": generated_answer
        })

    avg_f1 = np.mean(f1_scores) if f1_scores else 0.0
    avg_per = np.mean(precision_scores)
    avg_recall = np.mean(recall_scores)

    # Save to CSV
    df = pd.DataFrame(results)
    df.to_csv(csv_path, index=False, encoding='utf-8-sig')

    print(f"\n📊 Evaluation Complete — Average F1: {avg_f1:.4f} from {len(f1_scores)} samples")
    print(f"\n📊 Evaluation Complete — Average per: {avg_per:.4f} from {len(f1_scores)} samples")
    print(f"\n📊 Evaluation Complete — Average recall: {avg_recall:.4f} from {len(f1_scores)} samples")
    print(f"📁 Results saved to: {csv_path}")
    return results, avg_f1


In [None]:
seed = 42
test_dataset = datasets_ag["test"].shuffle(seed=seed).select(range(min(4000, len(datasets_ag["test"]))))

In [None]:
results, final_score = evaluate_answer_generation_f1(test_dataset,
                                                     model,
                                                     tokenizer,
                                                     len(test_dataset))

# EM

In [None]:
def normalize_text(text):
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    text = re.sub(r'\s+', ' ', text).strip().lower()
    return text

def exact_match_score(prediction, ground_truth):
    return normalize_text(prediction) == normalize_text(ground_truth)