Imports & Setup

In [2]:
import os
import uuid
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from IPython.display import display, Markdown

# Load Env
load_dotenv()
# Ensure you have set GROQ_API_KEY in your .env file or environment
# os.environ['GROQ_API_KEY'] = "your_key_here" 

llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0.1) # Recommended model for logic

Unified Data Models

In [4]:
from typing import List, Optional, Any, Annotated
from pydantic import BaseModel, Field, AliasChoices, BeforeValidator, model_validator

# --- Helpers ---
def force_string(v):
    return str(v) if v is not None else None
StringId = Annotated[Optional[str], BeforeValidator(force_string)]

# --- Core Entities ---
class Task(BaseModel):
    id: StringId = None
    task_name: str = Field(..., validation_alias=AliasChoices("title", "name", "task_name", "task"))
    task_description: str = Field(default="", validation_alias=AliasChoices("description", "task_description", "desc"))
    estimated_day: int = Field(..., validation_alias=AliasChoices("duration_days", "estimated_days", "estimated_day", "duration", "estimated_time", "time"))
    required_skill: str = Field(default="General", validation_alias=AliasChoices("required_skill", "skill", "skills"))

class TeamMember(BaseModel):
    name: str
    role: str
    skills: List[str]
    seniority: str 

class Team(BaseModel):
    team_members: List[TeamMember]

# --- Workflow Containers (With Smart Auto-Fixers) ---

class TaskList(BaseModel):
    task: List[Task] = Field(..., validation_alias=AliasChoices("tasks", "task"))
    
    @model_validator(mode='before')
    @classmethod
    def wrap_list(cls, data: Any) -> Any:
        if isinstance(data, list): return {"task": data}
        if isinstance(data, dict) and "tasks" in data: return {"task": data["tasks"]}
        return data

class Dependency(BaseModel):
    task_id: str = Field(..., validation_alias=AliasChoices("task_id", "id", "task"))
    dependent_on: List[str] = Field(..., validation_alias=AliasChoices("dependent_on", "dependencies", "deps", "depends_on"))

class DependencyList(BaseModel):
    dependencies: List[Dependency] = Field(..., validation_alias=AliasChoices("dependencies", "deps"))
    
    @model_validator(mode='before')
    @classmethod
    def wrap_list(cls, data: Any) -> Any:
        # Case 1: Raw List
        if isinstance(data, list): return {"dependencies": data}
        
        # Case 2: Dictionary Wrapper
        if isinstance(data, dict):
            # Check if it has the key "dependencies"
            deps = data.get("dependencies", data.get("deps"))
            
            # If the value inside is a Dict {"A": ["B"], "C": []}, convert to List format
            if isinstance(deps, dict):
                converted = [{"task_id": k, "dependent_on": v} for k, v in deps.items()]
                return {"dependencies": converted}
            
            # If the value inside is already a list, just return properly
            if isinstance(deps, list):
                return {"dependencies": deps}
                
        return data

class TaskSchedule(BaseModel):
    task: Task
    start_day: int
    end_day: int

class Schedule(BaseModel):
    schedule: List[TaskSchedule]

class TaskAllocation(BaseModel):
    task: Task
    team_member: TeamMember

class TaskAllocationList(BaseModel):
    task_allocations: List[TaskAllocation]
    
    @model_validator(mode='before')
    @classmethod
    def wrap_list(cls, data: Any) -> Any:
        if isinstance(data, list): return {"task_allocations": data}
        
        if isinstance(data, dict):
            # Check for common keys
            payload = data.get("task_allocations") or data.get("allocs") or data.get("allocations")
            
            # If payload is a Dict {"Task A": "Alice"}, convert to List format
            if isinstance(payload, dict):
                 converted = [{"task_id": k, "member_name": v} for k, v in payload.items()]
                 return {"task_allocations": converted}
            
            if isinstance(payload, list):
                return {"task_allocations": payload}

        return data

class Risk(BaseModel):
    task_name: str
    score: int
    reason: str

class RiskList(BaseModel):
    risks: List[Risk]

Agent State Definition

In [5]:
from typing import TypedDict

class AgentState(TypedDict):
    project_description: str
    team: Team
    tasks: TaskList
    dependencies: List[dict]
    schedule: Schedule
    task_allocations: TaskAllocationList
    risks: RiskList
    iteration_number: int
    max_iteration: int
    insights: List[str]
    project_risk_score_iterations: List[int]

