<a href="https://colab.research.google.com/github/HyperspaceDan/Kaggle_Auto_Auditor/blob/main/Kaggle_Auto_Auditor_(CapstoneProject).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1>1. Configuration:</h1>

1.1 Installing and upgrading the necessary Python libraries.

This step ensures that all the required libraries for the project are installed and up-to-date. We will be using:
*   **`google-genai`**: The official Google Python SDK for interacting with the Gemini family of models.
*   **`langgraph`**: A library for building stateful, multi-agent applications with LLMs, which we will use to define our workflow.
*   **`langchain` and `langchain-google-genai`**: The core framework and its integration for using Google's generative AI models within a structured workflow.
*   **`kaggle`**: The official Kaggle API client, which allows us to programmatically download notebooks.

In [None]:
!pip install -U google-genai langgraph langchain langchain-google-genai
!pip install -q --upgrade kaggle

1.2 Importing the libraries required for building the application.

Here, we import all the necessary modules and classes that will be used throughout the notebook. This includes:
*   Standard libraries like `os`, `re`, `json`, `subprocess`, `tempfile`, `shutil`, and `datetime` for system interactions, regular expressions, data handling, file operations, and timestamps.
*   `requests` and `nbformat` for fetching and parsing notebook files.
*   `google.genai` to configure the connection to the Google AI services.
*   `langgraph` components (`StateGraph`, `END`) to construct the agentic workflow.
*   `langchain_core` and `langchain_google_genai` for creating prompt templates, defining runnables (`RunnableWithFallbacks`, `RunnableConfig`), and initializing the LLM (`ChatGoogleGenerativeAI`).
*   `pydantic` and `typing` for robust data structuring (`BaseModel`, `Field`) and type hinting (`TypedDict`, `Optional`, `List`, etc.), which helps in defining the shared state of our application.

In [1]:
import os
import google.genai as genai
import requests, nbformat, pydantic
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
import re
import tempfile
import shutil
import json
import subprocess
from typing import TypedDict, Tuple, Optional, Dict, List, Any
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.runnables import RunnableWithFallbacks, RunnableConfig
from pydantic import BaseModel, Field
import datetime, time

1.3. Accessing the Gemini API key.

To use Google's Gemini models, we need to authenticate our requests with an API key. This cell retrieves the `GEMINI_API_KEY` from the secure secret manager. It then sets this key as an environment variable (`os.environ['GOOGLE_API_KEY']`), which allows the Google client libraries to automatically find and use it for authenticating API calls.


In [2]:
from google.colab import userdata
GOOGLE_API_KEY = userdata.get('GEMINI_API_KEY')
os.environ['GOOGLE_API_KEY'] = GOOGLE_API_KEY

1.4. Authenticating the Kaggle API.

To download notebooks directly from Kaggle, we need to authenticate with the Kaggle API. This cell initializes the `KaggleApi` client and calls the `authenticate()` method. This method automatically looks for your `kaggle.json` credentials file to establish a secure connection. A success message will be printed upon successful authentication.


In [3]:
from google.colab import files
uploaded = files.upload()

# Ensure the .kaggle directory exists
os.makedirs(os.path.expanduser('~/.config/kaggle'), exist_ok=True)

if uploaded:
    uploaded_filename = list(uploaded.keys())[0]
    # Move the uploaded kaggle.json to the correct directory
    !mv "{uploaded_filename}" ~/.config/kaggle/kaggle.json
    # Set appropriate permissions
    !chmod 600 ~/.config/kaggle/kaggle.json
else:
    print("‚ùå No kaggle.json file was uploaded. Please upload your kaggle.json file.")
    # Optionally exit or raise an error if no file was uploaded

# Now import kaggle and authenticate
import kaggle
from kaggle.api.kaggle_api_extended import KaggleApi

api = KaggleApi()
api.authenticate()
print("‚úÖ Kaggle API Authenticated Successfully")

Saving kaggle.json to kaggle.json
‚úÖ Kaggle API Authenticated Successfully


<h1>2. Defining the Workflow State and Tools</h1>

<h3>2.1. Defining the Central 'Memory' Object (GraphState)</h3>

We define the `GraphState` class, which serves as the shared memory for the entire multi-agent system. The state is passed between each node (agent) in the graph.

Each agent can read data from the state (e.g., the notebook content) and write its output back to the state (e.g., a `code_report`). This ensures a seamless flow of information as the notebook progresses through the audit pipeline. The fields are marked as `Optional` because they are populated sequentially.

This cell also defines a helper function, `append_log`, which adds a timestamped entry to the `logs` field in the state. This creates an audit trail of the workflow's execution, which is useful for debugging and tracking the process.

In [4]:
class GraphState(TypedDict):
    """
    Represents the state of our multi-agent workflow.
    This is the central "memory" object that gets passed between nodes.

    Attributes:
        target_url: The initial Kaggle notebook URL to be audited.
        notebook_content: The raw text content (code + markdown) extracted from the notebook.
        code_report: The analysis report from the CodeAnalysisAgent.
        doc_report: The analysis report from the DocumentationAgent.
        capability_report: The audit report from the GenCapabilityAuditAgent.
        final_report: The final JSON synthesis of all reports.
        error_message: A field to capture any errors that occur during the process.
        initial_audit_score: The initial audit score from the first capability audit.
        iteration: The current iteration number in a corrective loop.
        logs: A list of dictionaries representing an audit trail of actions.
    """
    target_url: str

    # All other fields are 'Optional' because they are
    # filled in sequentially by the agents.
    notebook_content: Optional[str]
    code_report: Optional[str]
    doc_report: Optional[str]
    capability_report: Optional[str]
    final_report: Optional[str]
    error_message: Optional[str]

    # Fields for corrective loop logic and final report synthesis
    initial_audit_score: Optional[int]
    iteration: int
    logs: Optional[List[Dict]] # Field for audit trail


def append_log(state: GraphState, node_name: str, status: str, details: Dict) -> List[Dict]:
    """
    Helper function to append an entry to the logs in the GraphState.

    Args:
        state: The current GraphState.
        node_name: The name of the node generating the log.
        status: The status of the operation (e.g., "SUCCESS", "ERROR", "INFO").
        details: A dictionary containing additional details about the log entry.

    Returns:
        A new list containing the updated logs.
    """
    current_logs = state.get('logs', [])

    # Convert non-JSON serializable details to strings
    serializable_details = {}
    for k, v in details.items():
        if isinstance(v, Exception):
            serializable_details[k] = str(v)
        else:
            serializable_details[k] = v

    new_log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "iteration": state.get('iteration', 0),
        "node": node_name,
        "status": status,
        "details": serializable_details
    }
    return current_logs + [new_log_entry]

