In [7]:
import tarfile
import os

file_path = "/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS.tar.gz"
extract_path = "./APPS_extracted"

os.makedirs(extract_path, exist_ok=True)

# Try as a normal tar
try:
    with tarfile.open(file_path, "r:") as tar:
        tar.extractall(path=extract_path)
    print("Extraction succeeded as normal tar!")
except Exception as e:
    print("Failed to extract:", e)

# Verify contents
print(os.listdir(extract_path))


Failed to extract: [Errno 2] No such file or directory: '/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS.tar.gz'
['APPS']


In [1]:
# ===============================================================
# FULL PIPELINE (WITH RANDOM-CoT, AUTO-CoT + CACHING)
# ===============================================================

!pip install -q -U datasets==2.20.0 transformers==4.44.2 huggingface_hub==0.25.2 pyarrow==17.0.0 tqdm pandas
!apt-get -qq install git-lfs
!git lfs install
!pip install -q -U google-generativeai

print("All dependencies installed.")


E: Could not open lock file /var/lib/dpkg/lock-frontend - open (13: Permission denied)
E: Unable to acquire the dpkg frontend lock (/var/lib/dpkg/lock-frontend), are you root?
git: 'lfs' is not a git command. See 'git --help'.

The most similar command is
	refs
All dependencies installed.


In [1]:
# ===============================================================
# IMPORTS
# ===============================================================
import os, random, tarfile, subprocess, json, re, time, ast, pandas as pd


In [3]:
!pip install --upgrade huggingface_hub

