 # Import

In [None]:
from huggingface_hub import login
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM
import os
import json
import re
from tqdm import tqdm
import requests
from bs4 import BeautifulSoup

In [None]:
token = ""
login(token)

# Carica il modello e il tokenizer
model_name = "google/gemma-2-9b-it"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
ner_name = "Jean-Baptiste/camembert-ner"
ner_pipeline = pipeline(model=ner_name, aggregation_strategy='simple', device="cuda")

In [None]:
# load data
!kaggle datasets download -d andreazenotto/semeval-data
!unzip -q "semeval-data.zip" -d "semeval-data"

In [None]:
def load_data(file_name):
    data_dir = "semeval-data/references/validation"
    
    # Lista per salvare le informazioni
    data_list = []
    
    file_path = os.path.join(data_dir, file_name)
    
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            obj = json.loads(line.strip())  # Carica l'oggetto JSON
            id = obj.get("id", "")
            wikidata_id = obj.get("wikidata_id", "")
            input_text = obj.get("source", "")
            target_lang = obj.get("target_locale", "")
            data_list.append({"id": id, "wikidata_id": wikidata_id, "input_text": input_text, "target_lang": target_lang})
    
    return data_list[:50]

In [None]:
def get_entity_translation(entity_id, target_language):
    # URL per ottenere i dettagli dell'entità
    entity_url = f"https://www.wikidata.org/wiki/Special:EntityData/{entity_id}.json"

    response = requests.get(entity_url)
    if response.status_code != 200:
        raise Exception(f"Errore durante l'accesso ai dettagli dell'entità: {response.status_code}")

    entity_data = response.json()

    # Estrarre i dettagli dell'entità
    entity_labels = entity_data["entities"][next(iter(entity_data["entities"]))]["labels"]

    # Ottenere il nome dell'entità in inglese
    entity_name_en = entity_labels.get("en", {}).get("value")

    # Estrarre la traduzione nella lingua target
    translation = entity_labels.get(target_language, {}).get("value")

    return entity_name_en, translation

In [None]:
def get_entity_id(entity_name):
    # URL di ricerca sul sito Wikidata
    search_url = f"https://www.wikidata.org/w/index.php"
    params = {
        "search": entity_name,
        "title": "Special:Search",
        "profile": "advanced",  # Usa il profilo avanzato per i risultati più accurati
        "fulltext": 1,
        "ns0": 1,  # Cerca solo nello spazio principale (entità)
    }

    # Richiesta al sito Wikidata
    response = requests.get(search_url, params=params)
    if response.status_code != 200:
        raise Exception(f"Errore durante l'accesso alla pagina di ricerca: {response.status_code}")

    # Analisi del contenuto HTML
    soup = BeautifulSoup(response.text, "html.parser")

    # Trova il primo risultato
    first_result = soup.find("div", class_="mw-search-result-heading")
    if first_result:
        # Estrai il link associato al risultato
        link = first_result.find("a")["href"]
        # Estrai il codice dell'entità dal link
        entity_id = link.split("/")[-1]

        # URL per ottenere i dettagli dell'entità
        entity_url = f"https://www.wikidata.org/wiki/Special:EntityData/{entity_id}.json"

        response = requests.get(entity_url)
        if response.status_code != 200:
            raise Exception(f"Errore durante l'accesso ai dettagli dell'entità: {response.status_code}")

        return list(response.json()['entities'].keys())[0]

    else:
        # print("Nessun risultato trovato.")
        return None

In [None]:
def find_entity(sentence):
  entities = ner_pipeline(sentence)

  results = []
  for entity in entities:
      # Clean up extra spaces introduced during tokenization
      cleaned_word = entity['word'].replace(" ' ", "' ")
      results.append(cleaned_word)

  found_words = [word for word in results if word in sentence]
  return combine_words(found_words)

In [None]:
def combine_words(words):
    combined = []
    temp = ""

    for word in words:
        # Se il token inizia con '##', è una parte di una parola più lunga
        if word.startswith('##'):
            temp += word[2:]  # Aggiungi solo la parte dopo '##'
        else:
            if temp:  # Se c'era una parte precedentemente accumulata, aggiungila
                combined.append(temp)
            temp = word  # Inizia una nuova parola
    if temp:
        combined.append(temp)  # Aggiungi l'ultima parte accumulata

    return " ".join(combined)

