In [6]:
# !pip install -q --no-deps bitsandbytes accelerate xformers==0.0.29.post3 peft trl triton cut_cross_entropy unsloth_zoo
# !pip install -q sentencepiece protobuf "datasets>=3.4.1" huggingface_hub hf_transfer
# !pip install -q --no-deps unsloth

In [21]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [22]:
#Robust CSV/Excel loader
import glob, pandas as pd

def load_any_table(path_globs):
    paths=[]
    for g in path_globs:
        paths += glob.glob(g, recursive=True) if "**" in g else ([g] if os.path.exists(g) else [])
    assert paths, f"Could not find any of: {path_globs}"
    p = paths[0]
    df = pd.read_csv(p, dtype=str, keep_default_na=False, engine="python")
    df.columns = [c.strip() for c in df.columns]
    return df

TRIAL = load_any_table([
    #"/content/trial.csv"
    "trial.csv",      # Change Here -----------------
])
DEV   = load_any_table([
    #"/content/test_v1.csv"
    "test_v1.csv",    # Change Here -----------
])

for df in (TRIAL, DEV):
    if "id" in df.columns:
        df["id"] = pd.to_numeric(df["id"], errors="coerce").fillna(-1).astype(int)
    for col in ("instruction","response","test_list"):
        if col in df.columns:
            df[col] = df[col].astype(str)


In [23]:
#parse tests + function name extraction
import re, json, ast

ASSERT_NAME_RE = re.compile(r"""assert\s+([A-Za-z_]\w*)\s*\(""", re.VERBOSE)

def _try_json_or_eval(s):
    try: return json.loads(s)
    except Exception:
        try: return ast.literal_eval(s)
        except Exception: return s

def parse_tests_cell(val):
    s = (val or "").strip()
    if not s or s.lower() in {"none","null","nan"}:
        return {"kind":"none","tests":[],"fn":None}

    # normalize quotes
    s = (s.replace("’","'").replace("‘","'")
           .replace("“","\"").replace("”","\""))

    obj = _try_json_or_eval(s)
    if isinstance(obj, str) and obj.strip().startswith("[") and obj.strip().endswith("]"):
        obj = _try_json_or_eval(obj)

    tests = []
    if isinstance(obj, list):
        for x in obj:
            xs = str(x).strip().strip('"').strip("'")
            if not xs.startswith("assert") and "==" in xs:
                xs = "assert " + xs
            if xs.startswith("assert"):
                tests.append(xs)
    else:
        # fallback: scrape raw string
        tests = [m.group(0).strip() for m in re.finditer(r"assert\s+.+", s)]

    # function name (first assert wins)
    fn = None
    for t in tests:
        m = ASSERT_NAME_RE.search(t)
        if m:
            fn = m.group(1)
            break

    return {"kind": "asserts" if tests else "none", "tests": tests, "fn": fn}


In [24]:
#Load base model and attach chat template
import torch

from unsloth import FastModel
from unsloth.chat_templates import get_chat_template

BASE_MODEL = "md-nishat-008/TigerLLM-1B-it"  # your pick

model, tokenizer = FastModel.from_pretrained(
    model_name      = BASE_MODEL,
    max_seq_length  = 1024,
    load_in_4bit    = False,
    load_in_8bit    = True,
    full_finetuning = False,
)

# Gemma-3 style chat template
tokenizer = get_chat_template(tokenizer, chat_template="gemma-3")

# Quick smoke generation
def quick_gen(prompt):
    chat = tokenizer.apply_chat_template(
        [{"role":"user","content":prompt}],
        add_generation_prompt=True, tokenize=False
    )
    toks = tokenizer(chat, return_tensors="pt").to(getattr(model,"device","cuda"))
    out = model.generate(**toks, max_new_tokens=128, do_sample=False, use_cache=False)
    print(tokenizer.decode(out[0], skip_special_tokens=True))

quick_gen("প্রদত্ত পূর্ণসংখ্যাটি একটি মৌলিক সংখ্যা কিনা তা পরীক্ষা করার জন্য একটি ফাংশন লিখুন। Example: prime_num(n)")


==((====))==  Unsloth 2025.9.1: Fast Gemma3 patching. Transformers: 4.56.1.
   \\   /|    NVIDIA GeForce RTX 3050. Num GPUs = 1. Max memory: 7.648 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.6.0+cu124. CUDA: 8.6. CUDA Toolkit: 12.4. Triton: 3.2.0
