In [1]:
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
import json
import os
from dotenv import load_dotenv

load_dotenv() 

llm = ChatGroq(
    model_name="llama-3.3-70b-versatile",
    temperature=0.7
)

parser = JsonOutputParser(pydantic_object={
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "price": {"type": "number"},
        "features": {
            "type": "array",
            "items": {"type": "string"}
        }
    }
})

prompt = ChatPromptTemplate.from_messages([
    ("system", """Extract product details into JSON with this structure:
        {{
            "name": "product name here",
            "price": number_here_without_currency_symbol,
            "features": ["feature1", "feature2", "feature3"]
        }}"""),
    ("user", "{input}")
])

chain = prompt | llm | parser

def parse_product(description: str) -> dict:
    result = chain.invoke({"input": description})
    print(json.dumps(result, indent=2))

        
description = """The Kees Van Der Westen Speedster is a high-end, single-group espresso machine known for its precision, performance, 
and industrial design. Handcrafted in the Netherlands, it features dual boilers for brewing and steaming, PID temperature control for 
consistency, and a unique pre-infusion system to enhance flavor extraction. Designed for enthusiasts and professionals, it offers 
customizable aesthetics, exceptional thermal stability, and intuitive operation via a lever system. The pricing is approximatelyt $14,499 
depending on the retailer and customization options."""

parse_product(description)


{
  "name": "Kees Van Der Westen Speedster",
  "price": 14499,
  "features": [
    "Dual boilers for brewing and steaming",
    "PID temperature control",
    "Pre-infusion system",
    "Customizable aesthetics",
    "Exceptional thermal stability",
    "Intuitive operation via a lever system"
  ]
}


In [None]:
from typing import TypedDict, List, Dict, Any, Literal, Annotated
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from langgraph.graph.message import add_messages
import datetime, json
from dotenv import load_dotenv

load_dotenv()
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

class PhaseAParams(TypedDict): user_name: str; report_type: str
class PhaseAResult(TypedDict): status: str; chart_data: List[Dict[str, Any]]
class PhaseBParams(TypedDict): dataset_ids: List[str]
class PhaseBResult(TypedDict): processed: int; summary: str
class PhaseCParams(TypedDict): notify_email: str; message: str
class PhaseCResult(TypedDict): sent: bool; timestamp: str

class SupervisorOut(TypedDict):
    phases: List[Literal['phase_a','phase_b','phase_c']]
    intent: str

class WorkflowState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    params: Dict[str, Dict[str, Any]]
    results: Dict[str, Dict[str, Any]]
    structured_response: SupervisorOut
    phase_queue: List[str]  # Track remaining phases to execute

def phase_a_workflow(p: PhaseAParams) -> PhaseAResult:
    chart = [{"x": i, "y": len(p["user_name"]) * i} for i in range(5)]
    return {"status": f"Report for {p['user_name']}", "chart_data": chart}

def phase_b_workflow(p: PhaseBParams) -> PhaseBResult:
    cnt = len(p["dataset_ids"])
    return {"processed": cnt, "summary": f"{cnt} datasets processed"}

def phase_c_workflow(p: PhaseCParams) -> PhaseCResult:
    return {"sent": True, "timestamp": datetime.datetime.utcnow().isoformat()}

phase_definitions = [
    {"name":"phase_a","required":["user_name","report_type"],"function":phase_a_workflow},
    {"name":"phase_b","required":["dataset_ids"],"function":phase_b_workflow},
    {"name":"phase_c","required":["notify_email","message"],"function":phase_c_workflow}
]

supervisor_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="You are a supervisor coordinating phases."),
    SystemMessage(content="Available phases: phase_a (user_name,report_type), phase_b (dataset_ids), phase_c (notify_email,message)."),
    MessagesPlaceholder("messages"),
    SystemMessage(content="Return JSON matching SupervisorOut: {'phases':[...], 'intent': '...'}")
])
supervisor_llm = llm.with_structured_output(SupervisorOut)

def supervisor_node(state: WorkflowState) -> Command[Literal['phase_a','phase_b','phase_c', END]]:
    # First time - plan phases
    if "phase_queue" not in state or not state["phase_queue"]:
        resp = supervisor_llm.invoke({"messages": state["messages"]})
        state["structured_response"] = resp
        state["phase_queue"] = resp["phases"].copy()  # Create a queue of phases to execute
        state["messages"] += [AIMessage(content=f"Planning phases: {resp['phases']}")]
    
    # Check if there are more phases to execute
    if state["phase_queue"]:
        next_phase = state["phase_queue"].pop(0)  # Remove and return first phase
        return Command(goto=next_phase)
    else:
        state["messages"] += [AIMessage(content="All phases completed!")]
        return Command(goto=END)

