In [None]:
import sys
print(sys.executable)

In [None]:
# Import

import fasttext
from pathlib import Path
from collections import Counter

In [None]:
# --> Get model params

def get_supervised_params_from_model(m):
    """Extract hyperparameters so we can reuse them."""
    a = m.f.getArgs()
    return {
        "lr": a.lr,
        "dim": a.dim,
        "ws": a.ws,
        "epoch": a.epoch,
        "minCount": a.minCount,
        "minn": a.minn,
        "maxn": a.maxn,
        "neg": a.neg,
        "wordNgrams": a.wordNgrams,
        "bucket": a.bucket,
        "lrUpdateRate": a.lrUpdateRate,
        "t": a.t,
        "loss": a.loss
,   
    }

In [None]:
### Helpers
import re
from collections import Counter

def to_ft(lbl: str) -> str:
    return lbl if lbl.startswith("__label__") else "__label__" + lbl

ACTIONABLE = {to_ft(x) for x in ["ar_ma","ar_msa","ar_ma_latin","en","fr","es","it"]}
FALLBACK_AR = to_ft("other_ar")
FALLBACK_LG = to_ft("other_lg")

_ARABIC_RE = re.compile(r"[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]")

def fallback_label(text: str) -> str:
    return FALLBACK_AR if _ARABIC_RE.search(text) else FALLBACK_LG

def gate_pred(text: str, top1_lbl: str, top1_p: float, top2_p: float | None,
              tau: float = 0.90, delta: float = 0.00) -> str:
    # If model already predicts fallback, keep it
    if top1_lbl in {FALLBACK_AR, FALLBACK_LG}:
        return top1_lbl

    # If model predicts actionable but not confident enough (or too close to #2), abstain
    if top1_lbl in ACTIONABLE:
        if top1_p < tau:
            return fallback_label(text)
        if top2_p is not None and (top1_p - top2_p) < delta:
            return fallback_label(text)
        return top1_lbl

    # Any unexpected label -> fallback
    return fallback_label(text)

### Helpers
def evaluate_selective(model, path, tau=0.90, delta=0.00):
    y_true, texts = read_ft_file(path)

    labels = sorted(set(y_true))

    tp = Counter()
    fp = Counter()
    fn = Counter()
    conf = Counter()

    # selective metrics counters
    pred_A = 0
    correct_pred_A = 0
    a2a_wrong = 0

    # confidence diagnostics
    p_top1_accept = []
    p_top1_abstain = []
    margin_accept = []
    margin_abstain = []

    for yt, text in zip(y_true, texts):
        # IMPORTANT: k=2 to get probabilities + margin
        pred_labels, pred_probs = model.predict(text, k=2)

        top1_lbl = pred_labels[0]
        top1_p = float(pred_probs[0])
        top2_p = float(pred_probs[1]) if len(pred_probs) > 1 else None
        margin = (top1_p - top2_p) if top2_p is not None else None

        yp = gate_pred(text, top1_lbl, top1_p, top2_p, tau=tau, delta=delta)

        accepted = (yp in ACTIONABLE)
        if accepted:
            pred_A += 1
            p_top1_accept.append(top1_p)
            if margin is not None: margin_accept.append(margin)
        else:
            p_top1_abstain.append(top1_p)
            if margin is not None: margin_abstain.append(margin)

        # Update confusion/per-label stats using FINAL prediction (yp)
        if yp == yt:
            tp[yt] += 1
            if accepted:
                correct_pred_A += 1
        else:
            fp[yp] += 1
            fn[yt] += 1
            conf[(yt, yp)] += 1

            # actionable-to-actionable wrong (the error type you want to minimize)
            if (yt in ACTIONABLE) and (yp in ACTIONABLE):
                a2a_wrong += 1

    N = len(y_true)
    coverage_A = pred_A / N if N else 0.0
    trust_precision_A = (correct_pred_A / pred_A) if pred_A else 0.0
    a2a_error_rate = (a2a_wrong / N) if N else 0.0

    # per-label stats (still useful, but you will focus mainly on precision for ACTIONABLE)
    per_label = {}
    for lbl in labels:
        TPi = tp[lbl]
        FPi = fp[lbl]
        FNi = fn[lbl]
        prec = TPi / (TPi + FPi) if (TPi + FPi) else 0.0
        rec  = TPi / (TPi + FNi) if (TPi + FNi) else 0.0
        f1   = (2 * prec * rec / (prec + rec)) if (prec + rec) else 0.0
        per_label[lbl] = {"precision": prec, "recall": rec, "f1": f1, "support": TPi + FNi}

    # Macro-precision over ACTIONABLE (aligned with your goal)
    actionable_present = [lbl for lbl in labels if lbl in ACTIONABLE]
    macro_precision_A = (sum(per_label[l]["precision"] for l in actionable_present) / len(actionable_present)) if actionable_present else 0.0

    top_conf = conf.most_common(15)

    # diagnostics
    def avg(x): return sum(x)/len(x) if x else None
    diag = {
        "avg_top1_p_accept": avg(p_top1_accept),
        "avg_top1_p_abstain": avg(p_top1_abstain),
        "avg_margin_accept": avg(margin_accept),
        "avg_margin_abstain": avg(margin_abstain),
    }

    return {
        "coverage_A": coverage_A,
        "trust_precision_A": trust_precision_A,
        "macro_precision_A": macro_precision_A,
        "a2a_error_rate": a2a_error_rate,
        "per_label": per_label,
        "top_conf": top_conf,
        "diag": diag,
    }