print("GraphState (our shared memory) is now defined.")

GraphState (our shared memory) is now defined.


<h3>2.2. Defining the Kaggle Notebook Fetcher Tool</h3>

This cell defines the `KaggleNotebookFetcher` class, a specialized tool responsible for handling all interactions with Kaggle notebooks. It has three main responsibilities:

1.  **`parse_url`**: Takes a standard Kaggle notebook URL and uses regular expressions to extract the `owner_slug` and `kernel_slug`, which are needed for the API call.
2.  **`fetch_kernel`**: Uses the `subprocess` module to run the Kaggle CLI command (`kaggle kernels pull`). This command downloads the notebook files into a temporary directory. The method then finds the `.ipynb` file and returns its path.
3.  **`extract_content`**: Reads the `.ipynb` file (which is in JSON format), iterates through its cells, and separates the code and markdown content into two distinct strings.

In [5]:
class KaggleNotebookFetcher:
    def __init__(self):
        """
        Initializes the Kaggle Notebook Fetcher and authenticates the Kaggle API client.
        """
        self.api = KaggleApi()
        self.api.authenticate()
        print("KaggleApi client initialized and authenticated.")

    def parse_url(self, url: str) -> Tuple[str, str]:
        """
        Parses a Kaggle notebook URL to extract the owner_slug and kernel_slug.

        Args:
            url: The Kaggle notebook URL (e.g., 'https://www.kaggle.com/code/{owner_slug}/{kernel_slug}').

        Returns:
            A tuple containing (owner_slug, kernel_slug).

        Raises:
            ValueError: If the URL format is invalid.
        """
        # Regex to match the typical Kaggle notebook URL format, including '/code/'
        # Example: https://www.kaggle.com/code/owner_slug/kernel_slug
        match = re.match(r'https://www\.kaggle\.com/code/([^/]+)/([^/]+)/?.*', url)
        if match:
            owner_slug = match.group(1)
            kernel_slug = match.group(2)
            return owner_slug, kernel_slug
        else:
            raise ValueError(
                f"Invalid Kaggle notebook URL format. Expected 'https://www.kaggle.com/code/{{owner_slug}}/{{kernel_slug}}'. Got: {url}"
            )

    def fetch_kernel(self, owner_slug: str, kernel_slug: str) -> Optional[str]:
        """
        Downloads a Kaggle notebook kernel using the Kaggle CLI, extracts it, and finds the .ipynb file.

        Args:
            owner_slug: The owner's slug of the Kaggle notebook.
            kernel_slug: The kernel's slug of the Kaggle notebook.

        Returns:
            The path to the extracted .ipynb file, or None if an error occurs.
        """
        ipynb_file_path = None
        # Use a temporary directory for the Kaggle CLI output
        with tempfile.TemporaryDirectory() as base_temp_dir:
            # Kaggle CLI will create its own subdirectory structure or place files directly
            # We'll create a specific temporary directory for the pull command's output
            kaggle_output_dir = os.path.join(base_temp_dir, f"kaggle_notebook_{owner_slug}_{kernel_slug}")
            os.makedirs(kaggle_output_dir, exist_ok=True)

            command = [
                'kaggle',
                'kernels',
                'pull',
                f'{owner_slug}/{kernel_slug}',
                '-p',
                kaggle_output_dir
            ]
            print(f"Executing Kaggle CLI command: {' '.join(command)}")

            try:
                # Run the kaggle kernels pull command
                process = subprocess.run(
                    command,
                    capture_output=True,
                    text=True,
                    check=True  # Raises CalledProcessError for non-zero exit codes
                )
                print("Kaggle CLI stdout:", process.stdout)
                if process.stderr:
                    print("Kaggle CLI stderr:", process.stderr)

                # Find the .ipynb file within the directory where Kaggle CLI pulled the kernel
                # The CLI usually places the .ipynb file directly or within a simple subdirectory.
                for root, _, files in os.walk(kaggle_output_dir):
                    for file in files:
                        if file.endswith('.ipynb'):
                            ipynb_file_path = os.path.join(root, file)
                            print(f"Found .ipynb file: {ipynb_file_path}")

                            # Copy the .ipynb file to a new persistent temporary file
                            # as base_temp_dir will be deleted after this block.
                            final_ipynb_path = tempfile.NamedTemporaryFile(suffix='.ipynb', delete=False).name
                            shutil.copy(ipynb_file_path, final_ipynb_path)
                            print(f"Copied .ipynb to persistent temp file: {final_ipynb_path}")
                            return final_ipynb_path

                print(f"No .ipynb file found in {kaggle_output_dir} after pulling {owner_slug}/{kernel_slug}.")
                return None

            except subprocess.CalledProcessError as e:
                print(f"Kaggle CLI command failed with exit code {e.returncode}.")
                print(f"Stderr: {e.stderr}")
                print(f"Stdout: {e.stdout}")
                return None
            except FileNotFoundError:
                print("Error: 'kaggle' command not found. Ensure Kaggle CLI is installed and in PATH.")
                return None
            except Exception as e:
                print(f"An unexpected error occurred during kernel pull: {e}")
                return None

    def extract_content(self, notebook_path: str) -> Dict[str, Any]:
        """
        Loads an .ipynb file and extracts its code and markdown content, and raw cells.

        Args:
            notebook_path: The file path to the .ipynb notebook.

        Returns:
            A dictionary with 'code', 'markdown' (concatenated content) and 'cells' (raw cell list).
            Returns an empty dictionary with empty lists/strings if an error occurs.
        """
        code_content = []
        markdown_content = []
        cells = []

        try:
            with open(notebook_path, 'r', encoding='utf-8') as f:
                notebook_json = json.load(f)

            cells = notebook_json.get('cells', [])

            for cell in cells:
                source = ''.join(cell.get('source', []))
                if cell.get('cell_type') == 'code':
                    code_content.append(source)
                elif cell.get('cell_type') == 'markdown':
                    markdown_content.append(source)

            return {
                'code': '\n'.join(code_content),
                'markdown': '\n'.join(markdown_content),
                'cells': cells
            }
        except FileNotFoundError:
            print(f"Error: Notebook file not found at {notebook_path}")
            return {'code': '', 'markdown': '', 'cells': []}
        except json.JSONDecodeError:
            print(f"Error: Could not decode JSON from notebook file at {notebook_path}")
            return {'code': '', 'markdown': '', 'cells': []}
        except Exception as e:
            print(f"An unexpected error occurred while extracting content from {notebook_path}: {e}")
            return {'code': '', 'markdown': '', 'cells': []}


