In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
# ============================================================
# FINAL TEAM TRANSLATION SCRIPT (DRIVE-SAFE + NO UNNEEDED API)
# - Copy-only for TEACHER_LANGS (NO API calls)
# - Translate only selected languages for THIS run
# - Team sharding: 5 people + optional 2 sessions/person (2 notebooks/keys)
# - High-quality prompts (similar to your old logic)
# - Marker protection + validation (no new @/#/URLs)
# - Batched translation (faster)
# - DURABLE checkpointing to Google Drive (atomic write + fsync)
# - Resume-safe (skips already translated row_ids in shard file)
# ============================================================

import os
import re
import json
import time
import hashlib
from typing import List, Dict, Any, Tuple

import pandas as pd
from tqdm import tqdm

from openai import OpenAI

# =========================
# 1) CONFIG
# =========================

# Input master (original)
MASTER_IN = "/content/drive/MyDrive/master_dataset.csv"

# Output directory for shards (each run writes its own file)
OUT_DIR = "/content/drive/MyDrive/translation_shards"
os.makedirs(OUT_DIR, exist_ok=True)

# Output column name
OUT_COL = "text_en"

# Master columns
TEXT_COL = "text"
LANG_COL = "lang"
SPLIT_COL = "split"

# Translate only train by default (recommended)
TRANSLATE_SPLITS = {"train"}  # change to {"train","dev"} if you really want dev too

# Teacher languages: copy-only (no API calls)
TEACHER_LANGS = {"eng", "spa", "deu", "rus", "tur", "pol", "arb"}

# Language names (for prompt)
LANG_MAP = {
    "amh": "Amharic", "arb": "Arabic", "ben": "Bengali", "deu": "German", "eng": "English",
    "fas": "Persian", "hau": "Hausa", "hin": "Hindi", "ita": "Italian", "khm": "Khmer",
    "mya": "Burmese", "nep": "Nepali", "ori": "Odia", "pan": "Punjabi", "pol": "Polish",
    "rus": "Russian", "spa": "Spanish", "swa": "Swahili", "tel": "Telugu", "tur": "Turkish",
    "urd": "Urdu", "zho": "Chinese"
}

# Team sharding
N_PEOPLE = 5
PERSON_ID = 2          # <-- each person sets 0..4

# Two sessions per person:
# Run the SAME script in 2 separate notebooks, each with a different API key:
# - Notebook A: SESSION_ID=0
# - Notebook B: SESSION_ID=1
SESSION_ID = 1         # <-- set to 0 or 1 (or None if you don't want 2 sessions)
N_SESSIONS_PER_PERSON = 2

# Which languages THIS run should translate (non-teacher langs recommended)
PERSON_LANGS = {
  0: ["urd", "hin", "hau"],
  1: ["khm", "nep", "tel"],
  2: ["ben", "amh", "ita"],
  3: ["swa", "ori", "pan"],
  4: ["zho", "mya", "fas"],
}
LANGS_THIS_RUN = PERSON_LANGS[PERSON_ID]


# Test mode
TEST_MODE = False
TEST_SAMPLES_PER_LANG = 5

# OpenAI
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "PRIVATE")
MODEL = "gpt-4o-mini"

# Batching + saving
BATCH_SIZE = 30      # 20–50 good. Use 10 for debugging saving
SAVE_EVERY = 10      # save every N batches. Use 1 for debugging saving

# Retry
MAX_RETRIES = 4
BASE_BACKOFF = 2.0

In [3]:
# =========================
# 2) MARKER PROTECTION (same spirit as your old code)
# =========================

MARKER_PATTERNS = [
    r'@\w+',              # mentions
    r'#\w+',              # hashtags
    r'https?://\S+',      # URLs
    r'www\.\S+',          # URLs
]

def extract_markers(text: str) -> set:
    out = set()
    for pat in MARKER_PATTERNS:
        out.update(re.findall(pat, str(text)))
    return out

def stable_row_id(lang: str, text: str) -> str:
    h = hashlib.sha256((str(lang) + "||" + str(text)).encode("utf-8")).hexdigest()
    return h[:16]