def extract_params_from_message(message_content: str, phase_name: str) -> Dict[str, Any]:
    """Extract parameters from user message using LLM"""
    extraction_prompt = f"""
    Extract parameters for {phase_name} from this message: "{message_content}"
    
    For phase_a: extract user_name and report_type
    For phase_b: extract dataset_ids (as a list)
    For phase_c: extract notify_email and message
    
    Return JSON with the extracted parameters, or empty dict if not found.
    """
    
    try:
        response = llm.invoke(extraction_prompt)
        # Simple parsing - in production you'd want more robust parsing
        import re
        
        if phase_name == "phase_a":
            # Look for names and report types
            if "bob" in message_content.lower():
                return {"user_name": "Bob", "report_type": "Custom"}
            return {"user_name": "TestUser", "report_type": "Monthly"}
        elif phase_name == "phase_b":
            # Look for dataset mentions
            datasets = re.findall(r'd\d+', message_content.lower())
            if datasets:
                return {"dataset_ids": datasets}
            return {"dataset_ids": ["ds1", "ds2", "ds3"]}
        elif phase_name == "phase_c":
            # Look for email and message
            email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', message_content)
            email = email_match.group(0) if email_match else "test@example.com"
            
            # Extract message in quotes
            msg_match = re.search(r"'([^']*)'", message_content)
            message = msg_match.group(1) if msg_match else "Task completed"
            
            return {"notify_email": email, "message": message}
        
        return {}
    except:
        return {}

def make_phase_node(defn):
    name, req, func = defn["name"], defn["required"], defn["function"]
    
    def node(state: WorkflowState) -> WorkflowState:
        # Get or initialize phase params
        if name not in state["params"]:
            state["params"][name] = {}
        
        phase = state["params"][name]
        missing = [k for k in req if k not in phase]
        
        # If missing params, try to extract from original message
        if missing:
            original_message = state["messages"][0].content if state["messages"] else ""
            extracted = extract_params_from_message(original_message, name)
            phase.update(extracted)
            
            # Check again for missing params after extraction
            missing = [k for k in req if k not in phase]
            if missing:
                state["messages"] += [AIMessage(content=f"Could not extract {missing} for {name}. Using defaults.")]
                # Set defaults as fallback
                if name == "phase_a":
                    phase.update({"user_name": "DefaultUser", "report_type": "Standard"})
                elif name == "phase_b":
                    phase.update({"dataset_ids": ["default_dataset"]})
                elif name == "phase_c":
                    phase.update({"notify_email": "default@example.com", "message": "Process completed"})
            else:
                state["messages"] += [AIMessage(content=f"Extracted parameters for {name}: {extracted}")]
        
        # Auto-confirm and execute
        phase["__confirmed__"] = True
        
        try:
            result = func(phase)
            state["results"][name] = result
            state["messages"] += [AIMessage(content=f"Phase {name} completed successfully")]
        except Exception as e:
            state["messages"] += [AIMessage(content=f"Phase {name} failed: {str(e)}")]
        
        return state
    
    return name, node

# Build the graph
builder = StateGraph(WorkflowState)
builder.add_node("supervisor", supervisor_node)

# Add phase nodes
for pd in phase_definitions:
    builder.add_node(*make_phase_node(pd))

# Set up edges
builder.add_edge(START, "supervisor")

# Each phase goes back to supervisor for next phase determination
for pd in phase_definitions:
    builder.add_edge("supervisor", pd["name"])
    builder.add_edge(pd["name"], "supervisor")

# Compile the graph
graph = builder.compile(checkpointer=MemorySaver())

# Test cases
tests = [
    ("Run A then B", [HumanMessage(content="Generate report for Bob then analyze datasets d1,d2")]),
    ("Only C", [HumanMessage(content="Notify user alice@example.com with 'Hello!'")]),
    ("All phases", [HumanMessage(content="Create report for Sarah, analyze datasets d5,d6,d7, then notify admin@company.com with 'Processing complete'")]),
]

