# Evaluation Testing For Training Model Testing

This paper needs to support two models: NLLB and LLAMA3

LLAMA3

In [None]:
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    pipeline,
)
import pandas as pd
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
)
import torch

import pandas as pds
from tqdm import tqdm
import sacrebleu
from datasets import Dataset
from datasets import load_from_disk
from sacrebleu.metrics import CHRF
from datasets import load_dataset
from datetime import datetime

############################################################################################################


MAX_LEN = 512
model_path = "/home/snt/llm_models/Llama-3.2-3B-Instruct"
val_dataset_path = "data/training_dataset/dataset_nllb_split.jsonl"
flore_dataset_path = "data/fake_targets/flores_devtest_arrow"
current_time = datetime.now()
formatted_time = current_time.strftime('%m_%d_%H_%M')
eval_output_path = val_dataset_path.split("/")[-1].replace(".jsonl", f"_{formatted_time}_eval_from_Llama3-3B.jsonl")
sample_num = None  # Number of samples to evaluate otherwise set to None


# src_lng = "English"
# src_lng_abr = "sentence_eng_Latn"

src_lng = "Luxembourgish"
src_lng_abr = "sentence_ltz_Latn"

# tgt_lng = "Luxembourgish"
# tgt_lng_abr = "sentence_ltz_Latn"

tgt_lng = "English"
tgt_lng_abr = "sentence_eng_Latn"

device="cuda:0"

############################################################################################################

# Load dataset
if val_dataset_path.endswith(".jsonl"):
    dataset = Dataset.from_json(val_dataset_path)  # Ensure correct format
else:
    dataset = load_from_disk(val_dataset_path)

# Filter by split
if sample_num:
    val_dataset = dataset.filter(lambda x: x["split"] == "val").select(range(sample_num))
else:
    val_dataset = dataset.filter(lambda x: x["split"] == "val")

val_dataset = val_dataset.rename_columns({
    "input": "Luxembourgish",
    "translated_text": "English",
})  # This pair cannot be changed

if sample_num:
    val_flores_dataset = (
        load_from_disk(flore_dataset_path)
        .rename_columns(
            {
                tgt_lng_abr: tgt_lng,
                src_lng_abr: src_lng,
            }
        )
        .select([i for i in range(10)])
    )
else:
    val_flores_dataset = load_from_disk(flore_dataset_path).rename_columns(
        {
            tgt_lng_abr: tgt_lng,
            src_lng_abr: src_lng,
        }
    )


tokenizer = AutoTokenizer.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token

# Reload model in FP16 and merge it with LoRA weights (was previously converted to 4 bits)

model = AutoModelForCausalLM.from_pretrained(
    model_path,
    device_map=device,
)

# Function to generate from the model
def generate_response(prompt, model):
    encoded_input = tokenizer(prompt, return_tensors="pt", add_special_tokens=True)
    model_inputs = encoded_input.to(device)
    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=MAX_LEN * 2,
        do_sample=True,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
        temperature=0.1,
    )
    decoded_output = tokenizer.batch_decode(generated_ids)
    return decoded_output[0].replace(prompt, "")

def compute_jaccard(prediction: str, reference: str) -> float:
    pred_set = set(prediction.split())
    ref_set = set(reference.split())
    if not pred_set and not ref_set:
        return 1.0 
    return len(pred_set & ref_set) / len(pred_set | ref_set)


