In [9]:
# ---------------------------------------------------------------------- #
#  Global constants & Configuration
# ---------------------------------------------------------------------- #

from pathlib import Path

def find_project_root():
    """Traverse upwards to find the project root, marked by the .git folder."""
    current_path = Path.cwd()
    while current_path != current_path.parent:
        if (current_path / ".git").is_dir():
            return current_path
        current_path = current_path.parent
    raise FileNotFoundError("Could not find project root. Is this a git repository?")


PROJECT_ROOT = find_project_root()
BASE_INPUT_DIR = PROJECT_ROOT / 'data' / 'code_gen_outputs_formatted'
BASE_OUTPUT_DIR = PROJECT_ROOT / 'data' / 'code_with_error'

#Make the output directory if it doesn't exist
if not BASE_OUTPUT_DIR.exists():
    BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    print(f"Created output directory: {BASE_OUTPUT_DIR}")

# Confirm the paths
print(f"Project root found: {PROJECT_ROOT}")
print(f"Base input directory set to: {BASE_INPUT_DIR}")
print(f"Base output directory set to: {BASE_OUTPUT_DIR}")

MODEL_DICT = {
  "anthropic": ["claude-3-5-haiku-20241022"], 
  "openai": ["gpt-4.1-mini"],
  "google": ["gemini-2.0-flash-thinking-exp", 
             "gemini-2.5-flash-lite-preview-06-17",
             "gemini-2.5-flash"]
}

MODELS = [f"{provider}_{model}" for provider, sublist in MODEL_DICT.items() for model in sublist]
print(f"Available models: {MODELS}")

INDICES = list(range(100))

Project root found: /Users/arvindsuresh/Documents/Github/Erdos-DL-June25-Math
Base input directory set to: /Users/arvindsuresh/Documents/Github/Erdos-DL-June25-Math/data/code_gen_outputs_formatted
Base output directory set to: /Users/arvindsuresh/Documents/Github/Erdos-DL-June25-Math/data/code_with_error
Available models: ['anthropic_claude-3-5-haiku-20241022', 'openai_gpt-4.1-mini', 'google_gemini-2.0-flash-thinking-exp', 'google_gemini-2.5-flash-lite-preview-06-17', 'google_gemini-2.5-flash']


In [None]:
import ast
import random
import inspect
import textwrap
from typing import Dict, Any, Tuple, Optional, List