def evaluate_per_label(model, path):
    y_true, texts = read_ft_file(path)

    labels = sorted(set(y_true))
    tp = Counter()
    fp = Counter()
    fn = Counter()
    conf = Counter()

    for yt, text in zip(y_true, texts):
        yp = model.predict(text, k=1)[0][0]
        if yp == yt:
            tp[yt] += 1
        else:
            fp[yp] += 1
            fn[yt] += 1
            conf[(yt, yp)] += 1

    per_label = {}
    f1_sum = 0.0
    for lbl in labels:
        TPi = tp[lbl]
        FPi = fp[lbl]
        FNi = fn[lbl]
        prec = TPi / (TPi + FPi) if (TPi + FPi) else 0.0
        rec  = TPi / (TPi + FNi) if (TPi + FNi) else 0.0
        f1   = (2 * prec * rec / (prec + rec)) if (prec + rec) else 0.0
        per_label[lbl] = {"precision": prec, "recall": rec, "f1": f1, "support": TPi + FNi}
        f1_sum += f1

    macro_f1 = f1_sum / len(labels) if labels else 0.0
    accuracy = sum(tp.values()) / len(y_true) if y_true else 0.0

    # Top confusions
    top_conf = conf.most_common(15)

    return accuracy, macro_f1, per_label, top_conf

In [None]:
################# Choosing between the full vocab and the partial vocab

In [None]:
########## Full

In [None]:
# Variables

model_origin_path = "models/with_initialization/lid.176.bin"
out_path = Path("models/with_initialization/vectors/lid176_full.vec")
out_path.parent.mkdir(parents=True, exist_ok=True)

In [None]:
model_origin = fasttext.load_model(model_origin_path)

words = model_origin.get_words()
dim = model_origin.get_dimension()

with out_path.open("w", encoding="utf-8") as f:
    f.write(f"{len(words)} {dim}\n")
    for w in words:
        v = model_origin.get_word_vector(w)
        f.write(w + " " + " ".join(map(str, v)) + "\n")

print("Saved:", out_path)
print("dim:", dim, "n_words_exported:", len(words))

In [None]:
####### Partial

In [None]:
# Variables

