In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
pip install -q "transformers>=4.42" "accelerate>=0.33" bitsandbytes sentencepiece


In [None]:
import os, re, json, textwrap, tempfile, subprocess, sys, time, traceback
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

# Pick an open-source code model (fits on Kaggle T4 in 4-bit)
MODEL_ID = os.environ.get("MODEL_ID", "Qwen/Qwen2.5-Coder-7B-Instruct")

print("CUDA available:", torch.cuda.is_available())
print("Loading model… (this can take a couple of minutes the first time)")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    device_map="auto",        # accelerate places shards/devices for us
    torch_dtype=torch.float16,
    load_in_4bit=True,        # 4-bit quantization -> fits T4
)

# 🚫 Do NOT pass `device=` here since we used device_map="auto"
gen = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
)

print("Model loaded:", MODEL_ID)


In [None]:
# Simple chat helper for instruction-tuned models
def chat(messages, max_new_tokens=1024, temperature=0.2, top_p=0.95):
    prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    out = gen(
        prompt,
        do_sample=temperature > 0,
        temperature=temperature,
        top_p=top_p,
        max_new_tokens=max_new_tokens,
        eos_token_id=[tokenizer.eos_token_id, tokenizer.convert_tokens_to_ids("<|im_end|>")],
    )[0]["generated_text"]
    return out[len(prompt):].strip()

# Extract ```python ... ``` blocks (or plain ``` ... ```)
import re
CODE_FENCE = re.compile(r"```(?:python|py|Python)?\s*(.*?)```", re.DOTALL)
def extract_code_blocks(text: str):
    blocks = CODE_FENCE.findall(text)
    return [b.strip() for b in blocks] if blocks else []


In [None]:
# Safer construction: avoid raw ``` inside f-strings
FENCE = "```"

def prompt_planning(problem, language="Python"):
    return (
        f"You are a programmer tasked with generating a **plan** to solve the problem using {language}.\n"
        "Do NOT write code.\n\n"
        "## Problem\n"
        f"{problem}\n\n"
        "### Expected Output format\n"
        "### Problem Understanding\n"
        "- Briefly restate the task and constraints.\n\n"
        "### Algorithm to solve the original problem\n"
        "- Name the approach and why it fits.\n"
        "- Important edge cases.\n\n"
        "### Plan\n"
        "- A numbered, step-by-step plan that is detailed but code-free.\n"
    ).strip()


def prompt_simulate_plan(problem, plan, language="Python"):
    return (
        "You are verifying whether the plan will produce the correct output.\n\n"
        "## Problem\n"
        f"{problem}\n\n"
        "### Plan\n"
        f"{plan}\n\n"
        "### Expected Output format\n"
        "### Simulation\n"
        "- Choose a simple sample input and apply the plan step-by-step to produce output.\n"
        "- Compare with the expected result implied by the problem.\n\n"
        "### Plan Evaluation\n"
        "- Print exactly one line: either **No Need to Modify Plan** OR **Plan Modification Needed** (and say why).\n"
    ).strip()


def prompt_refine_plan(problem, plan, sim_report, language="Python"):
    return (
        "You have a plan that needs improvements based on the simulation report.\n\n"
        "## Problem\n"
        f"{problem}\n\n"
        "### Current Plan\n"
        f"{plan}\n\n"
        "### Simulation Report\n"
        f"{sim_report}\n\n"
        "### Expected Output format\n"
        "### New Plan\n"
        "- Provide ONLY a corrected step-by-step plan. No explanations, no code.\n"
    ).strip()


def prompt_code(problem, plan, language="Python"):
    # Avoid literal ``` in the f-string by inserting from FENCE
    return (
        f"You are now writing {language} code to implement the plan. Output ONLY a single code block.\n\n"
        "## Problem\n"
        f"{problem}\n\n"
        "### Plan\n"
        f"{plan}\n\n"
        "### Important instructions\n"
        f"- Output must be a single code block fenced with {FENCE}.\n"
        "- No extra commentary.\n"
    ).strip()


def prompt_debug(problem, plan, buggy_code, test_log, language="Python"):
    # Build the code section with fences safely
    buggy_block = FENCE + language + "\n" + buggy_code + "\n" + FENCE
    return (
        f"You received {language} code that fails some tests. Fix it.\n\n"
        "## Problem\n"
        f"{problem}\n\n"
        "### Plan\n"
        f"{plan}\n\n"
        "### Buggy Code\n"
        f"{buggy_block}\n\n"
        "### Test Report\n"
        f"{test_log}\n\n"
        "### Expected Output format\n"
        "### Simulation with failed test case\n"
        "- Choose one failing case; simulate step-by-step to locate the issue.\n\n"
        "### Debugging Notes\n"
        "- Is the plan wrong, or is the code deviating from the plan? Explain briefly.\n"
        "- Describe the fix.\n\n"
        "### Modified Code\n"
        f"- Output ONLY a single corrected {language} code block fenced with {FENCE}.\n"
        "- Do NOT include test code.\n"
    ).strip()