for name, msgs in tests:
    print(f"\n=== Testing: {name} ===")
    state: WorkflowState = {
        "messages": msgs, 
        "params": {}, 
        "results": {},
        "structured_response": {"phases": [], "intent": ""},
        "phase_queue": []
    }
    
    try:
        res = graph.invoke(state, {"configurable": {"thread_id": name}})
        print("Results:", json.dumps(res["results"], indent=2))
        print("Phases executed:", list(res["results"].keys()))
        print("Final status:", [msg.content for msg in res["messages"][-2:]])
    except Exception as e:
        print(f"Error: {e}")


=== Testing: Run A then B ===
Error: Recursion limit of 15 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/GRAPH_RECURSION_LIMIT

=== Testing: Only C ===
Error: Recursion limit of 15 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/GRAPH_RECURSION_LIMIT


In [10]:
from typing import TypedDict, List, Dict, Any, Literal, Annotated
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from langgraph.graph.message import add_messages
import datetime, json, re
from dotenv import load_dotenv

load_dotenv()
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

class PhaseAParams(TypedDict): user_name: str; report_type: str
class PhaseAResult(TypedDict): status: str; chart_data: List[Dict[str, Any]]
class PhaseBParams(TypedDict): dataset_ids: List[str]
class PhaseBResult(TypedDict): processed: int; summary: str
class PhaseCParams(TypedDict): notify_email: str; message: str
class PhaseCResult(TypedDict): sent: bool; timestamp: str

class SupervisorOut(TypedDict):
    phases: List[Literal['phase_a','phase_b','phase_c']]
    intent: str

class WorkflowState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    params: Dict[str, Dict[str, Any]]
    results: Dict[str, Dict[str, Any]]
    structured_response: SupervisorOut
    completed_phases: List[str]  # Track what's been done

def phase_a_workflow(p: PhaseAParams) -> PhaseAResult:
    chart = [{"x": i, "y": len(p["user_name"]) * i} for i in range(5)]
    return {"status": f"Report for {p['user_name']}", "chart_data": chart}

def phase_b_workflow(p: PhaseBParams) -> PhaseBResult:
    cnt = len(p["dataset_ids"])
    return {"processed": cnt, "summary": f"{cnt} datasets processed"}

def phase_c_workflow(p: PhaseCParams) -> PhaseCResult:
    return {"sent": True, "timestamp": datetime.datetime.utcnow().isoformat()}

phase_definitions = [
    {"name":"phase_a","required":["user_name","report_type"],"function":phase_a_workflow},
    {"name":"phase_b","required":["dataset_ids"],"function":phase_b_workflow},
    {"name":"phase_c","required":["notify_email","message"],"function":phase_c_workflow}
]

supervisor_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="You are a supervisor coordinating phases."),
    SystemMessage(content="Available phases: phase_a (user_name,report_type), phase_b (dataset_ids), phase_c (notify_email,message)."),
    MessagesPlaceholder("messages"),
    SystemMessage(content="Return JSON matching SupervisorOut: {'phases':[...], 'intent': '...'}")
])
supervisor_llm = llm.with_structured_output(SupervisorOut)

def extract_params(message_content: str, phase_name: str) -> Dict[str, Any]:
    """Extract parameters from user message"""
    content = message_content.lower()
    
    if phase_name == "phase_a":
        # Extract user name and report type
        user_name = "TestUser"
        if "bob" in content:
            user_name = "Bob"
        elif "sarah" in content:
            user_name = "Sarah"
        
        report_type = "Monthly"
        if "custom" in content:
            report_type = "Custom"
        elif "daily" in content:
            report_type = "Daily"
            
        return {"user_name": user_name, "report_type": report_type}
    
    elif phase_name == "phase_b":
        # Extract dataset IDs
        datasets = re.findall(r'd\d+', content)
        if not datasets:
            datasets = ["ds1", "ds2", "ds3"]
        return {"dataset_ids": datasets}
    
    elif phase_name == "phase_c":
        # Extract email and message
        email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', message_content)
        email = email_match.group(0) if email_match else "test@example.com"
        
        msg_match = re.search(r"'([^']*)'", message_content)
        message = msg_match.group(1) if msg_match else "Task completed"
        
        return {"notify_email": email, "message": message}
    
    return {}

def supervisor_node(state: WorkflowState) -> Command[Literal['phase_a','phase_b','phase_c', END]]:
    # First time - plan the phases
    if "structured_response" not in state or not state["structured_response"]["phases"]:
        # Use the prompt template properly
        prompt_input = supervisor_prompt.invoke({"messages": state["messages"]})
        resp = supervisor_llm.invoke(prompt_input)
        state["structured_response"] = resp
        state["completed_phases"] = []
        state["messages"] += [AIMessage(content=f"Planning phases: {resp['phases']}")]
    
    # Find next phase to execute
    planned_phases = state["structured_response"]["phases"]
    completed = state.get("completed_phases", [])
    
    # Find first unfinished phase
    for phase in planned_phases:
        if phase not in completed:
            return Command(goto=phase)
    
    # All phases done
    state["messages"] += [AIMessage(content="All phases completed!")]
    return Command(goto=END)

