# Translation Adequacy via Centered-Euclidean KL (Colab)
This notebook evaluates how close **translated items (JP)** are to their **original items (EN)** using **open-weight multilingual embeddings** and a **centered-Euclidean approximation to KL**.

**Pipeline**
1. Install & import libraries
2. Load your scale items (EN + JP-LLM + JP-HUMAN)
3. Embed with one or more open models
4. ZCA-whiten per model and optionally align to a reference via orthogonal Procrustes
5. Compute item-level **KL≈½·||Δ||²**, cosine similarities, and **quantile indices** versus a negative baseline
6. Paired tests (LLM vs HUMAN), cross-model agreement (Spearman, ICC)
7. Visualize distributions and a 2D MDS map of versions

## 1) Setup

In [None]:
!pip -q install sentence-transformers umap-learn scikit-learn scipy matplotlib pandas numpy

## 2) Imports & helpers

In [None]:
import os, math, json, random
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
from pathlib import Path
from sentence_transformers import SentenceTransformer
from sklearn.manifold import MDS
from sklearn.metrics import pairwise_distances
from scipy.stats import spearmanr, ttest_rel, wilcoxon
import matplotlib.pyplot as plt

SEED = 2025
random.seed(SEED); np.random.seed(SEED)

def l2_normalize(x):
    n = np.linalg.norm(x, axis=-1, keepdims=True) + 1e-12
    return x / n

def cosine_sim(a, b):
    a = l2_normalize(a); b = l2_normalize(b)
    return (a*b).sum(-1)

def zca_whitener(X, eps=1e-6):
    mu = X.mean(0, keepdims=True)
    Xc = X - mu
    C = np.cov(Xc, rowvar=False)
    U, s, Vt = np.linalg.svd(C, full_matrices=False)
    W = (U @ np.diag(1.0/np.sqrt(s + eps)) @ U.T)
    return mu, W

def orthogonal_procrustes(X, Y):
    Xc = X - X.mean(0, keepdims=True)
    Yc = Y - Y.mean(0, keepdims=True)
    U, _, Vt = np.linalg.svd(Xc.T @ Yc, full_matrices=False)
    R = U @ Vt
    return R

def half_sqeuclid(a, b):
    d = a - b
    return 0.5 * float((d*d).sum())

def icc2_1(X):
    X = np.asarray(X, float)
    n, k = X.shape
    mean_rows = X.mean(axis=1, keepdims=True)
    mean_cols = X.mean(axis=0, keepdims=True)
    grand = X.mean()
    MSR = k * ((mean_rows - grand)**2).sum() / (n-1)
    MSC = n * ((mean_cols - grand)**2).sum() / (k-1)
    MSE = ((X - mean_rows - mean_cols + grand)**2).sum() / ((n-1)*(k-1))
    icc = (MSR - MSE) / (MSR + (k-1)*MSE + k*(MSC - MSE)/n)
    return float(icc)


## 3) Load your data
Upload or construct a DataFrame with **one scale** for now. Required columns:
- `item_id` (pairs EN↔JP by this id)
- `lang` in `{en, ja}`
- `trans_type` in `{llm, human}` (JP rows must indicate which)
- `text` (the item string)

If you don't have a CSV yet, run the sample cell below and edit inline.

In [None]:
USE_SAMPLE = True  # set to False and use the upload cell below

