## Defining benchmark functions

In [None]:
import random
import string
import re
import random
import textwrap
import re
import random
import textwrap


def a_key_function(s):
    if s[0].isnumeric():
        return s[::-1]
    elif s[0].isupper():
        return s.lower()
    else:
        return s[0]

a_key_function_str = """
def a_key_function(s):
    if s[0].isnumeric():
        return s[::-1]
    elif s[0].isupper():
        return s.lower()
    else:
        return s[0]
"""

# For a_key_function(s): checks if string is a palindrome
def generate_input_for_a_key_function():
    length = random.randint(3, 20)
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))



def sample_function(item):
    s = ""
    for c in item:
        if c.isnumeric():
            s += "a"
        else:
            s += c
    return s


sample_function_str = """
def sample_function(item):
    s = ""
    for c in item:
        if c.isnumeric():
            s += "a"
        else:
            s += c
    return s
"""

# For sample_function(item): returns length of input
def generate_input_for_sample_function():
    length = random.randint(0, 20)
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))

## Creates output required schema for LLM structured output

In [None]:
def get_output_schema_no_cot(batch_size, dtype, add_analysis_field=False):
    properties = "{"
    if add_analysis_field:
        properties += """
        "function_analysis": {
            "type": "string",
            "description": "Analysis of what the function does"
            }
        ,"""
    for i in range(batch_size):
        properties += f"""
        "{i}": {{
            "type": "{dtype}",
            "description": "Function output for input {i}"
            }}
        ,"""

    properties = properties[:-1] + "}"
    required = ", ".join(f'"{i}"' for i in range(batch_size))
    if add_analysis_field:
        required = "\"function_analysis\", " + required

    return f"""{{
    "type": "object",
    "properties": {properties},
    "required": [{required}],
    "additionalProperties": false
    }}"""


def get_output_schema_structured_cot(batch_size: int, dtype: str) -> str:
    # Forces the model to create a function analysis right at the end of the context before 
    outputs_props = ",\n".join(
        f'''            "{i}": {{
                "type": "{dtype}",
                "description": "Function output for input {i}"
            }}''' for i in range(batch_size)
    )

    outputs_required = ", ".join(f'"{i}"' for i in range(batch_size))

    # Assemble the full schema
    return f"""{{
        "type": "object",
        "properties": {{
            "function_analysis": {{
                "type": "string",
                "description": "Analysis of what the function does"
            }},
            "outputs": {{
                "type": "object",
                "properties": {{
{outputs_props}
                }},
                "required": [{outputs_required}],
                "additionalProperties": false
            }}
        }},
        "required": ["function_analysis", "outputs"],
        "additionalProperties": false
    }}"""


## Noisy additional context utils

In [None]:
def extract_top_level_functions(code_string):
    lines = code_string.splitlines()
    functions = []
    current_func = []
    in_function = False
    indent_level = None

    for i, line in enumerate(lines):
        if re.match(r'^def\s+\w+\(.*\):', line):
            if in_function:
                functions.append("\n".join(current_func).rstrip())
                current_func = []

            in_function = True
            indent_level = len(line) - len(line.lstrip())
            current_func.append(line)

        elif in_function:
            stripped = line.strip()
            if stripped == "" or (len(line) - len(line.lstrip()) > indent_level):
                current_func.append(line)
            else:
                functions.append("\n".join(current_func).rstrip())
                current_func = []
                in_function = False
                indent_level = None

                if re.match(r'^def\s+\w+\(.*\):', line):
                    in_function = True
                    indent_level = len(line) - len(line.lstrip())
                    current_func = [line]

    if current_func:
        functions.append("\n".join(current_func).rstrip())

    return functions


def shuffle_functions_with_injection(functions, new_function=None, depth=None, keep_ratio=1.0):
    original_count = len(functions)

    if not functions:
        print("[INFO] No top-level functions found.")
        return "# No top-level functions found.\n"



    # Apply keep_ratio
    keep_ratio = max(0.0, min(1.0, keep_ratio))
    keep_count = max(1, int(len(functions) * keep_ratio)) if keep_ratio > 0 else 0
    functions = functions[:keep_count]

    print(f"[INFO] Kept {keep_count} out of {original_count} functions (keep_ratio={keep_ratio}).")

    if new_function and depth is not None:
        depth = max(0.0, min(1.0, depth))
        insert_index = int(round(depth * len(functions)))
        cleaned = textwrap.dedent(new_function).strip()
        functions.insert(insert_index, cleaned)
        print(f"[INFO] Injected new function at position {insert_index} (depth={depth}).")

    return '\n\n'.join(functions)


