# Build a ShoeBot Sales Agent using LangGraph and Graphiti

The following example demonstrates building an agent using LangGraph. Graphiti is used to personalize agent responses based on information learned from prior conversations. Additionally, a database of products is loaded into the Graphiti graph, enabling the agent to speak to these products.

The agent implements:
- persistence of new chat turns to Graphiti and recall of relevant Facts using the most recent message.
- a tool for querying Graphiti for shoe information
- an in-memory MemorySaver to maintain agent state.

## Install dependencies
```shell
pip install graphiti-core langchain-openai langgraph ipywidgets
```

Ensure that you've followed the Graphiti installation instructions. In particular, installation of `neo4j`.

In [None]:
import asyncio
import json
import logging
import os
import sys
import uuid
from contextlib import suppress
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated

import ipywidgets as widgets
from dotenv import load_dotenv, find_dotenv
from IPython.display import Image, display
from typing_extensions import TypedDict



In [4]:
load_dotenv(find_dotenv())

True

In [5]:
def setup_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.ERROR)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return logger


logger = setup_logging()

## LangSmith integration (Optional)

If you'd like to trace your agent using LangSmith, ensure that you have a `LANGSMITH_API_KEY` set in your environment.

Then set `os.environ['LANGCHAIN_TRACING_V2'] = 'false'` to `true`.


In [6]:
os.environ['LANGCHAIN_TRACING_V2'] = 'false'
os.environ['LANGCHAIN_PROJECT'] = 'Graphiti LangGraph Tutorial'

## Configure Graphiti

Ensure that you have `neo4j` running and a database created. Ensure that you've configured the following in your environment.

```bash
NEO4J_URI=
NEO4J_USER=
NEO4J_PASSWORD=
```

In [14]:
# Configure Graphiti

from graphiti_core import Graphiti
from graphiti_core.edges import EntityEdge
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.maintenance.graph_data_operations import clear_data

neo4j_uri = os.getenv("NEO4J_URI")
neo4j_user = os.getenv("NEO4J_USER")
neo4j_password = os.getenv("NEO4J_PASSWORD")

client = Graphiti(
    neo4j_uri,
    neo4j_user,
    neo4j_password,
)

## Generating a database schema 

The following is only required for the first run of this notebook or when you'd like to start your database over.

**IMPORTANT**: `clear_data` is destructive and will wipe your entire database.

In [8]:
# Note: This will clear the database
await clear_data(client.driver)
await client.build_indices_and_constraints()

## Load Q & A history 

Load several Q & A responses into the Graphiti. This may take a while.


**IMPORTANT**: This only needs to be done once. If you run `clear_data` you'll need to rerun this step.

In [15]:

qa_history = [
    {
        "question": "Tell me about yourself.",
        "answer": "Name’s Buck Rawlins. Been ridin’ fence, bustin’ broncs, and keepin’ folks honest from Wyoming to West Texas. I don't say much, but when I do, it matters."
    },
    {
        "question": "What are your greatest strengths?",
        "answer": "I show up early, leave late, and don’t rattle when things get rough. Storm’s brewin’? I already got the cattle moved."
    },
    {
        "question": "What would you say is your biggest weakness?",
        "answer": "I ain’t too fond of paperwork. I’d rather fix a busted axle than fill out a requisition form. But if it needs doin’, I’ll do it right."
    },
    {
        "question": "How do you handle conflict on the team?",
        "answer": "You sit folks down, look ’em in the eye, and call things what they are. No yellin’, no posturin’. Just plain truth and a path forward."
    },
    {
        "question": "Describe a time you failed.",
        "answer": "Left the south gate open once and lost half the herd overnight. Learned real quick that double-checkin’ ain’t just for greenhorns."
    },
    {
        "question": "Why should we hire you?",
        "answer": "’Cause I’ll work harder than most, complain less than all, and bring calm when things go sideways. Can’t put that on a résumé, but you’ll feel it by sundown."
    },
    {
        "question": "Where do you see yourself in five years?",
        "answer": "Still honest, still upright, still earnin’ my keep. Maybe runnin’ my own spread. Maybe teachin’ some young pup how not to get kicked in the teeth."
    },
    {
        "question": "How do you stay motivated in tough conditions?",
        "answer": "A man don’t need motivation when he’s got responsibility. That gate breaks, you fix it. That’s just the deal. Weather and mood don’t enter into it."
    },
    {
        "question": "What’s your leadership style?",
        "answer": "Ride with your crew, not ahead of ’em. Listen more than you talk. And never ask a man to do what you won’t do yourself."
    },
    {
        "question": "Any final thoughts?",
        "answer": "You want fancy, I ain’t it. You want dependable, steady, and tough as boot leather—I’m your man. Now, which way’s the tack room?"
    }
]