def validate_translation(seed_text: str, translated: str) -> Tuple[bool, str]:
    if not isinstance(translated, str) or len(translated.strip()) < 2:
        return False, "empty translation"

    # Marker constraint: translated must not introduce new markers
    seed_markers = {m.lower() for m in extract_markers(seed_text)}
    seed_markers.discard("@user")  # allow @USER normalization
    out_markers = {m.lower() for m in extract_markers(translated)}
    out_markers.discard("@user")

    extra = out_markers - seed_markers
    if extra:
        return False, f"introduced new marker(s): {list(extra)[:3]}"

    return True, "ok"

In [5]:
# =========================
# 3) PROMPTS (high-quality, context-aware)
# =========================

SYSTEM_PROMPT = (
    "You are a professional human translator and localization expert.\n"
    "Translate social media text into natural, fluent English while preserving meaning, tone, and pragmatic intent.\n"
    "Use culturally appropriate equivalents; do NOT translate word-by-word if it harms meaning.\n"
    "Do NOT add explanations. Return ONLY valid JSON."
)

USER_TEMPLATE_SINGLE = """Translate the following social media post into English.

Source language: {source_lang_name} ({lang_code})

TEXT:
\"\"\"{text}\"\"\"

CRITICAL REQUIREMENTS:
- Preserve the original meaning, stance, and tone (including sarcasm, slang, dialect, irony).
- Do NOT add new information.
- Do NOT censor or soften content; translate faithfully.
- Preserve ALL @mentions, #hashtags, and URLs EXACTLY as in the original (do not introduce new ones).
- Keep formatting minimal and readable.

Output JSON only:
{{
  "translation": "..."
}}
"""

# Batched version: returns list of translations aligned with inputs
USER_TEMPLATE_BATCH = """Translate the following social media posts into English.

Source language: {source_lang_name} ({lang_code})

CRITICAL REQUIREMENTS (apply to every item):
- Preserve meaning, stance, and tone (sarcasm/slang/dialect/irony).
- Do NOT add new information.
- Do NOT censor/soften content; translate faithfully.
- Preserve ALL @mentions, #hashtags, and URLs EXACTLY as in the original (do not introduce new ones).
- Return EXACTLY one translation per input, same order.

INPUTS (JSON array of strings):
{inputs_json}

Output JSON only:
{{
  "translations": ["...", "..."]
}}
"""

# =========================
# 4) SHARDING (deterministic)
# =========================

def assign_person(row_id_hex: str) -> int:
    return int(row_id_hex, 16) % N_PEOPLE

def assign_session(row_id_hex: str) -> int:
    return int(row_id_hex, 16) % N_SESSIONS_PER_PERSON

def chunk_list(xs: List[Any], n: int) -> List[List[Any]]:
    return [xs[i:i+n] for i in range(0, len(xs), n)]

# =========================
# 5) DURABLE SAVE (atomic + fsync)
# =========================

def flush_shard(rows: List[Dict[str, Any]], path: str) -> None:
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    pd.DataFrame(rows).to_csv(tmp, index=False)

    # force flush to disk
    with open(tmp, "rb") as f:
        os.fsync(f.fileno())

    os.replace(tmp, path)

# =========================
# 6) OPENAI CALL (batch + retry + validation)
# =========================

def call_openai_translate_batch(
    client: OpenAI,
    lang_code: str,
    seed_texts: List[str],
) -> List[str]:
    lang_name = LANG_MAP.get(lang_code, "Unknown")
    user_prompt = USER_TEMPLATE_BATCH.format(
        source_lang_name=lang_name,
        lang_code=lang_code,
        inputs_json=json.dumps(seed_texts, ensure_ascii=False)
    )

    last_err = None
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = client.chat.completions.create(
                model=MODEL,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": user_prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2
            )
            data = json.loads(resp.choices[0].message.content)
            outs = data.get("translations", None)

            if not isinstance(outs, list) or len(outs) != len(seed_texts):
                raise ValueError(f"Bad JSON: expected translations list of len {len(seed_texts)}")

            # Validate each translation (marker protection)
            cleaned = []
            for seed, tr in zip(seed_texts, outs):
                ok, reason = validate_translation(seed, tr)
                if not ok:
                    raise ValueError(f"Validation failed: {reason}")
                cleaned.append(str(tr).strip())

            return cleaned

        except Exception as e:
            last_err = e
            if attempt == MAX_RETRIES:
                break
            time.sleep(BASE_BACKOFF * (2 ** (attempt - 1)) + 0.25 * attempt)

    raise RuntimeError(f"Translation failed after retries: {last_err}")