<h1>3. Defining the Graph Nodes (Agents)</h1>

<h3>3.1. Node 1: Ingest Notebook (Entry Point)</h3>

Now we define the first node for our LangGraph workflow: `ingest_notebook`. This function serves as the entry point for our graph.

It performs the following actions:
1.  Initializes the `KaggleNotebookFetcher`.
2.  Retrieves the `target_url` from the input state.
3.  Calls the fetcher's methods to parse the URL, download the notebook, and extract its content.
4.  Updates the `GraphState` with the `notebook_content` (a dictionary containing the code and markdown).
5.  Handles any errors that occur during the process and records them in the state.

In [6]:
def ingest_notebook(state: GraphState) -> GraphState:
    """
    LangGraph node to ingest a Kaggle notebook from a given URL.
    Fetches, parses, and extracts content, updating the GraphState.
    """
    print("---INGESTING KAGGLE NOTEBOOK---")
    fetcher = KaggleNotebookFetcher()
    target_url = state.get('target_url')

    if not target_url:
        error_msg = "target_url not found in GraphState."
        state['error_message'] = error_msg
        state['logs'] = append_log(state, "ingest_notebook", "ERROR", {"error": error_msg})
        return state

    try:
        # 1. Parse the URL
        owner_slug, kernel_slug = fetcher.parse_url(target_url)
        print(f"Parsed URL: Owner='{owner_slug}', Kernel='{kernel_slug}'")

        # 2. Fetch the kernel
        notebook_path = fetcher.fetch_kernel(owner_slug, kernel_slug)
        if notebook_path is None:
            error_msg = f"Failed to fetch notebook for {owner_slug}/{kernel_slug}."
            state['error_message'] = error_msg
            state['logs'] = append_log(state, "ingest_notebook", "ERROR", {"target_url": target_url, "owner_slug": owner_slug, "kernel_slug": kernel_slug, "error": error_msg})
            return state

        # 3. Extract content
        content = fetcher.extract_content(notebook_path)
        state['notebook_content'] = content
        print(f"Successfully extracted content from {kernel_slug}.")

        # 4. Clean up the temporary .ipynb file
        if os.path.exists(notebook_path):
            os.remove(notebook_path)
            print(f"Cleaned up temporary file: {notebook_path}")

        state['logs'] = append_log(state, "ingest_notebook", "SUCCESS", {"target_url": target_url, "owner_slug": owner_slug, "kernel_slug": kernel_slug})

    except ValueError as e:
        print(f"ValueError during ingestion: {e}")
        error_msg = f"Invalid URL or parsing error: {e}"
        state['error_message'] = error_msg
        state['logs'] = append_log(state, "ingest_notebook", "ERROR", {"target_url": target_url, "error": str(e)})
    except Exception as e:
        print(f"An unexpected error occurred during ingestion: {e}")
        error_msg = f"Error during ingestion: {e}"
        state['error_message'] = error_msg
        state['logs'] = append_log(state, "ingest_notebook", "ERROR", {"target_url": target_url, "error": str(e)})

    return state

<h3>3.2. Configuring the LLMs with Fallbacks</h3>

Here, we configure the generative models that will serve as the "brains" for our agents. To enhance the reliability of our system, we implement a fallback mechanism.

1.  **`llm_primary`**: The main `gemini-2.5-flash` model we intend to use.
2.  **`llm_fallback`**: A second instance of the same model that will be used if the primary one fails.
3.  **`llm_with_fallbacks`**: A new, resilient LangChain runnable object is created using `.with_fallbacks()`. If a call to `llm_primary` fails (e.g., due to a temporary API error), it will automatically retry the call with `llm_fallback`. This makes our application more robust.

In [7]:
# 1. Instantiate a primary ChatGoogleGenerativeAI model
llm_primary = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0.0,
    convert_system_message_to_human=True
)

# 2. Instantiate a fallback ChatGoogleGenerativeAI model
llm_fallback = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0.0,
    convert_system_message_to_human=True
)

# 3. Create the llm_with_fallbacks object
llm_with_fallbacks = llm_primary.with_fallbacks([llm_fallback])

print("ChatGoogleGenerativeAI models with fallbacks initialized.")

ChatGoogleGenerativeAI models with fallbacks initialized.


<h3>3.3. Node 2: Analyze Code (Code Reviewer Agent)</h3>

This cell defines the `analyze_code` node, which acts as our **Code Reviewer Agent**.

Its role is to:
1.  Read the `notebook_content` from the shared `GraphState`.
2.  Define a system prompt that gives the LLM a specific persona: a "Senior Google Staff Software Engineer."
3.  Instruct the LLM to review the code for reproducibility, efficiency, bugs, and best practices.
4.  Invoke the `llm_with_fallbacks` chain with the code.
5.  Save the generated analysis as a string into the `code_report` field of our `GraphState`.

In [8]:
def analyze_code(state: GraphState) -> GraphState:
    """
    LangGraph node to analyze the code content of a Kaggle notebook.
    Uses an LLM to provide a report on reproducibility, efficiency, and bugs.
    """
    print("---ANALYZING CODE--- ")

    notebook_content = state.get('notebook_content')
    if not notebook_content or not notebook_content.get('code'):
        error_msg = "No code content found for analysis."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "analyze_code", "ERROR", {"error": error_msg})
        return state

    code_to_analyze = notebook_content['code']

    try:
        # 1. Define the system prompt (persona and instructions)
        system_prompt = (
            "You are a Senior Google Staff Software Engineer. Your task is to provide a comprehensive "
            "code review of the given Python code from a Kaggle notebook. Focus on the following aspects: "
            "reproducibility, efficiency, potential bugs, security vulnerabilities (if any), and adherence to best practices. "
            "Provide constructive and detailed feedback in a clear, structured report format. "
            "Start your response directly with the analysis report, without pleasantries or introductory phrases. "
            "Your response should be a string of the analysis report."
        )

        # 2. Define the human message (including the code to analyze)
        human_message = "Analyze the following Python code:\n\n```python\n{code}\n```"

        # 3. Create a prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", human_message)
        ])

        # 4. Invoke the LLM with fallbacks
        # The llm_with_fallbacks object was defined in the previous step.
        full_chain = prompt_template | llm_with_fallbacks
        response = full_chain.invoke({"code": code_to_analyze})

        # Store the string output from the LLM's response
        state['code_report'] = response.content
        print("‚úÖ Code analysis report generated successfully.")

        state['logs'] = append_log(state, "analyze_code", "SUCCESS", {"report_snippet": state['code_report'][:100] + "..." if state['code_report'] else ""})

    except Exception as e:
        error_msg = f"Error during code analysis: {e}"
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "analyze_code", "ERROR", {"error": str(e)})

    return state

