## Setup: CPU-only, imports, seeds, paths, model choice

In [1]:
# 01 - Setup: CPU-only, imports, seeds, paths, model choice

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["TOKENIZERS_PARALLELISM"] = "false"

import random
import re
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Dict, Tuple, Optional

import torch
torch.backends.cudnn.enabled = False
torch.set_num_threads(max(1, os.cpu_count() // 2))
device = torch.device("cpu")
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

from sklearn.metrics import f1_score, classification_report, confusion_matrix

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM
)

DATA_DIR = Path("SemEval_2022_Task2-idiomaticity/SubTaskA")
TRAIN_ONE_SHOT = DATA_DIR / "Data" / "train_one_shot.csv"
TRAIN_ZERO_SHOT = DATA_DIR / "Data" / "train_zero_shot.csv"
DEV = DATA_DIR / "Data" / "dev.csv"
DEV_GOLD = DATA_DIR / "Data" / "dev_gold.csv"
EVAL = DATA_DIR / "Data" / "eval.csv"
EVAL_SUB_FMT = DATA_DIR / "Data" / "eval_submission_format.csv"

OUT_DIR = Path("outputs_en_llm")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Choose one instruct model that fits your RAM; both run on CPU:
# MODEL_NAME = "Qwen/Qwen2.5-3B-Instruct"
MODEL_NAME = "meta-llama/Llama-3.2-3B-Instruct"

MAX_NEW_TOKENS = 2
TEMPERATURE = 0.0
TOP_P = 1.0
DO_SAMPLE = False


  from .autonotebook import tqdm as notebook_tqdm


## Data loading & utilities (EN only)

In [2]:
# 02 - Data loading & utilities (EN only)

def load_any_csv(path: Path) -> pd.DataFrame:
    return pd.read_csv(path, sep=None, engine="python", dtype=str)

def ensure_label_int(df: pd.DataFrame, col="Label") -> pd.DataFrame:
    if col in df.columns:
        df[col] = df[col].astype(int)
    return df

def load_train_dev(language="EN", oneshot=True) -> Tuple[pd.DataFrame, pd.DataFrame]:
    if oneshot:
        train_df = load_any_csv(TRAIN_ONE_SHOT)
    else:
        train_df = load_any_csv(TRAIN_ZERO_SHOT)
    dev_df = load_any_csv(DEV)
    gold_df = load_any_csv(DEV_GOLD)

    train_df.columns = [c.strip() for c in train_df.columns]
    dev_df.columns = [c.strip() for c in dev_df.columns]
    gold_df.columns = [c.strip() for c in gold_df.columns]

    train_df = train_df[train_df["Language"] == language].copy()
    dev_df = dev_df[dev_df["Language"] == language].copy()

    gold = gold_df[gold_df["Language"] == language][["ID","Label"]].copy()
    gold["ID"] = gold["ID"].astype(str)
    dev_df["ID"] = dev_df["ID"].astype(str)
    dev_lab = dev_df.merge(gold, on="ID", how="left")
    dev_lab = ensure_label_int(dev_lab, "Label")
    train_df = ensure_label_int(train_df, "Label")
    return train_df, dev_lab

def load_eval(language="EN") -> pd.DataFrame:
    df = load_any_csv(EVAL)
    df.columns = [c.strip() for c in df.columns]
    return df[df["Language"] == language].copy()

def mark_first_case_insensitive(text: str, needle: str, ltag="<mwe>", rtag="</mwe>") -> str:
    if not isinstance(text, str) or not isinstance(needle, str):
        return text
    lt = text.lower()
    ln = needle.lower()
    i = lt.find(ln)
    if i == -1:
        return text
    return text[:i] + ltag + text[i:i+len(needle)] + rtag + text[i+len(needle):]

def pack_context(prev: str, target: str, nxt: str, mwe: str) -> str:
    prev = "" if pd.isna(prev) else prev
    nxt = "" if pd.isna(nxt) else nxt
    tgt = mark_first_case_insensitive(target, mwe)
    return f"Previous: {prev}\nTarget: {tgt}\nNext: {nxt}"

def label_to_str(y: int) -> str:
    return "1" if int(y) == 1 else "0"


## Build one-shot exemplars per MWE from train_one_shot (EN)

In [3]:
# 03 - Build one-shot exemplars per MWE from train_one_shot (EN)

def build_oneshot_index(train_one_shot_en: pd.DataFrame) -> Dict[str, Dict[int, Dict[str, str]]]:
    idx = {}
    for _, r in train_one_shot_en.iterrows():
        mwe = r["MWE"]
        y = int(r["Label"])
        ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)
        idx.setdefault(mwe, {})
        # keep the first example per label if multiple exist
        if y not in idx[mwe]:
            idx[mwe][y] = {"context": ctx, "label": y}
    return idx