class ErrorInjector:
    """
    A class to programmatically inject specific, controlled errors into
    the source code of a Python function using Abstract Syntax Tree (AST) manipulation.
    """

    def inject(self, source_code: str, error_type: str) -> Optional[Tuple[str, Dict[str, Any]]]:
        """
        Main method to inject an error of a specified type into the source code.

        Args:
            source_code: A string containing the source code of a single function.
            error_type: The type of error to inject. Must be one of:
                        'incorrect_operation', 'computational_error',
                        'incorrect_operand', 'skipped_step'.

        Returns:
            A tuple containing:
            - The modified source code as a string.
            - A dictionary with metadata about the injected error.
            Returns None if an error of the specified type could not be injected.
        """
        injection_methods = {
            "incorrect_operation": self.inject_incorrect_operation,
            "computational_error": self.inject_computational_error,
            "incorrect_operand": self.inject_incorrect_operand,
            "skipped_step": self.inject_skipped_step,
        }
        if error_type not in injection_methods:
            raise ValueError(f"Unknown error type: {error_type}")

        return injection_methods[error_type](source_code)

    # --------------------------------------------------------------------------
    # Error Injection Strategies
    # --------------------------------------------------------------------------

    def inject_incorrect_operation(self, source_code: str) -> Optional[Tuple[str, Dict[str, Any]]]:
        """
        Replaces a binary mathematical operator (+, -, *) with a different one.
        """
        tree = ast.parse(source_code)
        
        swap_map = {
            ast.Add: ast.Sub,
            ast.Sub: ast.Add,
            ast.Mult: ast.Add,
        }

        # Collect all swappable binary operations
        candidates = []
        for node in ast.walk(tree):
            if isinstance(node, ast.BinOp) and type(node.op) in swap_map:
                candidates.append(node)
        
        if not candidates:
            return None

        # Choose a random operation to modify
        target_node = random.choice(candidates)
        original_op_node = target_node.op
        new_op_node_type = swap_map[type(original_op_node)]
        
        # Perform the swap
        target_node.op = new_op_node_type()

        metadata = {
            "line": target_node.lineno,
            "original_op": original_op_node.__class__.__name__,
            "new_op": new_op_node_type.__name__
        }
        
        return ast.unparse(tree), metadata

    def inject_computational_error(self, source_code: str) -> Optional[Tuple[str, Dict[str, Any]]]:
        """
        Re-evaluates an assignment and replaces the result with a perturbed value.
        """
        tree = ast.parse(source_code)
        func_def = tree.body[0]
        
        # Collect all assignment statements
        candidates = [node for node in ast.walk(func_def) if isinstance(node, ast.Assign)]
        if not candidates:
            return None

        # Simulate execution to get variable values
        env = self._get_default_args(func_def)

        for target_node in random.sample(candidates, len(candidates)):
            # Execute statements up to the target to build the environment
            temp_env = env.copy()
            successful_exec = True
            for stmt in func_def.body:
                if stmt.lineno >= target_node.lineno:
                    break
                try:
                    exec(ast.unparse(stmt), globals(), temp_env)
                except Exception:
                    successful_exec = False
                    break # Cannot resolve this path
            if not successful_exec:
                continue

            # Evaluate the original right-hand side
            try:
                original_value = eval(ast.unparse(target_node.value), globals(), temp_env)
                if not isinstance(original_value, (int, float)):
                    continue # Only perturb numeric results
            except Exception:
                continue # Cannot evaluate this expression
            
            # Perturb the value
            perturbation = random.choice([-10, -1, 1, 10])
            perturbed_value = original_value + perturbation
            if perturbed_value == original_value: # Ensure the value changes
                perturbed_value = original_value + (1 if perturbation <=0 else -1)
            
            # Replace the RHS of the assignment with a constant
            target_node.value = ast.Constant(value=perturbed_value)

            metadata = {
                "line": target_node.lineno,
                "original_value": original_value,
                "new_value": perturbed_value
            }
            return ast.unparse(tree), metadata
            
        return None # No suitable assignment found to perturb

    def inject_incorrect_operand(self, source_code: str) -> Optional[Tuple[str, Dict[str, Any]]]:
        """
        Replaces a variable in an expression with another variable from the same scope.
        """
        tree = ast.parse(source_code)
        func_def = tree.body[0]
        
        # Get all variables available in the function's scope
        scope_vars = self._get_scope_vars(func_def)
        if len(scope_vars) < 2:
            return None # Not enough variables to perform a swap

        # Find all variables used as operands in binary operations
        candidates = []
        for node in ast.walk(tree):
            if isinstance(node, ast.BinOp):
                for operand in [node.left, node.right]:
                    if isinstance(operand, ast.Name):
                        candidates.append(operand)
        
        if not candidates:
            return None

        # Choose a random variable to replace
        target_node = random.choice(candidates)
        original_operand = target_node.id
        
        # Choose a different variable from the scope to replace it with
        possible_replacements = list(scope_vars - {original_operand})
        if not possible_replacements:
            return None
        new_operand = random.choice(possible_replacements)

        # Perform the replacement
        target_node.id = new_operand

        metadata = {
            "line": target_node.lineno,
            "original_operand": original_operand,
            "new_operand": new_operand
        }

        return ast.unparse(tree), metadata

    def inject_skipped_step(self, source_code: str) -> Optional[Tuple[str, Dict[str, Any]]]:
        """
        Deletes an assignment statement and replaces subsequent uses of the
        deleted variable with another variable from the scope to avoid NameErrors.
        """
        tree = ast.parse(source_code)
        func_def = tree.body[0]

        # Find all single-variable assignment statements
        candidates = []
        for node in func_def.body:
            if isinstance(node, ast.Assign) and len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
                candidates.append(node)
        
        if not candidates:
            return None
            
        # Choose a line to delete
        target_node = random.choice(candidates)
        deleted_var_name = target_node.targets[0].id

        # Find a suitable replacement variable (prefer function arguments)
        scope_vars = self._get_scope_vars(func_def, args_only=True)
        replacement_pool = list(scope_vars - {deleted_var_name})
        if not replacement_pool: # Fallback to all variables if no other args
             replacement_pool = list(self._get_scope_vars(func_def) - {deleted_var_name})
        if not replacement_pool:
            return None
        replacement_var_name = random.choice(replacement_pool)

        # Use a NodeTransformer to remove the node and patch subsequent uses
        transformer = self._StepSkipper(target_node, deleted_var_name, replacement_var_name)
        new_tree = transformer.visit(tree)
        ast.fix_missing_locations(new_tree)

        metadata = {
            "line": target_node.lineno,
            "deleted_variable": deleted_var_name,
            "replacement_variable": replacement_var_name
        }

        return ast.unparse(new_tree), metadata

    # --------------------------------------------------------------------------
    # Helper classes and methods
    # --------------------------------------------------------------------------

    def _get_default_args(self, func_def: ast.FunctionDef) -> Dict[str, Any]:
        """Extracts a dictionary of arguments and their default values."""
        env = {}
        for arg, default in zip(func_def.args.args[::-1], func_def.args.defaults[::-1]):
            env[arg.arg] = ast.literal_eval(default)
        return env
        
    def _get_scope_vars(self, func_def: ast.FunctionDef, args_only=False) -> set:
        """Returns a set of all variable names in the function's scope."""
        scope_vars = {arg.arg for arg in func_def.args.args}
        if not args_only:
            for node in func_def.body:
                if isinstance(node, ast.Assign):
                    for target in node.targets:
                        if isinstance(target, ast.Name):
                            scope_vars.add(target.id)
        return scope_vars

    class _StepSkipper(ast.NodeTransformer):
        """A transformer to remove a specific node and replace uses of its variable."""
        def __init__(self, node_to_delete, var_to_replace, replacement_var):
            self.node_to_delete = node_to_delete
            self.var_to_replace = var_to_replace
            self.replacement_var = replacement_var
            self.node_deleted = False

        def visit_Assign(self, node):
            # If this is the node we want to delete, return None to remove it
            if node == self.node_to_delete:
                self.node_deleted = True
                return None
            return self.generic_visit(node)

        def visit_Name(self, node: ast.Name) -> Any:
            # If we've passed the deleted node and see its variable used, replace it
            if self.node_deleted and node.id == self.var_to_replace and isinstance(node.ctx, ast.Load):
                return ast.copy_location(ast.Name(id=self.replacement_var, ctx=node.ctx), node)
            return node