<h3>3.4. Node 3: Analyze Documentation (Technical Writer Agent)</h3>

Next, we define the `analyze_documentation` node, which acts as our **Technical Writer Agent**.

This agent's responsibilities are:
1.  Read the markdown content and the `code_report` from the `GraphState`.
2.  Assume the persona of a "Senior Technical Writer & Data Storyteller."
3.  Analyze the notebook's documentation for clarity, narrative flow, and how well it explains the accompanying code. The `code_report` is provided as context to ensure the documentation review is relevant.
4.  Save the generated documentation review into the`doc_report` field of the `GraphState`.


In [9]:
def analyze_documentation(state: GraphState) -> GraphState:
    """
    LangGraph node to analyze the markdown (documentation) content of a Kaggle notebook.
    Uses an LLM to provide a report on clarity, narrative flow, and explanation of code logic.
    """
    print("---ANALYZING DOCUMENTATION---")

    notebook_content = state.get('notebook_content')
    code_report = state.get('code_report') # Get the code report for context

    if not notebook_content or not notebook_content.get('markdown'):
        error_msg = "No markdown content found for analysis."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "analyze_documentation", "ERROR", {"error": error_msg})
        return state

    if not code_report:
        error_msg = "Code report not found. Documentation analysis requires code context."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "analyze_documentation", "ERROR", {"error": error_msg})
        return state

    markdown_to_analyze = notebook_content['markdown']

    try:
        # 1. Define the system prompt (persona and instructions)
        system_prompt = (
            "You are a Senior Technical Writer & Data Storyteller. Your task is to provide a comprehensive "
            "review of the provided markdown documentation from a Kaggle notebook. "
            "Analyze it for clarity, narrative flow, and most importantly, how well it explains and corresponds to the accompanying code logic. "
            "Consider the code analysis report provided as context. "
            "Provide constructive and detailed feedback in a clear, structured report format. "
            "Start your response directly with the analysis report, without pleasantries or introductory phrases. "
            "Your response should be a string of the analysis report."
        )

        # 2. Define the human message (including markdown and code report for context)
        human_message = (
            "Analyze the following markdown documentation in the context of the provided code analysis report.\n\n"
            "--- Markdown Documentation ---\n"
            "```markdown\n{markdown}\n```\n\n"
            "--- Code Analysis Report (for context) ---\n"
            "```text\n{code_report}\n```\n"
        )

        # 3. Create a prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", human_message)
        ])

        # 4. Invoke the LLM with fallbacks
        full_chain = prompt_template | llm_with_fallbacks
        response = full_chain.invoke({
            "markdown": markdown_to_analyze,
            "code_report": code_report
        })

        # Store the string output from the LLM's response
        state['doc_report'] = response.content
        print("‚úÖ Documentation analysis report generated successfully.")

        state['logs'] = append_log(state, "analyze_documentation", "SUCCESS", {"report_snippet": state['doc_report'][:100] + "..." if state['doc_report'] else ""})

    except Exception as e:
        error_msg = f"Error during documentation analysis: {e}"
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "analyze_documentation", "ERROR", {"error": str(e)})

    return state

<h3>3.5. Node 4: Audit Capabilities (Principal Auditor Agent)</h3>

This cell defines the `audit_capabilities` node, which acts as our **Principal Auditor Agent**.

This agent performs a higher-level synthesis of the previous analyses:
1.  It takes both the `code_report` and `doc_report` from the `GraphState`.
2.  Assuming the persona of a "Principal AI Researcher / Peer Reviewer," it prompts the LLM to synthesize these reports.
3.  The LLM is instructed to generate a comprehensive audit, which must include:
    *   A **Trust Score** (an integer from 0 to 100).
    *   A list of the top 3 critical flaws.
    *   An assessment of "Correction Feasibility" for each flaw.
4.  The output is saved as the `capability_report`, and the initial trust score is recorded for later comparison.

In [10]:
def audit_capabilities(state: GraphState) -> GraphState:
    """
    LangGraph node to audit the overall capabilities of the notebook
    by synthesizing code and documentation reports.
    Assigns a Trust Score, identifies critical flaws, and assesses correction feasibility.
    """
    print("---AUDITING CAPABILITIES---")

    code_report = state.get('code_report')
    doc_report = state.get('doc_report')
    current_iteration = state.get('iteration', 0)

    if not code_report:
        error_msg = "Code report not found for capability audit."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "audit_capabilities", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    if not doc_report:
        error_msg = "Documentation report not found for capability audit."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "audit_capabilities", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    try:
        # 1. Define the system prompt (persona and instructions)
        system_prompt = (
            "You are a Principal AI Researcher / Peer Reviewer. Your task is to critically analyze "
            "the provided 'Code Analysis Report' and 'Documentation Analysis Report' for a Kaggle notebook. "
            "Synthesize these two reports to provide an overall audit, focusing on the notebook's "
            "reliability, correctness, and clarity. Your output must include: "
            "1. A 'Trust Score' (an integer from 0 to 100, where 100 is excellent)."
            "2. Top 3 critical flaws identified across both code and documentation, prioritizing impact."
            "3. For each of the top 3 flaws, a brief assessment of 'Correction Feasibility' (e.g., 'Easy', 'Medium', 'Hard')."
            "Provide constructive and detailed feedback in a clear, structured report format. "
            "Start your response directly with the analysis report, without pleasantries or introductory phrases. "
            "Your response should be a string of the audit report."
        )

        # 2. Define the human message
        human_message = (
            "Please synthesize the following code and documentation analysis reports:\n\n"
            "--- Code Analysis Report ---\n"
            "```text\n{code_report}\n```\n\n"
            "--- Documentation Analysis Report ---\n"
            "```text\n{doc_report}\n```\n"
        )

        # 3. Create a prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", human_message)
        ])

        # 4. Invoke the LLM with fallbacks
        full_chain = prompt_template | llm_with_fallbacks
        response = full_chain.invoke({
            "code_report": code_report,
            "doc_report": doc_report
        })

        # Store the string output from the LLM's response
        state['capability_report'] = response.content
        print("‚úÖ Capability audit report generated successfully.")

        # Extract the current trust score from the newly generated capability_report
        current_trust_score = None
        score_match = re.search(r'Trust Score: (\d+)', state['capability_report'])
        if score_match:
            current_trust_score = int(score_match.group(1))

        # Set initial_audit_score if it's the first time this node is run in a sequence (i.e., it's None in the incoming state)
        if state.get('initial_audit_score') is None and current_trust_score is not None:
            state['initial_audit_score'] = current_trust_score
            print(f"Initial Audit Score set to: {current_trust_score}")

        # Ensure iteration is explicitly carried forward in the state returned by audit_capabilities
        state['iteration'] = current_iteration

        state['logs'] = append_log(state, "audit_capabilities", "SUCCESS", {"current_trust_score": current_trust_score, "iteration": current_iteration, "report_snippet": state['capability_report'][:100] + "..." if state['capability_report'] else ""})

    except Exception as e:
        error_msg = f"Error during capability audit: {e}"
        state['error_message'] = error_msg
        print(state['error_message'])
        # Even in error, try to preserve iteration if possible for debugging loops
        state['iteration'] = current_iteration
        state['logs'] = append_log(state, "audit_capabilities", "ERROR", {"error": str(e), "iteration": current_iteration})

    return state

