In [2]:
import re
import os
import subprocess
import tempfile
import json
import random
import torch
import sys

from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
########################################
# 1) Extraction Code: "Input" section + "Example -> Input" section
########################################
def extract_input_and_example(text: str):
    # Extract portion after "-----Input-----" until the next '-----' or end
    input_pattern = r"-----Input-----\s*(.*?)(?=-----|$)"
    input_match = re.search(input_pattern, text, re.DOTALL)
    input_section = input_match.group(1).strip() if input_match else ""

    # Extract portion after "-----Example-----\nInput\n" until "Output" or next '-----' or end
    example_pattern = r"-----Example-----\s*Input\s*(.*?)(?=Output|-----|$)"
    example_match = re.search(example_pattern, text, re.DOTALL)
    example_input_section = example_match.group(1).strip() if example_match else ""

    return input_section, example_input_section

In [5]:
########################################
# 2) Load the APPS dataset
########################################
apps_dataset = load_dataset("codeparrot/apps")
train_dataset = apps_dataset["train"]
intro_train_dataset = train_dataset.filter(lambda example: example["difficulty"] == "introductory")

# Pick a specific problem, e.g., problem index 5
problem = intro_train_dataset[5]
problem_desc_raw = problem["question"]

# We'll remove lines after "-----Example-----" for code generation.
def remove_example_parts(text):
    pattern = r"-----Example-----.*"
    cleaned = re.sub(pattern, "", text, flags=re.DOTALL)
    return cleaned.strip()

problem_desc = remove_example_parts(problem_desc_raw)

# Extract the input and example input sections
extracted_input, extracted_example_input = extract_input_and_example(problem_desc_raw)

In [6]:
########################################
# 3) Load Models
########################################

# Model A: Code generation
code_model_name = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
code_tokenizer = AutoTokenizer.from_pretrained(code_model_name)
code_model = AutoModelForCausalLM.from_pretrained(code_model_name, device_map="auto")

# Model B: Test input generation
test_input_model_name = "meta-llama/Llama-3.1-8B-Instruct"  # Hypothetical or local path
test_input_tokenizer = AutoTokenizer.from_pretrained(
    test_input_model_name,
    use_auth_token=True
)
test_input_model = AutoModelForCausalLM.from_pretrained(
    test_input_model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    use_auth_token=True
)


Loading checkpoint shards: 100%|██████████| 2/2 [00:09<00:00,  4.86s/it]
Loading checkpoint shards: 100%|██████████| 4/4 [00:06<00:00,  1.59s/it]


In [17]:
########################################
# 4) Prompt Builders
########################################
def build_code_prompt(question):
    return f"""
Please generate a complete Python script that solves the following problem.
Enclose your final code within ```python and ``` markers.

Problem Description:
{question}

Generated Code:
"""

def build_test_input_prompt(input_section, example_input_section):
    return f"""
Please generate another input strictly following the format described.

Here is the input format:
{input_section}

Here is the example input:
{example_input_section}

Do not contain the example input and make the data seem random. Output only the generated input, line by line, with no explanations.
"""