def safe_read_csv(path: str) -> pd.DataFrame:
    if (not os.path.exists(path)) or os.path.getsize(path) < 5:
        # empty or non-existent -> treat as no previous progress
        return pd.DataFrame()
    return pd.read_csv(path)
# =========================
# 7) MAIN
# =========================

def main():
    # 1) Load
    df = pd.read_csv(MASTER_IN)

    # 2) Ensure columns
    if OUT_COL not in df.columns:
        df[OUT_COL] = pd.NA

    if "row_id" not in df.columns:
        df["row_id"] = [stable_row_id(l, t) for l, t in zip(df[LANG_COL], df[TEXT_COL])]

    # 3) Copy-only teacher langs (NO API)
    teacher_mask = df[LANG_COL].isin(TEACHER_LANGS)
    df.loc[teacher_mask & df[OUT_COL].isna(), OUT_COL] = df.loc[teacher_mask & df[OUT_COL].isna(), TEXT_COL]

    # 4) Work mask: chosen splits + langs + missing + not teacher
    split_mask = df[SPLIT_COL].astype(str).isin(TRANSLATE_SPLITS)
    lang_mask = df[LANG_COL].isin(LANGS_THIS_RUN)
    need_mask = split_mask & lang_mask & (~teacher_mask) & (df[OUT_COL].isna())

    # 5) Apply team sharding (ONLY if PERSON_ID / SESSION_ID are set)
    idx = df.index[need_mask].tolist()

    if SESSION_ID is not None:
        idx = [i for i in idx if assign_session(str(df.at[i, "row_id"])) == SESSION_ID]


    # 6) TEST_MODE sampling
    if TEST_MODE:
        sampled = []
        for lg in LANGS_THIS_RUN:
            candidates = [i for i in idx if df.at[i, LANG_COL] == lg]
            sampled.extend(candidates[:TEST_SAMPLES_PER_LANG])
        idx = sampled

    # 7) Output path for shard (safe even when PERSON_ID/SESSION_ID is None)
    person_tag = f"person{PERSON_ID}" if PERSON_ID is not None else "personALL"
    sess_tag   = f"S{SESSION_ID}"     if SESSION_ID is not None else "SALL"

    out_path = os.path.join(
        OUT_DIR,
        f"shard_{person_tag}_{sess_tag}_" + "_".join(LANGS_THIS_RUN) + ".csv"
    )
    print("Shard output path:", out_path)


    # 8) Resume support (robust to empty/partial files)
    done_row_ids = set()
    shard_rows: List[Dict[str, Any]] = []

    prev = safe_read_csv(out_path)
    if not prev.empty:
        # only accept if it actually has the expected columns
        if "row_id" in prev.columns and OUT_COL in prev.columns:
            done_row_ids = set(prev.loc[prev[OUT_COL].notna(), "row_id"].astype(str).tolist())
        shard_rows = prev.to_dict("records")
        print("Resume: already translated in shard:", len(done_row_ids))
    else:
        print("Resume: no valid previous shard (file missing or empty). Starting fresh.")

    idx = [i for i in idx if str(df.at[i, "row_id"]) not in done_row_ids]
    print("Total master rows:", len(df))
    print("Teacher rows (copy-only) missing text_en:", int((teacher_mask & df[OUT_COL].isna()).sum()))
    print("Rows assigned to THIS run (after resume):", len(idx))

    if len(idx) == 0:
        # still save shard (might contain previous work)
        if shard_rows:
            flush_shard(shard_rows, out_path)
            print(f"[FINAL SAVE] rows={len(shard_rows)} bytes={os.path.getsize(out_path)} mtime={time.ctime(os.path.getmtime(out_path))}")
        else:
            print("[FINAL SAVE] skipped (no rows)")


    # 9) Group by language (batch per language)
    groups: Dict[str, List[int]] = {}
    for i in idx:
        lg = str(df.at[i, LANG_COL])
        groups.setdefault(lg, []).append(i)

    # 10) Client
    client = OpenAI(api_key=OPENAI_API_KEY)

    # 11) Translate
    total = sum(len(v) for v in groups.values())
    pbar = tqdm(total=total, desc="Translating")
    batches_since_save = 0

    failures = []
    try:
        for lg, rows in groups.items():
            for chunk in chunk_list(rows, BATCH_SIZE):
                seed_texts = [str(df.at[i, TEXT_COL]) for i in chunk]

                try:
                    outs = call_openai_translate_batch(client, lg, seed_texts)
                except Exception as e:
                    # fall back to single-item calls (debug + salvage)
                    outs = []
                    for one_i, one_seed in zip(chunk, seed_texts):
                        try:
                            # single prompt fallback
                            lang_name = LANG_MAP.get(lg, "Unknown")
                            user_prompt = USER_TEMPLATE_SINGLE.format(
                                source_lang_name=lang_name,
                                lang_code=lg,
                                text=one_seed
                            )
                            resp = client.chat.completions.create(
                                model=MODEL,
                                messages=[
                                    {"role": "system", "content": SYSTEM_PROMPT},
                                    {"role": "user", "content": user_prompt},
                                ],
                                response_format={"type": "json_object"},
                                temperature=0.2
                            )
                            data = json.loads(resp.choices[0].message.content)
                            tr = data.get("translation", "")
                            ok, reason = validate_translation(one_seed, tr)
                            if not ok:
                                raise ValueError(reason)
                            outs.append(str(tr).strip())
                        except Exception as e2:
                            failures.append({
                                "row_id": str(df.at[one_i, "row_id"]),
                                "lang": lg,
                                "error": str(e2),
                                "seed_text": one_seed[:500]
                            })
                            outs.append(None)

                # write successes
                for i, tr in zip(chunk, outs):
                    if tr is None:
                        continue
                    df.at[i, OUT_COL] = tr
                    shard_rows.append({
                        "row_id": str(df.at[i, "row_id"]),
                        LANG_COL: str(df.at[i, LANG_COL]),
                        SPLIT_COL: str(df.at[i, SPLIT_COL]),
                        TEXT_COL: str(df.at[i, TEXT_COL]),
                        OUT_COL: tr
                    })

                pbar.update(len(chunk))
                batches_since_save += 1

                if batches_since_save >= SAVE_EVERY:
                    if shard_rows:
                        flush_shard(shard_rows, out_path)
                        print(f"[SAVED] rows={len(shard_rows)} bytes={os.path.getsize(out_path)} mtime={time.ctime(os.path.getmtime(out_path))}")
                    else:
                        print("[SAVED] skipped (no rows yet)")
                    batches_since_save = 0


        # final flush
        flush_shard(shard_rows, out_path)
        print(f"[FINAL SAVE] rows={len(shard_rows)} bytes={os.path.getsize(out_path)} mtime={time.ctime(os.path.getmtime(out_path))}")

        # failures log
        if failures:
            fail_path = out_path.replace(".csv", "_failures.jsonl")
            with open(fail_path, "a", encoding="utf-8") as f:
                for r in failures:
                    f.write(json.dumps(r, ensure_ascii=False) + "\n")
            print("Failures logged to:", fail_path, "count=", len(failures))

    finally:
        pbar.close()

    # TEST_MODE preview
    if TEST_MODE:
        print("\n=== TEST MODE PREVIEW ===")
        if shard_rows:
            preview = pd.DataFrame(shard_rows).tail(10)
            print(preview[[LANG_COL, TEXT_COL, OUT_COL]].to_string(index=False))
        else:
            print("(empty) No successful translations generated.")

    print("DONE. Shard saved:", out_path)