# ==============================================================================
# Function to test and debug error injection
# ==============================================================================

import traceback

def test_and_inject_error(
    problem_index: int,
    error_type: str,
    base_input_dir: Path = BASE_INPUT_DIR,
    base_output_dir: Path = BASE_OUTPUT_DIR,
    save_output: bool = True
):
    """
    Loads a formatted code file, injects a specified error, prints the results,
    and saves the modified code into a subdirectory named after the error type.

    Args:
        problem_index: The index of the problem to process.
        error_type: The type of error to inject (e.g., 'incorrect_operation').
        base_input_dir: The directory containing the formatted source files.
        base_output_dir: The directory where the flawed code will be saved.
        save_output: If True, writes the modified code to a file.
    """
    results = {}
    for model_name in MODELS:
        input_file = base_input_dir / str(problem_index) / f"{model_name}.py"
        
        # 1. Read the source code
        try:
            source_code = input_file.read_text(encoding="utf-8")
            print(f"--- Loaded Source File: {input_file} ---")
            print(source_code)
        except FileNotFoundError:
            print(f"❌ ERROR: Source file not found at {input_file}")
            continue
        except Exception as e:
            print(f"❌ ERROR: Failed to read source file {input_file}")
            traceback.print_exc()
            continue

        # 2. Instantiate injector and inject the error
        try:
            injector = ErrorInjector()
            result = injector.inject(source_code, error_type)
        except Exception as e:
            print(f"❌ ERROR: The ErrorInjector failed during execution for error type '{error_type}'.")
            traceback.print_exc()
            continue

        # 3. Process and display the result
        if result is None:
            print(f"\n--- ⚠️ Injection Failed ---")
            print(f"Could not inject an '{error_type}' error into the provided code.")
            continue

        modified_code, metadata = result
        
        print(f"\n--- ✅ Injected '{error_type}' Error ---")
        print("METADATA:", metadata)
        print("\n--- MODIFIED CODE ---")
        print(modified_code)

        # Store the results
        results[model_name] = (modified_code, metadata)

        # 4. Save the output
        if save_output:
            # MODIFICATION: Create a subdirectory for the error type.
            output_dir = base_output_dir / error_type / str(problem_index)
            output_dir.mkdir(parents=True, exist_ok=True)
            output_file = output_dir / f"{model_name}.py"
            try:
                output_file.write_text(modified_code, encoding="utf-8")
                print(f"\n✓ Saved modified code to {output_file}")
            except Exception as e:
                print(f"❌ ERROR: Failed to write output file to {output_file}")
                traceback.print_exc()

    return results

In [11]:
# Select a problem index and model to test
test_index = 0
test_error = 'incorrect_operation'
results = test_and_inject_error(problem_index=test_index,
                                error_type=test_error)

--- Loaded Source File: /Users/arvindsuresh/Documents/Github/Erdos-DL-June25-Math/data/code_gen_outputs_formatted/0/anthropic_claude-3-5-haiku-20241022.py ---
def solve(
    clips_sold_in_april: int = 48,  # Natalia sold clips to 48 of her friends in April
    fraction_of_april_sales: float = 1/2  # she sold half as many clips in May
):
    """Index: 0.
    Returns: the total number of clips Natalia sold in April and May."""

    #: L1
    clips_sold_in_may = clips_sold_in_april * fraction_of_april_sales

    #: L2
    total_clips_sold = clips_sold_in_april + clips_sold_in_may

    #: FA
    answer = total_clips_sold
    return answer

--- ✅ Injected 'incorrect_operation' Error ---
METADATA: {'line': 12, 'original_op': 'Add', 'new_op': 'Sub'}

--- MODIFIED CODE ---
def solve(clips_sold_in_april: int=48, fraction_of_april_sales: float=1 / 2):
    """Index: 0.
    Returns: the total number of clips Natalia sold in April and May."""
    clips_sold_in_may = clips_sold_in_april * fraction_o

In [12]:
model = MODELS[0]
print(f'Metadata for {model}:')
print(results[model][1])

Metadata for anthropic_claude-3-5-haiku-20241022:
{'line': 12, 'original_op': 'Add', 'new_op': 'Sub'}