def pick_global_oneshot_fallback(train_one_shot_en: pd.DataFrame) -> Dict[int, Dict[str, str]]:
    pool = {0: None, 1: None}
    for _, r in train_one_shot_en.iterrows():
        y = int(r["Label"])
        if pool[y] is None:
            mwe = r["MWE"]
            ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)
            pool[y] = {"context": ctx, "label": y, "mwe": mwe}
        if pool[0] is not None and pool[1] is not None:
            break
    return pool


## Load LLM (CPU-only) and tokenizer

In [4]:
# 04 - Load LLM (CPU-only) and tokenizer

import getpass
from huggingface_hub import login, whoami

hf_token = getpass.getpass("Paste your Hugging Face token (read scope is enough): ")
login(token=hf_token)          # authenticates this Python process
print(whoami())                # sanity check

tokenizer = AutoTokenizer.from_pretrained(
    MODEL_NAME,
    use_fast=True,
    trust_remote_code=True
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,
    device_map={"": "cpu"},
    trust_remote_code=True,
    low_cpu_mem_usage=True
)
model.eval()


{'type': 'user', 'id': '68d86c57f1bd7f5086be81bb', 'name': 'mhossai6', 'fullname': 'Mohammad Shahriar Hossain', 'email': 'mhossai6@ualberta.ca', 'emailVerified': True, 'canPay': False, 'periodEnd': None, 'isPro': False, 'avatarUrl': '/avatars/fad622d4d934fb9bf3204eaa20999042.svg', 'orgs': [], 'auth': {'type': 'access_token', 'accessToken': {'displayName': 'CMPUT497-A1', 'role': 'read', 'createdAt': '2025-11-12T07:19:03.479Z'}}}


`torch_dtype` is deprecated! Use `dtype` instead!
Loading checkpoint shards: 100%|██████████| 2/2 [00:14<00:00,  7.31s/it]


LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(128256, 3072)
    (layers): ModuleList(
      (0-27): 28 x LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): Linear(in_features=3072, out_features=3072, bias=False)
          (k_proj): Linear(in_features=3072, out_features=1024, bias=False)
          (v_proj): Linear(in_features=3072, out_features=1024, bias=False)
          (o_proj): Linear(in_features=3072, out_features=3072, bias=False)
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=3072, out_features=8192, bias=False)
          (up_proj): Linear(in_features=3072, out_features=8192, bias=False)
          (down_proj): Linear(in_features=8192, out_features=3072, bias=False)
          (act_fn): SiLUActivation()
        )
        (input_layernorm): LlamaRMSNorm((3072,), eps=1e-05)
        (post_attention_layernorm): LlamaRMSNorm((3072,), eps=1e-05)
      )
    )
    (norm): LlamaRMSNorm((3072,), eps=1e-05)
    (

## Prompt builders (zero-shot and one-shot)

In [5]:
# 05 - Prompt builders (zero-shot and one-shot)

INSTR_ZS = (
    "You are a precise classifier for idiomaticity of multi-word expressions (MWEs).\n"
    "Task: Given a context (Previous/Target/Next) and the target MWE, decide if the usage in the Target sentence is idiomatic (figurative) or literal.\n"
    "Output strictly one digit: 1 for idiomatic, 0 for literal.\n"
    "Do not output any other text."
)

def make_zero_shot_prompt(mwe: str, ctx_block: str) -> str:
    return (
        f"{INSTR_ZS}\n\n"
        f"MWE: {mwe}\n"
        f"{ctx_block}\n"
        f"Answer (0 or 1):"
    )

INSTR_1S = (
    "You are a precise classifier for idiomaticity of multi-word expressions (MWEs).\n"
    "See two labeled examples for the SAME MWE, then classify the new instance.\n"
    "Output strictly one digit: 1 for idiomatic, 0 for literal."
)

def make_one_shot_prompt(mwe: str, pos_example: str, neg_example: str, ctx_block: str) -> str:
    return (
        f"{INSTR_1S}\n\n"
        f"Example A (Label=1):\nMWE: {mwe}\n{pos_example}\n\n"
        f"Example B (Label=0):\nMWE: {mwe}\n{neg_example}\n\n"
        f"Now classify the new instance.\n"
        f"MWE: {mwe}\n{ctx_block}\n"
        f"Answer (0 or 1):"
    )

def apply_chat_template_if_available(text: str) -> Dict[str, torch.Tensor]:
    if hasattr(tokenizer, "apply_chat_template"):
        messages = [
            {"role": "system", "content": "You are a helpful, concise assistant."},
            {"role": "user", "content": text}
        ]
        prompt_ids = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt")
        return {"input_ids": prompt_ids.to(device)}
    ids = tokenizer(text, return_tensors="pt").input_ids.to(device)
    return {"input_ids": ids}

## Generation + parsing

In [6]:
# 06 - Generation + parsing

def generate_label_from_prompt(prompt: str) -> int:
    enc = apply_chat_template_if_available(prompt)
    with torch.no_grad():
        out = model.generate(
            **enc,
            max_new_tokens=MAX_NEW_TOKENS,
            do_sample=DO_SAMPLE,
            temperature=TEMPERATURE,
            top_p=TOP_P,
            eos_token_id=tokenizer.eos_token_id
        )
    gen = out[0][enc["input_ids"].shape[-1]:]
    text = tokenizer.decode(gen, skip_special_tokens=True)
    m = re.search(r"\b([01])\b", text)
    if m:
        return int(m.group(1))
    if "idiomatic" in text.lower():
        return 1
    if "literal" in text.lower():
        return 0
    return 1

## Predictors for zero-shot and one-shot (EN)

In [7]:
# 07 - Predictors for zero-shot and one-shot (EN)

def predict_zero_shot(df: pd.DataFrame) -> List[int]:
    preds = []
    for _, r in df.iterrows():
        mwe = r["MWE"]
        ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)
        prompt = make_zero_shot_prompt(mwe, ctx)
        yhat = generate_label_from_prompt(prompt)
        preds.append(yhat)
    return preds

def predict_one_shot(df: pd.DataFrame, oneshot_index: Dict[str, Dict[int, Dict[str, str]]], global_pool: Dict[int, Dict[str, str]]) -> List[int]:
    preds = []
    for _, r in df.iterrows():
        mwe = r["MWE"]
        ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)

        pos_ctx = None
        neg_ctx = None
        if mwe in oneshot_index:
            pos_ctx = oneshot_index[mwe].get(1, {}).get("context")
            neg_ctx = oneshot_index[mwe].get(0, {}).get("context")

        if pos_ctx is None:
            pos_ctx = global_pool[1]["context"]
        if neg_ctx is None:
            neg_ctx = global_pool[0]["context"]

        prompt = make_one_shot_prompt(mwe, pos_ctx, neg_ctx, ctx)
        yhat = generate_label_from_prompt(prompt)
        preds.append(yhat)
    return preds


## Zero-shot (EN): dev eval + eval submission

In [None]:
# 08 - Zero-shot (EN): dev eval + eval submission (with progress bar + timing + RESUME, immediate cache)

from time import perf_counter
from tqdm.auto import tqdm

