In [None]:
from dotenv import load_dotenv

_ = load_dotenv()

In [None]:

from typing import Optional

from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool


class FlightsInput(BaseModel):
    departure_airport: Optional[str] = Field(description='Departure airport code (IATA)')
    arrival_airport: Optional[str] = Field(description='Arrival airport code (IATA)')
    outbound_date: Optional[str] = Field(
        description='Parameter defines the outbound date. The format is YYYY-MM-DD. e.g. 2024-06-22')
    return_date: Optional[str] = Field(
        description='Parameter defines the return date. The format is YYYY-MM-DD. e.g. 2024-06-28')
    adults: Optional[int] = Field(1, description='Parameter defines the number of adults. Default to 1.')
    children: Optional[int] = Field(0, description='Parameter defines the number of children. Default to 0.')
    infants_in_seat: Optional[int] = Field(0,
                                           description='Parameter defines the number of infants in seat. Default to 0.')
    infants_on_lap: Optional[int] = Field(0,
                                          description='Parameter defines the number of infants on lap. Default to 0.')


class FlightsInputSchema(BaseModel):
    params: FlightsInput


@tool(args_schema=FlightsInputSchema)
def flights_finder(params: FlightsInput):
    '''
    Find flights using the Google Flights engine.

    Returns:
        dict: Flight search results.
    '''

    params = {
        'api_key': os.environ.get('SERPAPI_API_KEY'),
        'engine': 'google_flights',
        'hl': 'en',
        'gl': 'us',
        'departure_id': params.departure_airport,
        'arrival_id': params.arrival_airport,
        'outbound_date': params.outbound_date,
        'return_date': params.return_date,
        'currency': 'INR',
        'adults': params.adults,
        'infants_in_seat': params.infants_in_seat,
        'stops': '1',
        'infants_on_lap': params.infants_on_lap,
        'children': params.children
    }

    try:
        search = serpapi.search(params)
        results = search.data['best_flights']
    except Exception as e:
        results = str(e)
    return results


In [None]:
import os
from typing import Optional

import serpapi
from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool


# from pydantic import BaseModel, Field


class HotelsInput(BaseModel):
    q: str = Field(description='Location of the hotel')
    check_in_date: str = Field(description='Check-in date. The format is YYYY-MM-DD. e.g. 2024-06-22')
    check_out_date: str = Field(description='Check-out date. The format is YYYY-MM-DD. e.g. 2024-06-28')
    sort_by: Optional[str] = Field('8',
                                   description='Parameter is used for sorting the results. Default is sort by highest rating')
    adults: Optional[int] = Field(1, description='Number of adults. Default to 1.')
    children: Optional[int] = Field(0, description='Number of children. Default to 0.')
    children_ages: Optional[int] = Field(0,
                                         description='Ages of children, if multiple - pass comma separated numbers like "3,4" Default to 0.')
    rooms: Optional[int] = Field(1, description='Number of rooms. Default to 1.')
    hotel_class: Optional[str] = Field(
        None, description='Parameter defines to include only certain hotel class in the results. for example- 2,3,4')


class HotelsInputSchema(BaseModel):
    params: HotelsInput


@tool(args_schema=HotelsInputSchema)
def hotels_finder(params: HotelsInput):
    '''
    Find hotels using the Google Hotels engine.

    Returns:
        dict: Hotel search results.
    '''

    params = {
        'api_key': os.environ.get('SERPAPI_API_KEY'),
        'engine': 'google_hotels',
        'hl': 'en',
        'gl': 'us',
        'q': params.q,
        'check_in_date': params.check_in_date,
        'check_out_date': params.check_out_date,
        'currency': 'USD',
        'adults': params.adults,
        'children': params.children,
        'rooms': params.rooms,
        'sort_by': params.sort_by,
        'hotel_class': params.hotel_class,
        'children_ages': params.children_ages
    }

    search = serpapi.search(params)
    results = search.data
    return results['properties'][:5]


In [None]:
import operator
from langchain_core.messages import AnyMessage, AIMessage
from typing import TypedDict, Annotated


class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [None]:
from langchain_openai import ChatOpenAI
from datetime import datetime
from langchain_core.messages import SystemMessage

TOOLS = [flights_finder, hotels_finder]

CURRENT_YEAR = datetime.now().year
_tools = {t.name: t for t in TOOLS}
_tools_llm = ChatOpenAI(model='gpt-4o').bind_tools(TOOLS)

TOOLS_SYSTEM_PROMPT = f"""You are a smart travel agency. Use the tools to look up information.
    You are allowed to make multiple calls (either together or in sequence).
    Only look up information when you are sure of what you want.
    The current year is {CURRENT_YEAR}.
    If you need to look up some information before asking a follow up question, you are allowed to do that!
    I want to have in your output links to hotels websites and flights websites (if possible).
    I want to have as well the logo of the hotel and the logo of the airline company (if possible).
    In your output always include the price of the flight and the price of the hotel and the currency as well (if possible).
    for example for hotels-
    Rate: $581 per night
    Total: $3,488
    """


def call_tools_llm(state: AgentState):
    messages = state['messages']
    messages = [SystemMessage(content=TOOLS_SYSTEM_PROMPT)] + messages
    message = _tools_llm.invoke(messages)
    return {'messages': [message]}

Lets define a graph


In [None]:
from langchain_core.messages import ToolMessage


def invoke_tools(state: AgentState):
    tool_calls = state['messages'][-1].tool_calls
    results = []
    for t in tool_calls:
        print(f'Calling: {t}')
        if not t['name'] in _tools:  # check for bad tool name from LLM
            print('\n ....bad tool name....')
            result = 'bad tool name, retry'  # instruct LLM to retry if bad
        else:
            result = _tools[t['name']].invoke(t['args'])
        results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
    print('Back to the model!')
    return {'messages': results}

In [None]:
def exists_action(state: AgentState):
    result = state['messages'][-1]
    if len(result.tool_calls) == 0:
        return 'email_sender'
    return 'more_tools'

In [None]:
def email_sender(state: AgentState):
    print('Sending email...')
    return {'messages': [AIMessage(content='Email sent!')]}

In [None]:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import END
from langgraph.graph import StateGraph

builder = StateGraph(AgentState)
builder.add_node('call_tools_llm', call_tools_llm)
builder.add_node('invoke_tools', invoke_tools)
builder.add_node('email_sender', email_sender)
builder.set_entry_point('call_tools_llm')

builder.add_conditional_edges('call_tools_llm', exists_action,
                              {'more_tools': 'invoke_tools', 'email_sender': 'email_sender'})
builder.add_edge('invoke_tools', 'call_tools_llm')
builder.add_edge('email_sender', END)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory, interrupt_before=['email_sender'])

In [None]:
from IPython.display import Image, display

graph.get_graph().draw_mermaid()
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
import uuid
from langchain_core.messages import HumanMessage

thread_id = str(uuid.uuid4())

user_input = "Plan a trip to Delhi from Pune for 4 days starting April 15  for 2 adult and a child of 3 years. "
messages = [HumanMessage(content=user_input)]
config = {'configurable': {'thread_id': thread_id}}

result = graph.invoke({'messages': messages}, config=config)

In [None]:
from IPython.display import display, Markdown

display(Markdown(result['messages'][-1].content))