<h3>3.6. Node 5: Generate Corrections (Mentor & Corrector Agent)</h3>

This cell defines the `generate_corrections` node, which acts as our **Mentor & Corrector Agent**. This node is only activated if the audit score is below a certain threshold.

Its purpose is to:
1.  Take the `code_report`, `doc_report`, and the current `notebook_content` (specifically the list of cells) from the `GraphState`.
2.  Assume the persona of a "Senior Staff Engineer & Mentor."
3.  Instruct the LLM to generate corrected versions of the notebook's cells based on the identified flaws.
4.  The LLM is specifically prompted to return its output in a clean JSON format with a `cells` key, containing a list of cell objects.
5.  This new, corrected list of cells overwrites the existing cells in `notebook_content`. The flattened `code` and `markdown` keys are then re-synchronized from this new list of cells, preparing the state for a re-evaluation loop.


In [11]:
from google.api_core.exceptions import ResourceExhausted

def generate_corrections(state: GraphState) -> GraphState:
    """
    LangGraph node to generate corrected code and markdown based on audit reports.
    Uses an LLM to review reports and suggest improvements, preserving cell structure.
    """
    print("--GENERATING CORRECTIONS--")

    code_report = state.get('code_report')
    doc_report = state.get('doc_report')
    notebook_content = state.get('notebook_content')
    current_iteration = state.get('iteration', 0)

    if not code_report and not doc_report:
        error_msg = "No code or documentation reports found to generate corrections."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "generate_corrections", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    # Critical: Ensure 'cells' are present in notebook_content
    if not notebook_content or not notebook_content.get('cells'):
        error_msg = "Current notebook cells not found in notebook_content for correction."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "generate_corrections", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    llm_output_content = None # Initialize outside try block for wider scope

    try:
        # Get original cells to pass to the LLM for context
        original_cells = notebook_content['cells']
        original_cells_json = json.dumps(original_cells, indent=2) # Convert to JSON string for prompt

        # 1. Define the system prompt (persona and instructions)
        system_prompt = (
            "You are a Senior Staff Engineer & Mentor. Your task is to review the provided 'Code Analysis Report' and 'Documentation Analysis Report' for a Kaggle notebook. "
            "Based on these reports, you must generate improved versions of the original notebook's cells. Focus on addressing the identified flaws, "
            "improving clarity, reproducibility, efficiency, and narrative flow. "
            "Your output MUST be a JSON object with a single key: 'cells'. The 'cells' key must contain a list of cell objects. "
            "Each cell object MUST have two keys: 'cell_type' (either 'code' or 'markdown') and 'source' (the content of the cell as a string). "
            "The 'source' for code cells should be Python code, and for markdown cells should be Markdown text. "
            "Preserve the original interleaving of code and markdown cells as much as possible, only modifying the 'source' content as needed. "
            "Do not include any other text outside the JSON. Ensure the JSON is perfectly formatted and valid."
        )

        # 2. Define the human message (including all relevant context)
        human_message = (
            "Please provide corrected versions of the following Kaggle notebook's cells based on the analysis reports.\n\n"
            "--- Code Analysis Report ---\n"
            "```text\n{code_report}\n```\n\n"
            "--- Documentation Analysis Report ---\n"
            "```text\n{doc_report}\n```\n\n"
            "--- Original Notebook Cells (for structure and context) ---\n"
            "```json\n{original_cells_json}\n```\n\n"
            "Your response should be a JSON object with a single key 'cells' containing an array of cell objects, each with 'cell_type' and 'source'."
        )

        # 3. Create a prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", human_message)
        ])

        # 4. Invoke the LLM with fallbacks and rate limit handling
        full_chain = prompt_template | llm_with_fallbacks
        try:
            response = full_chain.invoke({
                "code_report": code_report,
                "doc_report": doc_report,
                "original_cells_json": original_cells_json
            })
        except ResourceExhausted:
            print("‚ö†Ô∏è RATE LIMIT HIT. Sleeping for 60 seconds to reset quota...")
            time.sleep(60) # Force a cool-down
            # Retry once after sleep
            response = full_chain.invoke({
                "code_report": code_report,
                "doc_report": doc_report,
                "original_cells_json": original_cells_json
            })

        # 5. Parse the LLM's response
        llm_output_content = response.content

        # Robust Regex Extraction
        # This finds the first opening brace '{' and the last closing brace '}'
        match = re.search(r'\{.*\}', llm_output_content, re.DOTALL)

        if match:
            json_str = match.group(0)
            try:
                corrections_json = json.loads(json_str)
            except json.JSONDecodeError:
                # Fallback: Try to clean common Markdown issues if strict parse fails
                clean_str = json_str.replace("```json", "").replace("```", "")
                corrections_json = json.loads(clean_str)
        else:
            raise ValueError("No JSON object found in LLM response.")


        if 'cells' not in corrections_json or not isinstance(corrections_json['cells'], list):
            raise ValueError("LLM response missing 'cells' key or 'cells' is not a list.")

        new_cells = corrections_json['cells']

        # Validate each cell object structure
        for cell in new_cells:
            if not isinstance(cell, dict) or 'cell_type' not in cell or 'source' not in cell:
                raise ValueError("Invalid cell object structure in LLM response.")
            if cell['cell_type'] not in ['code', 'markdown']:
                raise ValueError(f"Invalid cell_type '{cell['cell_type']}' in LLM response.")

        # 6. Update state with new cells
        state['notebook_content']['cells'] = new_cells

        # 7. CRITICAL SYNC STEP: Re-generate flattened code and markdown content from the new cells
        new_code_content_list = []
        new_markdown_content_list = []
        for cell in new_cells:
            if cell.get('cell_type') == 'code':
                new_code_content_list.append(cell.get('source', ''))
            elif cell.get('cell_type') == 'markdown':
                new_markdown_content_list.append(cell.get('source', ''))

        state['notebook_content']['code'] = '\n'.join(new_code_content_list)
        state['notebook_content']['markdown'] = '\n'.join(new_markdown_content_list)

        # 8. Increment iteration
        state['iteration'] = current_iteration + 1

        print(f"‚úÖ Corrections generated successfully. Iteration: {state['iteration']}")
        state['logs'] = append_log(state, "generate_corrections", "SUCCESS", {"iteration": state['iteration']})

    except ValueError as e:
        error_msg = f"LLM output JSON format invalid: {e}. Output: '{llm_output_content[:500]}...'"
        state['error_message'] = error_msg
        print(state['error_message'])
        state['iteration'] = current_iteration + 1 # Increment iteration to prevent infinite loops on repeated errors
        state['logs'] = append_log(state, "generate_corrections", "ERROR", {"error": str(e), "llm_output_prefix": llm_output_content[:500], "iteration": state['iteration']})
        return state
    except Exception as e:
        error_msg = f"An unexpected error occurred during correction generation: {e}"
        state['error_message'] = error_msg
        print(state['error_message'])
        # Increment iteration to prevent infinite loops on repeated errors
        state['iteration'] = current_iteration + 1
        state['logs'] = append_log(state, "generate_corrections", "ERROR", {"error": str(e), "iteration": state['iteration']})

    return state