def make_phase_node(defn):
    name, req, func = defn["name"], defn["required"], defn["function"]
    
    def node(state: WorkflowState) -> WorkflowState:
        # Initialize params if needed
        if name not in state["params"]:
            state["params"][name] = {}
        
        phase = state["params"][name]
        missing = [k for k in req if k not in phase]
        
        # If missing params, try to extract from original message
        if missing:
            original_msg = state["messages"][0].content if state["messages"] else ""
            extracted = extract_params(original_msg, name)
            phase.update(extracted)
            state["messages"] += [AIMessage(content=f"Extracted params for {name}: {extracted}")]
        
        # Auto-confirm and execute
        phase["__confirmed__"] = True
        
        try:
            # Execute the phase function
            result = func(phase)
            state["results"][name] = result
            
            # Mark as completed
            if "completed_phases" not in state:
                state["completed_phases"] = []
            state["completed_phases"].append(name)
            
            state["messages"] += [AIMessage(content=f"Phase {name} completed successfully")]
            
        except Exception as e:
            state["messages"] += [AIMessage(content=f"Phase {name} failed: {str(e)}")]
        
        return state
    
    return name, node

# Build the graph
builder = StateGraph(WorkflowState)
builder.add_node("supervisor", supervisor_node)

# Add phase nodes
for pd in phase_definitions:
    builder.add_node(*make_phase_node(pd))

# Set up edges
builder.add_edge(START, "supervisor")

# Each phase goes back to supervisor for next phase determination
for pd in phase_definitions:
    builder.add_edge("supervisor", pd["name"])
    builder.add_edge(pd["name"], "supervisor")

# Compile the graph
graph = builder.compile(checkpointer=MemorySaver())

# Test cases
tests = [
    ("Run A then B", [HumanMessage(content="Generate report for Bob then analyze datasets d1,d2")]),
    ("Only C", [HumanMessage(content="Notify user alice@example.com with 'Hello!'")]),
    ("All phases", [HumanMessage(content="Create report for Sarah, analyze datasets d5,d6,d7, then notify admin@company.com with 'Processing complete'")]),
]

for name, msgs in tests:
    print(f"\n=== Testing: {name} ===")
    state: WorkflowState = {
        "messages": msgs, 
        "params": {}, 
        "results": {},
        "structured_response": {"phases": [], "intent": ""},
        "completed_phases": []
    }
    
    try:
        res = graph.invoke(state, {"configurable": {"thread_id": name}})
        print("Results:", json.dumps(res["results"], indent=2))
        print("Completed phases:", res.get("completed_phases", []))
        print("Final messages:", [msg.content for msg in res["messages"][-2:]])
    except Exception as e:
        print(f"Error: {e}")


=== Testing: Run A then B ===
Error: At key 'params': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

=== Testing: Only C ===
Error: At key 'params': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

