In [None]:
# !pip install -U langgraph langchain-community langchain-anthropic tavily-python pandas openai
# !pip install langfuse

In [1]:
import os

import getpass
def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")


from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
    secret_key="sk-lf-7a32fef8-8e0b-41b0-8d93-147a18cc40fd",
    public_key="pk-lf-c05ba234-0e69-44dd-bbba-f2e7b6f4b61f",
    host="https://us.cloud.langfuse.com", # 🇺🇸 US region
)

In [2]:
import yaml
import re

class Flow(object):

    def __init__(self, flow_file):
        '''
        Construct the flow based on the descriptions from flow_file

        Args:
            flow_file (str): the path pointed to the text file containing the flow instructions.
        '''

        self.plan_data = None
        with open(flow_file) as stream:
            try:
                self.plan_data = yaml.safe_load(stream)
            except yaml.YAMLError as exc:
                print(exc)
        
        self.current_step = 0

    def get_current_task(self):
        return self.plan_data["step_" + str(self.current_step)]

    def parse_step_number(self, text: str):
        if "step" in text:
            match = re.search(r'step_(\d+)', text)
            if not match:
                match = re.search(r'step (\d+)', text)
            if match:
                number = match.group(1)
                return int(number.strip())
            
        return None
    
    def go_next(self):
        self.current_step += 1
        return self.get_current_task()

    def go_to(self, step_number):
        self.current_step = step_number
        return self.get_current_task()
    
    def reset(self):
        self.current_step = 0
        return self.get_current_task()


flow = Flow("plumbing_flow_v1.yaml")


In [3]:
from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    current_task: str
    current_step: int

In [4]:
from langchain_core.tools import tool
import random



user_in_service_range = False
@tool
def go_to_next_step(step_number: int) -> str:
    """Use this tool to guide user to go to next step"""

    global flow  
    print("FLOW CHANGE: Go to step_", step_number)
    flow.go_to(step_number)

    return "Have gone to step " + str(step_number)


user_in_service_range = False
@tool
def verify_customer_in_service_range() -> str:
    """Verify customer is in service range using customer Map"""

    print(f"Verifying customer in service range")
    if user_in_service_range:
        return "Customer is verified to be in service range"
    else:
        return "Customer is verified to be out of service range"

@tool
def log_customer_info(name: str, address: str, phone_number: str, email_address: str) -> str:
    """Log customer info to Talkdesk Contact"""
    
    print(f"Logging customer info: {name}, {address}, {phone_number}, {email_address}")

    return "Customer info logged"

check_knowledge_base_and_verify_service_result = False
@tool
def check_knowledge_base_and_verify_service():
    """Tool to check knowledge base list of provided services and verify if we can service the customer"""

    if check_knowledge_base_and_verify_service_result:
        return "Have checked the knowledge base and verified that we can service the customer"
    else:
        return "Have checked the knowledge base and verified that we cannot service the customer"

customer_distance_from_portland_result = "Customer is more than 60 minutes away"
# customer_distance_from_portland_result = "Customer is less than 45 minutes away"
# customer_distance_from_portland_result = "Customer is 45-60 minutes away"

customer_need_booking_type = "Needs ESTIMATE booking"
# customer_need_booking_type = "Needs SERVICE CALL booking"
@tool
def confirm_customer_distance_from_portland_me():
    """Service Zone Lookup tool. Use this tool to confirm customer's distance from Portland, ME"""

    return f"{customer_distance_from_portland_result} AND {customer_need_booking_type}"

@tool
def schedule_next_available_appointment():
    """Schedule next available estimate appointment"""

    return "Next available appointment scheduled"



demo_tools = [
    go_to_next_step,
    verify_customer_in_service_range,
    log_customer_info,
    check_knowledge_base_and_verify_service,
    confirm_customer_distance_from_portland_me,
    schedule_next_available_appointment
]

In [5]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda

from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

In [6]:
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from datetime import datetime
from textwrap import dedent
from langchain_core.messages.tool import ToolMessage
from langfuse.decorators import observe


def last_tool_message(state, messages_key: str = "messages") -> str:
    if isinstance(state, list):
        last_message = state[-1]
    elif isinstance(state, dict) and (messages := state.get(messages_key, [])):
        last_message = messages[-1]
    elif messages := getattr(state, messages_key, []):
        last_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")

    if isinstance(last_message, ToolMessage):
        return last_message.content
    else:
        return ""
    