with open("all_funcs.py", "r") as f:
    input_code = f.read()

random.seed(43)
functions = extract_top_level_functions(input_code)
random.shuffle(functions)

## Openrouter utils

In [None]:
import httpx # Import the asynchronous library
import os

openrouter_api_key = os.environ.get("OPENROUTER_API_KEY")
assert openrouter_api_key is not None


async def get_response(output_schema, prompt, model, temperature):
    """
    Sends a non-blocking request to the OpenRouter API using httpx.
    """
    headers = {
        "Authorization": f"Bearer {openrouter_api_key}",
        "Content-Type": "application/json",
    }

    json_data = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": temperature,
        "max_tokens": 16000,
        "response_format": {
            "type": "json_schema",
            "json_schema": {
                "name": "output_dict",
                "strict": True,
                "schema": output_schema,
            },
        },
    }

    # Use an async client to perform the request
    async with httpx.AsyncClient(timeout=60.0) as client:
        response = await client.post(
            "https://openrouter.ai/api/v1/chat/completions",
            headers=headers,
            json=json_data,
        )
        
        # This will raise an exception for bad responses (4xx or 5xx)
        response.raise_for_status() 

        return response.json()



## Defining the parameters and running the benchmark

In [None]:
test_functions_str = [
    a_key_function_str, 
    sample_function_str,
]
test_functions = [
    a_key_function, 
    sample_function,
]

args_generators = [
    generate_input_for_a_key_function,
    generate_input_for_sample_function,
]

dtypes = [
    "string",
    "string",
]

depths = [
    0.,
    0.05, 
    0.2, 
    0.5, 
    0.8,
    0.95,
    1.0
]

keep_ratios = [
    0.02,
    0.2, 
    0.5, 
    0.8,
    1
]

input_output_lengths = [
    4,
    64, 
    256,
    512,
]


def validate(fn, input_args, llm_result):
    return fn(*input_args) == llm_result

prompt_with_cot = """
You are given the following Python functions:

----------------function-list----------------
{functions}
---------------------------------------------

Run the function named "{function_name}" on each of the following inputs:

----------------input-list----------------
{input}
------------------------------------------

Start by finding and thoroughly explaining the target function in very simple words. Then proceed to create the output per input. Each key in the dictionary is an index, and the value is the argument(s) to the function.

The output must be a valid JSON dictionary where the key is the input index, and the value is the function output:
{{
    "target_function_analysis": "what does the target function do",
    "0": "output0",
    "1": "output1",
    ...
}}
"""

prompt_without_cot = """
You are given the following Python functions:

----------------function-list----------------
{functions}
---------------------------------------------

Run the function named "{function_name}" on each of the following inputs:

----------------input-list----------------
{input}
------------------------------------------

Each key in the dictionary is an index, and the value is the argument(s) to the function.

The output must be a valid JSON dictionary where the key is the input index, and the value is the function output:
{{
    "0": "output0",
    "1": "output1",
    ...
}}
"""




In [None]:
import asyncio
import json
from tqdm import tqdm
# (Assuming other necessary imports and helper functions like get_response, 
# validate, shuffle_functions_with_injection, etc. are defined elsewhere)


async def process_combination(
    semaphore, model, fn_info, depth, keep_ratio, num_inputs, functions, prompt, temperature, schema_generator
):
    """
    Processes a single combination of parameters, with retries. This function
    is designed to be run as a concurrent task.
    """
    fn_str, fn_obj, arg_gen, dtype = fn_info

    # The semaphore ensures we don't exceed the concurrency limit.
    async with semaphore:
        # 1. Generate input dictionary
        input_dict = {i: arg_gen() for i in range(num_inputs)}

        # 2. Prepare function list and prompt
        injected_func_str = fn_str.strip()
        shuffled_code = shuffle_functions_with_injection(
            functions, new_function=injected_func_str, depth=depth, keep_ratio=keep_ratio
        )
        filled_prompt = prompt.format(
            functions=shuffled_code, function_name=fn_obj.__name__, input=input_dict
        )

        # --- Retry Logic ---
        max_retries = 3
        for attempt in range(max_retries):
            try:
                # 3. Get response from the model
                output_schema_ = json.loads(schema_generator(num_inputs, dtype))
                raw_response = await get_response(output_schema_, filled_prompt, model, temperature)
                output_dict = json.loads(raw_response["choices"][0]["message"]["content"].strip("`json"))

                # 4. Validate each result
                success_count = 0
                for idx, input_val in input_dict.items():
                    llm_output = output_dict["outputs"][str(idx)]
                    if validate(fn_obj, (input_val,), llm_output):
                        success_count += 1

                # If validation completes, calculate and return the result
                accuracy = success_count / len(input_dict)
                result = {
                    "function": fn_obj.__name__,
                    "depth": depth,
                    "keep_ratio": keep_ratio,
                    "input_size": num_inputs,
                    "accuracy": accuracy,
                    "usage": raw_response["usage"],
                }

                print(
                    f"[INFO] {fn_obj.__name__} | depth={depth} | keep={keep_ratio} | size={num_inputs} | ✅ {success_count}/{len(input_dict)} correct"
                )
                return result  # Success, exit the function

            except Exception as e:
                print(f"[WARN] Attempt {attempt + 1}/{max_retries} failed for {fn_obj.__name__} (depth={depth}, keep={keep_ratio}): {e}")
                if attempt + 1 == max_retries:
                    print(f"[ERROR] All retries failed for this combination. Skipping.")
                    # Log the persistent failure before returning
                    return {
                        "function": fn_obj.__name__,
                        "depth": depth,
                        "keep_ratio": keep_ratio,
                        "input_size": num_inputs,
                        "accuracy": 0.0,
                        "error": str(e),
                    }
                else:
                    await asyncio.sleep(2)  # Wait 2 seconds before the next attempt
    return None # Should not be reached, but good practice


