In [28]:
from typing import TypedDict, List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, END
import json
import re


In [6]:
class RobotState(TypedDict):
    user_input: str
    plan: List[Dict[str, Any]]
    current_step: int
    last_result: str
    finished: bool

In [10]:
llm = ChatOpenAI(
    model="openai/gpt-4o-mini",
    temperature=0,
    base_url="https://openrouter.ai/api/v1",
    api_key="",
)

In [None]:
def planner_node(state: RobotState):

    prompt = f"""
You are a robotics task planner.

Convert the user request into a JSON list of actions using the provided skills .

Available skills:
- moveLandmark(location): Move the robot to a specific location.
- pick_item(object): Pick up an item and place into the box.
- pick_box(object): Pick up the box with picked items.
- inspect(object): Inspect if the requested items are present.
- requestHumanHelp(): Request human help if the task fails.

User request:
{state["user_input"]}

Return ONLY valid JSON list like:
[
  {{"action": "moveLandmark", "args": {{"location": "warehouse"}}}},
  {{"action": "pick_item", "args": {{"obj": "fruitCan"}}}}
]
"""

    response = llm.invoke(prompt)

    try:
        plan = json.loads(response.content)
    except Exception:
        print("⚠ Planner produced invalid JSON. Requesting human help.")
        plan = [
            {"action": "requestHumanHelp", "args": {}}
        ]

    return {

        **state,
        "plan": plan,
        "current_step": 0,
        "finished": False
    }

In [11]:
def executor_node(state: RobotState):

    if state["current_step"] >= len(state["plan"]):
        return {**state, "finished": True}

    step = state["plan"][state["current_step"]]
    action = step["action"]
    args = step["args"]

    print(f"\n➡ Executing step {state['current_step']+1}: {action} {args}")

    try:
        result = getattr(skills, action)(**args)
    except Exception as e:
        print("Execution error:", e)
        result = "failure"

    return {
        **state,
        "last_result": result,
        "current_step": state["current_step"] + 1
    }

In [9]:
def monitor_node(state: RobotState):

    failure = skills.detectFailure()

    if failure or state["last_result"] == "failure":
        print("⚠ Failure detected. Requesting human help.")
        skills.requestHumanHelp()
        return {**state, "finished": True}

    return state

In [None]:
builder = StateGraph(RobotState)

builder.add_node("planner", planner_node)
builder.add_node("executor", executor_node)
builder.add_node("monitor", monitor_node)

builder.set_entry_point("planner")

builder.add_edge("planner", "executor")
builder.add_edge("executor", "monitor")


def should_continue(state: RobotState):
    if state["finished"]:
        return END
    return "executor"


builder.add_conditional_edges("monitor", should_continue)

graph = builder.compile()

In [None]:
if __name__ == "__main__":

    initial_state: RobotState = {
        "user_input": "I want to buy two fruit can",
        "plan": [],
        "current_step": 0,
        "last_result": "",
        "finished": False
    }

    graph.invoke(initial_state)

In [33]:
def planner_test(user_input):

    prompt = f"""
You are a robotics task planner.

Convert the user request into a JSON list of actions using the provided skills,the robot should carry the box with items back to user, also check
if robot acquiredthe all requested items before coming back.

Available skills:
- moveLandmark(location): Move the robot to a specific location.
- pick_item(object): Pick up an item one time and place into the box.
- pick_box(object): Pick up the box with picked items.
- inspect(object): Check if the requested items are picked and placed in the box.
- requestHumanHelp(): Request human help if the task fails.

User request:
{user_input}

Return ONLY valid JSON list strictly from the following format with same variable names:
[
  {{"action": "moveLandmark", "args": {{"location": "warehouse"}}}},
  {{"action": "pick_item", "args": {{"obj": "fruitCan"}}}}
]
"""


    response = llm.invoke(prompt)
    raw = response.content.strip()

    # Extract JSON array even if wrapped in ```json
    match = re.search(r"\[.*\]", raw, re.DOTALL)
    if not match:
        raise ValueError("No JSON array found in model output")

    plan = json.loads(match.group())

    # try:
    #     plan = json.loads(response.content)
    # except Exception:
    #     print("⚠ Planner produced invalid JSON. Requesting human help.")
    #     plan = [
    #         {"action": "requestHumanHelp", "args": {}}
    #     ]

    return plan

In [35]:
user_input = "I want to buy two fruit can"
planner_test(user_input)

[{'action': 'moveLandmark', 'args': {'location': 'warehouse'}},
 {'action': 'pick_item', 'args': {'obj': 'fruitCan'}},
 {'action': 'pick_item', 'args': {'obj': 'fruitCan'}},
 {'action': 'pick_box', 'args': {'object': 'box'}},
 {'action': 'inspect', 'args': {'object': 'box'}},
 {'action': 'moveLandmark', 'args': {'location': 'user'}}]