def create_prompt(
    sample, src_lng, tgt_lng, mode="train", tokenizer=None
):
    """
    Create a prompt using the model's EOS token.

    Args:
        sample (dict): A dictionary containing source and target text.
        mode (str): The mode, either 'train' or 'test'.
        src_lng (str): Source language name.
        tgt_lng (str): Target language name.
        tokenizer: The tokenizer associated with the model (required to fetch EOS token).

    Returns:
        dict: A dictionary with the constructed prompt.
    """
    # Validate the tokenizer input
    if tokenizer is None or tokenizer.eos_token is None:
        raise ValueError("A tokenizer with a defined EOS token is required.")

    # Define the system message template.
    system_message = f"Translate the {src_lng} input text into {tgt_lng}.".upper()
    input_text = sample[src_lng.capitalize()].strip()  # Extract the input text.
    response = (
        sample[tgt_lng.capitalize()].strip() if tgt_lng.capitalize() in sample else ""
    )  # Extract the target text.

    # Get the EOS token from the tokenizer.
    eos_token = tokenizer.eos_token

    # Construct the full prompt.
    full_prompt = (
        "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n"
        + system_message
        + "<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n"
    )
    full_prompt += (
        input_text + "<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
    )
    if mode == "train":
        full_prompt += response + eos_token
    return {"prompt_response": full_prompt}


def generate_dataset_responses(dataset, model, src_lng, tgt_lng, tokenizer=None):
    """Generates prompts and corresponding LLM responses for the "test" split of a dataset,
    and computes the SPBLEU score by comparing the LLM responses to the ground truth."""

    source = []
    predictions = []  # List to store ground truth responses
    targets = []  # List to store LLM generated responses
    index_uniques = []

    for sample in tqdm(dataset, desc="Generating responses"):
        input_prompt = create_prompt(
            sample, mode="test", src_lng=src_lng, tgt_lng=tgt_lng, tokenizer=tokenizer
        )[
            "prompt_response"
        ]  # Create the prompt in "test" mode

        llm_response = (
            generate_response(input_prompt, model)
            .replace("<|begin_of_text|>", "")
            .replace("<|eot_id|>", "")
        )

        ground_truth = sample.get(tgt_lng, "")  # Get the ground truth (adjust field name as needed)
        index_unique = sample.get("index_unique", "")
        # Append the LLM response and ground truth

        predictions.append(llm_response)
        targets.append([ground_truth])  # References should be in list format for SPBLEU
        source.append(sample.get(src_lng, ""))
        index_uniques.append(index_unique)

    df_results = pd.DataFrame(
        list(zip(source, predictions, targets, index_uniques)),
        columns=["LLM_Input", "LLM_Output", "Ground_Truth", "index_unique"],
    )


    ## SPBLEU Score
    spbleu_scores = [
        sacrebleu.corpus_bleu([p], [t], tokenize="flores200").score
        for p, t in zip(predictions, targets)
    ]
    df_results["SPBLEU_Score"] = spbleu_scores

    ## CharF++
    chrf_metric = CHRF(word_order=3) 
    charf_scores = [chrf_metric.sentence_score(p, t).score for p, t in zip(predictions, targets)]

    ## Jaccard Score
    jaccard_scores = [
        compute_jaccard(p, t[0]) for p, t in zip(predictions, targets)
    ]
    df_results["CharF++_Score"] = charf_scores
    df_results["Jaccard_Score"] = jaccard_scores
    

    ## Average Scores
    average_charf = df_results["CharF++_Score"].mean()
    average_jaccard = df_results["Jaccard_Score"].mean()
    average_spbleu = df_results["SPBLEU_Score"].mean()

    print(f"Average SPBLEU Score: {average_spbleu:.2f}")
    print(f"Average CharF++ Score: {average_charf:.2f}")
    print(f"Average Jaccard Score: {average_jaccard:.2f}")

    return df_results


print ("Validation RTL Results")
print ("----------------------")
df_RTL_results = generate_dataset_responses(
    dataset=val_dataset, model=model, src_lng = src_lng, tgt_lng=tgt_lng, tokenizer=tokenizer
)

df_RTL_results["Dataset"] = "RTL"

print ("FLORES 200 Results")
print ("----------------------")

df_flores_results = generate_dataset_responses(
    dataset=val_flores_dataset, model=model, src_lng = src_lng, tgt_lng=tgt_lng, tokenizer=tokenizer
)
df_flores_results["Dataset"] = "FLORES"

df_results = pd.concat([df_RTL_results, df_flores_results], axis=0)
df_results.to_json(eval_output_path, orient="records", lines=True)
print(f"Results saved to {eval_output_path}")