model_origin_path = "models/with_initialization/lid.176.bin"
out_path = Path("models/with_initialization_with_augData/vectors/lid176_partial.vec")
out_path.parent.mkdir(parents=True, exist_ok=True)

splits = ["data/train.txt", "data/validation.txt", "data/test.txt"]

In [None]:
def iter_tokens_from_fasttext_file(path):
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split(maxsplit=1)
            if len(parts) < 2:
                continue
            text = parts[1]
            for tok in text.split():
                yield tok

m = fasttext.load_model(model_origin_path)
dim = m.get_dimension()

meta_vocab = set(m.get_words())  

dataset_tokens = set()
for sp in splits:
    for tok in iter_tokens_from_fasttext_file(sp):
        dataset_tokens.add(tok)

selected = [t for t in dataset_tokens if t in meta_vocab]
selected.sort()  # deterministic output

with out_path.open("w", encoding="utf-8") as f:
    f.write(f"{len(selected)} {dim}\n")
    for w in selected:
        v = m.get_word_vector(w)
        f.write(w + " " + " ".join(map(str, v)) + "\n")

print("Saved:", out_path)
print("dim:", dim)
print("dataset_tokens_total:", len(dataset_tokens))
print("in_meta_vocab:", len(selected))
print("coverage:", (len(selected) / max(1, len(dataset_tokens))))

In [None]:
################## Baseline full and partial 

In [None]:
# Variables

train_path = "data/train.txt"
val_path   = "data/validation.txt"

vec_full = "models/with_initialization/vectors/lid176_full.vec"
vec_partial = "models/with_initialization_with_augData/vectors/lid176_partial.vec"

In [None]:
# Process 

def read_vec_dim(vec_path: str) -> int:
    with open(vec_path, "r", encoding="utf-8") as f:
        first = f.readline().strip().split()
    # first line: "<n_words> <dim>"
    return int(first[1])

def read_ft_file(path):
    y_true, texts = [], []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split(maxsplit=1)
            if len(parts) < 2:
                continue
            y_true.append(parts[0])
            texts.append(parts[1].replace("\n", " "))
    return y_true, texts

In [None]:
# training
def train_pretrained(vec_path: str, out_name: str):
    dim = read_vec_dim(vec_path)

    model = fasttext.train_supervised(
        input=train_path,
        epoch=100,
        lr=0.007,
        dim=dim,                 # must match vec dim
        wordNgrams=2,
        minn=2,
        maxn=5,
        loss="softmax",
        pretrainedVectors=vec_path,
        seed=42,
        verbose=2
    )

    out_dir = Path("models/with_initialization_with_augData")
    out_dir.mkdir(parents=True, exist_ok=True)

    bin_path = out_dir / f"{out_name}.bin"
    model.save_model(str(bin_path))

    return model

# model = train_pretrained(vec_full, "langid_baseline_full_100ep_0.005lr")
model = train_pretrained(vec_partial, "langid_baseline_partial_100ep_0.007lr")

In [None]:
# baseline forced-choice metrics 
n, precision, recall = model.test(val_path)
print("N:", n, "P@1:", precision, "R@1:", recall)

acc, macro_f1, per_label, top_conf = evaluate_per_label(model, val_path)
print(f"Validation Accuracy (micro): {acc:.4f}")
print(f"Validation Macro-F1       : {macro_f1:.4f}")

# selective metrics 
res = evaluate_selective(model, val_path, tau=0.90, delta=0.00)
print("\nSelective metrics @ tau=0.98:")
print("TrustPrecision_A  :", round(res["trust_precision_A"], 4))
print("A2A error rate    :", round(res["a2a_error_rate"], 6))
print("Coverage_A        :", round(res["coverage_A"], 4))
print("MacroPrecision_A  :", round(res["macro_precision_A"], 4))
print("Diagnostics       :", res["diag"])

