In [11]:
import operator
import re
import subprocess
import os
from typing import TypedDict, Annotated
from langchain_ollama import ChatOllama
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END

In [None]:
# # Define the State
# class AgentState(TypedDict):
#     task: str
#     code: str
#     error: str
#     feedback: str
#     iterations: int

# # Utility to extract raw python code from LLM output
# def extract_code(text: str) -> str:
#     # Matches everything between ```python and ```
#     match = re.search(r"```(?:python)?(.*?)```", text, re.DOTALL | re.IGNORECASE)
#     if match:
#         return match.group(1).strip()
#     return text.strip() # Fallback in case it outputs raw code

In [None]:
# Define the State
class AgentState(TypedDict):
    task: str
    code: str
    test_code: str  # NEW: Stores the pytest code
    error: str
    feedback: str
    iterations: int

# Utility to extract raw python code from LLM output
def extract_code(text: str) -> str:
    # Matches everything between ```python and ```
    match = re.search(r"```(?:python)?(.*?)```", text, re.DOTALL | re.IGNORECASE)
    if match:
        return match.group(1).strip()
    return text.strip()

## Initializing the models

In [None]:
# # The Coder
# coder_llm = ChatOllama(
#     model="qwen2.5:7b", 
#     temperature=0.1 # Low temp for deterministic syntax
# )

# # The Debugger
# debugger_llm = ChatOllama(
#     model="phi3.5", 
#     temperature=0.3 # Slight creativity for problem-solving
# )


# The Coder
coder_llm = ChatOllama(
    model="qwen2.5:7b", 
    temperature=0.1
)

# The Tester (Generates tests)
tester_llm = ChatOllama(
    model="phi3.5", 
    temperature=0.1 # Keep this low so tests are deterministic and syntactically strict
)

# The Debugger (Analyzes failures)
debugger_llm = ChatOllama(
    model="phi3.5", 
    temperature=0.3
)

## Defining the Nodes

In [None]:
# def coder_node(state: AgentState):
#     print(f"\n--- CODER NODE (Iteration {state['iterations'] + 1}) ---")
    
#     # Base prompt
#     system_prompt = "You are an expert Python coder. Write clean, PEP8 compliant code to solve the user's task. ONLY output Python code inside ```python ``` blocks. Do not explain the code."
    
#     user_prompt = f"Task: {state['task']}\n"
    
#     # If there is feedback from Phi-3.5, include it!
#     if state.get("feedback") and state.get("error"):
#         user_prompt += f"\nPrevious Error:\n{state['error']}\n\nDebugger Advice:\n{state['feedback']}\n\nPlease fix the code."
        
#     messages = [
#         SystemMessage(content=system_prompt),
#         HumanMessage(content=user_prompt)
#     ]
    
#     response = coder_llm.invoke(messages)
#     raw_code = extract_code(response.content)
    
#     print("Qwen generated new code.")
#     return {"code": raw_code, "iterations": state["iterations"] + 1}


# def tester_node(state: AgentState):
#     print("\n--- TESTER NODE ---")
    
#     sandbox_dir = "sandbox"
#     os.makedirs(sandbox_dir, exist_ok=True)
#     file_path = os.path.join(sandbox_dir, "temp_script.py")
    
#     # Write the code to the sandbox
#     with open(file_path, "w") as f:
#         f.write(state["code"])
        
#     try:
#         # Run the script with a timeout to prevent infinite loops
#         result = subprocess.run(
#             ["python", file_path], 
#             capture_output=True, 
#             text=True, 
#             timeout=10
#         )
        
#         if result.returncode == 0:
#             print("Execution Successful!")
#             return {"error": "None", "feedback": ""} # Clears errors if successful
#         else:
#             print("Execution Failed.")
#             return {"error": result.stderr}
            
#     except subprocess.TimeoutExpired:
#         print("⏳ Execution timed out!")
#         return {"error": "TimeoutExpired: The code took too long to run. Check for infinite loops."}


# def debugger_node(state: AgentState):
#     print("\n--- DEBUGGER NODE ---")
    
#     system_prompt = "You are an expert Python debugger. Analyze the following code and the traceback error. Explain exactly WHY it failed and provide a brief, actionable plan to fix it. Do NOT write the final code, just give instructions."
    