if USE_SAMPLE:
    data = [
        {"item_id":1,"lang":"en","trans_type":"origin","text":"On the whole I am satisfied with myself."},
        {"item_id":2,"lang":"en","trans_type":"origin","text":"At times I think I am no good at all."},
        {"item_id":3,"lang":"en","trans_type":"origin","text":"I feel that I have a number of good qualities."},
        {"item_id":4,"lang":"en","trans_type":"origin","text":"I am able to do things as well as most other people."},
        {"item_id":5,"lang":"en","trans_type":"origin","text":"I feel I do not have much to be proud of."},
        {"item_id":1,"lang":"ja","trans_type":"llm","text":"概して私は自分に満足している。"},
        {"item_id":2,"lang":"ja","trans_type":"llm","text":"時々、私はまったくダメだと思うことがある。"},
        {"item_id":3,"lang":"ja","trans_type":"llm","text":"私は多くの長所があると感じる。"},
        {"item_id":4,"lang":"ja","trans_type":"llm","text":"私は他の人と同じくらい物事をうまくこなせる。"},
        {"item_id":5,"lang":"ja","trans_type":"llm","text":"誇れるものはあまりないと感じる。"},
        {"item_id":1,"lang":"ja","trans_type":"human","text":"概して、私は自分自身に満足している。"},
        {"item_id":2,"lang":"ja","trans_type":"human","text":"時々、私は全く価値がないのではないかと思う。"},
        {"item_id":3,"lang":"ja","trans_type":"human","text":"私は自分には多くの良い点があると感じる。"},
        {"item_id":4,"lang":"ja","trans_type":"human","text":"私は多くの人と同程度に物事をうまく行える。"},
        {"item_id":5,"lang":"ja","trans_type":"human","text":"誇れるものがあまりないと感じる。"},
    ]
    df = pd.DataFrame(data)
else:
    # from google.colab import files
    # up = files.upload()
    # fname = list(up.keys())[0]
    # df = pd.read_csv(fname)
    raise RuntimeError("Set USE_SAMPLE=True or use upload.")

assert set(['item_id','lang','trans_type','text']).issubset(df.columns), "Missing required columns."
df = df.copy()
df['item_id'] = df['item_id'].astype(str)
df.head()


## 4) Choose models & anchors
- Add/remove open-weight models as needed.
- For whitening & alignment, we'll use **all rows** as the anchor distribution.


In [None]:
MODELS = [
    ("labse", "sentence-transformers/LaBSE"),
    ("mE5",   "intfloat/multilingual-e5-large"),
]
DEVICE = None  # e.g., 'cuda' on Colab if GPU is enabled
CACHE_DIR = Path("/content/emb_cache"); CACHE_DIR.mkdir(parents=True, exist_ok=True)


## 5) Embed all texts (with caching)

In [None]:
def embed_model(model_name, model_ckpt, texts, device=None, batch_size=64):
    cache_path = CACHE_DIR / f"{model_name}_cache.npy"
    cache_index = CACHE_DIR / f"{model_name}_index.json"
    if cache_path.exists() and cache_index.exists():
        with open(cache_index, "r") as f:
            idx_map = json.load(f)
        if idx_map.get("texts") == texts:
            E = np.load(cache_path)
            return E
    model = SentenceTransformer(model_ckpt, device=device) if device else SentenceTransformer(model_ckpt)
    E = model.encode(texts, convert_to_numpy=True, batch_size=batch_size, show_progress_bar=True, normalize_embeddings=False)
    np.save(cache_path, E.astype(np.float32))
    with open(cache_index, "w") as f:
        json.dump({"texts": texts}, f)
    return E

texts = df['text'].tolist()
model_vecs = {}
for name, ckpt in MODELS:
    print(f"Embedding with {name} ({ckpt}) ...")
    E = embed_model(name, ckpt, texts, device=DEVICE)
    model_vecs[name] = E
print("Done.")


## 6) ZCA-whiten per model and align to a reference

In [None]:
anchor_idx = np.arange(len(texts))
whitened = {}
centers = {}
whiteners = {}
for name, _ in MODELS:
    X = model_vecs[name]
    mu, W = zca_whitener(X[anchor_idx])
    Xw = (X - mu) @ W.T
    whitened[name] = Xw
    centers[name] = mu
    whiteners[name] = W

ref_name = MODELS[0][0]
Xref = whitened[ref_name]
aligned = {ref_name: Xref.copy()}
for name, _ in MODELS[1:]:
    Xm = whitened[name]
    R = orthogonal_procrustes(Xm[anchor_idx], Xref[anchor_idx])
    aligned[name] = Xm @ R
print("Whitening + alignment complete.")


## 7) Compute item-level metrics (KL≈½·||Δ||², cosine, quantiles)

