## Dynamic Flow

### Phase-I

In [93]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional
from langchain_ollama import ChatOllama

In [94]:
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
import operator

class Task(TypedDict):
    id: str
    description: str
    assigned_agent: str  # 'backend' | 'frontend'
    status: str

class AgentState(TypedDict):
    project_root: str
    requirements: str
    architecture: Optional[str]
    
    # Queue management
    task_queue: List[Task]
    current_task: Optional[Task]
    completed_tasks: List[Task]
    
    # Testing & Debugging
    test_logs: Optional[str]
    test_status: str # 'pending', 'passed', 'failed'
    iteration_count: int

### Creating Tools

In [95]:
import os
import subprocess
from langchain_core.tools import tool

@tool
def write_file(file_path: str, content: str):
    """
    Writes content to a file inside ./builds/.
    Auto-creates missing directories.
    """
    import os

    # Normalize paths
    root = os.path.abspath("builds")
    abs_path = os.path.abspath(file_path)

    # Enforce sandbox
    if not abs_path.startswith(root):
        return "Error: write_file is restricted to the ./builds/ directory."

    try:
        # Ensure parent directory exists
        os.makedirs(os.path.dirname(abs_path), exist_ok=True)

        # Write content
        with open(abs_path, "w", encoding="utf-8") as f:
            f.write(content)

        return f"Successfully wrote {len(content)} bytes to {abs_path}"

    except Exception as e:
        return f"Error writing file: {e}"


@tool
def read_file(file_path: str):
    """Reads a file. Args: file_path"""
    try:
        with open(file_path, "r") as f:
            return f.read()
    except Exception as e:
        return f"Error reading file: {e}"

@tool
def list_files(root_path: str):
    """
    Returns a simple directory tree structure as a string.
    Useful for agents to quickly inspect files and folders.

    Args:
        root_path: The directory to scan (must be inside ./builds/)
    """

    import os

    if "builds/" not in root_path:
        return "Error: Only allowed to inspect files inside 'builds/' directory."

    if not os.path.exists(root_path):
        return f"Error: Path does not exist -> {root_path}"

    tree_output = []

    # walk the directory
    for base, dirs, files in os.walk(root_path):
        level = base.replace(root_path, "").count(os.sep)
        indent = "  " * level
        tree_output.append(f"{indent}{os.path.basename(base)}/")

        sub_indent = "  " * (level + 1)
        for f in files:
            tree_output.append(f"{sub_indent}{f}")

    return "\n".join(tree_output)


@tool
def run_shell_command(command: str, work_dir: str):
    """
    Executes a terminal command.
    Args:
        command: e.g., 'python main.py' or 'npm install'
        work_dir: the directory to run in (e.g., ./builds/app-1)
    """
    try:
        result = subprocess.run(
            command,
            cwd=work_dir,
            shell=True,
            capture_output=True,
            text=True,
            timeout=20
        )
        if result.returncode == 0:
            return f"SUCCESS:\n{result.stdout}"
        else:
            return f"FAILED:\nSTDOUT: {result.stdout}\nSTDERR: {result.stderr}"
    except Exception as e:
        return f"System Error: {e}"

llm = ChatOllama(
    model="llama3.1:8b", 
    temperature=0.1
)

llm_worker = llm.bind_tools([write_file, read_file, run_shell_command, list_files])

### Creating Agent Phase-II

In [96]:
import json
from prompts import *
from state import AgentState
from langgraph.graph import StateGraph, END
from tools import llm, llm_worker, write_file, read_file, run_shell_command
import json

# --- 1. ARCHITECT ---
def architect_node(state: AgentState):
    print(f"DEBUG STATE KEYS: {list(state.keys())}") 
    
    print(f"\nüèóÔ∏è  [Architect] Designing {state['project_root']}...")
    
    msg = architect_prompt.format(
        project_root=state["project_root"], 
        user_query=state["requirements"],
    )
    response = llm.invoke(msg)
    print(response.content)
    return {"architecture": response.content}