<h3>3.7. Node 6: Synthesize Report (Final Auditor Agent)</h3>

This cell defines the `synthesize_report` node, which is the final step in our workflow. It acts as the **Final Auditor Agent**.

Its responsibilities are:
1.  Define a `FinalReport` Pydantic model to specify the exact JSON structure of the final output. This ensures a reliable, machine-readable result.
2.  Take the `initial_audit_score` and the final `capability_report` from the `GraphState`.
3.  Prompt the LLM to synthesize all the information into a final summary that conforms to the `FinalReport` schema.
4.  The structured output from the LLM is then saved as a JSON string in the `final_report` field of the `GraphState`, concluding the workflow.

In [12]:
class FinalReport(BaseModel):
    initial_score: int = Field(..., description="The initial audit score of the notebook.")
    final_score: int = Field(..., description="The final audit score of the notebook after corrections.")
    improvements: List[str] = Field(..., description="A list of key improvements made during the correction process.")
    overall_summary: str = Field(..., description="An overall summary of the notebook's quality and the audit process.")

def synthesize_report(state: GraphState) -> GraphState:
    """
    LangGraph node to synthesize a final JSON report based on all analysis.
    Uses an LLM to generate a structured output according to the FinalReport Pydantic model.
    """
    print("---SYNTHESIZING FINAL REPORT---")

    initial_audit_score = state.get('initial_audit_score')
    capability_report = state.get('capability_report')
    current_iteration = state.get('iteration', 0)

    # Extract final_score from capability_report if available
    final_score = None
    if capability_report:
        score_match = re.search(r'Trust Score: (\d+)', capability_report)
        if score_match:
            final_score = int(score_match.group(1))

    if initial_audit_score is None:
        error_msg = "Initial audit score not found for final report synthesis."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "synthesize_report", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    if capability_report is None:
        error_msg = "Capability report not found for final report synthesis."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "synthesize_report", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    if final_score is None:
        error_msg = "Could not extract final score from capability report."
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "synthesize_report", "ERROR", {"error": error_msg, "iteration": current_iteration})
        return state

    try:
        # 1. Define the system prompt (persona and instructions)
        system_prompt = (
            "You are a Principal AI Researcher / Final Auditor. Your task is to synthesize all available information "
            "about a Kaggle notebook's audit process into a final, comprehensive report. "
            "The report should clearly summarize the notebook's quality, the improvements made, and the overall outcome of the audit. "
            "Your output MUST be a JSON object that strictly adheres to the provided Pydantic schema for `FinalReport`. "
            "Include the initial score, the final score, a list of key improvements (if any), and an overall summary. "
            "Do not include any other text outside the JSON. Be concise and factual."
        )

        # 2. Define the human message
        human_message = (
            "Please generate a final audit report based on the following information:\n\n" +
            f"Initial Audit Score: {initial_audit_score}\n" +
            f"Current Iteration: {current_iteration}\n" +
            "--- Latest Capability Report ---\n" +
            "```text\n{capability_report}\n```\n\n" +
            "Based on this, provide the 'final_score', 'improvements' (from identified flaws in the capability report, or state 'No significant changes yet' if applicable), and an 'overall_summary'."
        )

        # 3. Create a prompt template
        prompt_template = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            ("human", human_message)
        ])

        # 4. Invoke the LLM with fallbacks, ensuring structured output
        structured_llm = llm_with_fallbacks.with_structured_output(FinalReport)

        # First apply the prompt template, then pipe to structured_llm
        full_chain = prompt_template | structured_llm
        final_report_pydantic = full_chain.invoke({
            "capability_report": capability_report,
            "initial_audit_score": initial_audit_score,
            "iteration": current_iteration
        })

        final_report_pydantic.initial_score = initial_audit_score
        final_report_pydantic.final_score = final_score

        # 5. Convert the Pydantic object to a JSON string
        state['final_report'] = final_report_pydantic.model_dump_json(indent=2)
        print("‚úÖ Final report synthesized successfully.")
        state['logs'] = append_log(state, "synthesize_report", "SUCCESS", {"final_score": final_score, "iteration": current_iteration})

    except Exception as e:
        error_msg = f"Error during final report synthesis: {e}"
        state['error_message'] = error_msg
        print(state['error_message'])
        state['logs'] = append_log(state, "synthesize_report", "ERROR", {"error": str(e), "iteration": current_iteration})

    return state

print("synthesize_report function (LangGraph node) is defined, along with FinalReport Pydantic model.")

synthesize_report function (LangGraph node) is defined, along with FinalReport Pydantic model.


<h1>4. Defining the Graph Logic</h1>

<h3>4.1. Conditional Edge: Routing for Correction</h3>

This function, `route_for_correction`, defines the conditional logic that enables our workflow to loop. It acts as a router after the `audit_capabilities` node has run.