class Assistant:
    def __init__(self, runnable: Runnable, flow: Flow):
        self.runnable = runnable
        self.flow = flow

    @observe()
    def __call__(self, state: State, config: RunnableConfig):

        # verify
        
        while True:
            # configuration = config.get("configurable", {})
            # passenger_id = configuration.get("passenger_id", None)
            # state = {**state, "user_info": passenger_id}
            current_task = self.flow.get_current_task()
            state = {
                        **state, 
                        "current_task": current_task, 
                        "current_step": self.flow.current_step,
                        "last_message": last_tool_message(state)
                    }
            
            print("\n\nCURRENT TASK:", current_task)
            result = self.runnable.invoke(state, config={**config, "callbacks": [langfuse_handler]})
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                print("LLM returns empty response")
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            # elif "to step_" in result.content or "to step " in result.content:
            #     print(f"RESULT:```{result.content}```", )
            #     step_number = self.flow.parse_step_number(result.content)
            #     if not step_number:
            #         raise ValueError("Invalid step number:", result)    
            #     print("FLOW CHANGE: Go to step_", step_number)
            #     self.flow.go_to(step_number)
            else:
                break

        # Tool call or end of the flow
        return {"messages": result}


from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            dedent("""

Your are task orchestrator for Plumbing Service workflow process.
                   
**Context:**
Currently, you are in step number: {current_step} of a workflow process.
Your task is to read task instructions, and guide user what to do next based on user intent or last tool message.
Current Task:\n
{current_task}

Last tool message:
{last_message}

Notes:
- Focus on the current task and execute the task step by step.
                   
Tools usage:
- ALWAYS call 1 tool at a time.

            """)
        ),
        ("placeholder", "{messages}"),
    ]
)


assistant_runnable = primary_assistant_prompt | llm.bind_tools(demo_tools)

### Define graph

In [7]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)

flow = Flow("plumbing_flow_v1.yaml")