In [None]:
def chat_prompt(input_text, target_language):

    language_map = {
        "ar": "Arabic",        # Arabo
        "de": "German",        # Tedesco
        "es": "Spanish",       # Spagnolo
        "fr": "French",        # Francese
        "it": "Italian",       # Italiano
        "ja": "Japanese",      # Giapponese
        "ko": "Korean",        # Coreano
        "th": "Thai",          # Thai
        "tr": "Turkish",       # Turco
        "zh": "Chinese"        # Cinese
    }

    entity_name = find_entity(input_text)
    entity_id = get_entity_id(entity_name)
    if entity_id:
        entity, entity_translated = get_entity_translation(entity_id, target_language)
    else:
        entity, entity_translated = None, None

    instruction = (
    f"You are a professional translator. Your task is to translate the following text from English to {language_map[target_language]}. "
    f"Please ensure that the translation is accurate, preserving the meaning, and pay special attention to the proper names, "
    f"such as the names of people, places, films, books, and other specific entities. Where applicable, please use the localized "
    f"versions of these proper names in {language_map[target_language]} if they exist. Do not add any extra explanation or notes, just provide "
    f"the translation. "
    + (f"Pay special attention to correctly translate the entity '{entity}' into the entity '{entity_translated}'. " if entity is not None and entity_translated is not None else "")
    + "After each translation write 'EOTR' word."
)

    prompt = f"{instruction}\n\nTranslate the following text from English to {language_map[target_language]}: {input_text}\n\nTranslation:"

    chat = [{ "role": "user", "content": prompt }]

    prompt = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)

    return prompt

In [None]:
def extract_translation(model_output) -> str:
    if "Translation:" in model_output:
      translation_part = model_output.split("model", 1)[1].strip()  # Prendi tutto dopo "model"
      translated_text = translation_part.split("EOTR")[0].strip()  # Prendi solo la prima traduzione
      return translated_text
    # Se "Translation:" non è trovato
    print(f"Errore: 'Translation:' non trovato in: {model_output}")
    return ""

In [None]:
def translate_data(data_list, model, tokenizer):
    # Mappa per associare target_language ai file
    language_file_map = {
        "ar": "ar_AE.jsonl",
        "de": "de_DE.jsonl",
        "es": "es_ES.jsonl",
        "fr": "fr_FR.jsonl",
        "it": "it_IT.jsonl",
        "ja": "ja_JP.jsonl",
        "ko": "ko_KR.jsonl",
        "th": "th_TH.jsonl",
        "tr": "tr_TR.jsonl",
        "zh": "zh_TW.jsonl"
    }

    language_map = {
        "ar": "Arabic",        # Arabo
        "de": "German",        # Tedesco
        "es": "Spanish",       # Spagnolo
        "fr": "French",        # Francese
        "it": "Italian",       # Italiano
        "ja": "Japanese",      # Giapponese
        "ko": "Korean",        # Coreano
        "th": "Thai",          # Thai
        "tr": "Turkish",       # Turco
        "zh": "Chinese"        # Cinese
    }

    # Definisci il nome della cartella dove i file verranno salvati
    output_folder = "translations_by_language"  # Nome della cartella predefinita
    os.makedirs(output_folder, exist_ok=True)  # Crea la cartella se non esiste

    # Dizionario per tenere traccia dei file aperti
    file_handles = {}

    try:
        for item in tqdm(data_list, desc="Generation"):
            obj_id = item.get("id", "")  # Usa l'id dell'oggetto
            wikidata_id = item.get("wikidata_id", "")
            input_text = item.get("input_text", "")
            target_language = item.get("target_lang", "")

            # Crea il prompt usando la funzione get_translation
            prompt = chat_prompt(input_text, target_language)
            # if use gold data
            # prompt = chat_prompt(input_text, wikidata_id, target_language)

            # Prepara il prompt per il modello
            inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)

            device = model.device  # Ottieni la device del modello
            inputs = {key: val.to(device) for key, val in inputs.items()}

            # Genera l'output del modello
            outputs = model.generate(**inputs, max_new_tokens=50)

            # Decodifica l'output per ottenere il testo
            prediction = tokenizer.decode(outputs[0], skip_special_tokens=True)

            translated_text = extract_translation(prediction)
            print(translated_text)

            # Crea l'oggetto risultato
            result = {
                "id": obj_id,  # Usa l'id reale dell'oggetto
                "source_language": "English",
                "target_language": language_map[target_language],
                "text": input_text,
                "prediction": translated_text
            }

            # Stampa il risultato finale per il ciclo
            # print(f"Risultato per il ciclo {obj_id}:\n{result}\n")

            # Ottieni il file corrispondente alla lingua target
            if target_language in language_file_map:
                file_name = language_file_map[target_language]
                file_path = os.path.join(output_folder, file_name)

                # Apri il file una sola volta e riutilizza il handle
                if target_language not in file_handles:
                    file_handles[target_language] = open(file_path, "a", encoding="utf-8")

                # Scrivi il risultato nel file in formato JSONL
                json.dump(result, file_handles[target_language], ensure_ascii=False)
                file_handles[target_language].write("\n")

    finally:
        # Chiudi tutti i file aperti
        for handle in file_handles.values():
            handle.close()

In [None]:
languages = [
    "ar_AE.jsonl",
    "de_DE.jsonl",
    "es_ES.jsonl",
    "fr_FR.jsonl",
    "it_IT.jsonl",
    "ja_JP.jsonl",
    "ko_KR.jsonl",
    "th_TH.jsonl",
    "tr_TR.jsonl",
    "zh_TW.jsonl"
]


for lang in languages:
    data_list = load_data(lang)
    translate_data(data_list, model, tokenizer)