In [17]:
from typing import TypedDict, Literal
from langgraph.graph import StateGraph

class StarshipState(TypedDict):
    system: str     # System being checked (e.g., "warp core")
    status: Literal["NOMINAL", "WARNING", "CRITICAL"]  # Diagnostic result
    alert_message: str  # Generated alert (if any)
    logs: list  # Raw diagnostic data

In [18]:
# Define the nodes

# Fetch System Data (Simulate an API call)
def fetch_system_data(state: StarshipState):
    # Simulate fetching data from a starship system
    state["logs"] = [
        {"component": "plasma conduit", "reading": 0.8},
        {"component": "coolant", "reading": 0.2}  # Low coolant!
    ]
    state["system"] = "warp core"
    return state

# Process Data (Check for failures)
def process_data(state: StarshipState):
    # Check the logs for critical issues
    if any(log["reading"] < 0.5 for log in state["logs"]):
        state["status"] = "CRITICAL"
    elif any(log["reading"] < 0.7 for log in state["logs"]):
        state["status"] = "WARNING"
    else:
        state["status"] = "NOMINAL"
    return state

# Trigger Alert (Conditional edge: Only if critical)
def trigger_alert(state: StarshipState):
    if state["status"] == "CRITICAL":
        state["alert_message"] = f"Alert! {state['system']} is in critical condition!"
    else:
        state["alert_message"] = "No critical issues detected."
    return state

In [23]:
# Build the Graph

workflow = StateGraph(StarshipState)

# Add nodes
workflow.add_node("fetch", fetch_system_data)
workflow.add_node("process", process_data)
workflow.add_node("alert", trigger_alert)

# Configure workflow
workflow.set_entry_point("fetch")
workflow.add_edge("fetch", "process")

# Modern conditional edge (None terminates)
workflow.add_conditional_edges(
    "process",
    lambda state: "alert" if state["status"] == "CRITICAL" else None
)

workflow.set_finish_point("alert")  # Terminate after alert node

# Compile and run
app = workflow.compile()

# Compile
app = workflow.compile()

In [24]:
# Test
initial_state = StarshipState(
    system="",
    status="NOMINAL",
    alert_message="",
    logs=[]
)
result = app.invoke(initial_state)
print("Final State:", result)

Final State: {'system': 'warp core', 'status': 'CRITICAL', 'alert_message': 'Alert! warp core is in critical condition!', 'logs': [{'component': 'plasma conduit', 'reading': 0.8}, {'component': 'coolant', 'reading': 0.2}]}