In [21]:
########################################
# 5) Generic text generation function
########################################
def generate_text(prompt, tokenizer, model,
                  max_length=512, temperature=0.7, top_p=0.95, do_sample=True):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(
        inputs.input_ids,
        max_length=max_length,
        temperature=temperature,
        top_p=top_p,
        do_sample=do_sample
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

########################################
# 6) Extract code from LLM output
########################################
def extract_code(generated_text: str) -> str:
    pattern = r"```python\s*(.*?)\s*```"
    matches = re.findall(pattern, generated_text, re.DOTALL)
    if matches:
        return matches[-1].strip()
    return None

########################################
# 6a) Extract input from triple backticks
########################################
def extract_input_from_backticks(generated_text: str) -> str:
    pattern = r"```(.*?)```"
    matches = re.findall(pattern, generated_text, re.DOTALL)
    if matches:
        return matches[-1].strip()
    return None

In [25]:
########################################
# 7) Generate a single code sample (Model A)
########################################
def generate_code(question):
    prompt = build_code_prompt(question)
    raw_generation = generate_text(
        prompt,
        tokenizer=code_tokenizer,
        model=code_model,
        max_length=1024,
        temperature=0.7,
        top_p=0.95,
        do_sample=True
    )
    # Attempt to slice off the prompt portion
    code_str = extract_code(raw_generation[len(prompt):])
    return code_str

########################################
# 8) Generate a single test input (Model B)
########################################
def generate_single_test_input(input_section, example_input_section):
    prompt = build_test_input_prompt(input_section, example_input_section)
    raw_generation = generate_text(
        prompt,
        tokenizer=test_input_tokenizer,
        model=test_input_model,
        max_length=512,
        temperature=0.4,
        top_p=0.9,
        do_sample=True
    )
    # Now parse the generation for ``` blocks
    relevant_part = raw_generation[len(prompt):]
    extracted = extract_input_from_backticks(relevant_part)
    return extracted

In [None]:
########################################
# 9) Run code with a single input
########################################
def run_code_with_input(code_str, single_input):
    """Run code_str (Python) with single_input as stdin, return (stdout, error_msg)."""
    with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp:
        tmp.write(code_str)
        tmp_filename = tmp.name
    try:
        result = subprocess.run(
            ["python", tmp_filename],
            input=single_input,
            text=True,
            capture_output=True,
            timeout=10
        )
        print(f"code_str:\n{code_str}")
        print(f"result:\n{result}")
        # sys.exit()
        return (result.stdout.strip(), result.stderr)
    except Exception as e:
        return (None, str(e))
    finally:
        os.remove(tmp_filename)

In [27]:
########################################
# 10) Main procedure
########################################

NUM_CODE_SAMPLES = 1  # We'll generate 10 code solutions
NUM_TEST_INPUTS = 1   # We'll generate 10 test inputs (that pass standard solution)

# 1) Retrieve standard solution
# print(problem.keys())
solutions = eval(problem.get("solutions", []))

if not solutions:
    print("No standard solution found.")
    sys.exit(0)

standard_solution = solutions[0]

# 2) Generate code solutions from Model A
code_variants = []
for i in range(NUM_CODE_SAMPLES):
    candidate = None
    while not candidate:
        candidate = generate_code(problem_desc)
    code_variants.append(candidate)

# 3) Generate test inputs that pass the standard solution
accepted_inputs = []
standard_outputs = []  # We'll also store the standard solution's output
attempts = 0

while len(accepted_inputs) < NUM_TEST_INPUTS:
    attempts += 1
    raw_input = generate_single_test_input(extracted_input, extracted_example_input)
    if raw_input == None:
        continue

    # parse first line as t, limit lines
    lines = raw_input.split("\n")
    if lines:
        try:
            t = int(lines[0].strip())
            needed_count = 1 + 2*t
            lines = lines[:needed_count]
        except ValueError:
            pass
    final_input = "\n".join(lines)
    print(f"Final_input looks like: {final_input}")

    # test with standard solution
    out, err = run_code_with_input(standard_solution, final_input)
    if err or out=="":
        print(f"Standard solution:\n{standard_solution}")
        print(f"output is '{out}'")
        print(f"Attempt {attempts}: error => {err} => discarding.")
        continue

    # If no error, record the standard solution output
    standard_outputs.append(out)
    accepted_inputs.append(final_input)
    print(f"[Accepted Input #\{len(accepted_inputs)}]\n\{final_input}\n")
    print(f"The standard output is: \{out}")

# 4) For each code variant, run on each accepted input.
all_results = []

# We'll bundle the standard outputs with each input.
input_records = []
for i, inp in enumerate(accepted_inputs):
    input_records.append({
        "input": inp,
        "standard_output": standard_outputs[i]
    })

for idx, code_text in enumerate(code_variants, start=1):
    code_run_results = []
    for rec in input_records:
        inp = rec["input"]
        std_out = rec["standard_output"]
        out, err = run_code_with_input(code_text, inp)
        code_run_results.append({
            "input": inp,
            "standard_solution_output": std_out,
            "model_output": out if out else "",
            "error": err if err else None
        })
    all_results.append({
        "code_index": idx,
        "code": code_text,
        "test_results": code_run_results
    })

# 5) Save to final_results.json
with open("final_results.json", "w") as f:
    json.dump(all_results, f, indent=4)

print("\n[Done] Saved all code + inputs + outputs to final_results.json")

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Final_input looks like: 3
13
3 9 8 7 5 2 1 9 8 6 5 4 3
12
9 8 7 6 5 4 3 2 1 8 9 10
15
2 5 9 1 7 3 8 6 4 2 1 9 5 8 7
code_str:
for _ in range(int(input())):
    n = int(input())
    A = list(map(int, input().split()))
    m = 10 ** 9
    c = 0
    for i in range(n - 1, -1, -1):
        if A[i] <= m:
            m = A[i]
        else:
            c += 1
    print(c)
result:
CompletedProcess(args=['python', '/tmp/tmpp4lh8y83.py'], returncode=0, stdout='11\n8\n11\n', stderr='')
[Accepted Input #\1]
\3
13
3 9 8 7 5 2 1 9 8 6 5 4 3
12
9 8 7 6 5 4 3 2 1 8 9 10
15
2 5 9 1 7 3 8 6 4 2 1 9 5 8 7

The standard output is: \11
8
11
code_str:
import sys

t = int(sys.stdin.readline())
for _ in range(t):
    n = int(sys林)
    a = list(map(int, sys.stdin.readline().split()))
    # Compute the number of bad days
    # To determine if a day i is bad, check if there exists a j > i such that a[j] < a[i]
    # This can be done by keeping track of the minimum value seen so far from the end
    min_so_far = a