This example uses LangGraph to create a simple agent with some tools.  It demonstrates Mindlytics user identification during a conversation.  It also shows how the Mindlytics session can be passed around in LangGraph.

We are going to implement a simple customer service assistant.  The conversation will start out without the assistant knowning the user, so a Mindlytics session will be started with a "device id".  During the conversation the assistant will need to determine the user and will call a tool to search a company database of users to obtain the user.  This tool will then call the Mindlytics sdk to identify the user in this conversation.

In [None]:
import asyncio
import os
import uuid
from typing import Any, List
from pydantic import BaseModel
from langchain_openai import ChatOpenAI                # we are using openai
from langchain_core.messages import AIMessageChunk     # to detect assistant responses
from langgraph.prebuilt import create_react_agent      # pre-built agent
from langchain_core.tools import tool                  # we are going to define a tool
from langchain_core.runnables import RunnableConfig    # for accessing ML session in tools
from langgraph.checkpoint.memory import InMemorySaver  # for chat history
# Mindlytics imports needed for this demo
from mlsdk import Client, Session, MLEvent, TokenBasedCost
import json
import rich
if not os.getenv("OPENAI_API_KEY"):
    print("Error: OPENAI_API_KEY environment variable not set.")
    print("Please set your OpenAI API key:")
    print("export OPENAI_API_KEY='your-api-key-here'")
    sys.exit(1)

if not (os.getenv("MLSDK_API_KEY") and os.getenv("MLSDK_PROJECT_ID") ):
    print("Error: Mindlytics environment is not set up.  The latter part of this demo will not work without them")


Lets create the LLM model, with parameters to capture tokens.

In [None]:
MODEL = "gpt-4o-mini"

model = ChatOpenAI(
    model=MODEL,
    temperature=0.7,
    model_kwargs={
        "stream_options": {"include_usage": True},
    },
)

We need a tool to search the company database for a user record.  This will be called by the agent when a user needs to be identified.  In this tool we will identify this user to Mindlytics.

In [None]:
# a structured class to represent a user
class User(BaseModel):
    email: str
    name: str
    address: str

@tool("find_user_information", parse_docstring=True)
async def find_user_information(
    name_or_email: str,
    config: RunnableConfig,
) -> User:
    """Given a user name or an email address, return a User record for that user.

    Args:
        name_or_email: A user name or an email address

    Returns:
        User
    """
    # We would do a database lookup of course, but for this demo ...
    match = User(
        name="Princess Leia",
        email="princess.leia@alderaan.gal",
        address="Princess Leia Organa\nRoyal Palace\nCity of Aldera\nPlanet Alderaan\nCore Worlds, Galactic Republic"
    )

    # Send the identification to Mindlytics
    session: Session = config["configurable"].get("session")
    if session is not None:
        await session.user_identify(
            id=match.email,
            traits=match.model_dump(exclude_none=True)
        )
    else:
        raise Exception("Could not find ML session in tool context!")

    return match
    

Create the agent:

In [None]:
checkpointer = InMemorySaver() # manage chat history
agent = create_react_agent(
    model=model,
    tools=[find_user_information],
    checkpointer=checkpointer,
    prompt="You are the customer service agent at the Galactic Toys and Trinkets store.  To process returns, you can search for user information using supplied tools.",
)

Create a device id for this platform and initialize the Mindlytics sdk.

In [None]:
# Make a device_id
mac = uuid.getnode()
device_id = f"{mac:012x}"

# The client constructor will read MLSDK_API_KEY and MLSDK_PROJECT_ID 
ml_client = Client()


This function will be called to process chunks as they are streamed from the agent:

In [None]:
tc = []
async def process_chunk(
    session: Session,
    user_message: str,
    mode: str,
    chunk: Any,
) -> None:
    if mode == "messages":
        data = chunk[0]
        if isinstance(data, AIMessageChunk) and len(data.content) > 0:
            # Stream a token to the screen
            print(data.content, end="", flush=True)
    elif mode == "updates":
        data = chunk

        if data.get("tools") is not None:
            # Track a tool call to Mindlytics
            message = data["tools"]["messages"][0]
            name = getattr(message, "name", None)
            tool_call_id = getattr(message, "tool_call_id", None)
            #await session.track_tool_call(
            #    tool_name=name,
            #    tool_call_id=tool_call_id
            #)
            tc.append({
                "name": name,
                "tool_call_id": tool_call_id,
            })
        
        if data.get("agent") is not None:
            message = data["agent"]["messages"][0]
            if message.response_metadata.get("finish_reason") == "stop":
                # Track a conversation turn to Mindlytics
                assistant_message = message.content
                usage = {
                    "prompt_tokens": 0,
                    "completion_tokens": 0,
                    "model": MODEL,
                }
                usage_metadata = getattr(message, "usage_metadata", None)
                if usage_metadata is not None:
                    usage = {
                        "prompt_tokens": usage_metadata.get("input_tokens", 0),
                        "completion_tokens": usage_metadata.get("output_tokens", 0),
                        "model": MODEL,
                    }
                tool_calls = getattr(message, "tool_calls", None)
                if tool_calls is not None:
                    tc.extend(tool_calls)
                    
                await session.track_conversation_turn(
                    user=user_message,
                    assistant=assistant_message,
                    usage=TokenBasedCost(**usage),
                )


Now we will create the Mindlytics session context and invoke the agent.  The `user_messages` are a list of user statements to the conversation.  The LLM could behave in a variety of ways, so the user messages might not make a lot of sense after the first couple, but it is assusing to see what the LLM comes up with.  The important part is that the LLM will call our tool to identify the princess, which will identity Leia as the user of this conversation on this device.

In [None]:
# Capture any Mindlytic communication errors
ml_errors: List[Exception]  = []
async def on_error(error: Exception):
    # Handle Mindlytics errors here
    ml_errors.append(error)

# Create the context.  We don't know who the user is at first, so pass in our device id

session_context = ml_client.create_session(
    device_id=device_id,
    on_error=on_error,
)

# Not used in this demo, but if you want to modify the loop and prompt for manual input, then
# you could use this function like
#
#  user_message = await prompt("Princess Leia?:")
#
async def prompt(prompt_text: str) -> str:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, input, prompt_text)

# Start the session
async with session_context as session:
    # This state will be available to function calls, and other parts of the agent framework
    config = {
        "configurable": {
            "session": session,
            "thread_id": session.session_id,
        }
    }

    # Our canned user input
    user_messages = [
        "I would like to return a hair bun scrunchy.  It is too small for me!",
        "My name sir, is Princess Leia of Alderaan!",
        "Of course I want to proceed!",
        "My order number is 19-BBY.",
        "Never mind.  We're done here!",
        "Goodbye sir.",
    ]

    for user_message in user_messages:
        print(f"User: {user_message}")
        print("Assistant: ", end="", flush=True)
        async for mode, chunk in agent.astream(
            {
                "messages": user_message
            },
            stream_mode=["updates", "messages"],
            config=config
        ):
            # process the chunk
            await process_chunk(
                session=session,
                user_message=user_message,
                mode=mode,
                chunk=chunk,
            )
            
        print("\n")

print("The conversation is finished.")

if len(ml_errors) > 0:
    print("\n")
    print("There were problems communicating with Mindlytics:")
    for e in ml_errors:
        print(e)

if len(tc) > 0:
    print("\n")
    print("Tool Calls:")
    rich.print(tc)
    