In [1]:
%%capture --no-stderr
%pip install -U langgraph langgraph-checkpoint-postgres psycopg psycopg-pool langchain_google_genai


In [3]:
from google.colab import userdata
import os

# Gemini API key
GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
os.environ["LANGCHAIN_API_KEY"] = userdata.get('LANGCHAIN_API_KEY')
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "home-services-demo"

In [4]:
DB_URI = userdata.get('DB_URI')  # Example: "postgresql://user:pass@host:5432/dbname"

from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver

connection_kwargs = {"autocommit": True, "prepare_threshold": 0}
pool = ConnectionPool(conninfo=DB_URI, max_size=20, kwargs=connection_kwargs)
checkpointer = PostgresSaver(pool)
checkpointer.setup()  # create required tables

In [6]:
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from langgraph.graph.state import CompiledStateGraph
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# State schema
class ServiceState(dict):
    messages: list

llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", api_key=GEMINI_API_KEY)

In [7]:
def intake(state: ServiceState) -> ServiceState:
    print("Intake step running...")
    return state

# Diagnosis step
def diagnosis(state: ServiceState) -> ServiceState:
    user_message = state["messages"][-1].content.lower()

    # If expensive issue -> raise interrupt
    if "replace heater" in user_message or "expensive" in user_message:
        raise interrupt("⚠️ Expensive repair detected. Needs technician approval.")

    # Otherwise let model respond safely
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# Technician approval step
def technician_review(state: ServiceState) -> ServiceState:
    # This is a placeholder node
    # Technician will inject updates using graph.update_state()
    return state

In [8]:
builder = StateGraph(ServiceState)

builder.add_node("intake", intake)
builder.add_node("diagnosis", diagnosis)
builder.add_node("technician_review", technician_review)

builder.add_edge(START, "intake")
builder.add_edge("intake", "diagnosis")
builder.add_edge("diagnosis", END)

graph: CompiledStateGraph = builder.compile(checkpointer=checkpointer)


In [9]:
thread_config = {"configurable": {"thread_id": "1"}}

customer_msg = HumanMessage(content="No hot water, should I replace heater? (expensive)")
print("=== Customer starts ===")
for event in graph.stream({"messages": [customer_msg]}, thread_config, stream_mode="values"):
    print(event)

# Check paused state
state = graph.get_state(thread_config)
print("\n--- Graph paused state ---")
print("Next:", state.next)
print("Interrupts:", state.tasks)

=== Customer starts ===
{'messages': [HumanMessage(content='No hot water, should I replace heater? (expensive)', additional_kwargs={}, response_metadata={})]}
Intake step running...
{'messages': [HumanMessage(content='No hot water, should I replace heater? (expensive)', additional_kwargs={}, response_metadata={})]}

--- Graph paused state ---
Next: ('diagnosis',)
Interrupts: (PregelTask(id='f4c2c534-9bc6-e5f0-c96e-57ad2e5bdf60', name='diagnosis', path=('__pregel_pull', 'diagnosis'), error=None, interrupts=(Interrupt(value='⚠️ Expensive repair detected. Needs technician approval.', id='dd51336e3ed2fad96d650358888729bf'),), state=None, result=None),)


In [10]:
print("\n=== Technician logs in ===")
for m in state.values['messages']:
    print(type(m).__name__, ":", m.content)

# Technician edits the recommendation
from langchain_core.messages import HumanMessage
graph.update_state(
    thread_config,
    {"messages": [HumanMessage(content="Technician: Suggest flushing tank first before replacement.")]}
)


=== Technician logs in ===
HumanMessage : No hot water, should I replace heater? (expensive)


{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1f098727-015e-6fd3-8002-06746c39a346'}}

In [11]:
print("\n=== Resuming after Technician approval ===")
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

# Final state check
final_state = graph.get_state(thread_config)
print("\n--- Final State ---")
for m in final_state.values['messages']:
    print(type(m).__name__, ":", m.content)



=== Resuming after Technician approval ===
{'messages': [HumanMessage(content='Technician: Suggest flushing tank first before replacement.', additional_kwargs={}, response_metadata={})]}
{'messages': [AIMessage(content='Several options depending on the context and desired level of detail:\n\n**Option 1 (Concise):**\n\n> "Before replacing the [component], please flush the tank to remove any residual water or debris."\n\n**Option 2 (More detail):**\n\n> "Prior to replacement, it\'s recommended to flush the tank thoroughly. This will help prevent sediment or debris from entering the [component] during installation and ensure a cleaner working environment."\n\n**Option 3 (Most detail, including why):**\n\n> "To minimize the risk of contamination and improve the installation process, please flush the tank completely before replacing the [component].  This removes any loose sediment or debris that could interfere with the new part\'s function or cause damage during installation.  Ensure the

In [12]:
pool.close()