async def run_all_combinations_concurrent(model, temperature, prompt, schema_generator):
    """
    Runs all combinations of tests concurrently using a semaphore to limit
    the number of parallel requests.
    """
    # Set the maximum number of concurrent tasks
    CONCURRENCY_LIMIT = 20
    semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
    tasks = []
    
    # Package function-specific details together for easier passing

    for keep_ratio in keep_ratios:
        # Create a fresh iterator for each keep_ratio loop
        fn_details_iterator = zip(test_functions_str, test_functions, args_generators, dtypes)
        for fn_info in fn_details_iterator:
            for depth in depths:
                for num_inputs in input_output_lengths:
                    print(f"Queueing task for: {fn_info[1].__name__}, depth={depth}, keep_ratio={keep_ratio}, num_inputs={num_inputs}")
                    # Create a task for each combination and add it to the list
                    task = asyncio.create_task(
                        process_combination(
                            semaphore, model, fn_info, depth, keep_ratio, num_inputs, functions, prompt, temperature, schema_generator
                        )
                    )
                    tasks.append(task)
    # Use asyncio.as_completed wrapped with tqdm to process results as they finish
    results = []
    for future in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing combinations"):
        result = await future
        results.append(result)

    # Filter out any None results that might occur from unexpected errors
    return [res for res in results if res is not None]

In [None]:
results_deepseek_v3 = await run_all_combinations_concurrent('deepseek/deepseek-chat-v3-0324', temperature=0.0, prompt=prompt_without_cot, schema_generator=get_output_schema_no_cot)
results_deepseek_r1 = await run_all_combinations_concurrent('deepseek/deepseek-r1-0528', temperature=0.6, prompt=prompt_without_cot, schema_generator=get_output_schema_no_cot))
results_deepseek_v3_cot = await run_all_combinations_concurrent('deepseek/deepseek-chat-v3-0324', temperature=0.0, prompt=prompt_with_cot, schema_generator=get_output_schema_structured_cot)

## Analysing results

In [None]:
results = results_deepseek_v3

In [None]:
import pandas as pd

data = []
# for r in results + results1 + results2 + results3:
for r in results:
    # if r["function"] != "a_key_function":
    if "error" not in r:
        data.append({
            "distance_from_the_middle": min(r["depth"], 1 - r["depth"]), 
            "depth": r["depth"], 
            "context_length": r["keep_ratio"],
            "function": r["function"],
            "prompt_tokens": r["usage"]["prompt_tokens"], 
            "completion_tokens": r["usage"]["completion_tokens"], 
            "batch_size": r["input_size"],
            "accuracy": r["accuracy"]
        })

df = pd.DataFrame(data)
df['prompt_tokens_quant'] = pd.cut(df["prompt_tokens"], bins=10)
df['completion_tokens_quant'] = pd.cut(df["completion_tokens"], bins=10)

In [None]:
import matplotlib.pyplot as plt
import pandas as pd


parameters = ["depth", "prompt_tokens_quant", "completion_tokens_quant", "batch_size", "context_length"]