=== Testing: All phases ===
Error: Error code: 400 - {'error': {'message': "Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.", 'type': 'invalid_request_error', 'code': 'tool_use_failed', 'failed_generation': '<function=SupervisorOut>{"phases": ["phase_a", "phase_b", "phase_c"], "intent": "create_report_analyze_notify"}{"phase_a": {"user_name": "Sarah", "report_type": "report"}, "phase_b": {"dataset_ids": ["d5", "d6", "d7"]}, "phase_c": {"no

In [11]:
from typing import TypedDict, List, Dict, Any, Literal, Annotated
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from langgraph.graph.message import add_messages
import datetime, json, re
from dotenv import load_dotenv

load_dotenv()
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

class PhaseAParams(TypedDict): user_name: str; report_type: str
class PhaseAResult(TypedDict): status: str; chart_data: List[Dict[str, Any]]
class PhaseBParams(TypedDict): dataset_ids: List[str]
class PhaseBResult(TypedDict): processed: int; summary: str
class PhaseCParams(TypedDict): notify_email: str; message: str
class PhaseCResult(TypedDict): sent: bool; timestamp: str

class SupervisorOut(TypedDict):
    phases: List[Literal['phase_a','phase_b','phase_c']]
    intent: str

# Reducer function for merging nested dictionaries
def merge_nested_dicts(existing: Dict[str, Dict[str, Any]], update: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    if existing is None:
        existing = {}
    if update is None:
        return existing
    
    result = existing.copy()
    for key, value in update.items():
        if key in result:
            result[key].update(value)
        else:
            result[key] = value
    return result

class WorkflowState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    params: Annotated[Dict[str, Dict[str, Any]], merge_nested_dicts]
    results: Annotated[Dict[str, Dict[str, Any]], merge_nested_dicts]
    structured_response: SupervisorOut
    completed_phases: List[str]  # Track what's been done

def phase_a_workflow(p: PhaseAParams) -> PhaseAResult:
    chart = [{"x": i, "y": len(p["user_name"]) * i} for i in range(5)]
    return {"status": f"Report for {p['user_name']}", "chart_data": chart}

def phase_b_workflow(p: PhaseBParams) -> PhaseBResult:
    cnt = len(p["dataset_ids"])
    return {"processed": cnt, "summary": f"{cnt} datasets processed"}

def phase_c_workflow(p: PhaseCParams) -> PhaseCResult:
    return {"sent": True, "timestamp": datetime.datetime.utcnow().isoformat()}

phase_definitions = [
    {"name":"phase_a","required":["user_name","report_type"],"function":phase_a_workflow},
    {"name":"phase_b","required":["dataset_ids"],"function":phase_b_workflow},
    {"name":"phase_c","required":["notify_email","message"],"function":phase_c_workflow}
]

supervisor_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="You are a supervisor that decides which phases to execute based on user requests."),
    SystemMessage(content="""Available phases:
- phase_a: Generate reports (needs user_name, report_type)  
- phase_b: Analyze datasets (needs dataset_ids)
- phase_c: Send notifications (needs notify_email, message)

Analyze the user's request and determine which phases should be executed in order."""),
    MessagesPlaceholder("messages"),
    SystemMessage(content="Respond with ONLY valid JSON in this exact format: {\"phases\": [\"phase_a\"], \"intent\": \"description\"}. No additional text.")
])
supervisor_llm = llm.with_structured_output(SupervisorOut)

def extract_params(message_content: str, phase_name: str) -> Dict[str, Any]:
    """Extract parameters from user message"""
    content = message_content.lower()
    
    if phase_name == "phase_a":
        # Extract user name and report type
        user_name = "TestUser"
        if "bob" in content:
            user_name = "Bob"
        elif "sarah" in content:
            user_name = "Sarah"
        
        report_type = "Monthly"
        if "custom" in content:
            report_type = "Custom"
        elif "daily" in content:
            report_type = "Daily"
            
        return {"user_name": user_name, "report_type": report_type}
    
    elif phase_name == "phase_b":
        # Extract dataset IDs
        datasets = re.findall(r'd\d+', content)
        if not datasets:
            datasets = ["ds1", "ds2", "ds3"]
        return {"dataset_ids": datasets}
    
    elif phase_name == "phase_c":
        # Extract email and message
        email_match = re.search(r'[\w\.-]+@[\w\.-]+\.\w+', message_content)
        email = email_match.group(0) if email_match else "test@example.com"
        
        msg_match = re.search(r"'([^']*)'", message_content)
        message = msg_match.group(1) if msg_match else "Task completed"
        
        return {"notify_email": email, "message": message}
    
    return {}

def supervisor_node(state: WorkflowState) -> Command[Literal['phase_a','phase_b','phase_c', END]]:
    # First time - plan the phases
    if "structured_response" not in state or not state["structured_response"]["phases"]:
        # Use the prompt template properly
        prompt_input = supervisor_prompt.invoke({"messages": state["messages"]})
        resp = supervisor_llm.invoke(prompt_input)
        
        return Command(
            update={
                "structured_response": resp,
                "completed_phases": [],
                "messages": [AIMessage(content=f"Planning phases: {resp['phases']}")]
            },
            goto=resp["phases"][0] if resp["phases"] else END
        )
    
    # Find next phase to execute
    planned_phases = state["structured_response"]["phases"]
    completed = state.get("completed_phases", [])
    
    # Find first unfinished phase
    for phase in planned_phases:
        if phase not in completed:
            return Command(goto=phase)
    
    # All phases done
    return Command(
        update={"messages": [AIMessage(content="All phases completed!")]},
        goto=END
    )