# Show worst 10 labels by F1 
worst = sorted(per_label.items(), key=lambda x: x[1]["f1"])[:10]
print("\nWorst labels (by F1):")
for lbl, m in worst:
    print(lbl, "F1=", round(m["f1"],4), "P=", round(m["precision"],4), "R=", round(m["recall"],4), "support=", m["support"])

print("\nTop confusions (true -> pred):")
for (yt, yp), c in top_conf:
    print(yt, "->", yp, ":", c)

In [None]:
############ Autonune of Partial on F1

In [None]:
# Variables

train_path = "data/train.txt"
val_path   = "data/validation.txt"

vec_full = "models/with_initialization/vectors/lid176_full.vec"
vec_partial = "models/with_initialization_with_augData/vectors/lid176_partial.vec"

In [None]:
# training
model = fasttext.train_supervised(
    input=train_path,
    autotuneValidationFile=val_path,
    autotuneMetric="f1",
    autotuneDuration=1800,          # 600 < 1200 Vs 1800    in term of performance         
    pretrainedVectors=vec_full,
    dim=16,
    loss="softmax",
    minn=2,
    maxn=5,
    verbose=2,
    seed=42
)

out_dir = Path("models/with_initialization_with_augData")
out_dir.mkdir(parents=True, exist_ok=True)

model.save_model(str(out_dir / "langid_autotune_f1_full_1800.bin"))

In [None]:
# baseline forced-choice metrics 
n, precision, recall = model.test(val_path)
print("N:", n, "P@1:", precision, "R@1:", recall)

acc, macro_f1, per_label, top_conf = evaluate_per_label(model, val_path)
print(f"Validation Accuracy (micro): {acc:.4f}")
print(f"Validation Macro-F1       : {macro_f1:.4f}")

# selective metrics 
res = evaluate_selective(model, val_path, tau=0.90, delta=0.00)
print("\nSelective metrics @ tau=0.98:")
print("TrustPrecision_A  :", round(res["trust_precision_A"], 4))
print("A2A error rate    :", round(res["a2a_error_rate"], 6))
print("Coverage_A        :", round(res["coverage_A"], 4))
print("MacroPrecision_A  :", round(res["macro_precision_A"], 4))
print("Diagnostics       :", res["diag"])

# Show worst 10 labels by F1 
worst = sorted(per_label.items(), key=lambda x: x[1]["f1"])[:10]
print("\nWorst labels (by F1):")
for lbl, m in worst:
    print(lbl, "F1=", round(m["f1"],4), "P=", round(m["precision"],4), "R=", round(m["recall"],4), "support=", m["support"])

print("\nTop confusions (true -> pred):")
for (yt, yp), c in top_conf:
    print(yt, "->", yp, ":", c)

In [None]:
########### training using Autonune on precision

In [None]:
# training
model = fasttext.train_supervised(
    input=train_path,
    autotuneValidationFile=val_path,
    autotuneMetric="precisionAtRecall:30",
    autotuneDuration=1800,         
    pretrainedVectors=vec_path,
    dim=16,
    loss="softmax",
    minn=2,
    maxn=5,
    verbose=2,
    seed=42
)

out_dir = Path("models/with_initialization_with_augData")
out_dir.mkdir(parents=True, exist_ok=True)

model.save_model(str(out_dir / "langid_autotune_precision_partial_600.bin"))

In [None]:
# baseline forced-choice metrics 
n, precision, recall = model.test(val_path)
print("N:", n, "P@1:", precision, "R@1:", recall)

acc, macro_f1, per_label, top_conf = evaluate_per_label(model, val_path)
print(f"Validation Accuracy (micro): {acc:.4f}")
print(f"Validation Macro-F1       : {macro_f1:.4f}")

# selective metrics 
res = evaluate_selective(model, val_path, tau=0.90, delta=0.00)
print("\nSelective metrics @ tau=0.98:")
print("TrustPrecision_A  :", round(res["trust_precision_A"], 4))
print("A2A error rate    :", round(res["a2a_error_rate"], 6))
print("Coverage_A        :", round(res["coverage_A"], 4))
print("MacroPrecision_A  :", round(res["macro_precision_A"], 4))
print("Diagnostics       :", res["diag"])