In [None]:
import tempfile, subprocess, sys, json, os

def run_user_code_and_check(py_code: str, tests_code: str, time_limit=3.0):
    """
    py_code: candidate solution (string).
    tests_code: Python code that defines test functions t1(...), t2(...), ...
                must end with TEST_FUNCS = [t1, t2, ...]
    Returns: (passed: bool, log: str)
    """
    with tempfile.TemporaryDirectory() as td:
        sol_path = os.path.join(td, "solution.py")
        with open(sol_path, "w", encoding="utf-8") as f:
            f.write(py_code)

        harness_path = os.path.join(td, "harness.py")
        with open(harness_path, "w", encoding="utf-8") as f:
            f.write(
                "import importlib.util, json, sys\n"
                "solution_path = sys.argv[1]\n"
                "spec = importlib.util.spec_from_file_location('solution', solution_path)\n"
                "mod = importlib.util.module_from_spec(spec)\n"
                "spec.loader.exec_module(mod)\n"
            )
            f.write("\n# --- BEGIN TESTS ---\n")
            f.write(tests_code)
            f.write("\n# --- END TESTS ---\n")
            f.write(
                "results = []\n"
                "for fn in TEST_FUNCS:\n"
                "    try:\n"
                "        ok, msg = fn(mod)\n"
                "    except Exception as e:\n"
                "        ok, msg = False, f'exception: {e}'\n"
                "    results.append((bool(ok), str(msg)))\n"
                "print(json.dumps(results))\n"
            )

        try:
            proc = subprocess.run(
                [sys.executable, harness_path, sol_path],
                capture_output=True, text=True, timeout=time_limit
            )
        except subprocess.TimeoutExpired:
            return False, "timeout"

        if proc.returncode != 0:
            return False, f"harness error: {proc.stderr.strip()}"

        out = proc.stdout.strip()
        try:
            results = json.loads(out)
        except Exception:
            return False, f"bad harness output: {out[:500]}"

        passed = all(p for p, _ in results)
        return passed, str(results)


In [None]:
example_problem = """
Write a function `generate_integers(a, b)` that returns the even integers between a and b inclusive,
in ascending order, regardless of whether a <= b or a > b.
"""

# Correct tests: (10,14) should yield [10,12,14]
tests_code = """
def t1(mod):
    try:
        return (mod.generate_integers(2,8) == [2,4,6,8], 't1')
    except Exception as e:
        return (False, f't1 exception {e}')

def t2(mod):
    try:
        return (mod.generate_integers(8,2) == [2,4,6,8], 't2')
    except Exception as e:
        return (False, f't2 exception {e}')

def t3(mod):
    try:
        return (mod.generate_integers(10,14) == [10,12,14], 't3')
    except Exception as e:
        return (False, f't3 exception {e}')

TEST_FUNCS = [t1, t2, t3]
""".strip()


In [None]:
def codesim(problem, tests_code, language="Python", p=3, d=3,
            model_temperature=0.2, max_new_tokens=900):
    # 1) Planning
    plan = chat(
        [
            {"role":"system","content":"You are a careful software engineer who follows instructions exactly."},
            {"role":"user","content": prompt_planning(problem, language)}
        ],
        max_new_tokens=700, temperature=model_temperature
    )
    sim_report = chat(
        [
            {"role":"system","content":"You are a careful software engineer who follows instructions exactly."},
            {"role":"user","content": prompt_simulate_plan(problem, plan, language)}
        ],
        max_new_tokens=600, temperature=model_temperature
    )

    if "Plan Modification Needed" in sim_report:
        plan = chat(
            [
                {"role":"system","content":"You are a careful software engineer who follows instructions exactly."},
                {"role":"user","content": prompt_refine_plan(problem, plan, sim_report, language)}
            ],
            max_new_tokens=600, temperature=model_temperature
        )

    # 2) Coding
    codegen_resp = chat(
        [
            {"role":"system","content":"You are a careful software engineer who follows instructions exactly."},
            {"role":"user","content": prompt_code(problem, plan, language)}
        ],
        max_new_tokens=max_new_tokens, temperature=model_temperature
    )
    blocks = extract_code_blocks(codegen_resp)
    code = blocks[0] if blocks else codegen_resp

    passed, log = run_user_code_and_check(code, tests_code)
    if passed:
        return {"stage":"codegen_passed", "plan":plan, "log":log, "code":code}

    # 3) Debugging attempts
    last_code = code
    for _ in range(d):
        dbg_resp = chat(
            [
                {"role":"system","content":"You are a careful software engineer who follows instructions exactly."},
                {"role":"user","content": prompt_debug(problem, plan, last_code, log, language)}
            ],
            max_new_tokens=1000, temperature=model_temperature
        )
        blocks = extract_code_blocks(dbg_resp)
        if blocks:
            last_code = blocks[0]
        passed, log = run_user_code_and_check(last_code, tests_code)
        if passed:
            return {"stage":"debug_passed", "plan":plan, "log":log, "code":last_code}

    return {"stage":"failed", "plan":plan, "log":log, "code":last_code}