#     user_prompt = f"Code:\n```python\n{state['code']}\n```\n\nError Traceback:\n{state['error']}"
    
#     messages = [
#         SystemMessage(content=system_prompt),
#         HumanMessage(content=user_prompt)
#     ]
    
#     response = debugger_llm.invoke(messages)
#     print("Phi-3.5 generated debugging plan.")
    
#     return {"feedback": response.content}

In [None]:
def coder_node(state: AgentState):
    print(f"\n--- CODER NODE (Iteration {state.get('iterations', 0) + 1}) ---")
    
    system_prompt = "You are an expert Python coder. Write clean, PEP8 compliant code to solve the user's task. ONLY output Python code inside ```python ``` blocks. Do not explain the code. Do not write test cases."
    user_prompt = f"Task: {state['task']}\n"
    
    if state.get("feedback") and state.get("error"):
        user_prompt += f"\nPrevious Error:\n{state['error']}\n\nDebugger Advice:\n{state['feedback']}\n\nPlease fix the code."
        
    messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
    response = coder_llm.invoke(messages)
    
    print("Qwen generated new code.")
    return {"code": extract_code(response.content), "iterations": state.get("iterations", 0) + 1}


def test_generator_node(state: AgentState):
    print("\n--- TEST GENERATOR NODE (Phi-3.5) ---")
    
    system_prompt = (
        "You are a strict QA engineer. Write a comprehensive pytest suite for the provided Python code. "
        "The code you are testing will be saved in a file named `solution.py`. "
        "You MUST start your script with `import pytest` and `from solution import *`. "
        "Write edge cases and standard cases. ONLY output the Python test code inside ```python ``` blocks."
    )
    
    user_prompt = f"Original Task: {state['task']}\n\nCode to test:\n```python\n{state['code']}\n```"
    
    messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
    response = tester_llm.invoke(messages)
    
    print("Phi-3.5 generated pytest script.")
    return {"test_code": extract_code(response.content)}


def executor_node(state: AgentState):
    print("\n--- EXECUTOR NODE (Sandbox) ---")
    
    sandbox_dir = "sandbox"
    os.makedirs(sandbox_dir, exist_ok=True)
    
    # Save the main code
    solution_path = os.path.join(sandbox_dir, "solution.py")
    with open(solution_path, "w") as f:
        f.write(state["code"])
        
    # Save the test code
    test_path = os.path.join(sandbox_dir, "test_solution.py")
    with open(test_path, "w") as f:
        f.write(state["test_code"])
        
    try:
        # Run pytest on the generated test file
        result = subprocess.run(
            ["pytest", test_path, "-v"], 
            capture_output=True, 
            text=True, 
            timeout=15
        )
        
        if result.returncode == 0:
            print("Tests Passed Successfully!")
            return {"error": "None", "feedback": ""}
        else:
            print("Tests Failed.")
            return {"error": result.stdout + "\n" + result.stderr} # Pytest puts most info in stdout
            
    except subprocess.TimeoutExpired:
        print("⏳ Execution timed out!")
        return {"error": "TimeoutExpired: The tests took too long to run."}


def debugger_node(state: AgentState):
    print("\n--- DEBUGGER NODE ---")
    
    system_prompt = "You are an expert Python debugger. Analyze the code, the test cases, and the pytest error traceback. Explain exactly WHY it failed and provide a brief, actionable plan to fix the main code. Do NOT write the final code."
    
    user_prompt = (
        f"Main Code:\n```python\n{state['code']}\n```\n\n"
        f"Test Code:\n```python\n{state['test_code']}\n```\n\n"
        f"Pytest Traceback:\n{state['error']}"
    )
    
    messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
    response = debugger_llm.invoke(messages)
    
    print("Phi-3.5 generated debugging plan.")
    return {"feedback": response.content}

## Testing the nodes

In [None]:
# Create a mock state for the coder
mock_initial_state = {
    "task": "Write a Python function called `is_palindrome` that checks if a given string is a palindrome. It should ignore spaces and casing.",
    "code": "",
    "test_code": "",
    "error": "",
    "feedback": "",
    "iterations": 0
}

# Run the node directly
print("Testing Coder Node...\n")
coder_output = coder_node(mock_initial_state)