In [None]:
en_df = df[df['lang']=="en"].copy()
jp_llm = df[(df['lang']=="ja") & (df['trans_type']=="llm")].copy()
jp_hum = df[(df['lang']=="ja") & (df['trans_type']=="human")].copy()

common_ids_llm = sorted(set(en_df['item_id']) & set(jp_llm['item_id']))
common_ids_hum = sorted(set(en_df['item_id']) & set(jp_hum['item_id']))

def make_index_map(sub):
    return {row.item_id: row.Index for row in sub.reset_index().itertuples()}

idx_en   = make_index_map(en_df)
idx_llm  = make_index_map(jp_llm)
idx_hum  = make_index_map(jp_hum)

records = []

def neg_baseline_quantiles(X, pos_pairs, n_draws=200):
    if not pos_pairs:
        return np.array([]), np.array([])
    en_idx = [i for i, _ in pos_pairs]
    jp_idx = [j for _, j in pos_pairs]
    sims_pos = cosine_sim(X[en_idx], X[jp_idx])
    neg_scores = []
    for _ in range(n_draws):
        perm = np.random.permutation(len(jp_idx))
        neg_scores.append(cosine_sim(X[en_idx], X[jp_idx][perm]))
    neg_scores = np.concatenate(neg_scores)
    neg_sorted = np.sort(neg_scores)
    ranks = np.searchsorted(neg_sorted, sims_pos, side="right")
    quant = ranks / max(1, len(neg_scores))
    return sims_pos, quant

for name, _ in MODELS:
    X = aligned[name]
    pos_llm = [(idx_en[iid], idx_llm[iid]) for iid in common_ids_llm]
    pos_hum = [(idx_en[iid], idx_hum[iid]) for iid in common_ids_hum]
    sims_llm, q_llm = neg_baseline_quantiles(X, pos_llm)
    sims_hum, q_hum = neg_baseline_quantiles(X, pos_hum)
    def kl_list(pairs):
        return np.array([0.5 * np.sum((X[ie] - X[ij])**2) for (ie, ij) in pairs], float)
    kl_llm = kl_list(pos_llm) if pos_llm else np.array([])
    kl_hum = kl_list(pos_hum) if pos_hum else np.array([])
    for k, iid in enumerate(common_ids_llm):
        records.append({"model": name, "item_id": iid, "trans_type":"llm",
                        "cosine": float(sims_llm[k]), "quantile": float(q_llm[k]), "kl_half": float(kl_llm[k])})
    for k, iid in enumerate(common_ids_hum):
        records.append({"model": name, "item_id": iid, "trans_type":"human",
                        "cosine": float(sims_hum[k]), "quantile": float(q_hum[k]), "kl_half": float(kl_hum[k])})

res = pd.DataFrame(records)
res


## 8) Paired tests: LLM vs Human (per model)

In [None]:
def paired_compare(df_model):
    t_llm = df_model[df_model['trans_type']=="llm"].set_index("item_id")
    t_hum = df_model[df_model['trans_type']=="human"].set_index("item_id")
    inter = sorted(set(t_llm.index) & set(t_hum.index))
    if not inter:
        return None
    A = t_llm.loc[inter]
    B = t_hum.loc[inter]
    out = {}
    for metric in ["cosine", "quantile", "kl_half"]:
        a = A[metric].values
        b = B[metric].values
        try:
            tval, tp = ttest_rel(a, b, nan_policy="omit")
        except Exception:
            tval, tp = (np.nan, np.nan)
        try:
            wstat, wp = wilcoxon(a, b, zero_method="wilcox", alternative="two-sided", method="auto")
        except Exception:
            wstat, wp = (np.nan, np.nan)
        out[metric] = {"n": len(inter),
                       "mean(llm)": float(np.mean(a)), "mean(human)": float(np.mean(b)),
                       "t": float(tval), "t_p": float(tp),
                       "w": float(wstat), "w_p": float(wp)}
    return out

summary_rows = []
for name, _ in MODELS:
    dfm = res[res['model']==name].copy()
    stats = paired_compare(dfm)
    if stats is None:
        continue
    for metric, d in stats.items():
        summary_rows.append({"model": name, "metric": metric, **d})