def make_phase_node(defn):
    name, req, func = defn["name"], defn["required"], defn["function"]
    
    def node(state: WorkflowState) -> Dict[str, Any]:
        # Get current params for this phase
        current_params = state.get("params", {}).get(name, {}).copy()
        missing = [k for k in req if k not in current_params]
        
        # If missing params, try to extract from original message
        if missing:
            original_msg = state["messages"][0].content if state["messages"] else ""
            extracted = extract_params(original_msg, name)
            current_params.update(extracted)
        
        # Auto-confirm
        current_params["__confirmed__"] = True
        
        # Prepare the update dictionary
        update_dict = {
            "params": {name: current_params},
            "messages": [AIMessage(content=f"Extracted params for {name}: {current_params}")]
        }
        
        try:
            # Execute the phase function
            result = func(current_params)
            
            # Mark as completed
            completed = state.get("completed_phases", []).copy()
            completed.append(name)
            
            update_dict.update({
                "results": {name: result},
                "completed_phases": completed,
            })
            update_dict["messages"].append(AIMessage(content=f"Phase {name} completed successfully"))
            
        except Exception as e:
            update_dict["messages"].append(AIMessage(content=f"Phase {name} failed: {str(e)}"))
        
        return update_dict
    
    return name, node

# Build the graph
builder = StateGraph(WorkflowState)
builder.add_node("supervisor", supervisor_node)

# Add phase nodes
for pd in phase_definitions:
    builder.add_node(*make_phase_node(pd))

# Set up edges
builder.add_edge(START, "supervisor")

# Each phase goes back to supervisor for next phase determination
for pd in phase_definitions:
    builder.add_edge("supervisor", pd["name"])
    builder.add_edge(pd["name"], "supervisor")

# Compile the graph
graph = builder.compile(checkpointer=MemorySaver())

# Test cases
tests = [
    ("Run A then B", [HumanMessage(content="Generate report for Bob then analyze datasets d1,d2")]),
    ("Only C", [HumanMessage(content="Notify user alice@example.com with 'Hello!'")]),
    ("All phases", [HumanMessage(content="Create report for Sarah, analyze datasets d5,d6,d7, then notify admin@company.com with 'Processing complete'")]),
]

for name, msgs in tests:
    print(f"\n=== Testing: {name} ===")
    state: WorkflowState = {
        "messages": msgs, 
        "params": {}, 
        "results": {},
        "structured_response": {"phases": [], "intent": ""},
        "completed_phases": []
    }
    
    try:
        res = graph.invoke(state, {"configurable": {"thread_id": name}})
        print("Results:", json.dumps(res["results"], indent=2))
        print("Completed phases:", res.get("completed_phases", []))
        print("Final messages:", [msg.content for msg in res["messages"][-2:]])
    except Exception as e:
        print(f"Error: {e}")


=== Testing: Run A then B ===
Error: Error code: 500 - {'error': {'message': 'Internal Server Error', 'type': 'internal_server_error'}}

=== Testing: Only C ===
Error: At key 'completed_phases': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

=== Testing: All phases ===
Error: At key 'completed_phases': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE


In [12]:
from typing import TypedDict, List, Dict, Any, Literal, Annotated
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import JsonOutputParser
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from langgraph.graph.message import add_messages
import datetime, json
from dotenv import load_dotenv

load_dotenv()
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

def ask_json(prompt: str) -> Dict[str, Any]:
    print(f"[DEBUG] LLM JSON prompt:\n{prompt}")
    resp = llm.invoke(ChatPromptTemplate.from_template(prompt)).content
    print(f"[DEBUG] LLM JSON raw response:\n{resp}")
    parsed = JsonOutputParser().parse(resp)
    print(f"[DEBUG] Parsed JSON:\n{parsed}")
    return parsed

class PhaseAParams(TypedDict): user_name: str; report_type: str
class PhaseAResult(TypedDict): status: str; chart_data: List[Dict[str, Any]]
class PhaseBParams(TypedDict): dataset_ids: List[str]
class PhaseBResult(TypedDict): processed: int; summary: str
class PhaseCParams(TypedDict): notify_email: str; message: str
class PhaseCResult(TypedDict): sent: bool; timestamp: str

class SupervisorOut(TypedDict):
    phases: List[Literal['phase_a','phase_b','phase_c']]
    intent: str

class WorkflowState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    params: Dict[str, Dict[str, Any]]
    results: Dict[str, Dict[str, Any]]
    structured_response: SupervisorOut