In [None]:
import ast

result = codesim(example_problem, tests_code, p=3, d=3)
print("Stage:", result["stage"])

print("\n--- PLAN (first 900 chars) ---\n", result["plan"][:900])

print("\n--- TEST RESULTS ---")
try:
    parsed = ast.literal_eval(result["log"])  # parse the Python repr string
    for i, (ok, label) in enumerate(parsed, 1):
        mark = "✓" if ok else "✗"
        print(f"  {mark} {label}")
except Exception:
    print(result["log"])

print("\n--- CODE ---\n", result["code"])


In [None]:
# You can try a smaller/larger instruct model. Re-run Cell 2 after changing MODEL_ID.
# Examples:
# os.environ["MODEL_ID"] = "Qwen/Qwen2.5-Coder-3B-Instruct"
# os.environ["MODEL_ID"] = "Qwen/Qwen2.5-7B-Instruct"
# os.environ["MODEL_ID"] = "google/gemma-2-2b-it"   # tiny, fast; not code-specialized
# os.environ["MODEL_ID"] = "codellama/CodeLlama-7b-Instruct-hf"  # may need license acceptance with HF account
print("Current MODEL_ID =", os.environ.get("MODEL_ID", "Qwen/Qwen2.5-Coder-7B-Instruct"))


In [None]:
# Tweak these to trade quality vs speed
CODESIM_TEMPERATURE = 0.2   # lower = more deterministic
CODESIM_P = 3               # planning tries
CODESIM_D = 3               # debugging tries
MAX_NEW_TOKENS = 900        # cap generation length

print({
    "temperature": CODESIM_TEMPERATURE,
    "p": CODESIM_P,
    "d": CODESIM_D,
    "max_new_tokens": MAX_NEW_TOKENS
})


In [None]:
import ast
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class TinyTask:
    name: str
    problem: str
    tests_code: str

# A tiny suite of 3 problems (add more if you like)
TASKS: List[TinyTask] = [
    TinyTask(
        name="generate_integers",
        problem=(
            "Write a function `generate_integers(a, b)` that returns the even integers between a and b inclusive, "
            "in ascending order, regardless of whether a <= b or a > b."
        ),
        tests_code="""
def t1(mod):
    try:
        return (mod.generate_integers(2,8) == [2,4,6,8], 't1')
    except Exception as e:
        return (False, f't1 exception {e}')

def t2(mod):
    try:
        return (mod.generate_integers(8,2) == [2,4,6,8], 't2')
    except Exception as e:
        return (False, f't2 exception {e}')

def t3(mod):
    try:
        return (mod.generate_integers(10,14) == [10,12,14], 't3')
    except Exception as e:
        return (False, f't3 exception {e}')

TEST_FUNCS = [t1, t2, t3]
""".strip()
    ),
    TinyTask(
        name="is_palindrome",
        problem=(
            "Write a function `is_palindrome(s)` that returns True if string s is a palindrome "
            "(case-insensitive, ignore non-alphanumerics), else False."
        ),
        tests_code="""
import re
def t1(mod):
    try:
        return (mod.is_palindrome("racecar")==True, 't1')
    except Exception as e:
        return (False, f't1 exception {e}')
def t2(mod):
    try:
        return (mod.is_palindrome("A man, a plan, a canal: Panama")==True, 't2')
    except Exception as e:
        return (False, f't2 exception {e}')
def t3(mod):
    try:
        return (mod.is_palindrome("hello")==False, 't3')
    except Exception as e:
        return (False, f't3 exception {e}')
TEST_FUNCS = [t1, t2, t3]
""".strip()
    ),
    TinyTask(
        name="nth_fib",
        problem=(
            "Write a function `nth_fib(n)` that returns the n-th Fibonacci number where nth_fib(0)=0 and nth_fib(1)=1. "
            "Assume 0 <= n <= 30."
        ),
        tests_code="""
def t1(mod):
    try:
        return (mod.nth_fib(0)==0, 't1')
    except Exception as e:
        return (False, f't1 exception {e}')
def t2(mod):
    try:
        return (mod.nth_fib(1)==1, 't2')
    except Exception as e:
        return (False, f't2 exception {e}')
def t3(mod):
    try:
        return (mod.nth_fib(10)==55, 't3')
    except Exception as e:
        return (False, f't3 exception {e}')
TEST_FUNCS = [t1, t2, t3]
""".strip()
    ),
]