# --- 2. PLANNER ---
def planner_node(state: AgentState):
    print("\nüìÖ [Planner] Creating task list...")

    if state.get("task_queue") or state.get("completed_tasks"):
        print("   -> Skipping planning (tasks already exist or completed).")
        return {}

    msg = planner_prompt.format(
        project_root=state["project_root"],
        architecture=state["architecture"]
    )
    response = llm.invoke(msg)
    
    try:
        content = response.content.replace("```json", "").replace("```", "").strip()
        tasks = json.loads(content)
        print(tasks)
        # Ensure it's a list
        if isinstance(tasks, dict) and "tasks" in tasks: tasks = tasks["tasks"]
        return {"task_queue": tasks}
    except Exception as e:
        print(f"Error parsing plan: {e}")
        return {"task_queue": []}

# --- 3. ORCHESTRATOR (The Decision Maker) ---
def orchestrator_node(state: AgentState):
    """
    The Orchestrator node itself acts as a router. 
    It doesn't modify the state, but ensures we pause here 
    before the Conditional Edge makes a decision.
    """
    queue = state.get("task_queue", [])
    status = state.get("test_status", "pending")

    if queue:
        print(f"\nüëÆ [Orchestrator] {len(queue)} tasks pending. Next: {queue[0]['assigned_agent']}...")
    elif status == "pending":
        print("\nüëÆ [Orchestrator] All tasks done. Moving to Testing...")
    elif status == "failed":
        print("\nüëÆ [Orchestrator] Tests failed. Activating Debugger...")
    
    return {}

# Helper function to execute tool calls
def execute_tools(ai_msg):
    """Manually executes tool calls generated by the LLM."""
    if hasattr(ai_msg, "tool_calls") and ai_msg.tool_calls:
        for tool_call in ai_msg.tool_calls:
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]
            
            # Map tool names to actual functions
            tools_map = {
                "write_file": write_file,
                "read_file": read_file,
                "run_shell_command": run_shell_command
            }
            
            selected_tool = tools_map.get(tool_name)
            if selected_tool:
                print(f"   üõ†Ô∏è  Executing Tool: {tool_name} with args: {tool_args}")
                selected_tool.invoke(tool_args)

# --- 4. BACKEND AGENT ---
def backend_agent_node(state: AgentState):
    task = state["task_queue"][0]
    print(f"\n‚öôÔ∏è  [Backend] Working on: {task['description']}")
    
    msg = backend_prompt_template.format(
        task_description=task["description"],
        project_root=state["project_root"]
    )
    result = llm_worker.invoke(msg)

    execute_tools(result)
    
    return {
        "completed_tasks": [task],
        "task_queue": state["task_queue"][1:]
    }

# --- 5. FRONTEND AGENT ---
def frontend_agent_node(state: AgentState):
    task = state["task_queue"][0]
    print(f"\nüé® [Frontend] Working on: {task['description']}")
    
    msg = frontend_prompt_template.format(
        task_description=task["description"],
        project_root=state["project_root"]
    )
    result = llm_worker.invoke(msg)

    execute_tools(result)
    
    return {
        "completed_tasks": [task],
        "task_queue": state["task_queue"][1:]
    }

# --- 6. TESTER ---
def tester_node(state: AgentState):
    print("\nüß™ [Tester] Verifying application...")
    msg = tester_prompt_template.format(project_root=state["project_root"])
    

    response = llm_worker.invoke(msg)

    execute_tools(response)

    logs = response.content
    
    status = "passed"
    if any(err in logs for err in ["Error", "Traceback", "Failed"]):
        status = "failed"
        print("   -> ‚ùå Tests Failed")
    else:
        print("   -> ‚úÖ Tests Passed")

    return {"test_logs": logs, "test_status": status}

def debugger_node(state: AgentState):
    print("\nüêû [Debugger] Analyzing errors and creating fix...")
    msg = debugger_prompt.format(test_logs=state["test_logs"])
    response = llm.invoke(msg)
    
    try:
        content = response.content.replace("```json", "").replace("```", "").strip()
        fix_task = json.loads(content)
        print(f"   -> Created Fix Task: {fix_task['description']}")

        return {
            "task_queue": [fix_task] + state["task_queue"],
            "iteration_count": state["iteration_count"] + 1,
            "test_status": "pending" 
        }
    except:
        return {"iteration_count": state["iteration_count"] + 1}