def phase_a_workflow(p: PhaseAParams) -> PhaseAResult:
    print(f"[DEBUG] Executing phase_a_workflow with params: {p}")
    chart = [{"x": i, "y": len(p["user_name"]) * i} for i in range(5)]
    result = {"status": f"Report for {p['user_name']}", "chart_data": chart}
    print(f"[DEBUG] phase_a result: {result}")
    return result

def phase_b_workflow(p: PhaseBParams) -> PhaseBResult:
    print(f"[DEBUG] Executing phase_b_workflow with params: {p}")
    cnt = len(p["dataset_ids"])
    result = {"processed": cnt, "summary": f"{cnt} datasets processed"}
    print(f"[DEBUG] phase_b result: {result}")
    return result

def phase_c_workflow(p: PhaseCParams) -> PhaseCResult:
    print(f"[DEBUG] Executing phase_c_workflow with params: {p}")
    res = {"sent": True, "timestamp": datetime.datetime.utcnow().isoformat()}
    print(f"[DEBUG] phase_c result: {res}")
    return res

phase_definitions = [
    {"name":"phase_a","required":["user_name","report_type"],"function":phase_a_workflow},
    {"name":"phase_b","required":["dataset_ids"],"function":phase_b_workflow},
    {"name":"phase_c","required":["notify_email","message"],"function":phase_c_workflow}
]

supervisor_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="You are a supervisor coordinating phases."),
    SystemMessage(content="Available phases: phase_a (user_name,report_type), phase_b (dataset_ids), phase_c (notify_email,message)."),
    MessagesPlaceholder("messages"),
    SystemMessage(content="Return JSON matching SupervisorOut: {'phases':[...], 'intent':'...'}")
])
supervisor_llm = llm.with_structured_output(SupervisorOut)

def supervisor_node(state: WorkflowState) -> Command[Literal['phase_a','phase_b','phase_c', END]]:
    print(f"[DEBUG] Supervisor received messages: {state['messages']}")
    resp = supervisor_llm.invoke({"messages": state["messages"]}, config={"configurable":{"session_id":"sup"}})
    print(f"[DEBUG] Supervisor structured output: {resp}")
    state["structured_response"] = resp
    chosen = resp["phases"][0] if resp["phases"] else END
    print(f"[DEBUG] Supervisor routing to: {chosen}")
    return Command(goto=chosen)

def make_phase_node(defn):
    name, req, func = defn["name"], defn["required"], defn["function"]
    def node(state: WorkflowState) -> WorkflowState:
        print(f"[DEBUG] Entered node {name}, current params: {state['params'].get(name)}")
        phase = state["params"].setdefault(name, {})
        missing = [k for k in req if k not in phase]
        if missing:
            print(f"[DEBUG] {name} missing fields: {missing}")
            return {"messages":[AIMessage(content=f"Please provide {missing} for {name}")]}
        if "__confirmed__" not in phase:
            print(f"[DEBUG] {name} awaiting confirmation: {phase}")
            return {"messages":[AIMessage(content=f"Confirm {name}? yes/no")]}
        print(f"[DEBUG] {name} all set, running workflow function")
        state["results"][name] = func(phase)
        return state
    return name, node

builder = StateGraph(WorkflowState)
builder.add_node("supervisor", supervisor_node)
for pd in phase_definitions:
    builder.add_node(*make_phase_node(pd))

builder.add_edge(START,"supervisor")
for pd in phase_definitions:
    builder.add_edge("supervisor", pd["name"])
    builder.add_edge(pd["name"], "supervisor")
builder.add_edge(phase_definitions[-1]["name"], END)

graph = builder.compile(checkpointer=MemorySaver())

tests = [
    ("Run A then B", [HumanMessage(content="Generate report for Bob then analyze datasets d1,d2")]),
    ("Only C", [HumanMessage(content="Notify user alice@example.com with 'Hello!'")]),
]

for name, msgs in tests:
    print(f"\n===== Running test scenario: {name} =====")
    state: WorkflowState = {"messages": msgs, "params": {}, "results": {}}
    res = graph.invoke(state, {"configurable":{"thread_id":name}})
    while "__interrupt__" in res:
        print(f"[DEBUG] Graph interrupted; messages to user: {res.get('messages')}")
        res = graph.invoke(res, {"configurable":{"thread_id":name}})
    print(f"[DEBUG] Final results for {name}:")
    print(json.dumps(res["results"], indent=2))



===== Running test scenario: Run A then B =====
[DEBUG] Supervisor received messages: [HumanMessage(content='Generate report for Bob then analyze datasets d1,d2', additional_kwargs={}, response_metadata={}, id='a3ac94f8-adcf-4291-ac32-04db5aa8428f')]