1.  It reads the `capability_report` and the current `iteration` count from the state.
2.  It parses the `Trust Score` from the report.
3.  **If** the score is below 95 and the number of correction loops is less than `MAX_ITERATIONS`, it returns the string `"generate_corrections"`. This tells LangGraph to proceed to the correction node.
4.  **Else** (if the score is high enough or we've iterated too many times), it returns `"synthesize_report"`, directing the graph to the final reporting step.

In [13]:
# Define a constant for maximum iterations
MAX_ITERATIONS = 3 # Or 5, depending on desired robustness

def route_for_correction(state: GraphState) -> str:
    """
    Conditional edge function to determine the next step in the graph.
    Routes to 'generate_corrections' if score is low and max iterations not met,
    otherwise routes to 'synthesize_report'.
    Also sets the initial_audit_score if it's the first run.
    """
    print("---ROUTING FOR CORRECTION---")

    capability_report = state.get('capability_report')
    initial_audit_score = state.get('initial_audit_score')
    iteration = state.get('iteration', 0) # Default to 0 if not set

    if not capability_report:
        print("No capability report found. Routing to synthesize_report as a fallback.")
        return "synthesize_report"

    # Parse the Trust Score from the capability_report
    trust_score_match = re.search(r'Trust Score: (\d+)', capability_report)
    if not trust_score_match:
        print("Trust Score not found in capability report. Routing to synthesize_report as a fallback.")
        return "synthesize_report"

    current_trust_score = int(trust_score_match.group(1))
    print(f"Current Trust Score: {current_trust_score}, Iteration: {iteration}/{MAX_ITERATIONS}")

    # Set initial_audit_score if it's the first run (iteration 0)
    if initial_audit_score is None:
        state['initial_audit_score'] = current_trust_score
        print(f"Initial Audit Score set to: {current_trust_score}")

    # Conditional logic
    if current_trust_score < 95 and iteration < MAX_ITERATIONS:
        print("Score below 95 and max iterations not reached. Routing to generate_corrections.")
        return "generate_corrections"
    else:
        print("Score >= 95 OR max iterations reached. Routing to synthesize_report.")
        return "synthesize_report"

print("route_for_correction function (LangGraph conditional edge) is defined.")

route_for_correction function (LangGraph conditional edge) is defined.


<h3>4.2. Assembling and Compiling the Workflow</h3>

Here, we assemble all the components into a complete, runnable workflow using `StateGraph`.

1.  **Initialize Graph**: A `StateGraph` is created with our `GraphState` as its schema.
2.  **Add Nodes**: Each of our functions (`ingest_notebook`, `analyze_code`, etc.) is added as a node in the graph.
3.  **Set Entry Point**: We specify that the workflow must always start at the `ingest_notebook` node.
4.  **Add Edges**: We define the connections between the nodes. This creates the primary path: `ingest` -> `analyze_code` -> `analyze_documentation` -> `audit_capabilities`.
5.  **Add Conditional Edge**: We use the `route_for_correction` function to create a branching point after the audit. The graph will either loop back via `generate_corrections` or proceed to `synthesize_report`.
6.  **Compile**: The graph definition is compiled into a runnable application, `app`.

In [14]:
# 1. Initialize StateGraph with GraphState as its schema
workflow = StateGraph(GraphState)

# 2. Add each of the six functions as nodes
workflow.add_node("ingest_notebook", ingest_notebook)
workflow.add_node("analyze_code", analyze_code)
workflow.add_node("analyze_documentation", analyze_documentation)
workflow.add_node("audit_capabilities", audit_capabilities)
workflow.add_node("generate_corrections", generate_corrections)
workflow.add_node("synthesize_report", synthesize_report)

# 3. Set the entry point
workflow.set_entry_point("ingest_notebook")

# 4. Define linear edges
workflow.add_edge("ingest_notebook", "analyze_code")
workflow.add_edge("analyze_code", "analyze_documentation")
workflow.add_edge("analyze_documentation", "audit_capabilities")

# 5. Define the conditional edge after audit_capabilities
workflow.add_conditional_edges(
    "audit_capabilities", # Source node
    route_for_correction, # Conditional function
    {
        "generate_corrections": "generate_corrections", # If route_for_correction returns "generate_corrections"
        "synthesize_report": "synthesize_report"      # If route_for_correction returns "synthesize_report"
    }
)

# 6. Define the loop edge from generate_corrections back to analyze_code
workflow.add_edge("generate_corrections", "analyze_code")

# 7. Define the end edge from synthesize_report to END
workflow.add_edge("synthesize_report", END)

# Compile the workflow
app = workflow.compile()

print("StateGraph workflow defined, nodes added, and edges connected.")

StateGraph workflow defined, nodes added, and edges connected.


<h1>5. Execution and Output</h1>

<h3>5.1. Helper Functions: Saving Outputs</h3>

This cell defines two helper functions designed to save the final outputs of our workflow.

1.  **`save_corrected_notebook`**: Takes the final `GraphState`, which contains the corrected code and markdown, and reconstructs a valid `.ipynb` notebook file from that content. This provides a tangible, usable artifact from the audit and correction process.
2.  **`save_execution_logs`**: Takes the final `GraphState` and saves the `logs` list to a JSON file. This creates a detailed audit trail of the entire process, which is useful for debugging and review.

In [15]:
def save_corrected_notebook(state, filename="corrected_notebook.ipynb"):
    """
    Reconstructs a valid .ipynb file from the agent's 'notebook_content' state,
    preserving the original cell structure.
    """
    content = state.get("notebook_content")
    if not content or not content.get('cells'):
        print("‚ùå No structured notebook content ('cells' key) found to save.")
        return

    cells_to_save = []

    for original_cell in content['cells']:
        cell_type = original_cell.get('cell_type')
        source = original_cell.get('source', '')

        # Ensure source is a list of strings, each ending with a newline, for .ipynb format
        if isinstance(source, str):
            source_lines = [line + "\n" for line in source.split('\n')]
        elif isinstance(source, list):
            # Ensure each line in the list ends with a newline, if not already
            source_lines = [line if line.endswith('\n') else line + '\n' for line in source]
        else:
            source_lines = [str(source) + '\n'] # Convert non-string source to string and add newline

        if cell_type == 'markdown':
            new_cell = {
                "cell_type": "markdown",
                "metadata": {},
                "source": source_lines
            }
        elif cell_type == 'code':
            new_cell = {
                "cell_type": "code",
                "execution_count": None, # Set to None for a newly saved notebook
                "metadata": {},
                "outputs": [],
                "source": source_lines
            }
        else:
            # Handle unknown cell types or skip them
            print(f"‚ö†Ô∏è Warning: Unknown cell type '{cell_type}' encountered. Skipping cell.")
            continue
        cells_to_save.append(new_cell)

    # 3. Assemble the Notebook Structure
    notebook_json = {
        "cells": cells_to_save,
        "metadata": {
            "kernelspec": {
                "display_name": "Python 3",
                "language": "python",
                "name": "python3"
            },
            "language_info": {
                "codemirror_mode": {"name": "ipython", "version": 3},
                "file_extension": ".py",
                "mimetype": "text/x-python",
                "name": "python",
                "nbconvert_exporter": "python",
                "pygments_lexer": "ipython3",
                "version": "3.10.12" # Use a default or try to extract from original if available
            }
        },
        "nbformat": 4,
        "nbformat_minor": 5
    }

    # 4. Write to Disk
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(notebook_json, f, indent=2)
        print(f"‚úÖ SUCCESS: Corrected notebook saved to '{filename}'")
        print(f"‚¨áÔ∏è  Check the 'Files' sidebar in Colab to download it!")
    except Exception as e:
        print(f"‚ùå ERROR: Failed to save corrected notebook to '{filename}': {e}")

def save_execution_logs(state, filename="execution_logs.json"):
    """
    Saves the accumulated logs from the GraphState to a JSON file.
    """
    logs = state.get('logs', [])

    if not logs:
        print("‚ÑπÔ∏è No logs found to save.")
        return

    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(logs, f, indent=2)
        print(f"‚úÖ SUCCESS: Execution logs saved to '{filename}'")
        print(f"‚¨áÔ∏è  Check the 'Files' sidebar in Colab to download it!")
    except Exception as e:
        print(f"‚ùå ERROR: Failed to save execution logs to '{filename}': {e}")

<h3>5.2. Running the Auto-Auditor Workflow</h3>

This is the main execution block where we run the entire Auto-Auditor workflow.

1.  **Configuration**: We set a `recursion_limit` to allow for the looping behavior of the graph.
2.  **Initial State**: We define the starting state for the graph, providing the `target_url` of the Kaggle notebook we want to audit. The user is prompted to enter a URL, with a default provided as a fallback. The `iteration` count and `logs` list are also initialized.
3.  **Execution**: We call `app.stream()` to run the workflow. The `stream` method allows us to see the output from each node as it completes, providing a real-time log of the agent's progress. The final state is captured as the stream progresses.
4.  **Final Output**: Once the stream is complete, the final state is passed to the `save_corrected_notebook` and `save_execution_logs` functions, which write the final, improved notebook and the process logs to files. The final JSON report is also printed.

In [16]:
config = RunnableConfig(recursion_limit=30)

# 1. Implement User Input with Fallback Logic
default_url = "https://www.kaggle.com/code/jhoward/jupyter-notebook-101"
user_input_url = input(f"Enter Kaggle URL (or press Enter for default: {default_url}): ")

target_url = default_url
if user_input_url:
    if user_input_url.startswith("https://www.kaggle.com/code/"):
        target_url = user_input_url
        print(f"Using user-provided URL: {target_url}")
    else:
        print("‚ö†Ô∏è Invalid Kaggle URL format. Falling back to default URL.")
        print(f"Using default URL: {target_url}")
else:
    print(f"Using default URL: {target_url}")

initial_state = {
    "target_url": target_url,
    "notebook_content": None, "code_report": None, "doc_report": None,
    "capability_report": None, "final_report": None, "error_message": None,
    "iteration": 0, # Initialize iteration
    "logs": [] # Initialize logs list
}

print("üöÄ LAUNCHING KAGGLE AUTO-AUDITOR (Full Loop Mode)...")

final_state = None # Initialize final_state to be captured from stream

try:
    print("\n--- Workflow Events ---")
    for event in app.stream(initial_state, config=config):
        for key, value in event.items():
            print(f"üü¢ Finished Step: {key}")
            # Update final_state with the latest full state from the event.
            # The `value` in an event item is the state after that node executed.
            # We update `initial_state` which acts as our accumulating state.
            initial_state.update(value)

            # Check for specific milestones
            if key == "generate_corrections":
                print("   ‚ú® CORRECTIONS GENERATED! Looping back to analysis...")
            elif key == "audit_capabilities":
                # Safely access the score from the state update
                report = value.get('capability_report', '')
                score_match = re.search(r'Trust Score: (\d+)', report)
                current_score = int(score_match.group(1)) if score_match else 'N/A'
                print(f"   üîç Audit Complete. Current Trust Score: {current_score}")

            # If the workflow reached END, the final state is in initial_state
            if key == "synthesize_report": # This is the last node before END
                final_state = initial_state.copy()
                break # Break from inner loop once synthesize_report is hit

        if final_state: # If synthesize_report was processed, we're done
            break

    print("\n‚úÖ WORKFLOW FINISHED SUCCESSFULLY")

    # --- EXPORT STEP ---
    # 3. Error Handling: Only call save_corrected_notebook if a final_state was captured
    if final_state:
        save_corrected_notebook(final_state, filename="corrected_notebook_v2.ipynb")
        save_execution_logs(final_state, filename="execution_logs.json") # Save execution logs
        if final_state.get('final_report'):
            print("\n--- Final Report ---")
            print(final_state['final_report'])
    else:
        print("‚ùå Workflow completed, but no final state was captured for saving the notebook and logs.")

except Exception as e:
    print(f"‚ùå WORKFLOW FAILED: {e}")
    if initial_state.get('error_message'):
        print(f"  Error details from state: {initial_state['error_message']}")
    # Attempt to save logs even on failure
    save_execution_logs(initial_state, filename="execution_logs_error.json")
    import traceback
    traceback.print_exc()

Enter Kaggle URL (or press Enter for default: https://www.kaggle.com/code/jhoward/jupyter-notebook-101): https://www.kaggle.com/code/colinmorris/strings-and-dictionaries
Using user-provided URL: https://www.kaggle.com/code/colinmorris/strings-and-dictionaries
üöÄ LAUNCHING KAGGLE AUTO-AUDITOR (Full Loop Mode)...

--- Workflow Events ---
---INGESTING KAGGLE NOTEBOOK---
KaggleApi client initialized and authenticated.
Parsed URL: Owner='colinmorris', Kernel='strings-and-dictionaries'
Executing Kaggle CLI command: kaggle kernels pull colinmorris/strings-and-dictionaries -p /tmp/tmpgfiwobm9/kaggle_notebook_colinmorris_strings-and-dictionaries
Kaggle CLI stdout: Source code downloaded to /tmp/tmpgfiwobm9/kaggle_notebook_colinmorris_strings-and-dictionaries/strings-and-dictionaries.ipynb

Found .ipynb file: /tmp/tmpgfiwobm9/kaggle_notebook_colinmorris_strings-and-dictionaries/strings-and-dictionaries.ipynb
Copied .ipynb to persistent temp file: /tmp/tmpx23d1w3n.ipynb
Successfully extracted c