pd.DataFrame(summary_rows)


## 9) Cross-model agreement (Spearman, ICC) on item-level KL

In [None]:
tbl = res.pivot_table(index=["item_id","trans_type"], columns="model", values="kl_half")
tbl = tbl.dropna()
models = list(tbl.columns)
S = np.zeros((len(models), len(models)))
for i, m1 in enumerate(models):
    for j, m2 in enumerate(models):
        rho, _ = spearmanr(tbl[m1], tbl[m2])
        S[i,j] = rho
icc = icc2_1(tbl.values)
print("Models:", models)
print("Spearman agreement matrix:\n", np.round(S,3))
print("ICC(2,1) on KL across models:", round(float(icc), 3))


## 10) Visualizations

In [None]:
# (a) Distribution of KL per translation type for the first model
first_model = MODELS[0][0]
dfm = res[res['model']==first_model]
fig = plt.figure(figsize=(6,4))
vals_llm = dfm[dfm['trans_type']=="llm"]["kl_half"].values
vals_hum = dfm[dfm['trans_type']=="human"]["kl_half"].values
plt.boxplot([vals_llm, vals_hum], labels=["LLM", "Human"])
plt.ylabel("KL (≈ 0.5 * ||Δ||²)")
plt.title(f"KL by translation type — {first_model}")
plt.show()


In [None]:
# (b) MDS map of group centroids (EN, JP-LLM, JP-HUMAN) using aligned embeddings (first model)
first_model = MODELS[0][0]
X = aligned[first_model]
def centroid_mask(lang, trans_type=None):
    m = (df['lang']==lang).values
    if trans_type is not None:
        m &= (df['trans_type']==trans_type).values
    return m
centroids = []
labels = []
m_en = centroid_mask("en")
if m_en.sum() > 0:
    centroids.append(X[m_en].mean(0))
    labels.append("EN")
m_llm = centroid_mask("ja","llm")
if m_llm.sum() > 0:
    centroids.append(X[m_llm].mean(0))
    labels.append("JP-LLM")
m_hum = centroid_mask("ja","human")
if m_hum.sum() > 0:
    centroids.append(X[m_hum].mean(0))
    labels.append("JP-HUMAN")
C = np.vstack(centroids)
D = pairwise_distances(C, metric="euclidean")
mds = MDS(n_components=2, dissimilarity="precomputed", random_state=SEED)
Y = mds.fit_transform(D)
fig = plt.figure(figsize=(5,5))
plt.scatter(Y[:,0], Y[:,1])
for i, lab in enumerate(labels):
    plt.text(Y[i,0], Y[i,1], lab)
plt.title("MDS map of centroids (first model)")
plt.xlabel("Dim 1"); plt.ylabel("Dim 2")
plt.show()


## 11) Save outputs

In [None]:
out_dir = Path("/content/outputs"); out_dir.mkdir(parents=True, exist_ok=True)
res.to_csv(out_dir/"item_metrics.csv", index=False)
summary_df = pd.DataFrame(summary_rows)
summary_df.to_csv(out_dir/"paired_tests.csv", index=False)
tbl = res.pivot_table(index=["item_id","trans_type"], columns="model", values="kl_half").dropna()
models = list(tbl.columns)
S = np.zeros((len(models), len(models)))
for i, m1 in enumerate(models):
    for j, m2 in enumerate(models):
        rho, _ = spearmanr(tbl[m1], tbl[m2])
        S[i,j] = rho
np.savetxt(out_dir/"spearman_matrix.txt", S, fmt="%.4f")
with open(out_dir/"models.json","w") as f:
    json.dump(models, f)
print("Wrote:")
print(" - /content/outputs/item_metrics.csv")
print(" - /content/outputs/paired_tests.csv")
print(" - /content/outputs/spearman_matrix.txt")
print(" - /content/outputs/models.json")


## 12) Extending to many scales
- Add a `scale_id` column and **groupby** it to repeat the same logic per scale.
- Fit **whitening & Procrustes on a larger anchor set** spanning multiple scales for stability.
- Aggregate per-scale results and run a **random-effects meta-analysis** across scales.