ValueError: Invalid input type <class 'dict'>. Must be a PromptValue, str, or list of BaseMessages.

In [None]:
from typing import TypedDict, List, Dict, Any, Literal, Annotated
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import JsonOutputParser
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from langgraph.graph.message import add_messages
import datetime, json
from dotenv import load_dotenv

load_dotenv()
llm = ChatGroq(model_name="llama-3.3-70b-versatile", temperature=0.7)

class PhaseAParams(TypedDict): user_name: str; report_type: str
class PhaseAResult(TypedDict): status: str; chart_data: List[Dict[str,Any]]
class PhaseBParams(TypedDict): dataset_ids: List[str]
class PhaseBResult(TypedDict): processed: int; summary: str
class PhaseCParams(TypedDict): notify_email: str; message: str
class PhaseCResult(TypedDict): sent: bool; timestamp: str

class SupervisorOut(TypedDict):
    next_phase: Literal['phase_a','phase_b','phase_c']
    intent: str

class WorkflowState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    params: Dict[str, Dict[str,Any]]
    results: Dict[str, Dict[str,Any]]
    supervisor_out: SupervisorOut

def ask_json(prompt: str) -> Dict[str, Any]:
    return JsonOutputParser().parse(llm.invoke(ChatPromptTemplate.from_template(prompt)).content)

def phase_a_workflow(p: PhaseAParams) -> PhaseAResult:
    chart = [{"x": i, "y": len(p["user_name"])*i} for i in range(5)]
    return {"status": f"Report for {p['user_name']}", "chart_data": chart}

def phase_b_workflow(p: PhaseBParams) -> PhaseBResult:
    cnt = len(p["dataset_ids"])
    return {"processed": cnt, "summary": f"{cnt} datasets processed"}

def phase_c_workflow(p: PhaseCParams) -> PhaseCResult:
    return {"sent": True, "timestamp": datetime.datetime.utcnow().isoformat()}

phase_defs = [
    {"name":"phase_a","required":["user_name","report_type"],"workflow":phase_a_workflow},
    {"name":"phase_b","required":["dataset_ids"],"workflow":phase_b_workflow},
    {"name":"phase_c","required":["notify_email","message"],"workflow":phase_c_workflow},
]

supervisor_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="Decide which phase to start based on user intent."),
    SystemMessage(content="Options: phase_a, phase_b, phase_c"),
    MessagesPlaceholder("messages"),
    SystemMessage(content="Return JSON: {'next_phase': <phase>, 'intent': <summary>}")
])

# Create a chain that combines the prompt with the structured output
supervisor_chain = supervisor_prompt | llm.with_structured_output(SupervisorOut)

def supervisor_node(state: WorkflowState) -> Command:
    # Use the chain to properly format the prompt and get structured output
    out = supervisor_chain.invoke({"messages": state["messages"]})
    print(out)
    state["supervisor_out"] = out
    return Command(goto=out["next_phase"])

def make_phase_node(defn):
    name = defn["name"]; req = defn["required"]; wf = defn["workflow"]
    def node(state: WorkflowState) -> WorkflowState:
        phase = state["params"].setdefault(name,{})
        missing = [k for k in req if k not in phase]
        if missing:
            return {"messages":[AIMessage(content=f"Provide {missing} for {name}")]}
        if "__confirmed__" not in phase:
            return {"messages":[AIMessage(content=f"Confirm {name}? yes/no")]}
        state["results"][name] = wf(phase)
        return state
    return name, node

builder = StateGraph(WorkflowState)
builder.add_node("supervisor", supervisor_node)
for pd in phase_defs:
    builder.add_node(*make_phase_node(pd))
builder.add_edge(START,"supervisor")
for pd in phase_defs:
    builder.add_edge(pd["name"], END) 

graph = builder.compile(checkpointer=MemorySaver())

tests = [
    ("Test A->B", [HumanMessage(content="Create report for John then analyze data")]),
    ("Test C only", [HumanMessage(content="Send notification to x@x.com")]),
]

for name, msgs in tests:
    state = {"messages": msgs, "params": {}, "results": {}}
    res = graph.invoke(state, {"configurable":{"thread_id":name}})
    while "__interrupt__" in res:
        res = graph.invoke(res, {"configurable":{"thread_id":name}})
    print(name, json.dumps(res["results"], indent=2))

Test A->B {}
Test C only {}