def select_next_step(state: AgentState):
    """
    This function determines which node to visit next 
    based on the current state (queue, test status, etc.).
    """
    queue = state.get("task_queue", [])

    if queue:
        next_task = queue[0]
        agent_type = next_task.get("assigned_agent", "backend")
        
        if agent_type == "frontend":
            return "frontend"
        elif agent_type == "backend":
            return "backend"
        else:
            return "backend"

    test_status = state.get("test_status", "pending")
    if test_status == "pending":
        return "tester"

    if test_status == "failed":
        iteration = state.get("iteration_count", 0)
        if iteration >= 5:
            print("\n‚ùå [Orchestrator] Max retries (5) reached. Stopping workflow.")
            return END 
        
        return "debugger"

    if test_status == "passed":
        print("\n‚úÖ [Orchestrator] Workflow completed successfully!")
        return END

    return END

workflow = StateGraph(AgentState)

# Add Nodes
workflow.add_node("architect", architect_node)
workflow.add_node("planner", planner_node)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("backend", backend_agent_node)
workflow.add_node("frontend", frontend_agent_node)
workflow.add_node("tester", tester_node)
workflow.add_node("debugger", debugger_node)

workflow.set_entry_point("architect")

workflow.add_edge("architect", "planner")
workflow.add_edge("planner", "orchestrator")

workflow.add_conditional_edges(
    "orchestrator",   
    select_next_step,    
    {
        "backend": "backend",
        "frontend": "frontend",
        "tester": "tester",
        "debugger": "debugger",
        END: END
    }
)

workflow.add_edge("backend", "orchestrator")
workflow.add_edge("frontend", "orchestrator")
workflow.add_edge("debugger", "orchestrator") 
workflow.add_edge("tester", "orchestrator") 
# Compile
app = workflow.compile()

## Testing 

In [97]:
import os

os.makedirs("./builds", exist_ok=True)

inputs = {
    "requirements": "Create basic HTML  and CSS based Calculator web app. ",
    "project_root": "./builds/app-calculator",
    "architecture": None,
    "task_queue": [],
    "completed_tasks": [],
    "current_task": None,
    "test_logs": None,
    "test_status": "pending",
    "iteration_count": 0
}


try:
    final_state = app.invoke(
        inputs, 
        config={"recursion_limit": 50}
    )

    # 4. Report Final Results
    print("\n" + "="*50)
    print("üèÅ WORKFLOW FINISHED")
    print("="*50)
    print(f"Final Test Status: {final_state['test_status'].upper()}")
    print(f"Total Iterations:  {final_state['iteration_count']}")
    print(f"Tasks Completed:   {len(final_state['completed_tasks'])}")

except Exception as e:
    print(f"\n‚ùå Execution Error: {e}")

DEBUG STATE KEYS: ['project_root', 'requirements', 'architecture', 'task_queue', 'current_task', 'completed_tasks', 'test_logs', 'test_status', 'iteration_count']

üèóÔ∏è  [Architect] Designing ./builds/app-calculator...
{ 
  "structure": [
    "./builds/app-calculator/public/index.html",
    "./builds/app-calculator/public/styles.css",
    "./builds/app-calculator/src/js/calculator.js"
  ],
  "backend": "None",
  "frontend": ["HTML", "CSS"],
  "database": "None",
  "design_notes": "The calculator app will use client-side JavaScript to perform calculations. The HTML file will contain the UI elements, and the CSS file will handle styling. The JavaScript file will contain the logic for handling user input and performing calculations."
}

üìÖ [Planner] Creating task list...
{'tasks': [{'id': 'task_1', 'description': 'Set up project structure and create main.py for backend logic', 'assigned_agent': 'backend', 'status': 'pending'}, {'id': 'task_2', 'description': 'Create index.html with U