if __name__ == "__main__":
    main()

Shard output path: /content/drive/MyDrive/translation_shards/shard_person2_S1_ben_amh_ita.csv
Resume: no valid previous shard (file missing or empty). Starting fresh.
Total master rows: 77368
Teacher rows (copy-only) missing text_en: 0
Rows assigned to THIS run (after resume): 4946


Translating:   6%|▌         | 300/4946 [21:41<5:52:33,  4.55s/it]

[SAVED] rows=263 bytes=86329 mtime=Sun Jan  4 14:51:30 2026


Translating:  12%|█▏        | 600/4946 [44:27<5:07:33,  4.25s/it]

[SAVED] rows=525 bytes=172495 mtime=Sun Jan  4 15:14:17 2026


Translating:  18%|█▊        | 900/4946 [1:07:43<5:29:09,  4.88s/it]

[SAVED] rows=788 bytes=256921 mtime=Sun Jan  4 15:37:32 2026


Translating:  24%|██▍       | 1200/4946 [1:32:03<5:17:49,  5.09s/it]

[SAVED] rows=1048 bytes=340089 mtime=Sun Jan  4 16:01:52 2026


Translating:  30%|███       | 1500/4946 [1:55:48<4:11:58,  4.39s/it]

[SAVED] rows=1316 bytes=426807 mtime=Sun Jan  4 16:25:37 2026


