# Tool-Calling Agentic Code Generation

This notebook implements a **clean, tool-calling approach** for agentic code generation. The LLM uses function tools to explore files and generate extraction code.

## Workflow:
1. **Explore**: Use `exploration_shell` or `exploration_python` to understand file structure
2. **Extract**: Use `extraction` tool to provide complete Python code
3. **Validate**: Automatic sufficiency check ensures completeness
4. **Iterate**: Repeat until successful or max iterations reached

## Key Improvements (vs legacy approach):
✅ **Enhanced Logging**: Complete visibility into model I/O  
✅ **Iteration Control**: Prevents infinite loops with tool call limits  
✅ **Fixed Code Generation**: Proper `TARGET_FILE_PATH` injection  
✅ **Automatic Validation**: Sufficiency checks ensure quality  
✅ **Better Prompts**: Clear examples and instructions  

## Architecture:
- **Tool-based**: Model calls functions instead of generating JSON mode strings
- **Stateless execution**: Each script runs independently
- **Comprehensive logging**: See every input/output with `debug=True`
- **Self-correcting**: Feedback loop allows model to improve

---

**Note**: This is the **clean version** with only the tool-calling framework.  
The original notebook (`open_code_generation.ipynb`) contains legacy code for comparison.


In [40]:
import os
import json
import time
import textwrap
import subprocess
import re
import logging
import traceback
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import httpx

# Directory that will store generated Python code from each iteration
CODE_DIR = Path("../generated").resolve()
CODE_DIR.mkdir(parents=True, exist_ok=True)

# LM Studio configuration (OpenAI-compatible endpoint)
LMSTUDIO_BASE_URL = os.getenv("LMSTUDIO_BASE_URL", "http://localhost:1234/v1")
LMSTUDIO_MODEL = os.getenv("LMSTUDIO_MODEL", "qwen2.5-7b-instruct-1m")  # set this in your shell or override below
LMSTUDIO_API_KEY = os.getenv("LMSTUDIO_API_KEY", "lm-studio")  # many setups accept any token


## Logging Configuration

In [41]:
# Configure logging
LOGGER_NAME = "open_codegen"
logger = logging.getLogger(LOGGER_NAME)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s", "%Y-%m-%d %H:%M:%S")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
logger.setLevel(logging.INFO)


def set_log_level(level: str = "INFO") -> None:
    """Utility to adjust log level dynamically (e.g. set_log_level("DEBUG"))."""
    logger.setLevel(level.upper())


## LM Studio API Client

Two functions are provided:
- `lmstudio_chat_request()`: Returns full API response (for tool calling)
- `lmstudio_chat()`: Returns only text content (for simple queries)


In [42]:
def lmstudio_chat_request(
    messages: List[Dict[str, Any]],
    *,
    model: Optional[str] = None,
    temperature: float = 0.0,
    max_tokens: int = 1024,
    tools: Optional[List[Dict[str, Any]]] = None,
    tool_choice: Optional[str] = "auto",
) -> Dict[str, Any]:
    """Call LM Studio's chat endpoint and return the full JSON response."""
    assert LMSTUDIO_MODEL or model, "Set LMSTUDIO_MODEL env var or pass model= explicitly."

    base = LMSTUDIO_BASE_URL.rstrip("/")
    if not base.endswith("/v1"):
        base = base + "/v1"
    url = f"{base}/chat/completions"

    headers = {"Authorization": f"Bearer {LMSTUDIO_API_KEY}"}
    payload: Dict[str, Any] = {
        "model": model or LMSTUDIO_MODEL,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens,
        "stream": False,
    }
    if tools:
        payload["tools"] = tools
        if tool_choice is not None:
            payload["tool_choice"] = tool_choice

    with httpx.Client(timeout=180) as client:
        resp = client.post(url, headers=headers, json=payload)
        resp.raise_for_status()
        return resp.json()


# Keep backwards compatibility for existing callers expecting plain text
def lmstudio_chat(
    messages: List[Dict[str, str]],
    *,
    model: Optional[str] = None,
    temperature: float = 0.0,
    max_tokens: int = 1024,
) -> str:
    response = lmstudio_chat_request(
        messages,
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
    )
    try:
        return response["choices"][0]["message"]["content"]
    except Exception:
        return json.dumps(response, ensure_ascii=False)


## Tool Definitions

Three tools are available to the LLM:

1. **exploration_shell**: Run shell commands (grep, head, wc, etc.)
   - Useful for quick file inspection
   - Uses `$TARGET_FILE_PATH` environment variable

2. **exploration_python**: Run Python code to analyze structure  
   - More powerful than shell for complex analysis
   - Can use BeautifulSoup, pandas, etc.
   - Prints findings to stdout

3. **extraction**: Provide final Python extraction code
   - Must output ONLY valid JSON to stdout
   - TARGET_FILE_PATH variable is pre-defined


In [43]:
LMSTUDIO_TOOLS: List[Dict[str, Any]] = [
    {
        "type": "function",
        "function": {
            "name": "exploration_shell",
            "description": (
                "Run one or more shell commands to inspect the target file. Use the $TARGET_FILE_PATH "
                "environment variable inside commands instead of hard-coding the path."
            ),
            "parameters": {
                "type": "object",
                "properties": {
                    "commands": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Commands to execute sequentially. One command per array item.",
                    },
                    "notes": {
                        "type": "string",
                        "description": "Optional notes about what is being inspected.",
                    },
                },
                "required": ["commands"],
                "additionalProperties": False,
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "exploration_python",
            "description": (
                "Run a short Python snippet that inspects TARGET_FILE_PATH and prints concise findings."
            ),
            "parameters": {
                "type": "object",
                "properties": {
                    "script": {
                        "type": "string",
                        "description": (
                            "Complete Python script body. It will be executed with TARGET_FILE_PATH defined."
                        ),
                    },
                    "notes": {
                        "type": "string",
                        "description": "Optional context for the exploration.",
                    },
                },
                "required": ["script"],
                "additionalProperties": False,
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "extraction",
            "description": (
                "Provide the full Python extraction script that reads TARGET_FILE_PATH and prints ONLY the requested JSON."
            ),
            "parameters": {
                "type": "object",
                "properties": {
                    "script": {
                        "type": "string",
                        "description": "Complete Python extraction script.",
                    },
                },
                "required": ["script"],
                "additionalProperties": False,
            },
        },
    },
]


## Utility Functions

In [44]:
def _slug_from_path(file_path: str) -> str:
    """Generate a slug from file path for naming generated files."""
    stem = Path(file_path).stem
    parts = [p for p in re.split(r"[^a-zA-Z0-9]+", stem) if p]
    return "_".join(parts).lower() or "file"


def truncate_for_payload(text: Optional[str], limit: int) -> Optional[str]:
    """Truncate text to a maximum length for logging/API payloads."""
    if text is None:
        return None
    text = text.strip()
    if len(text) <= limit:
        return text
    return text[: limit - 3] + "..."


## Tool Execution Functions

These functions execute the tools requested by the model:
- `run_shell_exploration()`: Execute shell commands
- `run_python_exploration()`: Execute Python exploration scripts  
- `run_extraction_code()`: Execute final extraction code


In [45]:
def run_shell_exploration(
    commands: List[str],
    target_file_path: str,
    *,
    iteration: int,
    timeout: int = 60,
) -> Dict[str, Any]:
    """Execute shell commands sequentially for exploration purposes."""
    env = os.environ.copy()
    env["TARGET_FILE_PATH"] = target_file_path

    outputs: List[Dict[str, Any]] = []
    for command in commands:
        proc = subprocess.run(
            command,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout,
            env=env,
        )
        outputs.append(
            {
                "command": command,
                "returncode": proc.returncode,
                "stdout": (proc.stdout or "").strip(),
                "stderr": (proc.stderr or "").strip(),
            }
        )

    ok = all(item["returncode"] == 0 for item in outputs)
    return {
        "type": "shell",
        "ok": ok,
        "outputs": outputs,
        "commands": commands,
        "iteration": iteration,
    }


def run_python_exploration(
    script: str,
    target_file_path: str,
    *,
    iteration: int,
    timeout: int = 90,
) -> Dict[str, Any]:
    """Execute a short Python exploration script."""
    slug = _slug_from_path(target_file_path)
    timestamp = int(time.time())
    code_path = CODE_DIR / f"explore_{slug}_iter{iteration}_{timestamp}.py"

    # Inject TARGET_FILE_PATH at the top
    header = f'TARGET_FILE_PATH = r"{target_file_path}"\n\n'
    full_code = header + script
    code_path.write_text(full_code, encoding="utf-8")

    proc = subprocess.run(
        ["python", str(code_path)],
        capture_output=True,
        text=True,
        timeout=timeout,
    )

    stdout = (proc.stdout or "").strip()
    stderr = (proc.stderr or "").strip()
    ok = proc.returncode == 0

    return {
        "type": "python_exploration",
        "ok": ok,
        "stdout": stdout,
        "stderr": stderr,
        "path": str(code_path),
        "returncode": proc.returncode,
    }


def run_extraction_code(
    script: str,
    target_file_path: str,
    *,
    iteration: int,
    timeout: int = 120,
) -> Dict[str, Any]:
    """Write extraction code to disk, execute it, and capture results."""
    slug = _slug_from_path(target_file_path)
    timestamp = int(time.time())
    code_path = CODE_DIR / f"extract_{slug}_iter{iteration}_{timestamp}.py"

    # Inject TARGET_FILE_PATH at the top
    header = f'TARGET_FILE_PATH = r"{target_file_path}"\n\n'
    full_code = header + script
    code_path.write_text(full_code, encoding="utf-8")

    proc = subprocess.run(
        ["python", str(code_path)],
        capture_output=True,
        text=True,
        timeout=timeout,
    )

    ok = proc.returncode == 0
    stdout = (proc.stdout or "").strip()
    stderr = (proc.stderr or "").strip()

    parsed_json = None
    if ok and stdout:
        try:
            parsed_json = json.loads(stdout)
        except Exception:
            pass

    return {
        "type": "extraction",
        "ok": ok,
        "stdout": stdout,
        "stderr": stderr,
        "path": str(code_path),
        "json": parsed_json,
        "returncode": proc.returncode,
    }


## Tool Call Execution Dispatcher

In [None]:
def execute_tool_call(
    tool_call: Dict[str, Any],
    *,
    target_file_path: str,
    iteration: int,
    results_log: List[Dict[str, Any]],
) -> Tuple[str, Dict[str, Any]]:
    name = tool_call["function"]["name"]
    raw_arguments = tool_call["function"].get("arguments") or "{}"
    try:
        arguments = json.loads(raw_arguments)
    except json.JSONDecodeError as exc:
        error_payload = {
            "ok": False,
            "error": f"Invalid JSON arguments: {exc}",
            "raw_arguments": raw_arguments,
        }
        return json.dumps(error_payload, ensure_ascii=False), error_payload

    try:
        if name == "exploration_shell":
            commands = arguments.get("commands")
            if isinstance(commands, str):
                commands = [commands]
            if not isinstance(commands, list) or not commands:
                raise ValueError("'commands' must be a non-empty array of strings.")
            commands = [str(cmd) for cmd in commands]
            result = run_shell_exploration(commands, target_file_path, iteration=iteration)
            payload = {
                "ok": result["ok"],
                "outputs": [
                    {
                        "command": item["command"],
                        "returncode": item["returncode"],
                        "stdout": truncate_for_payload(item["stdout"], 5000),
                        "stderr": truncate_for_payload(item["stderr"], 5000),
                    }
                    for item in result["outputs"]
                ],
            }
            results_log.append({"iteration": iteration, "tool": name, "result": result})
            return json.dumps(payload, ensure_ascii=False), result

        if name == "exploration_python":
            script = arguments.get("script")
            if not isinstance(script, str) or not script.strip():
                raise ValueError("'script' must be a non-empty string.")
            result = run_python_exploration(script, target_file_path, iteration=iteration)
            payload = {
                "ok": result["ok"],
                "stdout": truncate_for_payload(result.get("stdout"), 5000),
                "stderr": truncate_for_payload(result.get("stderr"), 5000),
            }
            results_log.append({"iteration": iteration, "tool": name, "result": result})
            return json.dumps(payload, ensure_ascii=False), result

        if name == "extraction":
            script = arguments.get("script")
            if not isinstance(script, str) or not script.strip():
                raise ValueError("'script' must be a non-empty string.")
            result = run_extraction_code(script, target_file_path, iteration=iteration)
            payload = {
                "ok": result["ok"],
                "stdout": truncate_for_payload(result.get("stdout"), 5000),
                "stderr": truncate_for_payload(result.get("stderr"), 5000),
                "json": result.get("json"),
                "path": result.get("path"),
            }
            results_log.append({"iteration": iteration, "tool": name, "result": result})
            return json.dumps(payload, ensure_ascii=False), result

        raise ValueError(f"Unknown tool requested: {name}")

    except Exception as exc:
        trace = truncate_for_payload(traceback.format_exc(), 5000)
        error_payload = {
            "ok": False,
            "error": f"Exception while executing tool '{name}': {exc}",
            "traceback": trace,
        }
        results_log.append({"iteration": iteration, "tool": name, "result": error_payload})
        return json.dumps(error_payload, ensure_ascii=False), error_payload




## Message Building and Context Management

In [47]:
MAX_CONTEXT_MESSAGES = 20


def build_system_prompt(custom_prompt: Optional[str] = None) -> str:
    """Build the system prompt for the model."""
    if custom_prompt:
        return custom_prompt
    
    return textwrap.dedent(
        """
        You are an autonomous senior Python engineer specialized in data extraction.
        
        CRITICAL RULES:
        - The variable TARGET_FILE_PATH will be automatically defined at the top of your script. 
          Use it directly - do NOT redefine it or use literal strings like "$TARGET_FILE_PATH".
        - ALWAYS use BeautifulSoup or other parsing libraries to inspect the file structure first.
        - Available tools:
          1. exploration_shell: Run shell commands (grep, head, wc, etc.) to inspect the file
          2. exploration_python: Run Python code to analyze structure (counts, selectors, samples)
          3. extraction: Provide final Python code that outputs ONLY valid JSON to stdout
        
        WORKFLOW:
        1. THOROUGHLY explore the file structure first:
           - Use exploration_python to examine HTML structure, tags, classes, IDs
           - Get actual examples of the data (print 2-3 sample elements)
           - Count how many items exist to ensure complete extraction
           - Identify the exact selectors needed
        2. Analyze exploration results carefully before extracting
        3. Finally, use extraction tool with complete Python code that:
           - Uses TARGET_FILE_PATH directly (already defined)
           - Imports all needed libraries (beautifulsoup4, lxml, json, etc. are available)
           - Outputs ONLY valid JSON to stdout (use json.dumps or print(json.dumps(...)))
           - Handles all edge cases (missing data, empty fields, etc.)
        
        EXAMPLE of correct TARGET_FILE_PATH usage:
        ```python
        from bs4 import BeautifulSoup
        import json
        
        # TARGET_FILE_PATH is already defined - use it directly
        with open(TARGET_FILE_PATH, 'r', encoding='utf-8') as f:
            content = f.read()
        soup = BeautifulSoup(content, 'lxml')
        # ... extract data ...
        print(json.dumps(results))
        ```
        
        DO NOT write code like this (WRONG):
        ```python
        target_file_path = "$TARGET_FILE_PATH"  # WRONG - don't redefine
        target_file_path = TARGET_FILE_PATH     # WRONG - don't redefine
        ```
        """
    ).strip()


def build_user_prompt(
    file_path: str,
    query: str,
    preview_chars: int = 2000,
    extra_notes: Optional[str] = None,
) -> str:
    """Build the initial user prompt."""
    try:
        full_text = Path(file_path).read_text(encoding="utf-8", errors="ignore")
        preview_start = full_text[:preview_chars]
        midpoint = len(full_text) // 2
        preview_middle = full_text[midpoint : midpoint + preview_chars]
        preview_end = full_text[-preview_chars:] if preview_chars else ""
    except Exception as exc:
        preview_start = f"<Could not read preview: {exc}>"
        preview_middle = "<no middle preview available>"
        preview_end = ""

    base_prompt = textwrap.dedent(
        f"""
        TASK:
        {query}

        Target file path (will be available as TARGET_FILE_PATH in your scripts):
        {file_path}

        File preview (start, truncated to {preview_chars} chars):
        {preview_start}

        File preview (middle, truncated to {preview_chars} chars):
        {preview_middle}

        File preview (end, truncated to {preview_chars} chars):
        {preview_end}

        INSTRUCTIONS:
        1. Start by using exploration_shell or exploration_python to understand the file structure
        2. Use multiple explorations if needed to refine your understanding
        3. Once confident, use the extraction tool with complete Python code
        4. The code will have TARGET_FILE_PATH pre-defined - use it directly
        5. Output ONLY valid JSON using json.dumps or similar
        
        Use the function tools provided to accomplish this task. Do not write explanatory text - 
        call the appropriate tool for each step.
        """
    ).strip()
    
    if extra_notes:
        base_prompt += "\n\nAdditional notes:\n" + extra_notes.strip()
    
    return base_prompt


def build_context_messages(
    base_messages: List[Dict[str, str]],
    conversation: List[Dict[str, Any]],
    *,
    max_additional_messages: Optional[int] = MAX_CONTEXT_MESSAGES,
) -> List[Dict[str, Any]]:
    """Build the full message context for the next API call."""
    messages: List[Dict[str, Any]] = list(base_messages)
    if max_additional_messages is None or max_additional_messages <= 0:
        messages.extend(conversation)
    else:
        messages.extend(conversation[-max_additional_messages:])
    return messages


## Main Agentic Loop

In [48]:
def agentic_codegen_file(
    file_path: str,
    query: str,
    *,
    max_iterations: int = 15,
    preview_chars: int = 2000,
    temperature: float = 0.3,
    debug: bool = False,
    extra_notes: Optional[str] = None,
    system_prompt: Optional[str] = None,
    max_context_messages: int = MAX_CONTEXT_MESSAGES,
    max_tool_calls_per_iteration: int = 5,
) -> Dict[str, Any]:
    """
    Agentic code generation loop using tool calls.
    
    Args:
        file_path: Path to the file to extract data from
        query: Natural language description of what to extract
        max_iterations: Maximum number of iterations before giving up
        preview_chars: Number of characters to show in file preview
        temperature: Model temperature for generation
        debug: Enable detailed logging
        extra_notes: Additional instructions for the model
        system_prompt: Custom system prompt (overrides default)
        max_context_messages: Maximum conversation history to include
        max_tool_calls_per_iteration: Maximum tool call rounds per iteration
    
    Returns:
        Dictionary with keys: success, iterations, result, history, all_results, etc.
    """
    file_path = str(Path(file_path).resolve())
    
    # Build initial messages
    base_messages = [
        {"role": "system", "content": build_system_prompt(system_prompt)},
        {"role": "user", "content": build_user_prompt(file_path, query, preview_chars, extra_notes)},
    ]

    set_log_level("DEBUG" if debug else "INFO")
    conversation: List[Dict[str, Any]] = []
    results_log: List[Dict[str, Any]] = []
    last_extraction: Optional[Dict[str, Any]] = None
    history_snapshot: List[Dict[str, Any]] = []
    tool_call_count = 0

    iteration = 1
    while iteration <= max_iterations:
        messages = build_context_messages(
            base_messages,
            conversation,
            max_additional_messages=max_context_messages,
        )
        
        # Log the input messages
        logger.info("=" * 80)
        logger.info("ITERATION %s - INPUT TO MODEL", iteration)
        logger.info("=" * 80)
        logger.info("Messages being sent (last 2):")
        for i, msg in enumerate(messages[-2:]):
            logger.info("Message %s [%s]: %s", i, msg.get("role"), truncate_for_payload(json.dumps(msg, ensure_ascii=False), 5000))
        logger.info("-" * 80)
        
        response = lmstudio_chat_request(
            messages,
            temperature=temperature,
            tools=LMSTUDIO_TOOLS,
        )
        
        # Log the complete response from model
        logger.info("=" * 80)
        logger.info("ITERATION %s - OUTPUT FROM MODEL", iteration)
        logger.info("=" * 80)
        logger.info("Full response:\n%s", json.dumps(response, ensure_ascii=False, indent=2)[:10000])
        logger.info("-" * 80)
        
        try:
            message = response["choices"][0]["message"]
        except (KeyError, IndexError):
            logger.error("Malformed response: %s", json.dumps(response, ensure_ascii=False)[:1000])
            break

        conversation.append(message)
        history_snapshot.append(message)

        tool_calls = message.get("tool_calls") or []
        if tool_calls:
            tool_call_count += 1
            logger.info(
                "Iteration %s requested %s tool call(s). (Total tool rounds this iteration: %s)",
                iteration,
                len(tool_calls),
                tool_call_count,
            )
            
            # Prevent infinite tool calling
            if tool_call_count > max_tool_calls_per_iteration:
                logger.warning(
                    "Exceeded max tool calls per iteration (%s). Moving to next iteration.",
                    max_tool_calls_per_iteration,
                )
                iteration += 1
                tool_call_count = 0
                continue
            
            for tool_call in tool_calls:
                logger.info("Executing tool: %s with args: %s", 
                           tool_call["function"]["name"],
                           truncate_for_payload(tool_call["function"].get("arguments", "{}"), 2000))
                
                tool_content, tool_result = execute_tool_call(
                    tool_call,
                    target_file_path=file_path,
                    iteration=iteration,
                    results_log=results_log,
                )
                
                logger.info("Tool result (truncated): %s", truncate_for_payload(tool_content, 3000))
                
                tool_message = {
                    "role": "tool",
                    "name": tool_call["function"]["name"],
                    "tool_call_id": tool_call["id"],
                    "content": tool_content,
                }
                conversation.append(tool_message)
                history_snapshot.append(tool_message)

                if tool_call["function"]["name"] == "extraction" and isinstance(tool_result, dict):
                    last_extraction = tool_result
                    # If extraction succeeded, perform sufficiency check
                    if tool_result.get("ok") and tool_result.get("json") is not None:
                        logger.info("Extraction appears successful! Performing sufficiency check...")
                        
                        # Ask the model if the extraction is sufficient
                        json_preview = json.dumps(tool_result.get("json"), ensure_ascii=False, indent=2)[:5000]
                        sufficiency_prompt = textwrap.dedent(
                            f"""
                            Query that was requested:
                            {query}
                            
                            Extracted JSON (truncated to 2000 chars):
                            {json_preview}
                            
                            Does this JSON fully and correctly satisfy the query? 
                            - Reply "YES" if the data is complete, accurate, and matches all requirements
                            - Reply "NO: <reason>" if incomplete, incorrect, or missing data
                            
                            Be specific about what's missing or wrong if you say NO.
                            """
                        ).strip()
                        
                        suff_messages = [
                            {"role": "system", "content": "You are evaluating data extraction completeness. Reply YES if sufficient, or NO: <reason> if not."},
                            {"role": "user", "content": sufficiency_prompt}
                        ]
                        
                        logger.info("Checking extraction sufficiency...")
                        suff_response = lmstudio_chat(suff_messages, temperature=0.0)
                        logger.info("Sufficiency response: %s", suff_response.strip()[:2000])
                        
                        # Parse response
                        answer_line = suff_response.strip().upper()
                        if answer_line.startswith("YES"):
                            logger.info("✓ Extraction is sufficient! Completing successfully.")
                            # Add sufficiency check to conversation
                            conversation.append({"role": "user", "content": sufficiency_prompt})
                            conversation.append({"role": "assistant", "content": suff_response})
                            return {
                                "success": True,
                                "iterations": iteration,
                                "result": last_extraction,
                                "history": base_messages + history_snapshot,
                                "all_results": results_log,
                                "sufficiency_check": suff_response,
                            }
                        else:
                            logger.warning("✗ Extraction deemed insufficient: %s", suff_response.strip()[:300])
                            # Add feedback to conversation so model can improve
                            feedback_msg = f"The extraction was incomplete or incorrect. Feedback: {suff_response.strip()[:2000]}\n\nPlease explore further or refine your extraction code."
                            conversation.append({"role": "user", "content": feedback_msg})
                            history_snapshot.append({"role": "user", "content": feedback_msg})
            
            # stay on the same iteration until the assistant responds without tool calls
            continue

        # Reset tool call count when we get a non-tool response
        tool_call_count = 0
        
        content = message.get("content", "") or ""
        logger.info(
            "Iteration %s assistant response (no tool calls, truncated to 1000 chars):\n%s",
            iteration,
            truncate_for_payload(content, 3000) or "<empty>",
        )

        # If we have a successful extraction but didn't catch it in tool call handling
        # (this is a fallback - main check is now in tool call handling)
        if (
            last_extraction
            and last_extraction.get("ok")
            and last_extraction.get("json") is not None
        ):
            logger.info("Extraction succeeded (fallback check) on iteration %s.", iteration)
            return {
                "success": True,
                "iterations": iteration,
                "result": last_extraction,
                "history": base_messages + history_snapshot,
                "all_results": results_log,
                "last_message": content,
            }

        iteration += 1

    logger.error("Max iterations (%s) reached without a confirmed extraction.", max_iterations)
    return {
        "success": False,
        "iterations": iteration - 1,
        "result": last_extraction,
        "history": base_messages + history_snapshot,
        "all_results": results_log,
        "last_message": conversation[-1]["content"] if conversation else None,
    }


## Analysis and Debugging Utilities

In [49]:
def analyze_result(result: Dict[str, Any]) -> None:
    """Print detailed analysis of an agentic_codegen_file result."""
    print("="*80)
    print("RESULT ANALYSIS")
    print("="*80)
    
    print(f"\n✓ Success: {result['success']}")
    print(f"✓ Total iterations: {result['iterations']}")
    print(f"✓ Total tool calls: {len(result.get('all_results', []))}")
    
    # Count tool types
    tool_counts = {}
    for item in result.get('all_results', []):
        tool_name = item.get('tool', 'unknown')
        tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
    
    print(f"\n📊 Tool usage breakdown:")
    for tool, count in sorted(tool_counts.items()):
        print(f"   - {tool}: {count}x")
    
    # Show extraction result
    if result['success'] and result.get('result'):
        extracted = result['result'].get('json')
        if isinstance(extracted, list):
            print(f"\n📦 Extracted {len(extracted)} items")
            if len(extracted) > 0:
                print(f"\n📄 Sample item (first):")
                print(json.dumps(extracted[0], indent=2, ensure_ascii=False)[:500])
        elif isinstance(extracted, dict):
            print(f"\n📦 Extracted object with {len(extracted)} keys")
            print(f"\n📄 Keys: {list(extracted.keys())}")
        
        # Show sufficiency check if available
        if 'sufficiency_check' in result:
            print(f"\n✅ Sufficiency check: {result['sufficiency_check'][:200]}")
    else:
        print("\n❌ No successful extraction")
        if result.get('result'):
            print(f"   Last attempt stdout: {result['result'].get('stdout', '')[:300]}")
            print(f"   Last attempt stderr: {result['result'].get('stderr', '')[:300]}")
    
    print("\n" + "="*80)

In [50]:
def show_conversation_summary(result: Dict[str, Any], max_messages: int = 10) -> None:
    """Display a summary of the conversation between user/assistant/tools."""
    print("="*80)
    print("CONVERSATION HISTORY SUMMARY")
    print("="*80)
    
    history = result.get('history', [])
    print(f"\nTotal messages in history: {len(history)}")
    print(f"Showing last {min(max_messages, len(history))} messages:\n")
    
    for i, msg in enumerate(history[-max_messages:]):
        role = msg.get('role', 'unknown')
        
        if role == 'system':
            print(f"\n[{i}] 🔧 SYSTEM")
            print(f"    {msg.get('content', '')[:200]}...")
            
        elif role == 'user':
            print(f"\n[{i}] 👤 USER")
            content = msg.get('content', '')
            print(f"    {content[:300]}...")
            
        elif role == 'assistant':
            print(f"\n[{i}] 🤖 ASSISTANT")
            if 'tool_calls' in msg:
                print(f"    Called {len(msg['tool_calls'])} tool(s):")
                for tc in msg['tool_calls']:
                    func_name = tc.get('function', {}).get('name', 'unknown')
                    print(f"      - {func_name}")
            elif 'content' in msg:
                content = msg.get('content', '')
                print(f"    {content[:200]}...")
            else:
                print(f"    (no content or tool calls)")
                
        elif role == 'tool':
            print(f"\n[{i}] 🔨 TOOL: {msg.get('name', 'unknown')}")
            content = msg.get('content', '')
            try:
                parsed = json.loads(content)
                if parsed.get('ok'):
                    print(f"    ✓ Success")
                    if 'stdout' in parsed:
                        print(f"    Output: {parsed['stdout'][:150]}...")
                else:
                    print(f"    ✗ Failed")
                    if 'error' in parsed:
                        print(f"    Error: {parsed['error'][:150]}...")
            except:
                print(f"    {content[:200]}...")
    
    print("\n" + "="*80)

## Batch Testing Utilities

In [51]:
def test_scenario(scenario_name: str, debug: bool = False) -> Dict[str, Any]:
    """Test a single scenario and return results."""
    file_path = SCENARIOS[scenario_name]
    query = SCENARIO_QUERIES[scenario_name]
    
    print(f"\n{'='*80}")
    print(f"Testing: {scenario_name}")
    print(f"{'='*80}")
    
    result = agentic_codegen_file(
        file_path=file_path,
        query=query,
        max_iterations=15,
        preview_chars=2000,
        temperature=0.3,
        debug=debug,
    )
    
    # Quick summary
    print(f"\n{'='*80}")
    print(f"RESULT: {scenario_name}")
    print(f"{'='*80}")
    print(f"Success: {result['success']}")
    print(f"Iterations: {result['iterations']}")
    print(f"Tool calls: {len(result.get('all_results', []))}")
    
    if result['success']:
        extracted = result['result'].get('json', [])
        if isinstance(extracted, list):
            print(f"Extracted items: {len(extracted)}")
        print("✓ Extraction completed successfully")
    else:
        print("✗ Extraction failed or incomplete")
    
    return result


def test_all_scenarios(debug: bool = False) -> Dict[str, Dict[str, Any]]:
    """Test all scenarios and return a summary."""
    results = {}
    
    for scenario_name in SCENARIOS.keys():
        try:
            results[scenario_name] = test_scenario(scenario_name, debug=debug)
        except Exception as e:
            print(f"\n✗ Error testing {scenario_name}: {e}")
            results[scenario_name] = {"success": False, "error": str(e)}
    
    # Summary table
    print("\n" + "="*80)
    print("SUMMARY OF ALL SCENARIOS")
    print("="*80)
    print(f"{'Scenario':<25} {'Success':<10} {'Iterations':<12} {'Items':<10}")
    print("-"*80)
    
    for scenario_name, result in results.items():
        success = "✓" if result.get('success') else "✗"
        iterations = result.get('iterations', 'N/A')
        
        items = 'N/A'
        if result.get('success') and result.get('result'):
            extracted = result['result'].get('json', [])
            if isinstance(extracted, list):
                items = str(len(extracted))
        
        print(f"{scenario_name:<25} {success:<10} {iterations:<12} {items:<10}")
    
    print("="*80)
    return results

## Analysis and Debugging Utilities

## Analysis and Debugging Utilities

## Test Scenarios Setup

In [52]:
DATA_DIR = Path("../data/html").resolve()
SCENARIOS = {
    "scenario1_books": DATA_DIR / "scenario1_books.html",
    "scenario2_jobs": DATA_DIR / "scenario2_jobs.html",
    "scenario3_clubs": DATA_DIR / "scenario3_clubs.html",
    "scenario4_property": DATA_DIR / "scenario4_property.html",
}

SCENARIOS

SCENARIO_QUERIES = {
    "scenario1_books": "Can you return me the books: name and price?",
    "scenario2_jobs": "Extract job title, location, salary, and company name from the listings",
    "scenario3_clubs": "Get the club names, logo image links and their official websites",
    "scenario4_property": "Return the property name, address, latitude and longitude",
}


## Example Usage

Run extraction on a single scenario with detailed logging:


In [None]:
# Example: Extract data from jobs scenario
result = agentic_codegen_file(
    file_path=SCENARIOS["scenario2_jobs"],
    query=SCENARIO_QUERIES["scenario2_jobs"],
    max_iterations=15,
    preview_chars=2000,
    temperature=0.3,
    debug=True,  # Enable detailed logging
    extra_notes="Return a JSON array where each object has keys: title, company, location, salary.",
)

# Analyze the result
analyze_result(result)

# View conversation history (optional)
# show_conversation_summary(result, max_messages=15)


2025-11-09 10:33:07 | INFO | ITERATION 1 - INPUT TO MODEL
2025-11-09 10:33:07 | INFO | Messages being sent (last 2):
2025-11-09 10:33:07 | INFO | Message 0 [system]: {"role": "system", "content": "You are an autonomous senior Python engineer specialized in data extraction.\n\nCRITICAL RULES:\n- The variable TARGET_FILE_PATH will be automatically defined at the top of your script. \n  Use it directly - do NOT redefine it or use literal strings like \"$TARGET_FILE_PATH\".\n- ALWAYS use BeautifulSoup or other parsing libraries to inspect the file structure first.\n- Available tools:\n  1. exploration_shell: Run shell commands (grep, head, wc, etc.) to inspect the file\n  2. exploration_python: Run Python code to analyze structure (counts, selectors, samples)\n  3. extraction: Provide final Python code that outputs ONLY valid JSON to stdout\n\nWORKFLOW:\n1. First, use exploration_shell or exploration_python to understand the file structure\n2. Based on findings, refine your approach if nee

## Batch Testing Utility