Collecting huggingface_hub
  Downloading huggingface_hub-1.1.5-py3-none-any.whl (516 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m516.0/516.0 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m00:01[0m
Collecting shellingham
  Downloading shellingham-1.5.4-py2.py3-none-any.whl (9.8 kB)
Collecting typer-slim
  Downloading typer_slim-0.20.0-py3-none-any.whl (47 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.1/47.1 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
Collecting hf-xet<2.0.0,>=1.2.0
  Downloading hf_xet-1.2.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m31.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: typer-slim, shellingham, hf-xet, huggingface_hub
  Attempting uninstall: huggingface_hub
    Found existing installation: huggingface-hub 0.25.2
    Uninstalling huggingface-hub-0.25.2:
      Successfully

In [None]:
from huggingface_hub import InferenceClient
def generate(question):
    """
    Sends a problem to LLaMA via Hugging Face Inference API using chat.completions
    and returns the model's text output.
    """
    client = InferenceClient(api_key="")

    try:
        completion = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct:novita",
            messages=[
                {"role": "user", "content": question.strip()}
            ],
            max_tokens=512,
            temperature=0.7,
            top_p=0.95,
        )

        return completion.choices[0].message["content"]

    except Exception as e:
        return f"Error: {e}"


  from .autonotebook import tqdm as notebook_tqdm


In [15]:
r = generate("Write a Python function to check if a number is prime.")
print(r)


**Prime Number Checker Function**

Here's a simple Python function that checks if a given number is prime:

```python
def is_prime(n):
    """
    Checks if a number is prime.

    Args:
    n (int): The number to check.

    Returns:
    bool: True if the number is prime, False otherwise.
    """
    if n <= 1:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True
```

**Example Use Cases**
--------------------

```python
print(is_prime(25))  # False
print(is_prime(23))  # True
print(is_prime(37))  # True
print(is_prime(48))  # False
```

**Explanation**
--------------

This function takes an integer `n` as input and checks if it's prime. Here's how it works:

1.  If `n` is less than or equal to 1, it's not prime, so we return `False`.
2.  We only need to check divisibility up to the square root of `n`, as any factor larger than that would have a corresponding factor smaller than the square root.
3.  We iterate f

In [8]:
# ===============================================================
# DATA PARSERS
# ===============================================================
def parse_problem_from_files(problem_id, data_split="test"):
    try:
        base_path = "/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS_extracted/APPS"
        problem_path = os.path.join(base_path, data_split, str(problem_id))

        question_file = os.path.join(problem_path, "question.txt")
        test_file     = os.path.join(problem_path, "input_output.json")

        # Validate files
        if not os.path.exists(question_file):
            print(f"Missing question file: {question_file}")
            return None, None
        
        if not os.path.exists(test_file):
            print(f"Missing test cases file: {test_file}")
            return None, None

        # Load content
        with open(question_file, "r", encoding="utf-8") as f:
            question = f.read()

        with open(test_file, "r", encoding="utf-8") as f:
            test_cases = json.load(f)

        return question, test_cases

    except Exception as e:
        print(f"Error reading {problem_id}: {e}")
        return None, None


def load_example_for_prompting(problem_id, data_split="train"):
    path = os.path.join("/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS_extracted/APPS", data_split, str(problem_id))
    qfile = os.path.join(path, "question.txt")
    sfile = os.path.join(path, "solutions.json")
    try:
        with open(qfile, "r", encoding="utf-8") as f:
            question = f.read()
        with open(sfile, "r", encoding="utf-8") as f:
            sols = json.load(f)
        if sols:
            return question, sols[0]
    except Exception as e:
        print(f"Error loading {problem_id}: {e}")
    return None, None


In [120]:
# ===============================================================
# MANUAL FEW-SHOT CoT EXAMPLES
# ===============================================================
MANUAL_COTS = {
    "0002": "Let's think step by step to find the solution... (your reasoning here)",
    "0003": "Let's think step by step to find the solution... (your reasoning here)",
    "0004": "Let's think step by step to find the solution... (your reasoning here)"
}

FEW_SHOT_EXAMPLES = []
EXAMPLE_IDS = ["0002", "0003", "0004"]
for pid in EXAMPLE_IDS:
    q, s = load_example_for_prompting(pid)
    if q and s:
        FEW_SHOT_EXAMPLES.append({
            "problem_id": pid,
            "question": q,
            "reasoning": MANUAL_COTS[pid],
            "solution_code": s
        })
print(f"Loaded {len(FEW_SHOT_EXAMPLES)} manual few-shot examples.")


Loaded 3 manual few-shot examples.


In [121]:
# ===============================================================
# RANDOM-CoT IMPLEMENTATION
# ===============================================================
def get_random_few_shot_examples(k=3, data_split="train"):
    train_dir = os.path.join("/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS_extracted/APPS", data_split)
    all_ids = os.listdir(train_dir)
    random_ids = random.sample(all_ids, k)
    examples = []
    for pid in random_ids:
        q, s = load_example_for_prompting(pid, data_split)
        if q and s:
            reasoning = MANUAL_COTS.get(pid, "Let's think step by step to find the solution.")
            examples.append({"problem_id": pid, "question": q, "reasoning": reasoning, "solution_code": s})
    return examples


def create_random_few_shot_cot_prompt(problem_text, k=3):
    examples = get_random_few_shot_examples(k)
    prompt = "You will be given several example problems with reasoning and final solutions.\n\n"
    for ex in examples:
        prompt += f"--- EXAMPLE PROBLEM ---\n{ex['question']}\n{ex['reasoning']}\nSolution:\n```python\n{ex['solution_code']}\n```\n\n"
    prompt += "--- NEW PROBLEM TO SOLVE ---\n" + problem_text + "\nLet's think step by step to find the solution."
    return prompt

In [122]:
# ===============================================================
# AUTO-CoT IMPLEMENTATION WITH CACHING
# ===============================================================
auto_cot_cache_path = os.path.join("/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora", "auto_cot_cache.json")

# Load or initialize cache
if os.path.exists(auto_cot_cache_path):
    with open(auto_cot_cache_path, "r", encoding="utf-8") as f:
        AUTO_COT_CACHE = json.load(f)
    print(f"Loaded Auto-CoT cache with {len(AUTO_COT_CACHE)} entries.")
else:
    AUTO_COT_CACHE = {}
    print("Initialized empty Auto-CoT cache.")

def generate_auto_cot_for_problem(question_text, problem_id):
    """Generates reasoning explanation automatically using Gemini and caches it."""
    if problem_id in AUTO_COT_CACHE:
        return AUTO_COT_CACHE[problem_id]

    cot_prompt = f"Problem:\n{question_text}\n\nLet's think step by step and describe reasoning before writing any code."
    try:
        resp = generate(cot_prompt)
        reasoning = resp.text.strip()
    except Exception as e:
        print(f"Error generating auto-CoT for {problem_id}: {e}")
        reasoning = "Let's think step by step to find the solution."

    # Cache it
    AUTO_COT_CACHE[problem_id] = reasoning
    with open(auto_cot_cache_path, "w", encoding="utf-8") as f:
        json.dump(AUTO_COT_CACHE, f, indent=2)
    return reasoning

def get_auto_few_shot_examples(k=3, data_split="train"):
    train_dir = os.path.join("/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora/APPS_extracted/APPS", data_split)
    all_ids = os.listdir(train_dir)
    random_ids = random.sample(all_ids, k)
    examples = []
    for pid in random_ids:
        q, s = load_example_for_prompting(pid, data_split)
        if q and s:
            reasoning = generate_auto_cot_for_problem(q, pid)
            examples.append({"problem_id": pid, "question": q, "reasoning": reasoning, "solution_code": s})
    return examples

def create_auto_few_shot_cot_prompt(problem_text, k=3):
    examples = get_auto_few_shot_examples(k)
    prompt = "Here are some automatically reasoned examples with their final solutions.\n\n"
    for ex in examples:
        prompt += f"--- EXAMPLE PROBLEM ---\n{ex['question']}\n{ex['reasoning']}\nSolution:\n```python\n{ex['solution_code']}\n```\n\n"
    prompt += "--- NEW PROBLEM TO SOLVE ---\n" + problem_text + "\nLet's think step by step to find the solution."
    return prompt


Loaded Auto-CoT cache with 255 entries.


In [144]:
# ===============================================================
# PROMPT TEMPLATES
# ===============================================================
def format_problem_for_prompt(text):
    return (
        f"Problem:\n{text}\n\n + Code:\n```python\n# ONLY runnable Python 3 code here\n```\n"
        "\nRules:\n- No comments or text inside the code block.\n"
        "- Follow input/output format exactly.\n"
        "- Use an efficient algorithm; avoid brute force if inputs are large."
    )

def create_zero_shot_prompt(t):
    return (
        format_problem_for_prompt(t)
        + "\nYou are solving a competitive programming problem."
        + "\nFollow these instructions exactly:\n"
        + "\n[BEGIN REASONING]"
        + "\n- Restate the problem briefly in your own words."
        + "\n- Describe a correct and efficient algorithm that respects input constraints."
        + "\n- List critical edge cases you will handle."
        + "\n[END REASONING]\n"
 )




def create_zero_shot_cot_prompt(t): return format_problem_for_prompt(t) + ""

def create_analytic_cot_prompt(t):
    return f"""Problem:
{t}

Please provide:
1. Problem analysis
2. Algorithm design
3. Edge cases

Then give the final Python 3 code:
```python
(your code here)
```"""

def create_few_shot_prompt(t):
    prompt = "You will be given example problems and solutions.\n\n"
    for ex in FEW_SHOT_EXAMPLES:
        prompt += f"--- EXAMPLE ---\n{ex['question']}\nSolution:\n```python\n{ex['solution_code']}\n```\n\n"
    return prompt + "--- NEW PROBLEM ---\n" + t

def create_few_shot_cot_prompt(t):
    prompt = "You will be given examples with reasoning and solutions.\n\n"
    for ex in FEW_SHOT_EXAMPLES:
        prompt += f"--- EXAMPLE ---\n{ex['question']}\n{ex['reasoning']}\nSolution:\n```python\n{ex['solution_code']}\n```\n\n"
    return prompt + "--- NEW PROBLEM ---\n" + t + "\nLet's think step by step to find the solution."


In [19]:
def create_zero_shot_prompt(t):
    return (
        format_problem_for_prompt(t)
        
        + "\nFollow these instructions exactly:\n"
        + "Code:\n```python\n# ONLY runnable Python 3 code here\n```\n"
        + "\nRules:\n- No comments or text inside the code block."
        + "\n- Follow input/output format exactly."
        + "\n- The Python code block must be fully closed with the ``` tag and contain the *entire* runnable solution, including all necessary imports and input reading."
         + "\n\n"
        + "\nYou are solving a competitive programming problem."
        + "\n[BEGIN REASONING]"
        + "\n- Restate the problem briefly in your own words."
        + "\n- Describe a correct and efficient algorithm that respects input constraints."
        + "\n- State the time complexity (e.g., O(N), O(log N)) of your final algorithm."
         + "\n- Solutions that are O(N^2) or brute force on large inputs will fail the time limit."
        + "\n- List critical edge cases you will handle."
        + "\n- Verify that your proposed algorithm correctly handles the most complex edge case listed above."
        + "\n[END REASONING]\n"
        
    )

def format_problem_for_prompt(text):
    return (
        f"Problem:\n{text}\n"
    )

In [20]:
import re
import subprocess

# ===============================================================
# MODEL EXECUTION HELPERS
# ===============================================================

def get_model_response(prompt):
    try:
        return generate(prompt)
    except Exception as e:
        print(f"API Error: {e}")
        return None

def extract_python_code(text: str) -> str:
    import re
    blocks = re.findall(r"```python\s*\n(.*?)```", text, re.DOTALL)
    if blocks:
        # Keep only blocks that look like real code
        code_blocks = [
            b.strip()
            for b in blocks
            if re.search(r"\bdef\b|\bclass\b|\bfor\b|\bwhile\b|\bif\b|import|return", b)
            and not re.search(r"[A-Za-z]+\s+[A-Za-z]+\s+[A-Za-z]+", b)  # heuristic: avoid prose
        ]
        if code_blocks:
            return max(code_blocks, key=len)
        return max(blocks, key=len).strip()
    # Fallback: strip any fenced text
    return re.sub(r"```.*?```", "", text, flags=re.DOTALL).strip()
def run_code_against_test_cases(code: str, tests: dict):
    """
    Runs generated code against test cases.
    Returns PASS/FAIL/ERROR with a summary message.
    """
    if not isinstance(tests, dict):
        return "ERROR", "Invalid test cases"

    ins, outs = tests.get("inputs", []), tests.get("outputs", [])
    if not ins:
        return "ERROR", "No inputs provided"

    for i, (inp, outp) in enumerate(zip(ins, outs)):
        if isinstance(inp, list):
            inp = "\n".join(inp)
        if isinstance(outp, list):
            outp = "\n".join(outp)

        try:
            p = subprocess.run(
                ["python3", "-c", code],
                input=inp,
                text=True,
                capture_output=True,
                timeout=5
            )
            stdout, stderr = p.stdout.strip(), p.stderr.strip()

            if p.returncode != 0:
                return "ERROR", f"Runtime error on test {i+1}: {stderr}"

            if stdout != outp.strip():
                return "FAIL", f"Mismatch on test {i+1}: expected {outp}, got {stdout}"

        except subprocess.TimeoutExpired:
            return "ERROR", f"Timeout on test {i+1}"
        except Exception as e:
            return "ERROR", str(e)

    return "PASS", f"All {len(ins)} tests passed"


# ===============================================================
# REASONING EXTRACTION HELPER
# ===============================================================

def extract_reasoning(text: str) -> str:
    """
    Extracts reasoning before the first fenced code block.
    If no block is found, returns the entire text.
    """
    m = re.search(r"```.*?```", text, re.DOTALL)
    if m:
        return text[:m.start()].strip()
    return text.strip()

In [21]:
# ===============================================================
# EXPERIMENT FUNCTION
# ===============================================================
def run_full_experiment(problem_id, data_split="test", prompt_strategy="zero_shot"):
    print(f"\n Running Problem {problem_id} with strategy {prompt_strategy}")
    q, tests = parse_problem_from_files(problem_id, data_split)
    if not q:
        return {"problem_id": problem_id, "strategy": prompt_strategy, "result": "PARSE_ERROR"}

    # --- BUILD PROMPT ---
    if prompt_strategy == "zero_shot": prompt = create_zero_shot_prompt(q)
    elif prompt_strategy == "zero_shot_cot": prompt = create_zero_shot_cot_prompt(q)
    elif prompt_strategy == "analytic_cot": prompt = create_analytic_cot_prompt(q)
    elif prompt_strategy == "few_shot": prompt = create_few_shot_prompt(q)
    elif prompt_strategy == "few_shot_cot": prompt = create_few_shot_cot_prompt(q)
    elif prompt_strategy == "random_few_shot_cot": prompt = create_random_few_shot_cot_prompt(q)
    elif prompt_strategy == "auto_few_shot_cot": prompt = create_auto_few_shot_cot_prompt(q)
    else:
        return {"problem_id": problem_id, "strategy": prompt_strategy, "result": "INVALID"}

    # --- MODEL CALL ---
    raw_response = get_model_response(prompt)
    if not raw_response:
        return {"problem_id": problem_id, "strategy": prompt_strategy, "result": "API_ERROR"}

    # --- EXTRACT REASONING + CODE ---
    generated_code = extract_python_code(raw_response)
    reasoning_text = extract_reasoning(raw_response)

    # --- RUN TESTS ---
    result, details = run_code_against_test_cases(generated_code, tests)

    # --- LOG SUMMARY (reasoning + code only) ---
    reasoning_log_path = os.path.join("/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora", "experiment_results_with_reasoning.jsonl")
    try:
        log_entry = {
            "problem_id": problem_id,
            "strategy": prompt_strategy,
            "result": result,
            "details": details,
            "reasoning": reasoning_text,
            "generated_code": generated_code
        }
        with open(reasoning_log_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(log_entry) + "\n")
    except Exception as e:
        print(f"Failed to write reasoning log: {e}")

    return log_entry


In [22]:
# ===============================================================
# RUN BATCH EXPERIMENT WITH TIMESTAMPS
# ===============================================================
import datetime, json, time, os, random
import pandas as pd

strategies_to_test = [
    "zero_shot",
    # "zero_shot_cot",
    # "analytic_cot",
    # "few_shot",
    # "few_shot_cot",
    # "random_few_shot_cot",
    # "auto_few_shot_cot"
]

problem_ids_to_test = [f"{i:04d}" for i in range(10)]  # run on 50 problems for demo
results = []

for strat in strategies_to_test:
    for pid in problem_ids_to_test:
        res = run_full_experiment(pid, "test", strat)
        # add timestamp to each result row
        res["timestamp"] = datetime.datetime.utcnow().isoformat()
        results.append(res)
        print(json.dumps(res, indent=2))
        # add jitter to avoid being flagged for uniform timing
        time.sleep(2 + random.random() * 1)

# convert to DataFrame
df = pd.DataFrame(results)

# create unique filename with timestamp
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
out_path = os.path.join(
    "/mnt/batch/tasks/shared/LS_root/mounts/clusters/noora1/code/Users/noora",
    f"experiment_results_auto_cot_cached_{ts}.csv"
)

df.to_csv(out_path, index=False)
print(f"\nResults saved to {out_path}")


 Running Problem 0000 with strategy zero_shot
{
  "problem_id": "0000",
  "strategy": "zero_shot",
  "result": "FAIL",
  "details": "Mismatch on test 1: expected 4\n, got 15",
  "reasoning": "",
  "generated_code": "from sys import stdin, stdout\n\ndef solve():\n    s = input()\n    n = len(s)\n    f = [0] * 5\n    f[0] = 1\n    f[1] = 1\n    f[3] = 1\n    res = -1\n    left = 0\n    for right in range(n):\n        if s[right] == '[':\n            f[0] = 1\n        elif s[right] == ':':\n            f[1] = 1\n        elif s[right] == '|':\n            f[2] = 1\n        elif s[right] == ']':\n            f[3] = 1\n        if f[0] and f[1] and f[2] and f[3]:\n            res = max(res, left + right + 1)\n        left += 1\n    stdout.write(str(res) + '\\n')\n\nif __name__ == '__main__':\n    solve()",
  "timestamp": "2025-11-29T00:07:04.413954"
}

 Running Problem 0001 with strategy zero_shot
{
  "problem_id": "0001",
  "strategy": "zero_shot",
  "result": "FAIL",
  "details": "Mismatch

In [17]:
# ===============================================================
# ACCURACY SUMMARY
# ===============================================================
print("\n--- Accuracy Summary ---")
for s in strategies_to_test:
    sub = df[df.strategy == s]
    acc = (sub.result == "PASS").sum() / len(sub) * 100 if len(sub) > 0 else 0
    print(f"{s:25s}: {acc:.2f}% ({(sub.result=='PASS').sum()}/{len(sub)})")


--- Accuracy Summary ---
zero_shot                : 0.00% (0/10)


In [18]:
# ===============================================================
# ACCURACY SUMMARY WITH ERROR DETAILS
# ===============================================================
print("\n--- Accuracy Summary ---")
for s in strategies_to_test:
    sub = df[df.strategy == s]
    total = len(sub)
    if total == 0:
        print(f"{s:25s}: No runs")
        continue

    passes = (sub.result == "PASS").sum()
    fails = (sub.result == "FAIL").sum()
    errors = (sub.result == "ERROR").sum()
    acc = passes / total * 100

    print(f"{s:25s}: {acc:.2f}% ({passes}/{total})  | FAIL={fails}, ERROR={errors}")

    # Break down error details
    if errors > 0 or fails > 0:
        error_counts = {}
        for _, row in sub.iterrows():
            if row.result in ["FAIL", "ERROR"]:
                # classify by keyword in details
                detail = str(row.details)
                if "Timeout" in detail:
                    key = "Timeout"
                elif "SyntaxError" in detail:
                    key = "SyntaxError"
                elif "Runtime error" in detail:
                    key = "RuntimeError"
                elif "Mismatch" in detail:
                    key = "Mismatch"
                else:
                    key = "Other"
                error_counts[key] = error_counts.get(key, 0) + 1

        # print breakdown
        for k, v in error_counts.items():
            print(f"   - {k}: {v}")


--- Accuracy Summary ---
zero_shot                : 0.00% (0/10)  | FAIL=4, ERROR=6
   - Mismatch: 4
   - Timeout: 1
   - SyntaxError: 5
