# Install Dependencies

In [14]:
%pip install langchain-text-splitters==0.3.5 langchain==0.3.16 langchain_openai==0.3.2 langgraph==0.2.68 langchain-community==0.3.16 langchain-experimental==0.3.4 langchain-core==0.3.32 langchain-postgres==0.0.12 ipython==8.31.0

Note: you may need to restart the kernel to use updated packages.


# Setup Environment

In [3]:
import os

os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')
os.environ['LANGSMITH_API_KEY'] = os.getenv('LANGSMITH_API_KEY')

CONNECTION = os.getenv('CONNECTION')
COLLECTION_NAME = os.getenv('COLLECTION_NAME')

# Setup RAG

In [4]:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_postgres import PGVector

llm = ChatOpenAI(model='gpt-4o-mini')
embeddings = OpenAIEmbeddings(model='text-embedding-3-small')

pg_vector_store = PGVector(
    embeddings=embeddings,
    collection_name=COLLECTION_NAME,
    connection=CONNECTION,
    use_jsonb=True,
)

# Ingest Test Data

In [9]:
from langchain_core.documents import Document

docs = [
    Document(
        page_content='there are cats in the pond',
        metadata={'id': 1, 'location': 'pond', 'topic': 'animals'},
    ),
    Document(
        page_content='ducks are also found in the pond',
        metadata={'id': 2, 'location': 'pond', 'topic': 'animals'},
    ),
    Document(
        page_content='fresh apples are available at the market',
        metadata={'id': 3, 'location': 'market', 'topic': 'food'},
    ),
    Document(
        page_content='the market also sells fresh oranges',
        metadata={'id': 4, 'location': 'market', 'topic': 'food'},
    ),
    Document(
        page_content='the new art exhibit is fascinating',
        metadata={'id': 5, 'location': 'museum', 'topic': 'art'},
    ),
    Document(
        page_content='a sculpture exhibit is also at the museum',
        metadata={'id': 6, 'location': 'museum', 'topic': 'art'},
    ),
    Document(
        page_content='a new coffee shop opened on Main Street',
        metadata={'id': 7, 'location': 'Main Street', 'topic': 'food'},
    ),
    Document(
        page_content='the book club meets at the library',
        metadata={'id': 8, 'location': 'library', 'topic': 'reading'},
    ),
    Document(
        page_content='the library hosts a weekly story time for kids',
        metadata={'id': 9, 'location': 'library', 'topic': 'reading'},
    ),
    Document(
        page_content='a cooking class for beginners is offered at the community center',
        metadata={'id': 10, 'location': 'community center', 'topic': 'classes'},
    ),
]

pg_vector_store.add_documents(docs, ids=[doc.metadata['id'] for doc in docs])

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Test Queries

In [5]:
query_filter = {
    '$and': [
        {'id': {'$in': [1, 5, 2, 9]}},
        {'location': {'$in': ['pond', 'market']}},
    ]
}

pg_vector_store.similarity_search(
    'ducks',
    k=10,
    filter=query_filter,
)

[Document(id='2', metadata={'id': 2, 'topic': 'animals', 'location': 'pond'}, page_content='ducks are also found in the pond'),
 Document(id='1', metadata={'id': 1, 'topic': 'animals', 'location': 'pond'}, page_content='there are cats in the pond')]

# Agents Setup

In [6]:
from langgraph.graph import MessagesState


class State(MessagesState):
    next: str

# Tool setup

In [11]:
from langchain import hub
from langchain_core.tools import tool
from typing import Annotated

prompt = hub.pull('rlm/rag-prompt')


def retrieve(question: str):
    retrieved_docs = pg_vector_store.similarity_search(question)
    return {'context': retrieved_docs}


@tool
def rag_tool(question: Annotated[str, 'User prompt'], ):
    """Use this to get chunks"""
    docs_content = '\n\n'.join(doc.page_content for doc in question)
    messages = prompt.invoke({'question': question, 'context': docs_content})
    response = llm.invoke(messages)
    return {'answer': response.content}


@tool
def api_tool(question: Annotated[str, 'User prompt'], ):
    """Use this to get api result"""

    # API integration here

    return {'answer': 'Barista is the new coffee shop opened on Main Street'}

# Create Agents

In [13]:
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from typing import Literal

rag_agent = create_react_agent(llm, tools=[rag_tool])
api_agent = create_react_agent(llm, tools=[api_tool])

members = ['rag', 'api']

options = members + ['FINISH']

system_prompt = (
    'You are a supervisor tasked with managing a conversation between the'
    f' following workers: {members}. Given the following user request,'
    ' respond with the worker to act next. Each worker will perform a'
    ' task and respond with their results and status. When finished,'
    ' respond with FINISH.'
)

class Router(TypedDict):
    """Worker to route to next. If no workers needed, route to FINISH."""

    next: Literal[*options]

def supervisor_node(state: State) -> Command[Literal[*members, '__end__']]:
    messages = [
                   {'role': 'system', 'content': system_prompt},
               ] + state['messages']
    response = llm.with_structured_output(Router).invoke(messages)
    goto = response['next']
    if goto == 'FINISH':
        goto = END

    return Command(goto=goto, update={'next': goto})

def rag_node(state: State) -> Command[Literal['supervisor']]:
    result = rag_agent.invoke(state)
    return Command(
        update={
            'messages': [
                HumanMessage(content=result['messages'][-1].content, name='rag')
            ]
        },
        goto='supervisor',
    )


def api_node(state: State) -> Command[Literal['supervisor']]:
    result = api_agent.invoke(state)
    return Command(
        update={
            'messages': [
                HumanMessage(content=result['messages'][-1].content, name='api')
            ]
        },
        goto='supervisor',
    )

builder = StateGraph(State)
builder.add_edge(START, 'supervisor')
builder.add_node('supervisor', supervisor_node)
builder.add_node('rag', rag_node)
builder.add_node('api', api_node)
graph = builder.compile()

# Graph

In [None]:
from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))