\        /    Bfloat16 = TRUE. FA [Xformers = 0.0.29.post3. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!
user
প্রদত্ত পূর্ণসংখ্যাটি একটি মৌলিক সংখ্যা কিনা তা পরীক্ষা করার জন্য একটি ফাংশন লিখুন। Example: prime_num(n)
model
```python
def is_prime(n):
    if n <= 1:
        return False
    for i in range(2, int(n**0.5) + 1):
        if n % i == 0:
            return False
    return True
```


In [25]:
#attach LoRA adapters
from unsloth import FastModel

TARGETS = ["q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj"]

model = FastModel.get_peft_model(
    model,
    finetune_vision_layers     = False,
    finetune_language_layers   = True,
    finetune_attention_modules = True,
    finetune_mlp_modules       = True,

    r            = 32,
    lora_alpha   = 64,
    lora_dropout = 0.05,
    bias         = "none",
    target_modules = TARGETS,
    random_state = 3407,
)
print("PEFT (LoRA) attached.")


Unsloth: Making `model.base_model.model.model` require gradients
PEFT (LoRA) attached.


In [26]:
#robust code extractor + generators (place right after S1)
import re, types, builtins, torch

def _norm(s: str) -> str:
    return str(s or "").replace("\r\n","\n")

_FENCE_ANY = re.compile(r"```([\w+\-]*)\s*\n([\s\S]*?)\n```", re.I)

def extract_first_code_block(text: str) -> str | None:
    s = _norm(text)

    # 1) fenced blocks (prefer python)
    blocks = _FENCE_ANY.findall(s)
    if blocks:
        for lang, body in blocks:
            if lang.strip().lower() in ("python","py"):
                b = body.strip()
                if b: return b
        for _, body in blocks:
            b = body.strip()
            if b: return b

    # 2) Gemma-style section fallback
    if "\nmodel\n" in s:
        tail = s.split("\nmodel\n", 1)[1].strip()
        if ("def " in tail) or ("import " in tail) or ("return " in tail):
            return tail

    # 3) first top-level def
    lines = s.splitlines()
    start = None
    for i, L in enumerate(lines):
        if re.match(r"^\s*def\s+[A-Za-z_]\w*\s*\(", L):
            start = i; break
    if start is not None:
        buf=[lines[start]]
        for j in range(start+1,len(lines)):
            if re.match(r"^\s*(def|class)\s+[A-Za-z_]\w*\s*\(", lines[j]): break
            buf.append(lines[j])
        code="\n".join(buf).strip()
        if code: return code

    return None

def generate_code(model, tokenizer, prompt, *, do_sample=False, temperature=0.2, top_p=0.95, max_new_tokens=512):
    """Single-prompt with a strict system rule to emit ONE fenced python block."""
    messages = [
        {"role":"system",
         "content":"Reply with ONLY one ```python``` fenced block implementing the required function. No extra text."},
        {"role":"user","content":prompt},
    ]
    chat = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
    toks = tokenizer(chat, return_tensors="pt").to(getattr(model, "device", "cuda"))
    pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id
    was_training = model.training
    model.eval()
    with torch.no_grad():
        out = model.generate(
            **toks,
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            temperature=temperature,
            top_p=top_p,
            use_cache=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=pad_id,
        )
    if was_training: model.train()
    return tokenizer.decode(out[0], skip_special_tokens=True)

def generate_code_with_sys(model, tokenizer, system_text, user_text, *,
                           do_sample=False, temperature=0.2, top_p=0.95, max_new_tokens=512):
    """Two-message (system+user) generator matching SFT/RSFT serialization."""
    messages = [
        {"role":"system","content":system_text},
        {"role":"user","content":user_text},
    ]
    chat = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
    toks = tokenizer(chat, return_tensors="pt").to(getattr(model, "device", "cuda"))
    pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id
    was_training = model.training
    model.eval()
    with torch.no_grad():
        out = model.generate(
            **toks,
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            temperature=temperature,
            top_p=top_p,
            use_cache=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=pad_id,
        )
    if was_training: model.train()
    return tokenizer.decode(out[0], skip_special_tokens=True)


In [27]:
#permissive assert runner (place after G0 and before any eval/callback)
import re, types, builtins

def _expected_fn_from_asserts(asserts):
    for a in asserts:
        m = re.search(r"assert\s+([A-Za-z_]\w*)\s*\(", str(a))
        if m: return m.group(1)
    return None

def _sanitize_asserts(asserts):
    good=[]
    for raw in asserts:
        s=str(raw).strip()
        if not s.startswith("assert "):
            s = "assert " + s if not s.startswith("assert") else s
        try: compile(s, "<assert>", "exec"); good.append(s)
        except SyntaxError: continue
    return good

_COMMON_PREAMBLE = """
import math, itertools, functools, operator, bisect, heapq, statistics, collections, re, string, random, sys, datetime
from collections import Counter, defaultdict, deque, OrderedDict
from math import gcd, sqrt, factorial, comb, perm, ceil, floor
from bisect import bisect_left, bisect_right
"""

def _make_mod_permissive(code: str, expected_fn: str | None):
    try:
        m = types.ModuleType("student")
        g = {"__builtins__": builtins.__dict__}
        exec(_COMMON_PREAMBLE, g, g)
        exec(code, g, g)
        if expected_fn and expected_fn not in g:
            cands=[k for k,v in g.items() if callable(v) and not k.startswith("_")]
            if len(cands)==1: g[expected_fn]=g[cands[0]]
        m.__dict__.update(g); return m
    except Exception:
        return None

def run_asserts_module_permissive(generation_text: str, asserts: list[str]) -> tuple[int,int]:
    code = extract_first_code_block(generation_text)
    if not code: return (0,0)
    expected = _expected_fn_from_asserts(asserts)
    mod = _make_mod_permissive(code, expected)
    if mod is None: return (0,0)
    tests = _sanitize_asserts(asserts)
    ok=0
    for s in tests:
        try: exec(s, mod.__dict__, mod.__dict__); ok+=1
        except Exception: pass
    return ok, len(tests)


In [28]:
#Timed sandbox for running public asserts (drop this right AFTER E1)
import time, builtins, types, multiprocessing as mp, re

# Fallbacks if E1 helpers weren't defined (keeps this cell self-contained)
try:
    _COMMON_PREAMBLE
except NameError:
    _COMMON_PREAMBLE = """
import math, itertools, functools, operator, bisect, heapq, statistics, collections, re, string, random, sys, datetime
from collections import Counter, defaultdict, deque, OrderedDict
from math import gcd, sqrt, factorial, comb, perm, ceil, floor
from bisect import bisect_left, bisect_right
"""

try:
    def extract_first_code_block(generation_text: str) -> str:
        s = str(generation_text or "")
        m = re.search(r"```(?:python|py)?\s*\n([\s\S]*?)\n```", s, flags=re.IGNORECASE)
        if m: return m.group(1).strip()
        # fallback: first top-level def
        lines = s.splitlines()
        start = None
        for i,L in enumerate(lines):
            if re.match(r"^\s*def\s+[A-Za-z_]\w*\s*\(", L):
                start = i; break
        if start is not None:
            buf=[lines[start]]
            for j in range(start+1,len(lines)):
                if re.match(r"^\s*(def|class)\s+[A-Za-z_]\w*\s*\(", lines[j]): break
                buf.append(lines[j])
            code="\n".join(buf).strip()
            if code: return code
        return None
except NameError:
    pass

try:
    _expected_fn_from_asserts
except NameError:
    def _expected_fn_from_asserts(asserts):
        for a in asserts:
            m = re.search(r"assert\s+([A-Za-z_]\w*)\s*\(", str(a))
            if m: return m.group(1)
        return None

try:
    _sanitize_asserts
except NameError:
    def _sanitize_asserts(asserts):
        good=[]
        for raw in asserts:
            s=str(raw).strip()
            if not s.startswith("assert "):
                s = "assert " + s if not s.startswith("assert") else s
            try: compile(s, "<assert>", "exec"); good.append(s)
            except SyntaxError: continue
        return good

# ---- Worker that executes code + asserts in an isolated process ----
def _worker_exec_asserts(code_text: str, asserts: list[str], expected_fn: str|None, preamble: str, conn):
    """Runs inside a child process. MUST be top-level for multiprocessing."""
    try:
        # Optional resource limits (Linux only)
        try:
            import resource
            # Memory cap (MB) — keep generous or imports may fail
            mem_mb = int(os.environ.get("ASSERT_MEM_MB", "1024"))
            resource.setrlimit(resource.RLIMIT_AS, (mem_mb*1024*1024, mem_mb*1024*1024))
            # CPU seconds (hard limit) — optional; parent also has a wall-clock timeout
            cpu_s = int(float(os.environ.get("ASSERT_CPU_S", "0")))
            if cpu_s > 0:
                resource.setrlimit(resource.RLIMIT_CPU, (cpu_s, cpu_s+1))
        except Exception:
            pass

        g = {"__builtins__": builtins.__dict__}
        if preamble:
            exec(preamble, g, g)

        code = extract_first_code_block(code_text)
        if not code:
            conn.send((0,0,False)); return

        # Exec student code
        exec(code, g, g)

        # If expected function name missing but exactly one callable exists, alias it
        if expected_fn and expected_fn not in g:
            cands=[k for k,v in g.items() if callable(v) and not k.startswith("_")]
            if len(cands)==1:
                g[expected_fn]=g[cands[0]]

        tests = _sanitize_asserts(asserts)
        ok=0
        for s in tests:
            try:
                exec(s, g, g)
                ok+=1
            except Exception:
                pass

        conn.send((ok, len(tests), False))
    except Exception:
        try: conn.send((0,0,False))
        except Exception: pass
    finally:
        try: conn.close()
        except Exception: pass

def run_asserts_module_permissive_timed(generation_text: str,
                                        asserts: list[str],
                                        timeout_s: float = 2.0) -> tuple[int,int,bool]:
    """
    Run student code + asserts in a child process with a wall-clock timeout.
    Returns: (ok, total, timed_out)
    """
    expected = _expected_fn_from_asserts(asserts)
    parent_conn, child_conn = mp.Pipe(duplex=False)
    p = mp.Process(
        target=_worker_exec_asserts,
        args=(generation_text, asserts, expected, _COMMON_PREAMBLE, child_conn),
        daemon=True,
    )
    p.start()
    child_conn.close()

    timed_out = False
    res = None
    try:
        if parent_conn.poll(timeout_s):
            try:
                res = parent_conn.recv()
            except EOFError:
                timed_out = True
        else:
            timed_out = True
    finally:
        if timed_out and p.is_alive():
            try: p.terminate()
            except Exception: pass
        try: p.join(timeout=0.2)
        except Exception: pass
        try: parent_conn.close()
        except Exception: pass

    if not res:
        # If we timed out, count as 0/len(asserts)
        return (0, len(_sanitize_asserts(asserts)), True)

    ok, tot, _ = res
    return (int(ok), int(tot), bool(timed_out))

# ---- Monkey-patch the original name used across your notebook ----
try:
    _run_asserts_module_permissive_original = run_asserts_module_permissive  # keep a handle, just in case
except NameError:
    _run_asserts_module_permissive_original = None

def run_asserts_module_permissive(generation_text: str, asserts: list[str]):
    """Drop-in replacement preserving the old signature."""
    t = float(os.environ.get("ASSERT_TIMEOUT_S", "2.0"))
    ok, tot, _ = run_asserts_module_permissive_timed(generation_text, asserts, timeout_s=t)
    return ok, tot

print("✅ Timed assert runner active. Set timeout via os.environ['ASSERT_TIMEOUT_S'] (default=2.0s)")

✅ Timed assert runner active. Set timeout via os.environ['ASSERT_TIMEOUT_S'] (default=2.0s)


In [29]:
#helpers to mirror your SFT format (updated)
def build_system_prompt(fn: str) -> str:
    return (
        "তুমি একটি কোড-জেনারেশন সহকারী।\n"
        f"শুধু একটি ```python``` fenced ব্লকে **শুধুমাত্র** ফাংশন `{fn}` ইমপ্লিমেন্ট করবে।\n"
        "ইনপুট/আউটপুট বা print লিখবে না। প্রয়োজনীয় সব imports **ব্লকের ভিতরেই** দেবে। "
        "বাইরের টেক্সট, ব্যাখ্যা, বা একাধিক ব্লক দেবে না।"
    )

def build_user_prompt(instr: str, tests: list[str], fn: str) -> str:
    tests_comment = "\n".join(f"# {t}" for t in tests[:8])
    return (
        f"{instr.strip()}\n\n"
        "উদাহরণ টেস্টসমূহ (কমেন্ট আকারে):\n"
        f"{tests_comment}\n\n"
        f"ফাংশনের নাম অবশ্যই `{fn}` হবে এবং একটিমাত্র ```python``` fenced ব্লকে কোড দেবে।"
    )

def serialize_chat_for_sft(tokenizer, system_text: str, user_text: str, code_py: str) -> str:
    model_text = f"```python\n{code_py.strip()}\n```"
    messages = [
        {"role":"system","content": system_text},
        {"role":"user",  "content": user_text},
        {"role":"model", "content": model_text},
    ]
    txt = tokenizer.apply_chat_template(messages, add_generation_prompt=False, tokenize=False)
    return txt.replace("<bos>","")


In [30]:
#build DEV_HAS_TESTS, RSFT source, and a clean DEV_VAL (place after R1)
from sklearn.model_selection import train_test_split

DEV_parsed    = DEV.assign(_p = DEV.get("test_list","").apply(parse_tests_cell))
DEV_HAS_TESTS = DEV_parsed[DEV_parsed["_p"].apply(lambda d: len(d.get("tests",[]))>0)].reset_index(drop=True)

# If there are no rows with tests in DEV, fall back to TRIAL
if len(DEV_HAS_TESTS) == 0:
    print("⚠️ DEV has no usable asserts. Falling back to TRIAL for RSFT/Eval.")
    TRIAL_parsed    = TRIAL.assign(_p = TRIAL.get("test_list","").apply(parse_tests_cell))
    DEV_HAS_TESTS   = TRIAL_parsed[TRIAL_parsed["_p"].apply(lambda d: len(d.get("tests",[]))>0)].reset_index(drop=True)

DEV_RSFT, DEV_VAL = train_test_split(DEV_HAS_TESTS, test_size=0.2, random_state=42, shuffle=True)

print(f"DEV total={len(DEV)} | with tests={len(DEV_HAS_TESTS)} | RSFT-source={len(DEV_RSFT)} | DEV-val={len(DEV_VAL)}")

# Helper: always return a dataframe with tests
def pick_eval_df():
    for name in ["DEV_VAL","DEV_HAS_TESTS","TRIAL"]:
        if name in globals():
            df = globals()[name]
            parsed = df.assign(_p=df.get("test_list","").apply(parse_tests_cell))
            have = parsed[parsed["_p"].apply(lambda d: len(d.get("tests",[]))>0)]
            if len(have) > 0:
                print(f"[Eval] Using {name} with {len(have)}/{len(df)} rows that have tests.")
                return have.reset_index(drop=True)
    raise RuntimeError("No dataframe with usable asserts found.")


DEV total=500 | with tests=500 | RSFT-source=400 | DEV-val=100


In [None]:
#generate K samples per prompt, keep only those that pass all asserts (fixed: no nonlocal)
from datasets import Dataset
import torch

# ------- knobs -------
RSFT_MAX_ROWS        = len(DEV_RSFT)   # how many prompts to try
RSFT_K               = 10                      # samples per prompt
RSFT_TEMPERATURE     = 0.3
RSFT_TOP_P           = 0.95
RSFT_MAX_NEW_TOKENS  = 512
RSFT_PER_PROMPT_MAX  = 5                       # keep up to N winners per prompt (diversity)

# generator that uses the same chat shape as SFT/Callback (system + user)
def _gen_once_system_user(model, tokenizer, system_text, user_text,
                          *, do_sample, temperature, top_p, max_new_tokens):
    messages = [
        {"role":"system","content": system_text},
        {"role":"user",  "content": user_text},
    ]
    chat = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
    toks = tokenizer(chat, return_tensors="pt").to(getattr(model, "device", "cuda"))
    pad_id = tokenizer.pad_token_id or tokenizer.eos_token_id
    was_training = model.training
    model.eval()
    with torch.no_grad():
        out = model.generate(
            **toks,
            max_new_tokens=max_new_tokens,
            do_sample=do_sample,
            temperature=temperature,
            top_p=top_p,
            use_cache=False,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=pad_id,
        )
    if was_training: model.train()
    return tokenizer.decode(out[0], skip_special_tokens=True)

# pick the RSFT source set prepared in R0
source_df = DEV_RSFT if 'DEV_RSFT' in globals() else DEV_has_tests
picked = source_df.sample(min(RSFT_MAX_ROWS, len(source_df)), random_state=123).reset_index(drop=True)

winners_texts = []
attempted = 0
found_prompts_with_at_least_one = 0

for i, row in picked.iterrows():
    info  = parse_tests_cell(row.get("test_list", ""))
    tests = info["tests"]
    if not tests:
        continue
    fn = info["fn"] or "solve"
    system_text = build_system_prompt(fn)                   # from R1
    user_text   = build_user_prompt(row["instruction"], tests, fn)  # from R1

    codes_seen = set()
    winners_for_this_prompt = 0

    for _ in range(RSFT_K):
        gen = _gen_once_system_user(
            model, tokenizer, system_text, user_text,
            do_sample=True, temperature=RSFT_TEMPERATURE, top_p=RSFT_TOP_P,
            max_new_tokens=RSFT_MAX_NEW_TOKENS,
        )
        ok, tot = run_asserts_module_permissive(gen, tests)
        if tot > 0 and ok == tot:
            code = extract_first_code_block(gen)
            if code and ("def " in code) and (code not in codes_seen):
                codes_seen.add(code)
                winners_texts.append({
                    "text": serialize_chat_for_sft(tokenizer, system_text, user_text, code)  # from R1
                })
                winners_for_this_prompt += 1
                if winners_for_this_prompt >= RSFT_PER_PROMPT_MAX:
                    break

    attempted += 1
    found_prompts_with_at_least_one += int(winners_for_this_prompt > 0)
    print(f"[RSFT] prompt {i+1}/{len(picked)}: winners={winners_for_this_prompt} | "
          f"cumulative found={found_prompts_with_at_least_one}")

print(f"[RSFT] prompts attempted={attempted}, "
      f"prompts with ≥1 winner={found_prompts_with_at_least_one}, "
      f"total winners kept={len(winners_texts)}, "
      f"win-rate={found_prompts_with_at_least_one/max(1,attempted):.3f}")

rsft_ds = Dataset.from_list(winners_texts) if winners_texts else None
print(rsft_ds)


In [None]:
rsft_ds = Dataset.from_list(winners_texts) if winners_texts else None
print(rsft_ds)

In [31]:

#Compatibility fallbacks in case R1 wasn't executed yet ---
try:
    build_system_prompt
except NameError:
    def build_system_prompt(fn: str) -> str:
        return (
            "তুমি একটি কোড-জেনারেশন সহকারী।\n"
            f"শুধু একটি ```python``` fenced ব্লকে **শুধুমাত্র** ফাংশন `{fn}` ইমপ্লিমেন্ট করবে।\n"
            "ইনপুট/আউটপুট বা print লিখবে না। প্রয়োজনীয় সব imports **ব্লকের ভিতরেই** দেবে। "
            "বাইরের টেক্সট, ব্যাখ্যা, বা একাধিক ব্লক দেবে না।"
        )
try:
    build_user_prompt
except NameError:
    def build_user_prompt(instr: str, tests: list[str], fn: str) -> str:
        tests_comment = "\n".join(f"# {t}" for t in tests[:8])
        return (
            f"{instr.strip()}\n\n"
            "উদাহরণ টেস্টসমূহ (কমেন্ট আকারে):\n"
            f"{tests_comment}\n\n"
            f"ফাংশনের নাম অবশ্যই `{fn}` হবে এবং একটিমাত্র ```python``` fenced ব্লকে কোড দেবে।"
        )
try:
    serialize_chat_for_sft
except NameError:
    def serialize_chat_for_sft(tokenizer, system_text: str, user_text: str, code_py: str) -> str:
        model_text = f"```python\n{code_py.strip()}\n```"
        messages = [
            {"role":"system","content": system_text},
            {"role":"user",  "content": user_text},
            {"role":"model", "content": model_text},
        ]
        txt = tokenizer.apply_chat_template(messages, add_generation_prompt=False, tokenize=False)
        return txt.replace("<bos>","")
# --- end fallbacks ---

#Augment RSFT dataset with TRIAL gold that pass public asserts
from datasets import Dataset

def _check_gold_row(row):
    p = parse_tests_cell(row.get("test_list"))
    tests = p.get("tests", []) if isinstance(p, dict) else []
    if not tests: return None
    code = str(row.get("response") or "").strip()
    if not code: return None
    # Build module with permissive defaults and test
    expected = _expected_fn_from_asserts(tests)
    mod = _make_mod_permissive(code, expected)
    if mod is None: return None
    sanitized = _sanitize_asserts(tests)
    ok = 0
    for s in sanitized:
        try:
            exec(s, mod.__dict__, mod.__dict__)
            ok += 1
        except Exception:
            pass
    return code if (ok == len(sanitized) and ok > 0) else None

trial_texts = []
if "TRIAL" in globals():
    for _, r in TRIAL.iterrows():
        code = _check_gold_row(r)
        if not code:
            continue
        p = parse_tests_cell(r.get("test_list"))
        tests = p.get("tests", []) if isinstance(p, dict) else []
        fn = (p.get("fn") if isinstance(p, dict) else None) or _expected_fn_from_asserts(tests)
        system_text = build_system_prompt(fn)
        user_text   = build_user_prompt(r.get("instruction",""), tests, fn)

        trial_texts.append({"text": serialize_chat_for_sft(tokenizer, system_text, user_text, code)})

trial_ok_ds = Dataset.from_list(trial_texts) if trial_texts else None

# Merge with RSFT-mined winners
if "rsft_ds" in globals() and rsft_ds and trial_ok_ds:
    from datasets import concatenate_datasets
    train_ds = concatenate_datasets([rsft_ds, trial_ok_ds]).shuffle(seed=42)
elif "rsft_ds" in globals() and rsft_ds:
    train_ds = rsft_ds
else:
    train_ds = trial_ok_ds

# Ensure downstream trainer uses the merged dataset
rsft_ds = train_ds
print("✅ R3: train dataset prepared:", rsft_ds)


✅ R3: train dataset prepared: Dataset({
    features: ['text'],
    num_rows: 580
})




In [32]:
#persist RSFT training dataset to disk
import os
SAVE_DIR = "blp-rsft-out/"
os.makedirs(SAVE_DIR, exist_ok=True)

HF_DIR = os.path.join(SAVE_DIR, "rsft_ds.hf")
JSONL_PATH = os.path.join(SAVE_DIR, "rsft_ds.jsonl")
CSV_PATH = os.path.join(SAVE_DIR, "rsft_ds.csv")

rsft_ds.save_to_disk(HF_DIR)
rsft_ds.to_json(JSONL_PATH, orient="records", lines=True, force_ascii=False)
rsft_ds.to_csv(CSV_PATH)

print(f"✅ Saved: {HF_DIR}\n✅ Saved: {JSONL_PATH}\n✅ Saved: {CSV_PATH}")
print(rsft_ds.select(range(min(2, len(rsft_ds)))))  # quick peek


Saving the dataset (1/1 shards): 100%|██████████| 580/580 [00:00<00:00, 12859.16 examples/s]
Creating json from Arrow format: 100%|██████████| 1/1 [00:00<00:00, 91.16ba/s]
Creating CSV from Arrow format: 100%|██████████| 1/1 [00:00<00:00, 49.03ba/s]

✅ Saved: blp-rsft-out/rsft_ds.hf
✅ Saved: blp-rsft-out/rsft_ds.jsonl
✅ Saved: blp-rsft-out/rsft_ds.csv
Dataset({
    features: ['text'],
    num_rows: 2
})





In [33]:
#restore rsft_ds from disk (Arrow / JSONL / CSV), dedup & shuffle
import os
from datasets import load_from_disk, load_dataset, Dataset

# <<< SET THIS to where you saved it >>>
ART_DIR    = "blp-rsft-out/"          # e.g., "artifacts" if you saved there
HF_DIR     = os.path.join(ART_DIR, "rsft_ds.hf")
JSONL_PATH = os.path.join(ART_DIR, "rsft_ds.jsonl")
CSV_PATH   = os.path.join(ART_DIR, "rsft_ds.csv")

def _load_any():
    if os.path.isdir(HF_DIR):
        print(f"Loading HF Arrow from {HF_DIR}")
        return load_from_disk(HF_DIR)
    if os.path.exists(JSONL_PATH):
        print(f"Loading JSONL from {JSONL_PATH}")
        return load_dataset("json", data_files=JSONL_PATH, split="train")
    if os.path.exists(CSV_PATH):
        print(f"Loading CSV from {CSV_PATH}")
        return load_dataset("csv", data_files=CSV_PATH, split="train")
    raise FileNotFoundError("Could not find rsft_ds.hf / rsft_ds.jsonl / rsft_ds.csv at " + ART_DIR)

rsft_ds = _load_any()

# Ensure 'text' field exists (SFTTrainer expects dataset_text_field="text")
if "text" not in rsft_ds.column_names:
    raise ValueError(
        f"'text' column not found in loaded dataset. Columns={rsft_ds.column_names}.\n"
        "If you saved instruction/response instead of serialized 'text', either re-save with 'text' "
        "or map to 'text' via your serialize_chat_for_sft()."
    )

# Drop empties, de-duplicate exact 'text', and shuffle deterministically
df_tmp = rsft_ds.to_pandas()
before = len(df_tmp)
df_tmp = df_tmp.dropna(subset=["text"])
df_tmp = df_tmp[df_tmp["text"].astype(str).str.strip() != ""]
df_tmp = df_tmp.drop_duplicates(subset=["text"]).sample(frac=1.0, random_state=42).reset_index(drop=True)
after = len(df_tmp)

rsft_ds = Dataset.from_pandas(df_tmp, preserve_index=False)
assert len(rsft_ds) > 0, "rsft_ds ended up empty after cleaning!"

print(f"✅ rsft_ds restored | rows={len(rsft_ds)} | deduped={before - after}")

# (Optional) quick peek
print(rsft_ds.select(range(min(2, len(rsft_ds)))))


Loading HF Arrow from blp-rsft-out/rsft_ds.hf
✅ rsft_ds restored | rows=448 | deduped=132
Dataset({
    features: ['text'],
    num_rows: 2
})


In [34]:
rsft_ds

Dataset({
    features: ['text'],
    num_rows: 448
})

In [None]:
import os, pandas as pd, json, torch
from transformers import TrainingArguments, TrainerCallback
from trl import SFTTrainer

class SaveBestCodeEval(TrainerCallback):
    def __init__(self, trainer, model, tokenizer, dev_df, tests_col="test_list",
                 every_n_steps=10, max_new_tokens=640, save_dir="/content/best-rsft"):
        self.trainer=trainer; self.model=model; self.tok=tokenizer
        parsed = dev_df.assign(_p = dev_df.get(tests_col,"").apply(parse_tests_cell))
        self.dev = parsed[parsed["_p"].apply(lambda d: len(d.get("tests",[]))>0)].reset_index(drop=True)
        self.tests_col=tests_col; self.every=max(1,every_n_steps); self.max_new=max_new_tokens
        self.best=-1.0; self.save_dir=save_dir
        os.makedirs(save_dir, exist_ok=True)

    def _one_eval(self):
        total=passed=0
        for _,r in self.dev.iterrows():
            info=parse_tests_cell(r.get(self.tests_col,"")); tests=info["tests"]
            if not tests: continue
            fn=info["fn"] or "solve"
            sys_t = build_system_prompt(fn)
            usr_t = build_user_prompt(str(r["instruction"]), tests, fn)
            gen = generate_code_with_sys(self.model, self.tok, sys_t, usr_t, do_sample=False, max_new_tokens=self.max_new)
            ok, tot = run_asserts_module_permissive(gen, tests)
            if tot>0:
                total+=1; passed+=int(ok==tot)
        return (passed/total if total else 0.0), passed, total

    def on_step_end(self, args, state, control, **kwargs):
        step=int(state.global_step or 0)
        if step>0 and step%self.every==0:
            rate,p,t = self._one_eval()
            print(f"[CodeEval] step={step} pass@1={rate:.3f} ({p}/{t})")
            try: self.trainer.log({"code/pass_at_1": rate, "code/total": t})
            except: pass
            if t>0 and (rate > self.best + 1e-6):
                self.best = rate
                print(f"[CodeEval] New best {rate:.3f} — saving to {self.save_dir}")
                self.trainer.save_model(self.save_dir)
        return control

# Build/verify RSFT winners first (your R2 must create rsft_ds)
# NOTE: If you haven't run R2 in this session, run it before this cell!

if (rsft_ds is None) or (len(rsft_ds) == 0):
    print("No RSFT winners found — skip RSFT.")
else:
    rsft_args = TrainingArguments(
        output_dir                  = "blp-rsft-out",
        per_device_train_batch_size = 16,
        gradient_accumulation_steps = 4,
        learning_rate               = 5e-5,
        max_steps                   = 60,
        lr_scheduler_type           = "cosine",
        warmup_ratio                = 0.10,
        logging_steps               = 1,
        save_strategy               = "no",
        eval_strategy               = "no",
        bf16                        = True,
        fp16                        = False,
        remove_unused_columns       = False,
        report_to                   = [],
    )

    rsft_trainer = SFTTrainer(
        model              = model,
        tokenizer          = tokenizer,
        args               = rsft_args,
        train_dataset      = rsft_ds,
        dataset_text_field = "text",
        max_seq_length     = 1024,
    )

    # avoid duplicates
    rsft_trainer.callback_handler.callbacks = [
        cb for cb in rsft_trainer.callback_handler.callbacks
        if cb.__class__.__name__ not in ("CodeEvalCallback","SaveBestCodeEval")
    ]

    eval_df = pick_eval_df()
    rsft_trainer.add_callback(SaveBestCodeEval(
        trainer   = rsft_trainer,
        model     = model,
        tokenizer = tokenizer,
        dev_df    = eval_df,
        tests_col = "test_list",
        every_n_steps = 20,
        max_new_tokens = 840,
        save_dir  = "blp-rsft-out/best-rsft/",
    ))

    print("Starting RSFT…")
    model.config.use_cache = False
    rsft_train_result = rsft_trainer.train()
    print(rsft_train_result)


In [None]:
#save refined model
if rsft_ds is not None and len(rsft_ds) > 0:
    SAVE_DIR_RSFT = "blp-rsft-out/blp-sft-dpo/rsft-lora"
    rsft_trainer.save_model(SAVE_DIR_RSFT)
    tokenizer.save_pretrained(SAVE_DIR_RSFT)
    print("✅ RSFT complete and saved to:", SAVE_DIR_RSFT)


In [35]:
#pick adapters (prefer RSFT) and attach to model
import os
from peft import PeftModel

ADAPTER_RSFT = "blp-rsft-out/blp-sft-dpo/rsft-lora"
#ADAPTER_SFT  = "/kaggle/working/blp-sft-dpo/sft-lora"
BEST_ADAPTER_DIR = ADAPTER_RSFT
print("Using adapters from:", BEST_ADAPTER_DIR)

try:
    # If base model (no PEFT yet), wrap with PeftModel.from_pretrained
    if not hasattr(model, "peft_config"):
        model = PeftModel.from_pretrained(model, BEST_ADAPTER_DIR)
    else:
        # If already PEFT, try loading weights into current PEFT container
        try:
            model.load_adapter(BEST_ADAPTER_DIR, adapter_name="inference")
            model.set_adapter("inference")
        except Exception:
            # Some PEFT variants don’t support load_adapter; fall back to re-wrap
            model = PeftModel.from_pretrained(model, BEST_ADAPTER_DIR)
    model.eval()
    print("✅ Adapters attached.")
except Exception as e:
    print("⚠️ Could not attach adapters, proceeding with current in-memory model:", repr(e))


Using adapters from: blp-rsft-out/blp-sft-dpo/rsft-lora
✅ Adapters attached.


In [36]:
#helpers for inference messaging + payload extraction (place above S-run)
import re

def _extract_fn_from_instruction(instr: str) -> str:
    """Best-effort function-name extraction from the instruction text."""
    pats = [
        r'Exammple.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'Example.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'Examples?.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'উদাহরণ.*?\n\s*([A-Za-z_]\w*)\s*\(',
    ]
    for pat in pats:
        m = re.search(pat, instr, flags=re.IGNORECASE | re.MULTILINE)
        if m:
            return m.group(1)
    m_all = re.findall(r'([A-Za-z_]\w*)\s*\(', instr)
    return m_all[-1] if m_all else "solve"

# Fallbacks if helper cells weren't executed
try:
    build_system_prompt
except NameError:
    def build_system_prompt(fn: str) -> str:
        return (
            "তুমি একটি কোড-জেনারেশন সহকারী।\n"
            f"শুধু একটি ```python``` fenced ব্লকে **শুধুমাত্র** ফাংশন `{fn}` ইমপ্লিমেন্ট করবে।\n"
            "ইনপুট/আউটপুট বা print লিখবে না। প্রয়োজনীয় সব imports **ব্লকের ভিতরেই** দেবে। "
            "বাইরের টেক্সট, ব্যাখ্যা, বা একাধিক ব্লক দেবে না।"
        )

def build_messages_for_inference(instr: str):
    """
    Create (system,user) messages for inference, mirroring training constraints.
    """
    fn = _extract_fn_from_instruction(instr)
    user_text = (
        instr.strip() + "\n\n"
        f"ফাংশনের নাম অবশ্যই `{fn}` হবে এবং একটিমাত্র ```python``` fenced ব্লকে কোড দেবে।"
    )
    return [
        {"role": "system", "content": build_system_prompt(fn)},
        {"role": "user",   "content": user_text},
    ]

# Robust code payload extractor
try:
    extract_first_code_block
except NameError:
    extract_first_code_block = None

def extract_model_payload(decoded: str) -> str:
    """
    Return ONLY the python code block from the model's decoded text.
    Falls back gracefully if no fenced block is present.
    """
    s = str(decoded or "")
    if extract_first_code_block:
        code = extract_first_code_block(s)
        if code:
            return code.strip()
    m = re.search(r"```(?:python|py)?\s*\n([\s\S]*?)\n```", s, flags=re.IGNORECASE)
    if m:
        return m.group(1).strip()
    return s.strip()


In [37]:
#assert-mutation fuzzer (process-isolated, timed)
import os, re, ast, builtins, multiprocessing as mp
from typing import Any

# ---- knobs (can also override via env) ----
_FUZZ_TIMEOUT_S   = float(os.environ.get("FUZZ_TIMEOUT_S", "1.5"))   # wall-clock for the whole fuzz batch
_FUZZ_BUDGET      = int(os.environ.get("FUZZ_BUDGET", "8"))          # max mutated calls per problem
_FUZZ_MAX_SIZE    = int(os.environ.get("FUZZ_MAX_SIZE", "100000"))   # cap on len() of outputs to avoid huge objects

# Small preamble: common stdlib allowed in sandbox
_FUZZ_PREAMBLE = """
import math, itertools, functools, operator, bisect, heapq, statistics, collections, re, string, random, sys, datetime
from collections import Counter, defaultdict, deque, OrderedDict
from math import gcd, sqrt, factorial, comb, perm, ceil, floor
from bisect import bisect_left, bisect_right
"""

# --- helpers to parse "assert f( ... ) == <literal>" lines into (args, expected) ---

def _ast_literal_eval_safe(node: ast.AST) -> Any:
    """Safe literal_eval that also allows tuples/lists/dicts/sets of literals."""
    return ast.literal_eval(node)

def _extract_call_pairs(assert_line: str):
    """Return list of (args_list, expected_value_or_None). Only handles `assert f(...) == <literal>`."""
    # Normalize to a Python assert statement
    s = str(assert_line or "").strip()
    if not s.startswith("assert"):
        s = "assert " + s
    try:
        mod = ast.parse(s)
    except Exception:
        return []
    if not mod.body or not isinstance(mod.body[0], ast.Assert):
        return []
    test = mod.body[0].test
    # Only handle comparisons of the form Call == literal
    if isinstance(test, ast.Compare) and isinstance(test.left, ast.Call) and len(test.ops)==1 and isinstance(test.ops[0], ast.Eq):
        call = test.left
        rhs  = test.comparators[0]
        # args/keywords must be literal-evaluable
        try:
            args = [_ast_literal_eval_safe(a) for a in call.args]
            # (ignore keywords for simplicity; most public asserts don't use them)
            expected = _ast_literal_eval_safe(rhs)
            return [(args, expected)]
        except Exception:
            return []
    # If it's just "assert f(...)" without == expected, we cannot derive expected; skip
    return []

# --- value mutators (type-agnostic, conservative) ---

def _mutate_number(n):
    variants = {0, 1, -1}
    try:
        variants |= {n-1, n+1, -n}
        if isinstance(n, int):
            variants |= {n*2, n//2 if n!=0 else 0}
        else:  # float
            variants |= {n*1.1, n*0.9}
    except Exception:
        pass
    # keep within reasonable bounds
    out = []
    for v in variants:
        try:
            if isinstance(v, (int, float)) and abs(v) <= 10**6:
                out.append(v)
        except Exception:
            pass
    return out[:4]

def _mutate_string(s):
    cands = ["", " ", s + "a", s[::-1]]
    if len(s) <= 16:
        cands.append(s*2)
    # add a simple unicode edge
    cands.append(s + "৳")
    # dedup
    seen, out = set(), []
    for v in cands:
        if isinstance(v, str) and v not in seen:
            seen.add(v); out.append(v)
    return out[:4]

def _mutate_sequence(seq):
    try:
        lst = list(seq)
    except Exception:
        return []
    cands = [[], lst[:1], lst[::-1], lst + lst[:1]]
    # slight numeric tweak if ints
    if lst and all(isinstance(x, int) for x in lst):
        cands.append([x+1 for x in lst])
    # uniqueness & shape limit
    out, seen = [], set()
    for v in cands:
        try:
            key = ("list", tuple(v))
            if key not in seen and len(v) <= 64:
                seen.add(key); out.append(type(seq)(v) if not isinstance(seq, list) else v)
        except Exception:
            pass
    return out[:4]

def _mutate_mapping(d):
    if not isinstance(d, dict):
        return []
    keys = list(d.keys())
    cands = [ {}, {**d}, {**d, "__x__": d.get(keys[0], 0) if keys else 1} ]
    if keys:
        d2 = dict(d); d2.pop(keys[0], None); cands.append(d2)
    out, seen = [], set()
    for v in cands:
        try:
            key = ("dict", tuple(sorted(v.items())))
            if key not in seen and len(v) <= 64:
                seen.add(key); out.append(v)
        except Exception:
            pass
    return out[:3]

def _mutate_value(v):
    if isinstance(v, bool):
        return [not v]
    if isinstance(v, (int, float)):
        return _mutate_number(v)
    if isinstance(v, str):
        return _mutate_string(v)
    if isinstance(v, (list, tuple)):
        return _mutate_sequence(v)
    if isinstance(v, dict):
        return _mutate_mapping(v)
    if isinstance(v, set):
        return _mutate_sequence(list(v))
    # unknown types: no mutation
    return []

def _mutate_args(args):
    """Mutate one position at a time to keep changes small."""
    out = []
    for i, a in enumerate(list(args)):
        muts = _mutate_value(a)
        for m in muts:
            new = list(args)
            new[i] = m
            out.append(new)
    return out

# --- child process worker to run baseline + mutated calls safely ---

def _fuzz_worker(code_text: str, fn_name: str, test_pairs, mutated_args_list, conn):
    try:
        # optional resource limits
        try:
            import resource
            mem_mb = int(os.environ.get("FUZZ_MEM_MB", "1024"))
            resource.setrlimit(resource.RLIMIT_AS, (mem_mb*1024*1024, mem_mb*1024*1024))
            cpu_s = int(float(os.environ.get("FUZZ_CPU_S", "0")))
            if cpu_s > 0:
                resource.setrlimit(resource.RLIMIT_CPU, (cpu_s, cpu_s+1))
        except Exception:
            pass

        g = {"__builtins__": builtins.__dict__}
        exec(_FUZZ_PREAMBLE, g, g)

        # Extract code block if fenced
        m = re.search(r"```(?:python|py)?\s*\n([\s\S]*?)\n```", str(code_text or ""), flags=re.I)
        code = (m.group(1) if m else str(code_text or "")).replace("\r\n","\n")

        # Block I/O
        def _blocked(*a, **k): raise RuntimeError("I/O blocked")
        g["open"] = _blocked; g["input"] = _blocked; g["print"] = lambda *a, **k: None

        exec(code, g, g)

        # Resolve function (allow alias if only one callable exists)
        if fn_name not in g:
            cands=[k for k,v in g.items() if callable(v) and not k.startswith("_")]
            if len(cands)==1:
                g[fn_name]=g[cands[0]]
        if fn_name not in g or not callable(g[fn_name]):
            conn.send(False); return
        fn = g[fn_name]

        # Baseline type from the test pairs (use expected literal if available)
        base_types = []
        for (args, expected) in test_pairs:
            try:
                y = fn(*args)
                if expected is not None:
                    base_types.append(type(expected))
                else:
                    base_types.append(type(y))
            except Exception:
                conn.send(False); return
        base_t = base_types[0] if base_types else type(fn(*([]))) if callable(fn) else None

        # Now run mutated calls — must not error/timeout and must return same type
        for args in mutated_args_list:
            y = fn(*args)  # exception -> fail
            # Size guard
            if hasattr(y, "__len__"):
                try:
                    if len(y) > _FUZZ_MAX_SIZE:
                        conn.send(False); return
                except Exception:
                    pass
            if base_t is not None and not isinstance(y, base_t):
                conn.send(False); return

        conn.send(True)
    except Exception:
        conn.send(False)
    finally:
        try: conn.close()
        except: pass

def fuzz_accepts(code_py: str, tests_cell) -> bool:
    """
    Returns True only if:
    - We can parse at least ONE assert of the form `assert f(args) == <literal>`
    - For that function, several MUTATED inputs run without error/timeout and keep the output TYPE.
    """
    # Pull asserts + function name from your parse_tests_cell (fallback included)
    try:
        info  = parse_tests_cell(tests_cell or "")
        asserts = info.get("tests", [])
        fn_name = info.get("fn") or None
    except Exception:
        asserts, fn_name = [], None

    if not asserts:
        return True  # nothing to fuzz against; don't block

    # Collect a few (args, expected) pairs
    pairs = []
    for a in asserts:
        pairs.extend(_extract_call_pairs(a))
        if len(pairs) >= 3:
            break
    if not pairs:
        return True  # asserts not parseable; don't block

    # Infer function name if missing
    if fn_name is None:
        m = re.search(r'assert\s+([A-Za-z_]\w*)\s*\(', asserts[0])
        fn_name = m.group(1) if m else "solve"

    # Build a small set of mutated args from the FIRST pair only (keeps budget small)
    base_args, base_expected = pairs[0]
    muts = _mutate_args(base_args)[: max(1, min(_Fuzz_Budget := _FUZZ_BUDGET, 12))]
    if not muts:
        return True

    # Fire child process to execute fuzz batch with a wall-clock timeout
    parent, child = mp.Pipe(duplex=False)
    p = mp.Process(target=_fuzz_worker, args=(code_py, fn_name, pairs[:1], muts, child), daemon=True)
    p.start(); child.close()
    ok = False
    if parent.poll(_FUZZ_TIMEOUT_S):
        try: ok = bool(parent.recv())
        except EOFError: ok = False
    else:
        try: p.terminate()
        except Exception: pass
        ok = False
    try: p.join(0.2)
    except Exception: pass
    try: parent.close()
    except Exception: pass
    return ok

In [38]:
# PATCH — define strict timed runner if missing + fix _passes_public_tests
import re, builtins, multiprocessing as mp, os

# --- tiny preamble used in child process ---
_COMMON_PREAMBLE = """
import math, itertools, functools, operator, bisect, heapq, statistics, collections, re, string, random, sys, datetime
from collections import Counter, defaultdict, deque, OrderedDict
from math import gcd, sqrt, factorial, comb, perm, ceil, floor
from bisect import bisect_left, bisect_right
"""

def _extract_first_code_block(s: str) -> str | None:
    m = re.search(r"```(?:python|py)?\s*\n([\s\S]*?)\n```", str(s or ""), flags=re.I)
    if m: return m.group(1).strip()
    # fallback: first top-level def
    lines = str(s or "").splitlines()
    start = None
    for i, L in enumerate(lines):
        if re.match(r"^\s*def\s+[A-Za-z_]\w*\s*\(", L):
            start = i; break
    if start is not None:
        buf=[lines[start]]
        for j in range(start+1,len(lines)):
            if re.match(r"^\s*(def|class)\s+[A-Za-z_]\w*\s*\(", lines[j]): break
            buf.append(lines[j])
        code="\n".join(buf).strip()
        if code: return code
    return None

def _expected_fn_from_asserts(asserts):
    for a in asserts:
        m = re.search(r"assert\s+([A-Za-z_]\w*)\s*\(", str(a))
        if m: return m.group(1)
    return None

def _sanitize_asserts(asserts):
    goods=[]
    for raw in asserts:
        s=str(raw).strip()
        if not s.startswith("assert"):
            s = "assert " + s
        try: compile(s, "<assert>", "exec"); goods.append(s)
        except SyntaxError: pass
    return goods

def _runner_worker(code_text: str, asserts: list[str], expected_fn: str|None, conn):
    try:
        # optional resource caps (Linux)
        try:
            import resource
            mem_mb = int(os.environ.get("ASSERT_MEM_MB", "1024"))
            resource.setrlimit(resource.RLIMIT_AS, (mem_mb*1024*1024, mem_mb*1024*1024))
            cpu_s = int(float(os.environ.get("ASSERT_CPU_S", "0")))
            if cpu_s > 0:
                resource.setrlimit(resource.RLIMIT_CPU, (cpu_s, cpu_s+1))
        except Exception:
            pass

        g = {"__builtins__": builtins.__dict__}
        exec(_COMMON_PREAMBLE, g, g)

        code = _extract_first_code_block(code_text) or str(code_text or "")
        # block I/O
        def _blocked(*a, **k): raise RuntimeError("I/O blocked in sandbox")
        g["open"] = _blocked; g["input"]=_blocked; g["print"]=lambda *a, **k: None

        try:
            exec(code, g, g)
        except SyntaxError:
            conn.send(("compile_error", 0, 0)); return
        except Exception:
            conn.send(("runtime_error", 0, 0)); return

        if expected_fn and expected_fn not in g:
            cands=[k for k,v in g.items() if callable(v) and not k.startswith("_")]
            if len(cands)==1:
                g[expected_fn]=g[cands[0]]

        tests = _sanitize_asserts(asserts)
        ok=0
        for s in tests:
            try: exec(s, g, g); ok+=1
            except Exception: pass
        conn.send(("ok", ok, len(tests)))
    except Exception:
        conn.send(("runtime_error", 0, 0))
    finally:
        try: conn.close()
        except: pass

# define only if missing
try:
    run_asserts_module_strict_timed
except NameError:
    def run_asserts_module_strict_timed(code_text: str, asserts: list[str], timeout_s: float = 1.5):
        parent, child = mp.Pipe(duplex=False)
        p = mp.Process(
            target=_runner_worker,
            args=(code_text, asserts, _expected_fn_from_asserts(asserts), child),
            daemon=True
        )
        p.start(); child.close()
        status=("runtime_error", 0, 0)
        if parent.poll(timeout_s):
            try: status = parent.recv()
            except EOFError: status=("runtime_error", 0, 0)
        else:
            try: p.terminate()
            except: pass
            status=("timeout", 0, 0)
        try: p.join(0.2)
        except: pass
        try: parent.close()
        except: pass
        return status

# replace your _passes_public_tests with a robust version (no broken f-string)
def _passes_public_tests(code_py: str, tests_cell: str, timeout_s: float = 1.5) -> bool:
    info  = parse_tests_cell(tests_cell or "")
    tests = info.get("tests", []) if isinstance(info, dict) else []
    if not tests:
        return False
    code_wrapped = "```python\n" + code_py + "\n```"   # avoid f-string mishaps in notebooks
    st, ok, tot = run_asserts_module_strict_timed(code_wrapped, tests, timeout_s=timeout_s)
    return (st == "ok" and tot > 0 and ok == tot)

print("✅ Patched: strict timed runner available and _passes_public_tests fixed.")

✅ Patched: strict timed runner available and _passes_public_tests fixed.


In [49]:
#batched rejection-sampling to build submission.json efficiently
import os, re, json, textwrap, pandas as pd, torch
from tqdm.auto import tqdm
from unsloth.chat_templates import get_chat_template
def _set_local_seed(seed: int):
    import torch, random, numpy as np
    random.seed(seed)
    np.random.seed(seed % (2**32-1))
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# ---------- knobs (tune here) ----------
K_MAX            = 100       # total candidates per prompt (upper bound)
BATCH_K          = 8        # candidates per generation call (tradeoff: VRAM vs speed)
TEMP_MIN, TEMP_MAX = 0.5, 0.9
TOP_P            = 0.95
REP_PENALTY      = 1.07
MAX_NEW_TOKENS   = 640
STRICT_TIMEOUT_S = 1.5      # requires your strict/timed runner cell (T1)
USE_FUZZ         = True     # requires your FZ1 fuzzer cell
SEED_BASE        = 1337     # per-row generator seed = SEED_BASE + row_id
GREEDY_FALLBACK  = True

# ---------- small helpers ----------
def _extract_fn_from_instruction(instr: str) -> str:
    pats = [
        r'Exammple.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'Example.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'Examples?.*?\n\s*([A-Za-z_]\w*)\s*\(',
        r'উদাহরণ.*?\n\s*([A-Za-z_]\w*)\s*\(',
    ]
    for pat in pats:
        m = re.search(pat, instr, flags=re.IGNORECASE | re.MULTILINE)
        if m: return m.group(1)
    m_all = re.findall(r'([A-Za-z_]\w*)\s*\(', instr)
    return m_all[-1] if m_all else "solve"

def build_messages_for_inference(instr: str):
    fn = _extract_fn_from_instruction(instr)
    user_text = (
        instr.strip() + "\n\n"
        f"ফাংশনের নাম অবশ্যই `{fn}` হবে এবং একটিমাত্র ```python``` fenced ব্লকে কোড দেবে।"
    )
    return [
        {"role":"system","content":
         "তুমি একটি কোড-জেনারেশন সহকারী।\n"
         "শুধু একটি ```python``` fenced ব্লকে **শুধুমাত্র** নির্দিষ্ট ফাংশন ইমপ্লিমেন্ট করবে।\n"
         "ইনপুট/আউটপুট লিখবে না; সব imports ব্লকের ভেতরেই দেবে।"},
        {"role":"user","content": user_text},
    ]

def extract_model_payload(decoded: str) -> str:
    m = re.search(r"```(?:python|py)?\s*\n([\s\S]*?)\n```", str(decoded or ""), flags=re.IGNORECASE)
    return (m.group(1) if m else str(decoded or "")).strip()

def post_sanitize_code(payload: str, instr: str) -> str:
    s = str(payload or "")
    s = re.sub(r"```(?:python|py)?\s*", "", s, flags=re.IGNORECASE).replace("```","")
    s = s.replace("\r\n","\n")
    fn = _extract_fn_from_instruction(instr)
    def _rename_first_def(m): return m.group(0).replace(f"def {m.group(1)}(", f"def {fn}(", 1)
    s = re.sub(r'^\s*def\s+([A-Za-z_]\w*)\s*\(', _rename_first_def, s, count=1, flags=re.M)
    s = re.sub(r'\bprint\s*\(', '# removed print(', s)
    s = re.sub(r'\binput\s*\(', '# removed input(', s)
    s = re.sub(r'if\s+__name__\s*==\s*[\'"]__main__[\'"]:\s*\n(?:.*\n?)*$', '', s, flags=re.M)
    # keep imports + first def only
    lines = s.splitlines(); out, seen_def = [], False; i=0
    while i < len(lines):
        ln = lines[i]
        if not seen_def and re.match(r'^\s*(import\s+\S+|from\s+\S+\s+import)', ln): out.append(ln)
        if re.match(r'^\s*def\s+[A-Za-z_]\w*\s*\(', ln):
            seen_def = True; out.append(ln); i += 1
            while i < len(lines) and not re.match(r'^\s*(def|class)\s+\w', lines[i]):
                out.append(lines[i]); i += 1
            break
        i += 1
    s = "\n".join(out).strip()
    try: compile(s+"\n","<san>","exec")
    except Exception: pass
    return textwrap.dedent(s).strip()

def ensure_compilable_module(code: str) -> str:
    try:
        compile(code + "\n", "<sub>", "exec"); return code
    except Exception:
        # keep only first def + imports (already done in sanitizer)
        return code

# strict/timed assert runner shim (assumes you added T1 cell)
def _passes_public_tests(code_py: str, tests_cell: str) -> bool:
    info  = parse_tests_cell(tests_cell or "")
    tests = info.get("tests", []) if isinstance(info, dict) else []
    if not tests:  # nothing to validate against
        return False
    st, ok, tot = run_asserts_module_strict_timed(f"```python\n{code_py}\n```", tests, timeout_s=STRICT_TIMEOUT_S)
    return (st == "ok" and tot > 0 and ok == tot)

# ---------- align tokenizer/template & device ----------
tokenizer = get_chat_template(tokenizer, chat_template="gemma-3")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
pad_id = tokenizer.pad_token_id or tokenizer.eos_token_id
eos_id = tokenizer.eos_token_id
model.eval()
torch.backends.cuda.matmul.allow_tf32 = True  # speedup on Ampere+

# ---------- load eval CSV ----------
DEV_PATH = "test_v1.csv"   # UPDATE if needed
df = pd.read_csv(DEV_PATH, dtype=str, keep_default_na=False, engine="python")
cols_lower = {c.lower(): c for c in df.columns}
assert "instruction" in cols_lower, f"Missing 'instruction' column. Found: {list(df.columns)}"
inst_col = cols_lower["instruction"]
id_col   = cols_lower.get("id", None) or "id"
if "id" not in df.columns:
    df["id"] = range(len(df))
df[id_col] = pd.to_numeric(df[id_col], downcast="integer", errors="coerce").fillna(-1).astype(int)

# Preparse public tests once (saves work)
def _pre_tests(row):
    info = parse_tests_cell(row.get("test_list",""))
    return {"tests": info.get("tests", []), "fn": info.get("fn")}
df["_pre"] = df.apply(_pre_tests, axis=1)

# ---------- main loop (batched K with early-stop) ----------
rows = []
for _, r in tqdm(df.iterrows(), total=len(df), desc="Generating (RS-fast)"):
    rid        = int(r[id_col])
    instr      = str(r[inst_col])
    tests_cell = r.get("test_list", "")
    tests_list = r["_pre"]["tests"] if isinstance(r.get("_pre"), dict) else []

    # Build inputs ONCE per prompt
    messages = build_messages_for_inference(instr)
    chat_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
    inputs = tokenizer(chat_text, return_tensors="pt")
    inputs = {k: v.to(model.device) for k, v in inputs.items()}

    accepted = None
    codes_seen = set()
    tries_left = K_MAX
    batch_idx  = 0

    # Quick path: if no public tests, just greedy once
    if not tests_list:
        with torch.inference_mode():
            out = model.generate(
                **inputs, do_sample=False, max_new_tokens=MAX_NEW_TOKENS,
                use_cache=True, eos_token_id=eos_id, pad_token_id=pad_id,
            )
        decoded = tokenizer.decode(out[0], skip_special_tokens=True)
        code = post_sanitize_code(extract_model_payload(decoded), instr)
        rows.append({"id": rid, "response": ensure_compilable_module(code)})
        continue
    
    # RS in batches
    while tries_left > 0 and accepted is None:
        cur_k = min(BATCH_K, tries_left)
        # simple temperature schedule across batches
        cur_temp = float(TEMP_MIN + (TEMP_MAX - TEMP_MIN) * (batch_idx / max(1, (K_MAX + BATCH_K - 1)//BATCH_K - 1)))

        _set_local_seed(SEED_BASE + rid + batch_idx * 1000)

        with torch.inference_mode():
            outs = model.generate(
                **inputs,
                do_sample=True,
                num_return_sequences=cur_k,
                temperature=cur_temp,
                top_p=TOP_P,
                repetition_penalty=REP_PENALTY,
                max_new_tokens=MAX_NEW_TOKENS,
                use_cache=True,              # keep cache on for speed
                eos_token_id=eos_id,
                pad_token_id=pad_id,
            )

        # batch decode & screen
        decs = tokenizer.batch_decode(outs, skip_special_tokens=True)
        for dec in decs:
            payload = extract_model_payload(dec)
            code = post_sanitize_code(payload, instr)
            code = ensure_compilable_module(code)
            # dedup identical code to avoid re-evaluating
            sig = hash(code)
            if sig in codes_seen: 
                continue
            codes_seen.add(sig)

            ok_pub = _passes_public_tests(code, tests_cell)
            ok_fzz = True
            if USE_FUZZ and ok_pub:
                try:
                    ok_fzz = fuzz_accepts(code, tests_cell)
                except Exception:
                    ok_fzz = True  # don't block if fuzzer hiccups

            if ok_pub and ok_fzz:
                accepted = code
                break

        tries_left -= cur_k
        batch_idx  += 1

    # Fallback greedy if nothing accepted
    if accepted is None and GREEDY_FALLBACK:
        with torch.inference_mode():
            out = model.generate(
                **inputs, do_sample=False, max_new_tokens=MAX_NEW_TOKENS,
                use_cache=True, eos_token_id=eos_id, pad_token_id=pad_id,
            )
        decoded = tokenizer.decode(out[0], skip_special_tokens=True)
        accepted = ensure_compilable_module(post_sanitize_code(extract_model_payload(decoded), instr))

    rows.append({"id": rid, "response": accepted})

# ---------- save ----------
out_df = pd.DataFrame(rows)
out_df.to_json("submission.json", orient="records", force_ascii=False, indent=2)
try:
    out_df.to_json("submission.json", orient="records", force_ascii=False, indent=2)
except Exception:
    pass
print(f"✅ submission.json written: {len(out_df)} rows | K_MAX={K_MAX}, BATCH_K={BATCH_K}")

Generating (RS-fast):  56%|█████▋    | 282/500 [2:56:45<4:22:03, 72.13s/it]usage: ipykernel_launcher.py [-h] [-v] [-q] [--locals] [--durations N] [-f]
                             [-c] [-b] [-k TESTNAMEPATTERNS]
                             [tests ...]
ipykernel_launcher.py: error: argument -f/--failfast: ignored explicit argument '/home/nufayer/.local/share/jupyter/runtime/kernel-v336809accdec0bf16b9a2dc6aa04c38e41677eb80.json'
Generating (RS-fast): 100%|██████████| 500/500 [5:31:51<00:00, 39.82s/it]

✅ submission.json written: 500 rows | K_MAX=100, BATCH_K=8





In [52]:
import json, ast, re, difflib, types
import pandas as pd
from pathlib import Path
import zipfile 

# ---- paths (adjust if needed) ----
CSV_PATH = Path("test_v1.csv")
SUB_PATH = Path("submission.json")

# ---- helpers (no intermediate prints/logs) ----
def strip_code_fences(s: str) -> str:
    if not isinstance(s, str):
        s = str(s)
    s = s.strip()
    if s.startswith("```"):
        body = s[3:]
        if "\n" in body:
            body = body.split("\n", 1)[1]
        if "```" in body:
            body = body.rsplit("```", 1)[0]
        return body.strip()
    return s

def parse_tests(raw) -> list:
    raw = str(raw)
    x = ast.literal_eval(raw)
    if isinstance(x, str):
        x = ast.literal_eval(x)
    if not isinstance(x, (list, tuple)):
        raise ValueError("test_list parsed to non-list")
    return [str(t) for t in x]

FN_RE = re.compile(r"\b([A-Za-z_]\w*)\s*\(")
def expected_function_names(assert_list):
    from collections import Counter
    names = []
    for a in assert_list:
        for m in FN_RE.finditer(a):
            names.append(m.group(1))
    return [n for n, _ in Counter(names).most_common()]

def ensure_function(ns: dict, expected: str):
    if expected in ns and isinstance(ns[expected], types.FunctionType):
        return expected, False
    fns = [k for k, v in ns.items() if isinstance(v, types.FunctionType)]
    if not fns:
        return None, False
    def norm(s): return re.sub(r"_+", "", s).lower()
    target = norm(expected)
    for cand in sorted(fns, key=lambda x: (norm(x) != target, x)):
        if norm(cand) == target:
            ns[expected] = ns[cand]
            return cand, True
    close = difflib.get_close_matches(expected, fns, n=1, cutoff=0.82)
    if close:
        cand = close[0]
        ns[expected] = ns[cand]
        return cand, True
    if len(fns) == 1:
        cand = fns[0]
        ns[expected] = ns[cand]
        return cand, True
    return None, False

# ---- load data ----
df = pd.read_csv(CSV_PATH)
with open(SUB_PATH, "r", encoding="utf-8") as f:
    sub = json.load(f)

# id -> response
if isinstance(sub, list):
    id_to_resp = {}
    for row in sub:
        if isinstance(row, dict):
            id_key = next((k for k in ("id","ID","sample_id","idx") if k in row), None)
            resp_key = next((k for k in ("response","output","code","generated_code","prediction") if k in row), None)
            if id_key is not None and resp_key is not None:
                id_to_resp[str(row[id_key])] = row[resp_key]
else:
    id_to_resp = {str(k): v for k, v in sub.items()}

# ---- run tests (silent) ----
counts = {
    "PASS": 0,
    "FAIL_ASSERT": 0,
    "RUNTIME_ERROR": 0,
    "COMPILE_ERROR": 0,
    "MISSING_CODE": 0,
    "PARSE_FAIL": 0,
}
total = len(df)

for _, r in df.iterrows():
    rid = str(r.get("id", ""))
    try:
        tests = parse_tests(r["test_list"])
    except Exception:
        counts["PARSE_FAIL"] += 1
        continue

    resp = id_to_resp.get(rid)
    if resp is None:
        counts["MISSING_CODE"] += 1
        continue

    code = strip_code_fences(resp)
    ns = {}
    try:
        exec(code, ns, ns)
    except Exception:
        counts["COMPILE_ERROR"] += 1
        continue

    names = expected_function_names(tests)
    expected = names[0] if names else None
    if expected:
        ensure_function(ns, expected)  # silent alias if possible

    ok = True
    for a in tests:
        try:
            exec(a, ns, ns)
        except AssertionError:
            counts["FAIL_ASSERT"] += 1
            ok = False
            break
        except Exception:
            counts["RUNTIME_ERROR"] += 1
            ok = False
            break

    if ok:
        counts["PASS"] += 1

# ---- single fancy print with details ----
passes = counts["PASS"]
pct = (passes / total) * 100 if total else 0.0
fails = total - passes
bar_len = 28
filled = int(round(bar_len * pct / 100))
bar = "█" * filled + "░" * (bar_len - filled)

# ANSI styles
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"

print(
    f"\n{BOLD}Pass@1 Summary{RESET}\n"
    f"{BOLD}{pct:6.2f}%{RESET}  {bar}  {BOLD}{passes}{RESET}/{total} passed\n"
    f"{DIM}Failures: {fails} = "
    f"Assertion {counts['FAIL_ASSERT']} | "
    f"Runtime {counts['RUNTIME_ERROR']} | "
    f"Compile {counts['COMPILE_ERROR']} | "
    f"Missing {counts['MISSING_CODE']} | "
    f"Parse {counts['PARSE_FAIL']}{RESET}\n"
)




[1mPass@1 Summary[0m
[1m 49.00%[0m  ██████████████░░░░░░░░░░░░░░  [1m245[0m/500 passed
[2mFailures: 255 = Assertion 194 | Runtime 58 | Compile 3 | Missing 0 | Parse 0[0m



In [53]:
import json, os, re, zipfile

SUB_PATH = "submission.json"

def file_format_check(path: str) -> bool:
    if os.path.basename(path) != "submission.json":
        print("Error: File name must be exactly 'submission.json'")
        return False
    if not path.lower().endswith(".json"):
        print("Error: File must have .json extension")
        return False
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON format - {e}")
        print("Note: The file must be in proper JSON format (not JSONL)")
        return False
    if not isinstance(data, list):
        print("Error: The root element should be a list of objects")
        return False
    for idx, item in enumerate(data):
        if not isinstance(item, dict):
            print(f"Error: Item at index {idx} is not a dictionary"); return False
        keys = set(item.keys())
        if keys != {"id", "response"}:
            print(f"Error: Item at index {idx} must contain only keys 'id' and 'response', found: {keys}")
            return False
        if not isinstance(item["id"], int):
            print(f"Error: 'id' field at index {idx} must be an integer"); return False
        if not isinstance(item["response"], str):
            print(f"Error: 'response' field at index {idx} must be a string"); return False
    print("Format check passed successfully!")
    return True

# --- Robust fence matcher: accepts ```python / ```py / ``` (no lang), LF/CRLF, trailing ws ---
FENCE_ANY = re.compile(
    r"""\A\s*```                # opening
        [ \t]*(?:python|py)?    # optional language tag
        [ \t]*\r?\n
        [\s\S]*?                # code
        \r?\n```\s*\Z           # closing, optional trailing ws
    """,
    re.IGNORECASE | re.VERBOSE,
)

def normalize_newlines(s: str) -> str:
    return str(s or "").replace("\r\n","\n").replace("\r","\n")

def ensure_python_fence(resp: str) -> str:
    """
    Guarantee the response is a ```python fenced block.
    If already fenced with any tag (or none), normalize the opening to ```python
    and ensure a closing fence exists.
    """
    s = normalize_newlines(resp).strip("\n")

    if FENCE_ANY.fullmatch(s):
        # Already fenced: normalize opening to ```python
        nl = s.find("\n")
        if nl == -1:
            # pathological but wrap anyway
            body = ""
        else:
            body = s[nl+1:]
        # ensure closing fence is present (it is, because FENCE_ANY matched)
        return f"```python\n{body}\n```"

    # If it starts with a fence but didn't match fully (e.g., missing closing),
    # try to coerce it by stripping first line and adding closing.
    if s.lstrip().startswith("```"):
        lines = s.split("\n", 1)
        body = lines[1] if len(lines) > 1 else ""
        return f"```python\n{body}\n```"

    # Plain code: wrap it
    return f"```python\n{s}\n```"

def item_format_ok(item):
    return (
        isinstance(item, dict)
        and set(item.keys()) == {"id", "response"}
        and isinstance(item["id"], int)
        and isinstance(item["response"], str)
    )

# ---------- Load ----------
with open(SUB_PATH, "r", encoding="utf-8") as f:
    data = json.load(f)

n = len(data)

def is_fenced_python(s: str) -> bool:
    s = normalize_newlines(s).strip()
    return bool(FENCE_ANY.fullmatch(s))

def count_fenced(rows):
    return sum(1 for it in rows if isinstance(it, dict) and is_fenced_python(it.get("response","")))

print(f"[BEFORE] Fencing valid: {count_fenced(data)}/{n}")

# ---------- Guard: normalize ALL responses to ```python fenced blocks ----------
for i, item in enumerate(data):
    if not item_format_ok(item):
        # keep as-is; file_format_check will flag hard errors later if present
        continue
    data[i]["response"] = ensure_python_fence(item["response"])

print(f"[AFTER ] Fencing valid:  {count_fenced(data)}/{n}")

# ---------- Save (id+response only) ----------
with open(SUB_PATH, "w", encoding="utf-8") as f:
    json.dump(
        [{"id": item["id"], "response": item["response"]} for item in data],
        f, ensure_ascii=False, indent=2
    )
print("✅ Updated submission.json with guarded Python fences.")

# Final file-level check
_ = file_format_check(SUB_PATH)

# Zip as submission.zip
with zipfile.ZipFile("submission.zip", "w", compression=zipfile.ZIP_DEFLATED) as zf:
    zf.write(SUB_PATH)
print("📦 Created submission.zip containing submission.json.")

[BEFORE] Fencing valid: 0/500
[AFTER ] Fencing valid:  500/500
✅ Updated submission.json with guarded Python fences.
Format check passed successfully!
📦 Created submission.zip containing submission.json.