async def ingest_products_data(client: Graphiti, qa_history: list[dict[str, str]]):
    
    for i, qa in enumerate(qa_history):
        await client.add_episode(
            name=f"Interview response {i}",
            episode_body=(
                f"Question: {qa['question']}\n"
                f"Answer: {qa['answer']}"
            ),
            source_description='Q & A responses',
            reference_time=datetime.now(timezone.utc),
        )


await ingest_products_data(client, qa_history)

## Create a user node in the Graphiti graph

In your own app, this step could be done later once the user has identified themselves and made their sales intent known. We do this here so we can configure the agent with the user's `node_uuid`.

In [19]:
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_EPISODE_MENTIONS

user_name = 'jess'

await client.add_episode(
    name='User Creation',
    episode_body=(f'{user_name} is looking to generate accurate responses to interview questions. '
                  f'They are a cowboy with a lot of experience and have been in the field for a long time.'),
    source=EpisodeType.text,
    reference_time=datetime.now(timezone.utc),
    source_description='U',
)

In [None]:
# let's get Jess's node uuid
nl = await client._search(user_name, NODE_HYBRID_SEARCH_EPISODE_MENTIONS)

user_node_uuid = nl.nodes[0].uuid


# Lets get the interview response nodes
interview_response_nodes = []
for i in range(len(qa_history)):
    nl = await client._search(f'Interview response {i}', NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
    interview_response_nodes.append(nl.nodes[0].uuid)

In [17]:
def edges_to_facts_string(entities: list[EntityEdge]):
    return '-' + '\n- '.join([edge.fact for edge in entities])

In [18]:
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode

## `get_shoe_data` Tool

The agent will use this to search the Graphiti graph for information about shoes. We center the search on the `manybirds_node_uuid` to ensure we rank shoe-related data over user data.


In [None]:
@tool
async def get_interview_data(query: str, node_uuid) -> str:
    """Search the graphiti graph for information about the interview responses."""
    # Look through the interview responses

    edge_results = await client.search(
        query,
        center_node_uuid=node_uuid,
        num_results=10,
    )
    return edges_to_facts_string(edge_results)


tools = [get_interview_data]
tool_node = ToolNode(tools)

In [None]:
llm = ChatOpenAI(model='gpt-4.1-mini', temperature=0).bind_tools(tools)

In [None]:
# Test the tool node
await tool_node.ainvoke({'messages': [await llm.ainvoke('wool shoes')]})

## Chatbot Function Explanation

The chatbot uses Graphiti to provide context-aware responses in a shoe sales scenario. Here's how it works:

1. **Context Retrieval**: It searches the Graphiti graph for relevant information based on the latest message, using the user's node as the center point. This ensures that user-related facts are ranked higher than other information in the graph.

2. **System Message**: It constructs a system message incorporating facts from Graphiti, setting the context for the AI's response.

3. **Knowledge Persistence**: After generating a response, it asynchronously adds the interaction to the Graphiti graph, allowing future queries to reference this conversation.

This approach enables the chatbot to maintain context across interactions and provide personalized responses based on the user's history and preferences stored in the Graphiti graph.

In [None]:
class State(TypedDict):
    messages: Annotated[list, add_messages]
    user_name: str
    user_node_uuid: str


async def chatbot(state: State):
    facts_string = None
    if len(state['messages']) > 0:
        last_message = state['messages'][-1]
        graphiti_query = f'{"SalesBot" if isinstance(last_message, AIMessage) else state["user_name"]}: {last_message.content}'
        # search graphiti using Jess's node uuid as the center node
        # graph edges (facts) further from the Jess node will be ranked lower
        edge_results = await client.search(
            graphiti_query, center_node_uuid=state['user_node_uuid'], num_results=5
        )
        facts_string = edges_to_facts_string(edge_results)

    system_message = SystemMessage(
        content=f"""You are a skillfull shoe salesperson working for ManyBirds. Review information about the user and their prior conversation below and respond accordingly.
        Keep responses short and concise. And remember, always be selling (and helpful!)

        Things you'll need to know about the user in order to close a sale:
        - the user's shoe size
        - any other shoe needs? maybe for wide feet?
        - the user's preferred colors and styles
        - their budget

        Ensure that you ask the user for the above if you don't already know.

        Facts about the user and their conversation:
        {facts_string or 'No facts about the user and their conversation'}"""
    )

    messages = [system_message] + state['messages']

    response = await llm.ainvoke(messages)

    # add the response to the graphiti graph.
    # this will allow us to use the graphiti search later in the conversation
    # we're doing async here to avoid blocking the graph execution
    asyncio.create_task(
        client.add_episode(
            name='Chatbot Response',
            episode_body=f'{state["user_name"]}: {state["messages"][-1]}\nSalesBot: {response.content}',
            source=EpisodeType.message,
            reference_time=datetime.now(timezone.utc),
            source_description='Chatbot',
        )
    )

    return {'messages': [response]}

## Setting up the Agent

This section sets up the Agent's LangGraph graph:

1. **Graph Structure**: It defines a graph with nodes for the agent (chatbot) and tools, connected in a loop.

2. **Conditional Logic**: The `should_continue` function determines whether to end the graph execution or continue to the tools node based on the presence of tool calls.

3. **Memory Management**: It uses a MemorySaver to maintain conversation state across turns. This is in addition to using Graphiti for facts.

In [None]:
graph_builder = StateGraph(State)

memory = MemorySaver()


# Define the function that determines whether to continue or not
async def should_continue(state, config):
    messages = state['messages']
    last_message = messages[-1]
    # If there is no function call, then we finish
    if not last_message.tool_calls:
        return 'end'
    # Otherwise if there is, we continue
    else:
        return 'continue'


graph_builder.add_node('agent', chatbot)
graph_builder.add_node('tools', tool_node)

graph_builder.add_edge(START, 'agent')
graph_builder.add_conditional_edges('agent', should_continue, {'continue': 'tools', 'end': END})
graph_builder.add_edge('tools', 'agent')

graph = graph_builder.compile(checkpointer=memory)

Our LangGraph agent graph is illustrated below.

In [None]:
with suppress(Exception):
    display(Image(graph.get_graph().draw_mermaid_png()))

## Running the Agent

Let's test the agent with a single call

In [None]:
await graph.ainvoke(
    {
        'messages': [
            {
                'role': 'user',
                'content': 'What sizes do the TinyBirds Wool Runners in Natural Black come in?',
            }
        ],
        'user_name': user_name,
        'user_node_uuid': user_node_uuid,
    },
    config={'configurable': {'thread_id': uuid.uuid4().hex}},
)

## Viewing the Graph

At this stage, the graph would look something like this. The `jess` node is `INTERESTED_IN` the `TinyBirds Wool Runner` node. The image below was generated using Neo4j Desktop.

In [None]:
display(Image(filename='tinybirds-jess.png', width=850))

## Running the Agent interactively

The following code will run the agent in an event loop. Just enter a message into the box and click submit.

In [None]:
conversation_output = widgets.Output()
config = {'configurable': {'thread_id': uuid.uuid4().hex}}
user_state = {'user_name': user_name, 'user_node_uuid': user_node_uuid}


async def process_input(user_state: State, user_input: str):
    conversation_output.append_stdout(f'\nUser: {user_input}\n')
    conversation_output.append_stdout('\nAssistant: ')

    graph_state = {
        'messages': [{'role': 'user', 'content': user_input}],
        'user_name': user_state['user_name'],
        'user_node_uuid': user_state['user_node_uuid'],
    }

    try:
        async for event in graph.astream(
            graph_state,
            config=config,
        ):
            for value in event.values():
                if 'messages' in value:
                    last_message = value['messages'][-1]
                    if isinstance(last_message, AIMessage) and isinstance(
                        last_message.content, str
                    ):
                        conversation_output.append_stdout(last_message.content)
    except Exception as e:
        conversation_output.append_stdout(f'Error: {e}')


def on_submit(b):
    user_input = input_box.value
    input_box.value = ''
    asyncio.create_task(process_input(user_state, user_input))


input_box = widgets.Text(placeholder='Type your message here...')
submit_button = widgets.Button(description='Send')
submit_button.on_click(on_submit)

conversation_output.append_stdout('Asssistant: Hello, how can I help you find shoes today?')

display(widgets.VBox([input_box, submit_button, conversation_output]))