Node Logic

In [6]:
# --- Node Functions ---

def scope_decomposition_node(state: AgentState):
    """Decomposes project into granular tasks."""
    print("--- Node: Scoper ---")
    prompt = f"""
    Project: {state['project_description']}
    Team Skills Available: {[m.skills for m in state['team'].team_members]}
    
    Break this project into granular tasks (max 4 days per task).
    
    CRITICAL INSTRUCTIONS:
    1. Return a JSON object matching the TaskList schema.
    2. REQUIRED FIELDS: 'task_name', 'task_description', 'estimated_day', 'required_skill'.
    3. Do NOT assign names (like Alice or Bob). Only identify the 'required_skill' (e.g., Python, React, Testing).
    
    Example Output:
    {{
      "tasks": [
        {{
            "task_name": "Setup Repo", 
            "task_description": "Initialize Git repo and CI/CD pipelines", 
            "estimated_day": 1, 
            "required_skill": "DevOps"
        }}
      ]
    }}
    """
    struct_llm = llm.with_structured_output(TaskList, method="json_mode")
    response = struct_llm.invoke(prompt)
    
    # Generate IDs if missing
    for t in response.task:
        if not t.id: t.id = str(uuid.uuid4())[:4]
            
    return {"tasks": response}

def dependency_mapping_node(state: AgentState):
    """Identifies dependencies between tasks."""
    print("--- Node: Mapper ---")
    tasks_fmt = "\n".join([f"ID: {t.id} | Name: {t.task_name}" for t in state['tasks'].task])
    
    prompt = f"""
    Map dependencies for these tasks:
    {tasks_fmt}
    
    Return a JSON object matching DependencyList. 
    Use IDs to reference tasks. 
    
    Example Output:
    {{
      "dependencies": [
         {{"task_id": "1a2b", "dependent_on": []}},
         {{"task_id": "3c4d", "dependent_on": ["1a2b"]}}
      ]
    }}
    """
    struct_llm = llm.with_structured_output(DependencyList, method="json_mode")
    response = struct_llm.invoke(prompt)
    return {"dependencies": response.dependencies}

def smart_scheduler_node(state: AgentState):
    """Schedules tasks (Day 0, Day 1...)"""
    print("--- Node: Scheduler ---")
    
    class SimpleSchedItem(BaseModel):
        task_id: str = Field(..., validation_alias=AliasChoices("task_id", "id", "task_name", "task"))
        start: int = Field(..., validation_alias=AliasChoices("start", "start_day"))
        end: int = Field(..., validation_alias=AliasChoices("end", "end_day"))
    
    class SimpleSched(BaseModel):
        items: List[SimpleSchedItem]
        
        @model_validator(mode='before')
        @classmethod
        def wrap(cls, data):
            if isinstance(data, list): return {"items": data}
            if isinstance(data, dict):
                if "tasks" in data: return {"items": data["tasks"]}
                if "schedule" in data: return {"items": data["schedule"]}
                if "timeline" in data: return {"items": data["timeline"]}
            return data

    prompt = f"""
    Schedule these tasks (Tasks: {state['tasks']}) 
    considering Dependencies: {state.get('dependencies')}
    Previous Insights: {state.get('insights', [])}
    
    Return JSON with start/end days (integers).
    """
    struct_llm = llm.with_structured_output(SimpleSched, method="json_mode")
    resp = struct_llm.invoke(prompt)
    
    task_map = {t.id: t for t in state['tasks'].task}
    name_map = {t.task_name: t for t in state['tasks'].task}
    
    final_sched = []
    for item in resp.items:
        task = task_map.get(str(item.task_id)) or name_map.get(str(item.task_id))
        if task:
            final_sched.append(TaskSchedule(task=task, start_day=item.start, end_day=item.end))
            
    return {"schedule": Schedule(schedule=final_sched)}

