## 1. Mount Google Drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 2. Install and Import Required Libraries


In [None]:
!pip install -q transformers sentencepiece wikidata spacy
!python -m spacy download en_core_web_trf

Collecting en-core-web-trf==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl (457.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting spacy-curated-transformers<1.0.0,>=0.2.2 (from en-core-web-trf==3.8.0)
  Downloading spacy_curated_transformers-0.3.1-py2.py3-none-any.whl.metadata (2.7 kB)
Collecting curated-transformers<0.2.0,>=0.1.0 (from spacy-curated-transformers<1.0.0,>=0.2.2->en-core-web-trf==3.8.0)
  Downloading curated_transformers-0.1.1-py2.py3-none-any.whl.metadata (965 bytes)
Collecting curated-tokenizers<0.1.0,>=0.0.9 (from spacy-curated-transformers<1.0.0,>=0.2.2->en-core-web-trf==3.8.0)
  Downloading curated_tokenizers-0.0.9-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.9 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.12.0->spacy-curated-transformers<

In [None]:
!pip install transformers sentencepiece tqdm



In [None]:
import re
import unicodedata
import string
import torch, spacy
from transformers import MBart50TokenizerFast, AutoModelForSeq2SeqLM
import json
import glob
import tqdm
import os
import spacy
from wikidata.client import Client

## 3. Define Language Mapping Function


In [None]:
def get_language_name(short_code):
    lang_map = {
        'ar': 'Arabic',
        'zh': 'Chinese (Traditional)',
        'fr': 'French',
        'de': 'German',
        'it': 'Italian',
        'ja': 'Japanese',
        'ko': 'Korean',
        'es': 'Spanish',
        'th': 'Thai',
        'tr': 'Turkish',
        'en': 'English',
        # Add more as needed
    }
    return lang_map.get(short_code, short_code)

In [None]:
SPACY_TO_WIKIDATA = {
    "PERSON": "Q5",                # human
    "NORP": "Q41710",              # ethnic group
    "FAC": "Q811979",              # architectural structure
    "ORG": "Q43229",               # organization
    "GPE": "Q82794",               # geopolitical entity
    "LOC": "Q2221906",             # geographical object
    "PRODUCT": "Q2424752",         # product
    "EVENT": "Q1190554",           # event
    "WORK_OF_ART": "Q838948",      # work of art
    "LAW": "Q820655",              # legal text
    "LANGUAGE": "Q34770",          # human language
}


In [None]:
client = Client()  # initialize once, globally

def wikidata_translate_entity(entity_name, target_lang='es', entity_type=None):
    """
    Look up a translation for a named entity from Wikidata.

    Args:
        entity_name (str): The entity name to search (in English).
        target_lang (str): The target language code for translation.
        entity_type (str): Optional spaCy entity type (e.g., 'PERSON', 'ORG').

    Returns:
        str or None: The translated label (or alias/fallback) if found.
    """
    search_url = "https://www.wikidata.org/w/api.php"
    params = {
        'action': 'wbsearchentities',
        'format': 'json',
        'language': 'en',
        'search': entity_name
    }

    try:
        response = requests.get(search_url, params=params, timeout=10)
        time.sleep(0.5)  # Rate limit to avoid 429

        if response.status_code != 200:
            print(f"[WARN] Wikidata API failed for '{entity_name}' with status {response.status_code}")
            return None

        data = response.json()
        search_results = data.get('search', [])
        if not search_results:
            return None

        # Optional: filter by expected Wikidata class
        expected_qid = SPACY_TO_WIKIDATA.get(entity_type)
        matched_entity = None

        for result in search_results:
            qid = result['id']
            try:
                entity = client.get(qid, load=True)
                if expected_qid:
                    # Check if instance_of (P31) matches expected class
                    instance_of = entity.get('P31', [])
                    instance_ids = [i.id for i in instance_of] if isinstance(instance_of, list) else [instance_of.id]
                    if expected_qid in instance_ids:
                        matched_entity = entity
                        break
                else:
                    matched_entity = entity
                    break
            except Exception as e:
                continue  # skip invalid entity

        if not matched_entity:
            return None

        # 1. Label in target language
        label = matched_entity.label.get(target_lang)
        if label:
            return label

        # 2. Fallback: label in English
        return matched_entity.label.get('en')

    except requests.exceptions.RequestException as e:
        print(f"[ERROR] HTTP error for '{entity_name}': {e}")
    except ValueError as e:
        print(f"[ERROR] JSON decode error for '{entity_name}': {e}")
    except Exception as e:
        print(f"[ERROR] Unexpected error for '{entity_name}': {e}")

    return None

In [None]:
def normalize_entity_name(entity_name):
    # Lowercase, strip whitespace
    name = entity_name.strip().lower()

    # Remove leading articles
    name = re.sub(r'^(the|a|an)\s+', '', name)

    # Remove punctuation
    name = name.translate(str.maketrans('', '', string.punctuation))

    # Normalize internal whitespace
    name = re.sub(r'\s+', ' ', name)

    # Capitalize title-style for matching Wikidata labels
    name = name.title()

    return name


In [None]:
normalize_entity_name(" .jhsvbdvjksvq")

'Jhsvbdvjksvq'

## 5. Initialize Hugging Face Model and Pipeline

In [None]:
# os.environ['HF_TOKEN']="" #token for hugging face

In [None]:
model_name = "facebook/nllb-200-3.3B"
tokenizer = MBart50TokenizerFast.from_pretrained(model_name, use_fast=False)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model = model.to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/564 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/4.85M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.3M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/3.55k [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'NllbTokenizer'. 
The class this function is called from is 'MBart50TokenizerFast'.


config.json:   0%|          | 0.00/808 [00:00<?, ?B/s]

pytorch_model.bin.index.json:   0%|          | 0.00/90.0k [00:00<?, ?B/s]

Fetching 3 files:   0%|          | 0/3 [00:00<?, ?it/s]

pytorch_model-00002-of-00003.bin:   0%|          | 0.00/8.55G [00:00<?, ?B/s]

pytorch_model-00001-of-00003.bin:   0%|          | 0.00/6.93G [00:00<?, ?B/s]

pytorch_model-00003-of-00003.bin:   0%|          | 0.00/2.10G [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/94.1k [00:00<?, ?B/s]

## 6. Prepare Input and Output Paths (Google Drive)


In [None]:
input_data_folder = "/content/drive/MyDrive/DL_project/data/references/validation/"
jsonl_files = glob.glob(f"{input_data_folder}/*.jsonl")

output_prediction_dir = os.path.join("/content/drive/MyDrive/DL_project/data/predictions", model_name.replace("/", "_"), "validation")
os.makedirs(output_prediction_dir, exist_ok=True)

In [None]:
lang_map = {
    'en': 'eng_Latn', 'fr': 'fra_Latn', 'de': 'deu_Latn', 'es': 'spa_Latn', 'ar': 'arb_Arab',
    'zh': 'zho_Hant', 'it': 'ita_Latn', 'ja': 'jpn_Jpan', 'ko': 'kor_Hang', 'tr': 'tur_Latn', 'th': 'tha_Thai'
}

In [None]:
def merge_adjacent_entities(doc):
    """Merge adjacent or stopword-separated entities into unified phrases."""
    merged_ents = []
    i = 0
    while i < len(doc.ents):
        start = doc.ents[i].start
        end = doc.ents[i].end
        j = i + 1

        # Attempt to merge adjacent or nearly-adjacent entities
        while j < len(doc.ents) and doc.ents[j].start <= end + 2:
            intervening = doc[end:doc.ents[j].start]
            if all(tok.is_stop or tok.is_punct for tok in intervening):
                end = doc.ents[j].end
                j += 1
            else:
                break

        span = doc[start:end]
        merged_ents.append(span.text.strip())
        i = j

    return list(set(merged_ents))  # avoid duplicates


In [None]:
entity_translation_cache = {}
nlp = spacy.load("en_core_web_trf")

def detect_and_translate_entities(text, target_lang):
    doc = nlp(text)

    # Step 1: Merge adjacent entities for better span detection
    # merged_entities = merge_adjacent_entities(doc)
    merged_entities = [ent.text for ent in doc.ents]

    translated_entities = {}
    for ent in merged_entities:
        norm = normalize_entity_name(ent)

        # Cache to avoid duplicate wikidata lookups
        if ent in entity_translation_cache:
            translation = entity_translation_cache[ent]
        else:
            translation = (
                wikidata_translate_entity(ent, target_lang) or
                wikidata_translate_entity(norm, target_lang)
            )
            entity_translation_cache[ent] = translation

        if translation:
            translated_entities[ent] = translation

    return translated_entities


In [None]:
trans = detect_and_translate_entities("Who played the lead role in The Mole – Undercover in North Korea?", "de")
trans

In [None]:
def translate_nllb(text, src_lang, tgt_lang):
    tokenizer.src_lang = src_lang
    encoded = tokenizer(text, return_tensors="pt").to(device)
    try:
        tgt_token_id = tokenizer.convert_tokens_to_ids(tgt_lang)
    except KeyError:
        raise ValueError(f"Unsupported target language token: {tgt_lang}")
    generated_tokens = model.generate(
        **encoded,
        forced_bos_token_id=tgt_token_id,
        # max_new_tokens=200,
        no_repeat_ngram_size=3
    )
    return tokenizer.decode(generated_tokens[0], skip_special_tokens=True)

In [None]:
# Combine entity detection + translation + inject into source + NLLB translate
no_translations = {}
def entity_aware_translate(text, src_lang, tgt_lang):
    translated_entities = detect_and_translate_entities(text, target_lang = tgt_lang)

    for ent, trans in translated_entities.items():
        if(trans == None):
            no_translations[ent] = trans
            continue
        text = text.replace(ent, trans)

    return translate_nllb(text, lang_map.get(src_lang), lang_map.get(tgt_lang))


## 7. Process and Translate JSONL Files


In [None]:
for file_path in jsonl_files:
    filename = os.path.basename(file_path)
    outfile_path = os.path.join(output_prediction_dir, filename)

    with open(file_path, 'r', encoding='utf-8') as f:
        data = [json.loads(line) for line in f]

    results = []
    pbar = tqdm.tqdm(total=len(data))

    for idx, record in enumerate(data, 1):
        id = record['id']
        source = record['source']
        source_locale = record['source_locale']
        target_locale = record['target_locale']
        source_language = get_language_name(source_locale)
        target_language = get_language_name(target_locale)

        try:
            translation = entity_aware_translate(source, source_locale, target_locale)
        except Exception as e:
            translation = ""
            print(f"Error translating {id}: {e}")

        results.append({
            "id": id,
            "source_language": source_language,
            "target_language": target_language,
            "text": source,
            "prediction": translation,
        })

        pbar.update(1)

        if idx % 10 == 0 or idx == len(data):
            with open(outfile_path, 'w', encoding='utf-8') as f:
                for res in results:
                    f.write(json.dumps(res, ensure_ascii=False) + '\n')

    pbar.close()
    print(f"Translations saved to {outfile_path}")

In [1]:
from framework import download_comet_model
comet_model = download_comet_model()

ModuleNotFoundError: No module named 'framework'

In [None]:
import os
import glob
import json
from framework import calculate_comet_scores, calculate_meta_score

model_name = "facebook_nllb_200_3.3b"
output_prediction_dir = os.path.join("data/predictions", model_name, "validation")
os.makedirs(output_prediction_dir, exist_ok=True)

input_data_folder = "data/references/validation"
jsonl_files = glob.glob(f"{input_data_folder}/*.jsonl")

def calculate_scores(template_id):
    scores_dir = os.path.join(output_prediction_dir, template_id, "scores")

    if not os.path.exists(scores_dir):
        os.makedirs(scores_dir, exist_ok=True)

    for file_path in jsonl_files:
        references_path = file_path
        filename = os.path.basename(file_path)
        predictions_path = os.path.join(output_prediction_dir, template_id, filename)

        comet_score = calculate_comet_scores(
            comet_model, 
            references_path, 
            predictions_path
        )

        correct_instances, total_instances, meta_score = calculate_meta_score(
            references_path,
            predictions_path)

        evaluation_results = {
            "correct_instances": correct_instances,
            "total_instances": total_instances,
            "comet_score": comet_score,
            "meta_score": meta_score
        }

        evaluation_output_path = os.path.join(scores_dir, f"{os.path.splitext(filename)[0]}.json")
        with open(evaluation_output_path, 'w', encoding='utf-8') as json_file:
            json.dump(evaluation_results, json_file, ensure_ascii=False, indent=4)

In [None]:
calculate_scores("zero_shot")
calculate_scores("rag-wikidata")
calculate_scores("rag-wikidata-entity-type-matching")