# Print the exact string that was extracted
print("\n--- EXTRACTED CODE ---")
print(coder_output.get("code", "No code returned."))
print("\n--- UPDATED ITERATION COUNT ---")
print(coder_output.get("iterations"))

In [None]:
# Create a mock state containing the code we want to test
mock_testing_state = {
    "task": "Write a Python function called `is_palindrome` that checks if a given string is a palindrome. It should ignore spaces and casing.",
    "code": coder_output.get("code", "def is_palindrome(s):\n    s = s.replace(' ', '').lower()\n    return s == s[::-1]"), # Fallback code just in case
    "test_code": "",
    "error": "",
    "feedback": "",
    "iterations": 1
}

# Run the test generator node directly
print("Testing Test Generator Node...\n")
tester_output = test_generator_node(mock_testing_state)

# Print the exact string that Phi generated
print("\n--- EXTRACTED PYTEST CODE ---")
print(tester_output.get("test_code", "No test code returned."))

## Graph Orchestration & Routing Logic

In [None]:
def route_after_test(state: AgentState):
    if state["error"] == "None":
        return "end"
    elif state["iterations"] >= 3:
        print("\nMax iterations reached. Aborting.")
        return "end"
    else:
        return "debug"

workflow = StateGraph(AgentState)

# Add all 4 Nodes
workflow.add_node("Coder", coder_node)
workflow.add_node("TestGenerator", test_generator_node)
workflow.add_node("Executor", executor_node)
workflow.add_node("Debugger", debugger_node)

# Set the new cyclical edges
workflow.set_entry_point("Coder")
workflow.add_edge("Coder", "TestGenerator")
workflow.add_edge("TestGenerator", "Executor")

workflow.add_conditional_edges(
    "Executor",
    route_after_test,
    {
        "end": END,
        "debug": "Debugger"
    }
)
workflow.add_edge("Debugger", "Coder")

memory = MemorySaver()
agent = workflow.compile(checkpointer=memory)

In [None]:
# def route_after_test(state: AgentState):
#     if state["error"] == "None":
#         return "end"
#     elif state["iterations"] >= 3: # Max 3 attempts
#         print("\nMax iterations reached. Aborting.")
#         return "end"
#     else:
#         return "debug"

# # Build the Graph
# workflow = StateGraph(AgentState)

# # Add Nodes
# workflow.add_node("Coder", coder_node)
# workflow.add_node("Tester", tester_node)
# workflow.add_node("Debugger", debugger_node)

# # Add Edges
# workflow.set_entry_point("Coder")
# workflow.add_edge("Coder", "Tester")
# workflow.add_conditional_edges(
#     "Tester",
#     route_after_test,
#     {
#         "end": END,
#         "debug": "Debugger"
#     }
# )
# workflow.add_edge("Debugger", "Coder")

# # Initialize memory and compile
# memory = MemorySaver()
# agent = workflow.compile(checkpointer=memory)

## Running the Agent

In [None]:
# The task
initial_state = {
    # "task": "Write a python script that creates a list of numbers from 1 to 10, calculates the square root of each, and prints the result. Ensure you import the math module.",
    "task": "Solve using python: You are climbing a staircase. It takes`n`steps to reach the top.Each time you can either climb`1`or`2`steps. In how many distinct ways can you climb to the top?. Input: n = 3",
    "code": "",
    "error": "",
    "feedback": "",
    "iterations": 0
}


# Create a config for the checkpointer
config = {"configurable": {"thread_id": "loyums_first_agent"}}

# Stream the execution, passing the config
for output in agent.stream(initial_state, config=config):
    print(list(output.keys())) # Shows which node just ran

print("\n\n--- FINAL DELIVERED CODE ---")
# Now get_state will work perfectly!
final_state = agent.get_state(config).values
print(final_state.get('code', 'No code generated.'))


--- CODER NODE (Iteration 1) ---
Qwen generated new code.
['Coder']

--- TESTER NODE ---
Execution Successful!
['Tester']


--- FINAL DELIVERED CODE ---
def climb_stairs(n):
    if n == 1:
        return 1
    a, b = 1, 2
    for i in range(3, n + 1):
        temp = b
        b = a + b
        a = temp
    return b

n = 3
print(climb_stairs(n))