def resource_allocation_node(state: AgentState):
    """Allocates tasks to team members."""
    print("--- Node: Allocator ---")
    
    class SimpleAllocItem(BaseModel):
        task_id: str = Field(..., validation_alias=AliasChoices("task_id", "id", "task_name", "task"))
        member_name: str = Field(..., validation_alias=AliasChoices("member_name", "member", "assignee", "person", "name"))
        
    class SimpleAlloc(BaseModel):
        allocs: List[SimpleAllocItem]
        
        @model_validator(mode='before')
        @classmethod
        def wrap(cls, data):
            if isinstance(data, list): return {"allocs": data}
            if isinstance(data, dict):
                if "allocations" in data: return {"allocs": data["allocations"]}
                if "task_allocations" in data: return {"allocs": data["task_allocations"]}
                if "assignments" in data: return {"allocs": data["assignments"]}
                if "team" in data: return {"allocs": data["team"]}
            return data

    prompt = f"""
    Allocate tasks: {state['tasks']}
    To Team: {state['team']}
    
    IMPORTANT: Return JSON.
    
    Output Format Example:
    {{
      "allocs": [
        {{"task_id": "1a2b", "member_name": "Alice"}},
        {{"task_id": "3c4d", "member_name": "Bob"}}
      ]
    }}
    """
    struct_llm = llm.with_structured_output(SimpleAlloc, method="json_mode")
    resp = struct_llm.invoke(prompt)
    
    task_map = {t.id: t for t in state['tasks'].task}
    name_map = {t.task_name: t for t in state['tasks'].task}
    member_map = {m.name: m for m in state['team'].team_members}
    
    final_allocs = []
    for a in resp.allocs:
        task = task_map.get(str(a.task_id)) or name_map.get(str(a.task_id))
        member = member_map.get(a.member_name)
        if task and member:
            final_allocs.append(TaskAllocation(task=task, team_member=member))
            
    return {"task_allocations": TaskAllocationList(task_allocations=final_allocs)}

def risk_audit_node(state: AgentState):
    """Calculates risk score."""
    print("--- Node: Auditor ---")
    
    # *** REPAIR FUNCTION: Fixes incomplete risk items before validation ***
    def repair_risk_item(item: Any) -> Any:
        if isinstance(item, dict):
            # If missing reason, fill it with a placeholder
            if "reason" not in item:
                # Try to find synonyms
                item["reason"] = item.get("description", item.get("justification", item.get("explanation", "No reason provided")))
            
            # If missing task_name, try to grab task_id
            if "task_name" not in item and "task_id" in item:
                item["task_name"] = str(item["task_id"])
            
            # Ensure score exists
            if "score" not in item:
                item["score"] = 5 # Default medium risk
        return item

    class SimpleRisk(BaseModel):
        task_name: str = Field(..., validation_alias=AliasChoices("task_name", "task", "name", "Task Name", "Task"))
        score: int = Field(..., validation_alias=AliasChoices("score", "risk_score", "Risk Score"))
        reason: str = Field(..., validation_alias=AliasChoices("reason", "description", "justification", "Reason", "explanation"))
        
        @model_validator(mode='before')
        @classmethod
        def fix_data(cls, data):
            return repair_risk_item(data)

    class SimpleRiskList(BaseModel):
        risks: List[SimpleRisk]
        
        @model_validator(mode='before')
        @classmethod
        def wrap(cls, data):
            # 1. Unwrap common nested structures
            if isinstance(data, dict):
                # Search for the list key
                for key in ["risks", "issues", "threats", "audit", "High-Risk Tasks", "tasks", "Risk Breakdown", "recommendations"]:
                    if key in data and isinstance(data[key], list):
                        return {"risks": data[key]}
                
                # Search values
                for val in data.values():
                    if isinstance(val, list) and len(val) > 0 and isinstance(val[0], dict):
                        return {"risks": val}
            
            if isinstance(data, list): return {"risks": data}
            return data

    prompt = f"""
    Audit this Plan for Risks (0-10 score):
    Schedule: {state.get('schedule')}
    Allocations: {state.get('task_allocations')}
    
    IMPORTANT: Return a JSON object with a single key "risks" containing a list of objects.
    Each object MUST have: "task_name", "score", and "reason".
    """
    struct_llm = llm.with_structured_output(SimpleRiskList, method="json_mode")
    resp = struct_llm.invoke(prompt)
    
    final_risks = [Risk(task_name=r.task_name, score=r.score, reason=r.reason) for r in resp.risks]
    risks_obj = RiskList(risks=final_risks)
    
    score = sum(r.score for r in final_risks)
    return {
        "risks": risks_obj,
        "project_risk_score_iterations": state.get('project_risk_score_iterations', []) + [score],
        "iteration_number": state.get('iteration_number', 0) + 1
    }