def run_tasks(tasks: List[TinyTask]) -> List[Dict]:
    results = []
    for t in tasks:
        print(f"\n=== Running task: {t.name} ===")
        r = codesim(
            t.problem,
            t.tests_code,
            p=CODESIM_P,
            d=CODESIM_D,
            language="Python",
            model_temperature=CODESIM_TEMPERATURE,
            max_new_tokens=MAX_NEW_TOKENS
        )
        # Parse test log
        try:
            parsed = ast.literal_eval(r["log"])
            pass_list = [bool(ok) for ok, _ in parsed]
            labels = [lbl for _, lbl in parsed]
        except Exception:
            pass_list, labels = [], []
        row = {
            "task": t.name,
            "stage": r["stage"],
            "tests_passed": int(sum(pass_list)),
            "tests_total": int(len(pass_list)),
            "test_labels": ",".join(labels),
            "code": r["code"],
            "plan": r["plan"][:900],
        }
        print(f" -> stage={row['stage']}   {row['tests_passed']}/{row['tests_total']} tests")
        results.append(row)
    return results

batch_results = run_tasks(TASKS)
print("\nDone. Collected", len(batch_results), "results.")


In [None]:
import pandas as pd

df = pd.DataFrame(batch_results)
display(df)

out_path = "/kaggle/working/codesim_batch_results.csv"
df.to_csv(out_path, index=False)
print("Saved:", out_path)


In [None]:
pip install -q datasets

In [None]:
from datasets import load_dataset
import json

def humaneval_tasks(n=5):
    """
    Load the first n HumanEval tasks and wrap them into our TinyTask format.
    Each item has fields: prompt (function header + docstring), test (python code),
    and entry_point (function name).
    We create a tests_code string that executes their 'test' code and calls check(candidate).
    """
    ds = load_dataset("openai_humaneval")["test"]  # small dataset
    tasks = []
    for i in range(min(n, len(ds))):
        item = ds[i]
        prompt = item["prompt"]           # includes def signature + docstring (no body)
        test_src = item["test"]           # Python code with a function `check(candidate)`
        entry = item["entry_point"]       # name of the function to implement

        # Build a problem string from the prompt (what we show to the LLM)
        problem = f"""
Complete the following function in Python. Do not modify the function name or signature.

{prompt}
""".strip()

        # Build tests_code that runs HumanEval's check(candidate).
        # Use json.dumps to safely embed the test code string.
        tests_code = f"""
ENTRY_POINT = {json.dumps(entry)}
TEST_SRC = {json.dumps(test_src)}

def t1(mod):
    # Executes HumanEval's test code, then calls check(candidate)
    try:
        ns = {{}}
        exec(TEST_SRC, ns)
        cand = getattr(mod, ENTRY_POINT)
        ns['check'](cand)
        return (True, 'human-eval')
    except AssertionError as e:
        return (False, f'assertion failed: {{e}}')
    except Exception as e:
        return (False, f'error: {{e}}')

TEST_FUNCS = [t1]
""".strip()

        tasks.append(TinyTask(
            name=f"humaneval_{item['task_id']}",
            problem=problem,
            tests_code=tests_code
        ))
    return tasks

print("HumanEval loader ready.")


In [None]:
# How many HumanEval tasks to try (keep small first; bump later)
N = 5

he_tasks = humaneval_tasks(n=N)
he_results = run_tasks(he_tasks)

# Compute pass@1
passed = sum(1 for r in he_results if r["tests_passed"] == r["tests_total"] == 1)
pass_at_1 = passed / max(1, len(he_results))
print(f"\nHumanEval mini-run: {passed}/{len(he_results)} passed  → pass@1 = {pass_at_1:.3f}")


In [None]:
import pandas as pd

df_he = pd.DataFrame(he_results)
display(df_he)

out_path_he = "/kaggle/working/codesim_humaneval_results.csv"
df_he.to_csv(out_path_he, index=False)
print("Saved:", out_path_he)