Translating:  36%|███▋      | 1799/4946 [2:17:00<3:29:05,  3.99s/it]

[SAVED] rows=1577 bytes=502715 mtime=Sun Jan  4 16:46:50 2026


Translating:  42%|████▏     | 2099/4946 [2:38:29<3:31:06,  4.45s/it]

[SAVED] rows=1845 bytes=604552 mtime=Sun Jan  4 17:08:18 2026


Translating:  49%|████▊     | 2399/4946 [2:49:52<1:51:44,  2.63s/it]

[SAVED] rows=2137 bytes=707465 mtime=Sun Jan  4 17:19:42 2026


Translating:  55%|█████▍    | 2699/4946 [3:07:59<2:26:36,  3.91s/it]

[SAVED] rows=2419 bytes=808706 mtime=Sun Jan  4 17:37:48 2026


Translating:  61%|██████    | 2999/4946 [3:26:16<1:40:03,  3.08s/it]

[SAVED] rows=2702 bytes=908031 mtime=Sun Jan  4 17:56:05 2026


Translating:  66%|██████▋   | 3285/4946 [3:40:48<1:20:15,  2.90s/it]

[SAVED] rows=2978 bytes=1011514 mtime=Sun Jan  4 18:10:37 2026


Translating:  72%|███████▏  | 3585/4946 [3:44:15<16:06,  1.41it/s]

[SAVED] rows=3278 bytes=1200761 mtime=Sun Jan  4 18:14:04 2026


Translating:  79%|███████▊  | 3885/4946 [3:47:46<12:45,  1.39it/s]

[SAVED] rows=3578 bytes=1387950 mtime=Sun Jan  4 18:17:36 2026


Translating:  85%|████████▍ | 4185/4946 [3:51:46<09:38,  1.31it/s]

[SAVED] rows=3878 bytes=1583007 mtime=Sun Jan  4 18:21:35 2026


Translating:  91%|█████████ | 4485/4946 [3:55:44<06:31,  1.18it/s]

[SAVED] rows=4178 bytes=1773867 mtime=Sun Jan  4 18:25:33 2026


Translating:  97%|█████████▋| 4785/4946 [4:00:19<02:49,  1.05s/it]

[SAVED] rows=4478 bytes=1977375 mtime=Sun Jan  4 18:30:09 2026


Translating: 100%|██████████| 4946/4946 [4:03:14<00:00,  2.95s/it]

[FINAL SAVE] rows=4639 bytes=2087541 mtime=Sun Jan  4 18:33:03 2026
Failures logged to: /content/drive/MyDrive/translation_shards/shard_person2_S1_ben_amh_ita_failures.jsonl count= 307
DONE. Shard saved: /content/drive/MyDrive/translation_shards/shard_person2_S1_ben_amh_ita.csv