def optimization_insight_node(state: AgentState):
    """Generates advice if risk is high."""
    print("--- Node: Optimizer ---")
    prompt = f"Risks: {state['risks']}. Suggest 1 concrete schedule/allocation change to lower risk."
    insight = llm.invoke(prompt).content
    return {"insights": state.get('insights', []) + [insight]}

Graph Structure

In [7]:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

def routing_logic(state: AgentState):
    # Stop if we hit max iterations or risk is low (e.g. < 15)
    last_score = state['project_risk_score_iterations'][-1]
    if state["iteration_number"] >= state["max_iteration"] or last_score < 15:
        return END
    return "optimizer"

workflow = StateGraph(AgentState)

# Add Nodes
workflow.add_node("scoper", scope_decomposition_node)
workflow.add_node("mapper", dependency_mapping_node)
workflow.add_node("scheduler", smart_scheduler_node)
workflow.add_node("allocator", resource_allocation_node)
workflow.add_node("auditor", risk_audit_node)
workflow.add_node("optimizer", optimization_insight_node)

# Add Edges
workflow.set_entry_point("scoper")
workflow.add_edge("scoper", "mapper")
workflow.add_edge("mapper", "scheduler")
workflow.add_edge("scheduler", "allocator")
workflow.add_edge("allocator", "auditor")
workflow.add_conditional_edges("auditor", routing_logic)
workflow.add_edge("optimizer", "scheduler") # Loop back

graph = workflow.compile(checkpointer=MemorySaver())

Execution

In [8]:
# 1. Define Team
my_team = Team(team_members=[
    TeamMember(name="Alice", role="Lead Dev", skills=["Python", "LangGraph", "Architecture"], seniority="Senior"),
    TeamMember(name="Bob", role="Frontend", skills=["React", "UI/UX"], seniority="Mid"),
    TeamMember(name="Charlie", role="QA", skills=["Testing", "Python"], seniority="Junior")
])

# 2. Define Initial State
init_state = {
    "project_description": "Create a secure, multi-user AI dashboard using LangGraph and React.",
    "team": my_team,
    "iteration_number": 0,
    "max_iteration": 2,
    "insights": [],
    "project_risk_score_iterations": []
}

# 3. Run
print("Starting Workflow...")
final_state = graph.invoke(init_state, {"configurable": {"thread_id": "v3_production"}})
print("Workflow Finished!")

Starting Workflow...
--- Node: Scoper ---
--- Node: Mapper ---
--- Node: Scheduler ---
--- Node: Allocator ---
--- Node: Auditor ---
--- Node: Optimizer ---
--- Node: Scheduler ---
--- Node: Allocator ---
--- Node: Auditor ---
Workflow Finished!


Visualization & Reporting

In [9]:
# --- 1. Report Metadata ---
display(Markdown(f"# Project Report: {final_state['project_description']}"))
display(Markdown(f"**Risk Score History:** {final_state['project_risk_score_iterations']}"))

# --- 2. Org Chart ---
print("\nTeam Structure:")
for m in final_state['team'].team_members:
    print(f"└── {m.name} ({m.role})")

# --- 3. Gantt Chart Data Prep ---
sched_data = []
start_date_base = datetime.now()

# Create a lookup for Allocations to avoid O(N^2) loop
alloc_map = {a.task.task_name: a.team_member.name for a in final_state['task_allocations'].task_allocations}

for item in final_state['schedule'].schedule:
    t_name = item.task.task_name
    assignee = alloc_map.get(t_name, "Unassigned")
    
    # Date Math
    s_date = start_date_base + timedelta(days=item.start_day)
    e_date = start_date_base + timedelta(days=item.end_day)
    
    sched_data.append({
        "Task": t_name,
        "Start": s_date.strftime("%Y-%m-%d"),
        "Finish": e_date.strftime("%Y-%m-%d"),
        "Assignee": assignee,
        "Duration": item.end_day - item.start_day
    })

df = pd.DataFrame(sched_data).sort_values("Start")

# --- 4. Plot ---
if not df.empty:
    fig = px.timeline(
        df, 
        x_start="Start", 
        x_end="Finish", 
        y="Task", 
        color="Assignee",
        title="Project Schedule"
    )
    fig.update_yaxes(autorange="reversed")
    fig.show()
else:
    print("No schedule data to plot.")

# Project Report: Create a secure, multi-user AI dashboard using LangGraph and React.

**Risk Score History:** [37, 42]


Team Structure:
└── Alice (Lead Dev)
└── Bob (Frontend)
└── Charlie (QA)