for p in parameters:
    # 1. Create a new figure and get the axes for each plot
    plt.figure(figsize=(12, 7)) # A larger figure is better for multiple lines
    ax = plt.gca() # Get Current Axes to plot on

    # 2. Plot a separate line for each function
    # Group by the parameter (e.g., 'depth') AND 'function', then unstack
    # to pivot functions into columns.
    per_function_accuracy = df.groupby([p, 'function'])['accuracy'].mean().unstack()
    per_function_accuracy.plot(
        ax=ax,
        marker='o',
        linestyle=':',  # Use dotted lines for individual functions
        alpha=0.8
    )

    # 3. Plot the aggregated line for ALL functions
    # Group only by the parameter to get the overall average.
    overall_accuracy = df.groupby(p)['accuracy'].mean()
    display(per_function_accuracy.assign(overall_accuracy=overall_accuracy))
    overall_accuracy.plot(
        ax=ax,
        label='Overall Average',
        color='black',
        marker='D',      # Use a different marker (diamond)
        linestyle='-',   # Use a solid line for the average
        linewidth=2.5
    )

    # 4. Finalize the plot with titles, labels, and a legend
    plt.title(f"Mean Accuracy vs. {p.replace('_', ' ').title()}", fontsize=16)
    plt.xlabel(p.replace('_', ' ').title(), fontsize=12)
    plt.ylabel("Mean Accuracy", fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend(title='Function') # The legend is now crucial
    plt.tight_layout()
    plt.show()

In [None]:
import numpy as np
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

X = df[['distance_from_the_middle', 'batch_size', 'context_length']]
y = df['accuracy']

# 2. Split Data into Training and Validation Sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.05)

# 3. Normalize the Data
# We fit the scaler ONLY on the training data to avoid data leakage
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val) # Apply the same transformation to the validation set

# Convert scaled arrays back to pandas DataFrames for clarity
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X.columns, index=X_train.index)
X_val_scaled = pd.DataFrame(X_val_scaled, columns=X.columns, index=X_val.index)


# 4. Fit the Linear Regression Model
# statsmodels requires manually adding a constant (the intercept)
X_train_scaled_const = sm.add_constant(X_train_scaled)

# Fit the Ordinary Least Squares (OLS) model
model = sm.OLS(y_train, X_train_scaled_const).fit()


# 5. Analyze the Factors and Confidence Intervals
print("## Model Summary")
print(model.summary())


# 6. Evaluate Loss on the Validation Set
# Add a constant to the validation features as well
X_val_scaled_const = sm.add_constant(X_val_scaled, has_constant='add')

# Make predictions
y_pred = model.predict(X_val_scaled_const)

# Calculate the loss (Mean Squared Error)
loss = mean_squared_error(y_val, y_pred)

print("\n" + "="*50)
print(f"## Validation Loss (Mean Squared Error): {loss:.4f}")



## Compare all models

In [None]:
import matplotlib.pyplot as plt
import pandas as pd


def prepare_data(results):
    data = []
    # for r in results + results1 + results2 + results3:
    for r in results:
        # if r["function"] != "a_key_function":
        if "error" not in r:
            data.append({
                "distance_from_the_middle": min(r["depth"], 1 - r["depth"]), 
                "depth": r["depth"], 
                "context_length": r["keep_ratio"],
                "function": r["function"],
                "prompt_tokens": r["usage"]["prompt_tokens"], 
                "completion_tokens": r["usage"]["completion_tokens"], 
                "batch_size": r["input_size"],
                "accuracy": r["accuracy"]
            })
    return pd.DataFrame(data)


dfs = {
    "Deepseek R1": results_deepseek_r1,
    "Deepseek V3": results_deepseek_v3,
    "Deepseek V3 with specified output": results_deepseek_v3_cot
}

parameters = [
    "depth",
    "batch_size",
    "context_length",
]

for p in parameters:
    # --------------------------------------------------------------
    # 1⃣  Build a table of overall-average accuracy for this parameter
    param_df = pd.concat(
        [
            data.groupby(p)["accuracy"].mean().rename(label)
            for label, data in dfs.items()
        ],
        axis=1,
    )
    param_df.index.name = p  # nice row label
    print(param_df.to_markdown())        # show the table right above the plot

    # --------------------------------------------------------------
    # 2⃣  Plot the three overall-average lines
    plt.figure(figsize=(10, 6))
    ax = plt.gca()

    param_df.plot(
        ax=ax,
        marker="o",
        linewidth=2,
    )

    ax.set_title(f"Overall Mean Accuracy vs. {p.replace('_', ' ').title()}", fontsize=14)
    ax.set_xlabel(p.replace('_', ' ').title(), fontsize=12)
    ax.set_ylabel("Mean Accuracy", fontsize=12)
    ax.grid(True, linestyle="--", alpha=0.6)
    ax.legend(title="Dataset")
    plt.tight_layout()
    plt.show()

