In [None]:
!pip install transformers datasets nltk rouge-score sacrebleu sentence-transformers sentencepiece fsspec==2025.3.2 bert-score --quiet
!pip install indic-nlp-library camel-tools

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.1/104.1 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.1/61.1 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m113.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m91.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m53.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.3 MB/s[0m eta [36m0

In [None]:
!pip install unbabel-comet
!pip install evaluate

In [None]:
# ─── IMPORTS ─────────────────────────────────────────────
import os
import json
import hashlib
import numpy as np
import torch
import pandas as pd
import os
from rouge_score import rouge_scorer
from nltk.translate.meteor_score import meteor_score
import sacrebleu
from sentence_transformers import util
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel
from bert_score import score as bert_score

from nltk.tokenize import word_tokenize
from indicnlp.tokenize.indic_tokenize import trivial_tokenize
from camel_tools.tokenizers.word import simple_word_tokenize
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from evaluate import load as evaluate_load

In [None]:
# Install required packages
import nltk
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('punkt_tab')


In [None]:
!pip install -U gdown
!gdown --folder 1QdxrYnelt9poi45eLT5xgObihDRb_OtV -O /content/103080

In [None]:
# Clone updated repo
!git clone https://github.com/DrishtiShrrrma/nueva.git

# Adjust base_dir to new path for prompt-based summaries
base_dir = "/content/nueva/prompt_analysis"



In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
# ─── CONFIGURATION ──────────────────────────────────────────────
backtranslation_dir = "backtranslations_cache"
os.makedirs(backtranslation_dir, exist_ok=True)


# Mapping for summary field name → Display name
json_field_to_lang = {
    "chinese":     "Chinese",
    "french":      "French",
    "spanish":     "Spanish",
    "portuguese":  "Portuguese",
    "arabic":      "Arabic",
    "hindi":       "Hindi"
}

# Mapping for Display name → M2M-100 language code (used for backtranslation)
bt_lang_code_map = {
    "Chinese":     "zh",
    "French":      "fr",
    "Spanish":     "es",
    "Portuguese":  "pt",
    "Arabic":      "ar",
    "Hindi":       "hi"
}


# Load llamax model
from transformers import AutoModelForCausalLM, AutoTokenizer


bt_model_name = "facebook/m2m100_418M"
bt_model_tag = "m2m100_418M"


is_encoder_decoder = "m2m" in bt_model_name.lower() or "opus" in bt_model_name.lower()

from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM, pipeline

bt_tokenizer = AutoTokenizer.from_pretrained(bt_model_name)
device = "cuda" if torch.cuda.is_available() else "cpu"

if is_encoder_decoder:
    bt_model = AutoModelForSeq2SeqLM.from_pretrained(bt_model_name).to(device)
else:
    bt_model = AutoModelForCausalLM.from_pretrained(bt_model_name, torch_dtype=torch.bfloat16 if "tower" in bt_model_name.lower() else None).to(device)
bt_model.eval()



# Caches
embedding_model     = None
bertscore_model     = None
bertscore_tokenizer = None
side_tokenizer      = None
side_model          = None

In [None]:
def sanitize_text(text: str) -> str:
    for token in [
        "<|end_of_text|>", "</s>", "<s>", "<|eot_id|>",
        "<|im_start|>user", "<|im_start|>assistant", "<|im_start|>system", "<|im_end|>",
        "<|user|>", "<|assistant|>", "<|system|>",
        "<|CHATBOT_TOKEN|>", "<|START_OF_TURN_TOKEN|>", "<|END_OF_TURN_TOKEN|>", "<BOS_TOKEN>"
    ]:
        text = text.replace(token, "")
    return text.strip()



def clean_translation_output(decoded: str):
    decoded = sanitize_text(decoded)

    # Handle known prompt-style artifacts
    for token in ["### Response:", "<|CHATBOT_TOKEN|>", "English:", "Translation:"]:
        if token in decoded:
            decoded = decoded.split(token, 1)[-1].strip()

    # Handle chat-style output
    if "<|im_start|> assistant" in decoded:
        decoded = decoded.split("<|im_start|> assistant", 1)[-1].strip()
    elif "<|im_start|>" in decoded:
        decoded = decoded.split("<|im_start|>", 1)[-1].strip()

    if "<|im_end|>" in decoded:
        decoded = decoded.split("<|im_end|>", 1)[0].strip()

    return decoded.strip()


In [None]:
def prompt_template(text, src_lang, tgt_lang):
    return f"Translate the following text from {src_lang} to {tgt_lang}:\n{text.strip()}\nTranslation:"

def bt_function(text, src_lang_name):
    src_lang_name = src_lang_name.strip()
    key = f"{src_lang_name}_{bt_model_tag}_{hashlib.md5(text.encode()).hexdigest()}"
    cache_file = os.path.join(backtranslation_dir, key + ".txt")
    if os.path.exists(cache_file):
        return open(cache_file, 'r', encoding='utf-8').read()

    # Normalize names
    src_lang_key = src_lang_name.lower()
    src_lang = json_field_to_lang.get(src_lang_key, src_lang_name)
    tgt_lang = "English"

    is_encoder_decoder = isinstance(bt_model, AutoModelForSeq2SeqLM)

    if is_encoder_decoder:
        tgt_lang_code = "en"
        # FIXED: Use capitalized version to match the map keys
        src_lang_code = bt_lang_code_map.get(src_lang_name.strip().capitalize())
        if not src_lang_code:
            return text
        bt_tokenizer.src_lang = src_lang_code
        inputs = bt_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        output_ids = bt_model.generate(**inputs, forced_bos_token_id=bt_tokenizer.get_lang_id(tgt_lang_code))
        output = bt_tokenizer.decode(output_ids[0], skip_special_tokens=True)
    else:
        prompt = prompt_template(text, src_lang, tgt_lang)
        if hasattr(bt_tokenizer, "apply_chat_template"):
            messages = [{"role": "user", "content": prompt}]
            prompt = bt_tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        inputs = bt_tokenizer(prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            output_ids = bt_model.generate(
                inputs.input_ids,
                max_new_tokens=512,
                pad_token_id=bt_tokenizer.eos_token_id,
                do_sample=False
            )
        decoded = bt_tokenizer.decode(output_ids[0], skip_special_tokens=False, clean_up_tokenization_spaces=False)
        output = clean_translation_output(decoded)

    output = output.replace("<|end_of_text|>", "").strip()
    with open(cache_file, 'w', encoding='utf-8') as f:
        f.write(output)

    return sanitize_text(output)





# ─── METRIC FUNCTIONS ──────────────────────────────────────────────────────
def compute_bertscore(refs, hyps):
    P, R, F1 = bert_score(
        hyps,
        refs,
        model_type="xlm-roberta-large",
        lang="en",
        rescale_with_baseline=False
    )
    return {
        "precision": round(P.mean().item(), 4),
        "recall":    round(R.mean().item(), 4),
        "f1":        round(F1.mean().item(), 4)
    }

In [None]:
# ─── MEAN POOLING (for SIDE) ───────────────────────────────────────────────
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]
    mask = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * mask, 1) / torch.clamp(mask.sum(1), min=1e-9)


def compute_side_score(codes, hyps):
    global side_tokenizer, side_model
    if side_model is None:
        checkpoint = "/content/103080"
        side_tokenizer = AutoTokenizer.from_pretrained(checkpoint)
        side_model     = AutoModel.from_pretrained(checkpoint)
        if torch.cuda.is_available(): side_model = side_model.cuda()
        side_model.eval()
    scores = []
    for code, summ in zip(codes, hyps):
        enc = side_tokenizer([code, summ], padding=True, truncation=True, return_tensors="pt")
        if torch.cuda.is_available(): enc = {k:v.cuda() for k,v in enc.items()}
        with torch.no_grad(): out = side_model(**enc)
        pooled = mean_pooling(out, enc['attention_mask'])
        normed = torch.nn.functional.normalize(pooled, p=2, dim=1)
        scores.append(util.pytorch_cos_sim(normed[0], normed[1]).item())
    return round(float(np.mean(scores)),4)

def compute_meteor_score(refs, hyps):
    sc = []
    for r,h in zip(refs, hyps):
        rt = word_tokenize(r.lower()); ht = word_tokenize(h.lower())
        sc.append(meteor_score([rt], ht))
    return round(float(np.mean(sc)),4)

def compute_chrf_score(refs, hyps):
  refs = [r.lower() for r in refs]
  hyps = [h.lower() for h in hyps]

  res = sacrebleu.corpus_chrf(hyps, [refs], word_order=2)
  return round(res.score / 100, 4)

## ----BLEU METRIC-----------
def compute_bleu_sacre(refs, hyps, lang_name):
    lang_name = lang_name.lower()

    # Define tokenizer per language
    tokenizer_map = {
        "chinese": "zh",
        "french": "13a",
        "portuguese": "13a",
        "arabic": "intl",
        "hindi": "intl",
        "spanish": "13a"
    }

    # Default tokenizer if language not found
    tokenizer = tokenizer_map.get(lang_name, "13a")

    # Compute BLEU-4
    score = sacrebleu.corpus_bleu(hyps, [refs], tokenize=tokenizer)
    return round(score.score / 100, 4) # Normalize to 0–1 like nltk



def tokenize(text, lang):
    lang = lang.lower()
    if lang == "chinese":
        return list(text.strip())
    elif lang == "arabic":
        return simple_word_tokenize(text)
    elif lang == "hindi":
        return trivial_tokenize(text, lang='hi')
    elif lang in ["french", "portuguese"]:
        return word_tokenize(text, language=lang)
    else:
        return text.strip().split()


def compute_bleu_nltk(refs_tokenized, hyps_tokenized):
    smoothie = SmoothingFunction().method1
    score = corpus_bleu(
        refs_tokenized,
        hyps_tokenized,
        weights=(0.25, 0.25, 0.25, 0.25),
        smoothing_function=smoothie
    )
    return round(score, 4)

## ----COMET METRIC-----------

comet = evaluate_load("comet", config_name="Unbabel/wmt22-comet-da")

def compute_comet_score(sources, references, hypotheses, batch_size=8, gpus=0):
    result = comet.compute(
        sources=sources,
        predictions=hypotheses,
        references=references,
    )
    per_example = result.get("scores", [])
    mean_score = float(np.mean(per_example)) if per_example else 0.0
    return round(mean_score, 4), per_example


# COMPUTE ALL METRICS
def compute_all_metrics(codes, refs, hyps, lang_name, code_lang):
    print(f"  Computing backtranslation-based metrics for {lang_name}...")
    bt = [bt_function(h, lang_name) for h in hyps]
    smoothie = SmoothingFunction().method4
    refs_tokenized = [[tokenize(r, lang_name)] for r in refs]
    hyps_tokenized = [tokenize(b, lang_name) for b in bt]

    # Compute BLEU using tokenized inputs
    bleu_nltk = compute_bleu_nltk(refs_tokenized, hyps_tokenized)

    bleu_sacre = compute_bleu_sacre(refs, bt, lang_name)
    bleu_diff = round(abs(bleu_nltk - bleu_sacre), 4)

    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    rl = [scorer.score(r, b)['rougeL'].fmeasure for r, b in zip(refs, bt)]
    comet_mean, comet_per_example = compute_comet_score(
        sources=hyps,
        references=refs,
        hypotheses=bt
    )

    return {
        "bleu4_nltk": round(bleu_nltk, 4),
        "bleu4_sacrebleu": bleu_sacre,
        "bleu4_diff": bleu_diff,
        "rougeL": round(np.mean(rl), 4),
        "meteor": compute_meteor_score(refs, bt),
        "chrf++": compute_chrf_score(refs, bt),
        "side_bt": compute_side_score(codes, bt),
        "comet_mean": comet_mean,
        "comet_per_example": comet_per_example
    }



In [None]:
# ─── BACKTRANSLATION SANITY TEST ─────────────────────────────────────────────
print("\n🔍 Running backtranslation test...")

sample_inputs = {
    "Chinese": "我喜欢自然语言处理。",
    "French": "J'aime le traitement automatique des langues.",
    "Arabic": "أنا أحب معالجة اللغة الطبيعية.",
}

for lang_name, input_text in sample_inputs.items():
    print(f"\n🌐 {lang_name} Input: {input_text}")
    output = bt_function(input_text, lang_name)
    print(f"📝 Backtranslated Output: {output}")


In [None]:
from collections import OrderedDict

def insert_backtranslations(data):
    for entry in data:
        new_entry = OrderedDict()
        for key, value in entry.items():
            new_entry[key] = value
            if key.startswith("summary_"):
                lang_code = key.replace("summary_", "").lower()
                if lang_code in json_field_to_lang:
                    lang_name = json_field_to_lang[lang_code]
                    gen = entry.get(key, "").strip()
                    if gen:
                        bt_key = f"bt_{lang_code}"
                        new_entry[bt_key] = sanitize_text(bt_function(gen, lang_name))
        entry.clear()
        entry.update(new_entry)



In [None]:
# ─── MAIN EVALUATION ─────────────────────────────────────
def run_evaluation():
    all_results = []
    base_dir = "/content/nueva/prompt_analysis"
    model_folders = ["codegemma", "gemma-2-9b-it", "qwen2.5coder", "deepseekcoder"]
    prompt_subdirs = ["prompt0"]

    bt_json_dir = "backtranslated_jsons"
    os.makedirs(bt_json_dir, exist_ok=True)

    for model_folder in model_folders:
        for prompt in prompt_subdirs:
            prompt_path = os.path.join(base_dir, model_folder, prompt)
            if not os.path.isdir(prompt_path):
                continue

            for fname in os.listdir(prompt_path):
                if not fname.endswith(".json") or not fname.startswith("all_languages_prompt"):
                    continue

                summary_path = os.path.join(prompt_path, fname)
                print(f"\nProcessing file: {summary_path}")

                with open(summary_path, encoding='utf-8') as f:
                    data = json.load(f)

                if not data:
                    print("  Skipped: empty file")
                    continue

                codes = [d.get("code", "") for d in data]
                refs = [sanitize_text(d.get("summary_english", d.get("docstring", ""))) for d in data]
                code_lang = data[0].get("language", "unknown")
                model_name = data[0].get("model_name", model_folder)
                prompt_used = data[0].get("prompt_used", prompt)

                for field, lang_name in json_field_to_lang.items():
                    hyp_key = f"summary_{field}"
                    if hyp_key not in data[0]:
                        print(f"  Skipping {lang_name} — {hyp_key} not found.")
                        continue

                    hyps = [sanitize_text(d.get(hyp_key, "")) for d in data]
                    if not any(hyps):
                        print(f"  Skipping {lang_name} — all summaries empty.")
                        continue

                    print(f"  → Evaluating summaries in {lang_name}...")
                    bert = compute_bertscore(refs, hyps)
                    side_original = compute_side_score(codes, hyps)
                    metrics = compute_all_metrics(codes, refs, hyps, lang_name, code_lang)
                    side_drop = round(side_original - metrics["side_bt"], 4)

                    for i, entry in enumerate(data):
                        code = entry.get("code", "")
                        sample_id = entry.get("id", f"{code_lang}_{i}")
                        full_func = entry.get("whole_func_string", code)
                        word_len = len(full_func.strip().split())

                        generated_summary = sanitize_text(entry.get(hyp_key, ""))
                        backtranslated_summary = sanitize_text(bt_function(generated_summary, lang_name))
                        reference_summary = sanitize_text(entry.get("summary_english", entry.get("docstring", "")))

                        result = {
                            "sample_id": sample_id,
                            "model_folder_name": model_folder,
                            "model_name": model_name,
                            "programming_language": code_lang,
                            "language": lang_name,
                            "prompt_used": prompt_used,
                            "bt_model": bt_model_tag,
                            "word_len": word_len,
                            "length_bucket": entry.get("length_bucket", "unknown"),
                            "reference_summary": reference_summary,
                            "generated_summary": generated_summary,
                            "backtranslated_summary": backtranslated_summary,
                            "bertscore_f1": bert["f1"],
                            "bertscore_precision": bert["precision"],
                            "bertscore_recall": bert["recall"],
                            "side_original": side_original,
                            "side_bt": metrics["side_bt"],
                            "side_drop": side_drop,
                            "bleu4_nltk": metrics["bleu4_nltk"],
                            "bleu4_sacrebleu": metrics["bleu4_sacrebleu"],
                            "bleu4_diff": metrics["bleu4_diff"],
                            "rougeL": metrics["rougeL"],
                            "meteor": metrics["meteor"],
                            "chrf++": metrics["chrf++"],
                            "comet_mean": metrics["comet_mean"],
                            "comet_example_score": metrics["comet_per_example"][i] if i < len(metrics["comet_per_example"]) else None
                        }

                        all_results.append(result)

                # Insert backtranslations into data and save
                insert_backtranslations(data)
                enhanced_fname = os.path.basename(summary_path).replace(".json", f"_with_bt_{bt_model_tag}.json")
                enhanced_fpath = os.path.join(bt_json_dir, enhanced_fname)
                with open(enhanced_fpath, "w", encoding='utf-8') as f:
                    json.dump(data, f, indent=2, ensure_ascii=False)

    # Save metric results
    os.makedirs(backtranslation_dir, exist_ok=True)
    json_out = os.path.join(backtranslation_dir, f"all_scores_bt_{bt_model_tag}.json")
    csv_out = os.path.join(backtranslation_dir, f"all_scores_bt_{bt_model_tag}.csv")

    with open(json_out, "w", encoding='utf-8') as f:
        json.dump(all_results, f, indent=2, ensure_ascii=False)

    pd.DataFrame(all_results).to_csv(csv_out, index=False)

    print(f"\nSaved results to:\n  JSON: {json_out}\n  CSV:  {csv_out}")



run_evaluation()


In [None]:
from google.colab import files

# Paths to the generated files
json_out = os.path.join(backtranslation_dir, f"all_scores_bt_{bt_model_tag}.json")
csv_out = os.path.join(backtranslation_dir, f"all_scores_bt_{bt_model_tag}.csv")

# Download the files
files.download(json_out)
files.download(csv_out)