def _load_cache(cache_path: Path) -> dict:
    if cache_path.exists():
        df = pd.read_csv(cache_path, dtype={"ID": str, "Label": int})
        return dict(zip(df["ID"].astype(str), df["Label"].astype(int)))
    return {}

def _append_one(cache_path: Path, rec: tuple):
    _id, _lab = rec
    header_needed = not cache_path.exists()
    with open(cache_path, "a") as f:
        if header_needed:
            f.write("ID,Label\n")
        f.write(f"{_id},{int(_lab)}\n")

def progressive_predict_zero_shot(df: pd.DataFrame, cache_path: Path, desc: str = "Zero-shot EN") -> list:
    df = df.copy()
    df["ID"] = df["ID"].astype(str)

    preds_map = _load_cache(cache_path)
    done = set(preds_map.keys())

    print(f"{desc} | Resuming with {len(done)} cached / {len(df)} total")
    t0 = perf_counter()
    processed = 0

    for _, r in tqdm(df.iterrows(), total=len(df), desc=desc, leave=True):
        _id = str(r["ID"])
        if _id in done:
            continue
        mwe = r["MWE"]
        ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)
        prompt = make_zero_shot_prompt(mwe, ctx)
        yhat = generate_label_from_prompt(prompt)
        preds_map[_id] = int(yhat)
        _append_one(cache_path, (_id, int(yhat)))
        processed += 1

    elapsed = perf_counter() - t0
    print(f"{desc} | New computed: {processed} | Cached at start: {len(done)} | Total: {len(df)} | "
          f"Elapsed: {elapsed:.1f}s | {elapsed/max(1,processed):.3f}s/example (new only)")

    yhat = [preds_map[str(i)] for i in df["ID"]]
    return yhat

# ---- DEV (zero-shot EN) ----
train_0s_en, dev_0s_en = load_train_dev(language="EN", oneshot=False)

cache_dev_0s = OUT_DIR / "cache_llm_zeroshot_dev_en.csv"
yhat_dev_0s = progressive_predict_zero_shot(dev_0s_en, cache_dev_0s, desc="Zero-shot EN (dev)")
ytrue_dev_0s = dev_0s_en["Label"].tolist()
f1_0s = f1_score(ytrue_dev_0s, yhat_dev_0s, average="macro")
print(f"[LLM Zero-shot EN] Dev macro-F1: {f1_0s:.4f}")
print(classification_report(ytrue_dev_0s, yhat_dev_0s, digits=4))
print(confusion_matrix(ytrue_dev_0s, yhat_dev_0s))

# ---- EVAL (zero-shot EN) ----
eval_en = load_eval(language="EN")
cache_eval_0s = OUT_DIR / "cache_llm_zeroshot_eval_en.csv"
yhat_eval_0s = progressive_predict_zero_shot(eval_en, cache_eval_0s, desc="Zero-shot EN (eval)")

sub_0s = pd.DataFrame({
    "ID": eval_en["ID"].astype(str),
    "Language": eval_en["Language"],
    "Setting": ["zero_shot"] * len(eval_en),
    "Label": yhat_eval_0s
})
sub_0s_path = OUT_DIR / "eval_submission_en_llm_zeroshot.csv"
sub_0s.to_csv(sub_0s_path, index=False)
print(f"Wrote {sub_0s_path}")

Zero-shot EN (dev):   0%|          | 0/466 [00:00<?, ?it/s]The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Zero-shot EN (dev):   2%|▏         | 11/466 [01:16<52:43,  6.95s/it]The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Zero-s

KeyboardInterrupt: 

## One-shot (EN): build exemplars, dev eval + eval submission

In [None]:
# 09 - One-shot (EN): build exemplars, dev eval + eval submission (progress + RESUME, immediate cache)

from time import perf_counter
from tqdm.auto import tqdm