# Define nodes: these do the work
builder.add_node("assistant", Assistant(assistant_runnable, flow))
builder.add_node("tools", create_tool_node_with_fallback(demo_tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools", "assistant")

# The checkpointer lets the graph persist its state
# this is a complete memory for the entire graph.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

In [8]:
# from IPython.display import Image, display

# try:
#     display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
# except Exception:
#     # This requires some extra dependencies and is optional
#     pass

In [9]:
import shutil
import uuid


def test_agent(user_questions: list, thread_id: str):
    # Update with the backup file so we can restart from the original place in each section

    config = {
        "configurable": {
            # Checkpoints are accessed by thread_id
            "thread_id": thread_id,
        }
    }

    flow.reset()

    _printed = set()
    for question in user_questions:
        events = graph.stream(
            {"messages": ("user", question)}, config, stream_mode="values"
        )
        for event in events:
            _print_event(event, _printed)

In [10]:
# Case 10
user_in_service_range = True
check_knowledge_base_and_verify_service_result = True

customer_distance_from_portland_result = "Customer is 45-60 minutes away"
customer_need_booking_type = "Needs ESTIMATE booking"

case3_user_queries = [
    "Hi there, I need a plumber to fix my sink?",
    "My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com"
]
test_agent(case3_user_queries, thread_id=str(uuid.uuid4()))

Langfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client



Hi there, I need a plumber to fix my sink?


CURRENT TASK: - Determine user request intent
  - If user request is about personal (not related to a plumbing request), go to step_1
  - If user request is about selling something, go to step_2
  - If user request is about a new lead (needs plumbing services), go to step_3

Tool Calls:
  go_to_next_step (call_bAusnRh1zXXYiNtw0jgl5gjB)
 Call ID: call_bAusnRh1zXXYiNtw0jgl5gjB
  Args:
    step_number: 3
FLOW CHANGE: Go to step_ 3
Name: go_to_next_step

Have gone to step 3


CURRENT TASK: - Collect and log Basic Information in Talkdesk Contact: Name, Address, Phone Number, Email Address
- Then go to step_4


To proceed, I need to collect some basic information from you. Please provide the following details:

1. Name
2. Address
3. Phone Number
4. Email Address

Once I have this information, I'll log it and move to the next step.

My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com


CURRENT TA

In [11]:
# Case 9
user_in_service_range = True
check_knowledge_base_and_verify_service_result = True

customer_distance_from_portland_result = "Customer is less than 45 minutes away"
customer_need_booking_type = "Needs ESTIMATE booking"

case3_user_queries = [
    "Hi there, I need a plumber to fix my sink?",
    "My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com"
]
test_agent(case3_user_queries, thread_id=str(uuid.uuid4()))


Hi there, I need a plumber to fix my sink?


CURRENT TASK: - Determine user request intent
  - If user request is about personal (not related to a plumbing request), go to step_1
  - If user request is about selling something, go to step_2
  - If user request is about a new lead (needs plumbing services), go to step_3

Tool Calls:
  go_to_next_step (call_97qInTu0B1xJe3ni1c2BiHLV)
 Call ID: call_97qInTu0B1xJe3ni1c2BiHLV
  Args:
    step_number: 3
FLOW CHANGE: Go to step_ 3
Name: go_to_next_step

Have gone to step 3


CURRENT TASK: - Collect and log Basic Information in Talkdesk Contact: Name, Address, Phone Number, Email Address
- Then go to step_4


Let's start by collecting some basic information. Please provide the following details:

1. Your Name:
2. Your Address:
3. Your Phone Number:
4. Your Email Address:

Once I have this information, I will log it and proceed to the next step.

My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.

In [12]:
# Case 5
user_in_service_range = True
check_knowledge_base_and_verify_service_result = True
customer_distance_from_portland_result = "Customer is more than 60 minutes away"

case3_user_queries = [
    "Hi there, I need a plumber to fix my sink?",
    "My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com"
]
test_agent(case3_user_queries, thread_id=str(uuid.uuid4()))


Hi there, I need a plumber to fix my sink?


CURRENT TASK: - Determine user request intent
  - If user request is about personal (not related to a plumbing request), go to step_1
  - If user request is about selling something, go to step_2
  - If user request is about a new lead (needs plumbing services), go to step_3

Tool Calls:
  go_to_next_step (call_X901Uodz4cx568QEZL1ttnMg)
 Call ID: call_X901Uodz4cx568QEZL1ttnMg
  Args:
    step_number: 3
FLOW CHANGE: Go to step_ 3
Name: go_to_next_step

Have gone to step 3


CURRENT TASK: - Collect and log Basic Information in Talkdesk Contact: Name, Address, Phone Number, Email Address
- Then go to step_4


To proceed with your request for plumbing services, I need to collect some basic information from you. Please provide the following details:

1. Name
2. Address
3. Phone Number
4. Email Address

Once I have this information, I'll log it and move to the next step.

My name is Thuan, my phone number is 0902727231, my address is 123 Main St, 

In [13]:
# user not in provided service
user_in_service_range = True
check_knowledge_base_and_verify_service_result = False
case3_user_queries = [
    "Hi there, I need a plumber to fix my sink?",
    "My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com"
]
test_agent(case3_user_queries, thread_id=str(uuid.uuid4()))


Hi there, I need a plumber to fix my sink?


CURRENT TASK: - Determine user request intent
  - If user request is about personal (not related to a plumbing request), go to step_1
  - If user request is about selling something, go to step_2
  - If user request is about a new lead (needs plumbing services), go to step_3

Tool Calls:
  go_to_next_step (call_ehVow2EGbIZthfGN5FRozeYh)
 Call ID: call_ehVow2EGbIZthfGN5FRozeYh
  Args:
    step_number: 3
FLOW CHANGE: Go to step_ 3
Name: go_to_next_step

Have gone to step 3


CURRENT TASK: - Collect and log Basic Information in Talkdesk Contact: Name, Address, Phone Number, Email Address
- Then go to step_4


To proceed, I need to collect some basic information from you. Please provide the following details:

1. Your Name
2. Your Address
3. Your Phone Number
4. Your Email Address

Once I have this information, I can log it and move on to the next step.

My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thua

In [14]:
# user not in service range
user_in_service_range = False

case3_user_queries = [
    "Hi there, I need a plumber to fix my sink?",
    "My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com"
]
test_agent(case3_user_queries, thread_id=str(uuid.uuid4()))


Hi there, I need a plumber to fix my sink?


CURRENT TASK: - Determine user request intent
  - If user request is about personal (not related to a plumbing request), go to step_1
  - If user request is about selling something, go to step_2
  - If user request is about a new lead (needs plumbing services), go to step_3

Tool Calls:
  go_to_next_step (call_wX4wIDBCiAKaNTOq413KZFOg)
 Call ID: call_wX4wIDBCiAKaNTOq413KZFOg
  Args:
    step_number: 3
FLOW CHANGE: Go to step_ 3
Name: go_to_next_step

Have gone to step 3


CURRENT TASK: - Collect and log Basic Information in Talkdesk Contact: Name, Address, Phone Number, Email Address
- Then go to step_4


To proceed, I need to collect some basic information from you. Please provide the following details:

1. Name
2. Address
3. Phone Number
4. Email Address

Once I have this information, I will log it and move on to the next step.

My name is Thuan, my phone number is 0902727231, my address is 123 Main St, my email is thuan@gmail.com


CURRE

In [None]:
# Saleperson Test
case2_user_queries = [
    "Hi there, I'm from a ABC that sells gift cards. Are you interested in buying some?",
]
test_agent(case2_user_queries, thread_id=str(uuid.uuid4()))

In [None]:
# Personal Call Test
case1_user_queries = [
    "Hi there, I want to meet your boss?",
    "Sure, I'm Aron, and this is my phone number, 0902727231?",
    "Yeah, leave this message for him, I want to know if he is available to meet me?",
]

test_agent(case1_user_queries, thread_id=str(uuid.uuid4()))