# Show worst 10 labels by F1 
worst = sorted(per_label.items(), key=lambda x: x[1]["f1"])[:10]
print("\nWorst labels (by F1):")
for lbl, m in worst:
    print(lbl, "F1=", round(m["f1"],4), "P=", round(m["precision"],4), "R=", round(m["recall"],4), "support=", m["support"])

print("\nTop confusions (true -> pred):")
for (yt, yp), c in top_conf:
    print(yt, "->", yp, ":", c)

In [None]:
##################### training our chosen model on the train and the val data

In [None]:
# Variables

train_path = "data/train.txt"
val_path   = "data/validation.txt"
test_path = "data/test.txt"
train_val_path = "data/train_val.txt"

vec_full = "models/with_initialization_with_augData/vectors/lid176_full.vec"

In [None]:
# combine train and val

with open(train_val_path, "w", encoding="utf-8") as out:
    for fname in (train_path, val_path):
        with open(fname, "r", encoding="utf-8") as f:
            for line in f:
                out.write(line)

In [None]:
# Process 

def read_vec_dim(vec_path: str) -> int:
    with open(vec_path, "r", encoding="utf-8") as f:
        first = f.readline().strip().split()
    # first line: "<n_words> <dim>"
    return int(first[1])

def read_ft_file(path):
    y_true, texts = [], []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split(maxsplit=1)
            if len(parts) < 2:
                continue
            y_true.append(parts[0])
            texts.append(parts[1].replace("\n", " "))
    return y_true, texts

In [None]:
# training
def train_pretrained(vec_path: str, out_name: str):
    dim = read_vec_dim(vec_path)

    model = fasttext.train_supervised(
        input=train_val_path,
        epoch=100,
        lr=0.007,
        dim=dim,                 # must match vec dim
        wordNgrams=2,
        minn=2,
        maxn=5,
        loss="softmax",
        pretrainedVectors=vec_path,
        seed=42,
        verbose=2
    )

    out_dir = Path("models/with_initialization_with_augData")
    out_dir.mkdir(parents=True, exist_ok=True)

    bin_path = out_dir / f"{out_name}.bin"
    model.save_model(str(bin_path))

    return model

model = train_pretrained(vec_full, "langid_baseline_fullfull_100ep_0.007lr")
# model = train_pretrained(vec_partial, "langid_baseline_partial_100ep_0.007lr")

In [None]:
# baseline forced-choice metrics 
n, precision, recall = model.test(test_path)
print("N:", n, "P@1:", precision, "R@1:", recall)

acc, macro_f1, per_label, top_conf = evaluate_per_label(model, test_path)
print(f"Validation Accuracy (micro): {acc:.4f}")
print(f"Validation Macro-F1       : {macro_f1:.4f}")

# selective metrics 
res = evaluate_selective(model, test_path, tau=0.90, delta=0.00)
print("\nSelective metrics @ tau=0.98:")
print("TrustPrecision_A  :", round(res["trust_precision_A"], 4))
print("A2A error rate    :", round(res["a2a_error_rate"], 6))
print("Coverage_A        :", round(res["coverage_A"], 4))
print("MacroPrecision_A  :", round(res["macro_precision_A"], 4))
print("Diagnostics       :", res["diag"])

# Show worst 10 labels by F1 
worst = sorted(per_label.items(), key=lambda x: x[1]["f1"])[:10]
print("\nWorst labels (by F1):")
for lbl, m in worst:
    print(lbl, "F1=", round(m["f1"],4), "P=", round(m["precision"],4), "R=", round(m["recall"],4), "support=", m["support"])

print("\nTop confusions (true -> pred):")
for (yt, yp), c in top_conf:
    print(yt, "->", yp, ":", c)