def progressive_predict_one_shot(df: pd.DataFrame,
                                 oneshot_index: dict,
                                 global_pool: dict,
                                 cache_path: Path,
                                 desc: str = "One-shot EN") -> list:
    df = df.copy()
    df["ID"] = df["ID"].astype(str)

    preds_map = _load_cache(cache_path)
    done = set(preds_map.keys())

    print(f"{desc} | Resuming with {len(done)} cached / {len(df)} total")
    t0 = perf_counter()
    processed = 0

    for _, r in tqdm(df.iterrows(), total=len(df), desc=desc, leave=True):
        _id = str(r["ID"])
        if _id in done:
            continue

        mwe = r["MWE"]
        ctx = pack_context(r.get("Previous",""), r.get("Target",""), r.get("Next",""), mwe)

        pos_ctx = oneshot_index.get(mwe, {}).get(1, {}).get("context")
        neg_ctx = oneshot_index.get(mwe, {}).get(0, {}).get("context")
        if pos_ctx is None:
            pos_ctx = global_pool[1]["context"]
        if neg_ctx is None:
            neg_ctx = global_pool[0]["context"]

        prompt = make_one_shot_prompt(mwe, pos_ctx, neg_ctx, ctx)
        yhat = generate_label_from_prompt(prompt)
        preds_map[_id] = int(yhat)
        _append_one(cache_path, (_id, int(yhat)))
        processed += 1

    elapsed = perf_counter() - t0
    print(f"{desc} | New computed: {processed} | Cached at start: {len(done)} | Total: {len(df)} | "
          f"Elapsed: {elapsed:.1f}s | {elapsed/max(1,processed):.3f}s/example (new only)")

    yhat = [preds_map[str(i)] for i in df["ID"]]
    return yhat

train_1s_en, dev_1s_en = load_train_dev(language="EN", oneshot=True)
oneshot_index = build_oneshot_index(train_1s_en)
global_pool = pick_global_oneshot_fallback(train_1s_en)

cache_dev_1s = OUT_DIR / "cache_llm_oneshot_dev_en.csv"
yhat_dev_1s = progressive_predict_one_shot(dev_1s_en, oneshot_index, global_pool, cache_dev_1s, desc="One-shot EN (dev)")
ytrue_dev_1s = dev_1s_en["Label"].tolist()
f1_1s = f1_score(ytrue_dev_1s, yhat_dev_1s, average="macro")
print(f"[LLM One-shot EN] Dev macro-F1: {f1_1s:.4f}")
print(classification_report(ytrue_dev_1s, yhat_dev_1s, digits=4))
print(confusion_matrix(ytrue_dev_1s, yhat_dev_1s))

eval_en = load_eval(language="EN")
cache_eval_1s = OUT_DIR / "cache_llm_oneshot_eval_en.csv"
yhat_eval_1s = progressive_predict_one_shot(eval_en, oneshot_index, global_pool, cache_eval_1s, desc="One-shot EN (eval)")

sub_1s = pd.DataFrame({
    "ID": eval_en["ID"].astype(str),
    "Language": eval_en["Language"],
    "Setting": ["zero_shot"] * len(eval_en),
    "Label": yhat_eval_1s
})
sub_1s_path = OUT_DIR / "eval_submission_en_llm_oneshot.csv"
sub_1s.to_csv(sub_1s_path, index=False)
print(f"Wrote {sub_1s_path}")

## Save run metadata

In [None]:
# 10 - Save run metadata

with open(OUT_DIR / "run_en_llm.txt", "w") as f:
    f.write(f"MODEL_NAME={MODEL_NAME}\n")
    f.write(f"ZERO_SHOT_DEV_F1={f1_0s:.4f}\n")
    f.write(f"ONE_SHOT_DEV_F1={f1_1s:.4f}\n")
    f.write("CPU_ONLY=True\n")
print("Saved run metadata.")

print("Resume cache files (delete to start fresh):")
for p in [
    OUT_DIR / "cache_llm_zeroshot_dev_en.csv",
    OUT_DIR / "cache_llm_zeroshot_eval_en.csv",
    OUT_DIR / "cache_llm_oneshot_dev_en.csv",
    OUT_DIR / "cache_llm_oneshot_eval_en.csv",
]:
    print(f"- {p}")
