In [1]:
import os

os.environ["OPENAI_API_KEY"] = "sk-proj-..."

In [2]:
from openai import OpenAI

client = OpenAI()

In [3]:
from pydantic import BaseModel, Field


class Search(BaseModel):
    query: str = Field(description="Search query for internet information")

class Memory(BaseModel):
    query: str = Field(description="Self-directed query to search information from your long term memory")

class FinalAnswer(BaseModel):
    answer: str = Field(description="Final answer to the user query, must be in markdown format")
    sources: str = Field(description="Sources used to answer the user query, must be in markdown format")

In [4]:
import json
import openai
from graphai import router, node


@node(start=True)
def node_start(input: dict):
    """Descriptive string for the node."""
    print(">>> node_start")
    return {"input": input}


@router(stream=True)
def node_router(input: dict, callback):
    print(">>> node_router")
    query = input["query"]
    messages = [
        {
            "role": "system",
            "content": """You are a helpful assistant. Select the best route to answer the user query. ONLY choose one function.""",
        },
        {"role": "user", "content": query},
    ]
    # we stream directly from the client
    stream = client.chat.completions.create(
        messages=messages,
        model="gpt-4o-mini",
        stream=True,
        tools=[
            openai.pydantic_function_tool(Search),
            openai.pydantic_function_tool(Memory),
        ],
        tool_choice="required",
    )

    first_chunk = True  # first chunk contains the tool name
    args_str = ""
    for chunk in stream:
        choice = chunk.choices[0]
        if first_chunk:
            toolname = choice.delta.tool_calls[0].function.name.lower()
            first_chunk = False
            callback(f"<graphai:toolname:{toolname}>")
        elif choice.finish_reason == "tool_calls":
            # this means we finished the tool call
            pass
        else:
            chunk = choice.delta.tool_calls[0].function.arguments
            callback(chunk)
            args_str += chunk
    args = json.loads(args_str)
    return {
        "choice": toolname,
        "input": {**input, **args},
    }


@node(stream=True)
def memory(input: dict, callback):
    print(">>> memory")
    #query = input["query"]
    # dummy function for testing to simulate memory search
    out = "The user is in Bali right now."
    callback(out)
    return {"input": {"text": out, **input}}


@node(stream=True)
def search(input: dict, callback):
    print(">>> search")
    #query = input["query"]
    # another dummy function for testing to simulate search
    out = "The most famous photo spot in Bali is the Uluwatu Temple."
    callback(out)
    return {
        "input": {
            "text": out,
            **input,
        }
    }


@node(stream=True)
def llm_node(input: dict, callback):
    print(">>> llm_node")
    chat_history = [
        {"role": message["role"], "content": message["content"]}
        for message in input["chat_history"]
    ]
    # construct all messages
    messages = [
        {"role": "system", "content": """You are a helpful assistant."""},
        *chat_history,
        {"role": "user", "content": input["query"]},
        {"role": "user", "content": (
            f"Response to the following query from the user: {input['query']}\n"
            "Here is additional context. You can use it to answer the user query. "
            f"But do not directly reference it: {input.get('text', '')}."
        )},
    ]
    # we stream directly from the client
    stream = client.chat.completions.create(
        messages=messages,
        model="gpt-4o-mini",
        stream=True,
        tools=[openai.pydantic_function_tool(FinalAnswer)],
        tool_choice="required",
    )

    first_chunk = True  # first chunk contains the tool name
    args_str = ""
    for chunk in stream:
        try:
            choice = chunk.choices[0]
            if first_chunk:
                toolname = choice.delta.tool_calls[0].function.name.lower()
                first_chunk = False
                callback(f"<graphai:toolname:{toolname}>")
            elif choice.finish_reason == "tool_calls":
                # this means we finished the tool call
                pass
            else:
                chunk = choice.delta.tool_calls[0].function.arguments
                callback(chunk)
                args_str += chunk
        except:
            pass
    args = json.loads(args_str)
    return {
        "choice": toolname,
        "input": {**input, **args},
    }


@node(end=True)
def node_end(input: dict, callback = None):
    """Descriptive string for the node."""
    print(">>> node_end")
    callback.close()
    return {"output": input["output"]}

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'
* 'smart_union' has been removed
  from .autonotebook import tqdm as notebook_tqdm


In [5]:
from graphai import Graph

graph = Graph()

In [6]:
for node_fn in [node_start, memory, search, llm_node, node_end]:
    graph.add_node(node_fn)

# add the router
graph.add_router(
    source=node_start,
    router=node_router,
    destinations=[memory, search]
)

# build the certain edges
graph.add_edge(source=node_start, destination=node_router)
graph.add_edge(source=memory, destination=llm_node)
graph.add_edge(source=search, destination=llm_node)
graph.add_edge(source=llm_node, destination=node_end)

#graph.visualize()

In [7]:
graph.compile()

Graph compiled successfully.


In [8]:
chat_history = []

Now we can get a response:

In [12]:
query = "do you remember where I am?"

chat_history.append({"role": "user", "content": query})
response = await graph.async_execute(
    input={"input": {"query": query, "chat_history": chat_history}}
)
response

>>> node_start
>>> node_router
>>> memory
>>> llm_node


{'choice': 'finalanswer',
 'input': {'text': 'The user is in Bali right now.',
  'query': 'user location',
  'chat_history': [{'role': 'user', 'content': 'do you remember where I am?'},
   {'role': 'user', 'content': 'do you remember where I am?'}],
  'answer': "It seems you're currently in Bali!",
  'sources': ''}}

We can see the order that our graph nodes were called, and the final JSON output. However, none of this was streamed — for streaming we need to use the `callback` object that can be generated from our graph:

In [14]:
import asyncio

callback = graph.get_callback()

query = "tell me a long story"
response = asyncio.create_task(graph.async_execute(
    input={"input": {"query": query, "chat_history": chat_history}}
))

async for token in callback.aiter():
    print(token)

>>> node_start
>>> node_router
>>> memory
>>> llm_node
<graphai:start:node_router>
<graphai:toolname:memory>
{"
query
":"
long
 story
"}
<graphai:end:node_router>
<graphai:start:memory>
The user is in Bali right now.
<graphai:end:memory>
<graphai:start:llm_node>
<graphai:toolname:finalanswer>
{"
answer
":"
It
 sounds
 like
 you've
 got
 something
 interesting
 going
 on
!
 If
 you'd
 like
 to
 share
 more
 about
 your
 long
 story
,
 I'm
 here
 to
 listen
.","
sources
":
""
}
<graphai:end:llm_node>
<graphai:END>
