In [24]:
import requests
import vertexai
import os

vertexai.init(project=os.environ.get('GCP_PROJECT_ID'))

from google.cloud import bigquery

from vertexai.generative_models import (
    GenerativeModel,
    GenerationConfig,
    Part,
    FunctionDeclaration
)
from vertexai.generative_models import Tool as VertexTool

In [2]:
PROJECT_ID = 'bigquery-public-data'
DATASET_ID = 'thelook_ecommerce'

## APIs

In [5]:
def get_exchange_rate_from_api(params):
    url = f"https://api.frankfurter.app/latest?from={params['currency_from']}&to={params['currency_to']}"
    api_response = requests.get(url)
    return api_response.text

In [6]:
get_exchange_rate_from_api({'currency_from': 'USD', 'currency_to': 'EUR'})

'{"amount":1.0,"base":"USD","date":"2024-11-22","rates":{"EUR":0.96043}}'

In [7]:
def list_datasets(params):
    client = bigquery.Client(project=params['project'])
    datasets = list(client.list_datasets())
    if datasets:
        return [dataset.dataset_id for dataset in datasets]
    else:
        return "{} project does not contain any datasets.".format(params['project'])

In [12]:
list_datasets({'project': PROJECT_ID})[:10]

['america_health_rankings',
 'austin_311',
 'austin_bikeshare',
 'austin_crime',
 'austin_incidents',
 'austin_waste',
 'baseball',
 'bbc_news',
 'bigqueryml_ncaa',
 'bitcoin_blockchain']

In [13]:
def list_tables(params):
    client = bigquery.Client(project=params['project'])
    try:
        response = client.list_tables(params["dataset_id"])
        return [table.table_id for table in response]
    except Exception as e:
        return f"The dataset {params['dataset_id']} is not found in the {params['project']} project, please specify the dataset and project"

In [14]:
list_tables({'project': PROJECT_ID, 'dataset_id': DATASET_ID})

['distribution_centers',
 'events',
 'inventory_items',
 'order_items',
 'orders',
 'products',
 'users']

## Gemini Function Calling

### Create the tool

In [15]:
# Function declarations
get_exchange_rate_func = FunctionDeclaration(
    name="get_exchange_rate",
    description="Get the exchange rate for currencies between countries",
    parameters={
    "type": "object",
    "properties": {
        "currency_date": {
            "type": "string",
            "description": "A date that must always be in YYYY-MM-DD format or the value 'latest' if a time period is not specified"
        },
        "currency_from": {
            "type": "string",
            "description": "The currency to convert from in ISO 4217 format"
        },
        "currency_to": {
            "type": "string",
            "description": "The currency to convert to in ISO 4217 format"
        }
    },
         "required": [
            "currency_from",
            "currency_to",
      ]
  },
)

list_datasets_func = FunctionDeclaration(
    name="list_datasets",
    description="Get a list of datasets in a project that will help answer the user's question",
    parameters={
        "type": "object",
        "properties": {
            "project": {
                "type": "string",
                "description": "Project ID to fetch tables from.",
            }
        },
    },
)

list_tables_func = FunctionDeclaration(
    name="list_tables",
    description="List tables in a dataset that will help answer the user's question",
    parameters={
        "type": "object",
        "properties": {
            "dataset_id": {
                "type": "string",
                "description": "Dataset ID to fetch tables from.",
            },
            "project": {
                "type": "string",
                "description": "Project ID to fetch tables from.",
            }
        },
        "required": [
            "dataset_id",
            "project"
        ],
    },
)

tool = VertexTool(
    function_declarations=[
        get_exchange_rate_func,
        list_datasets_func,
        list_tables_func
    ]
)

# Mapping to map function name to function
function_handler = {
    "get_exchange_rate": get_exchange_rate_from_api,
    "list_datasets": list_datasets,
    "list_tables": list_tables,
}

Tools can also be created this way but this implies having docstrings in your functions

# Function declarations

```py
def get_exchange_rate_from_api_v2(currency_from: str, currency_to: str):
    """
    Get the exchange rate for currencies
    
    Args:
        currency_from (str): The currency to convert from in ISO 4217 format
        currency_to (str): The currency to convert to in ISO 4217 format
    """
    url = f"https://api.frankfurter.app/latest?from={currency_from}&to={currency_to}"
    params = {'currency_to': currency_to, 'currency_from': currency_from}
    api_response = requests.get(url, params=params, verify=False)
    return api_response.text


get_exchange_rate_func_v2 = FunctionDeclaration.from_func(get_exchange_rate_from_api_v2)

tool = VertexTool(
    function_declarations=[
        get_exchange_rate_func_v2
    ]
)

# Mapping to map function name to function
function_handler = {
    "get_exchange_rate": get_exchange_rate_from_api,
    "list_datasets": list_datasets,
    "list_tables": list_tables,
}
````

### Create the Chat

In [25]:
gemini_model = GenerativeModel(
    "gemini-1.5-flash",
    generation_config=GenerationConfig(temperature=0),
    # tools=[tool]
)
chat = gemini_model.start_chat()

In [27]:
response = chat.send_message("What is the current exchange rate for USD vs EUR ?")
response.candidates[0].content.parts[0].text

'I do not have access to real-time information, including live exchange rates. \n\nTo get the most up-to-date USD to EUR exchange rate, I recommend checking a reliable financial website or using a currency converter app. \n\nHere are some popular options:\n\n* **Google Finance:** Simply search "USD to EUR" on Google.\n* **XE.com:** A dedicated currency converter website.\n* **Bloomberg:** A financial news and data provider.\n* **Yahoo Finance:** Another popular financial website.\n\nRemember that exchange rates fluctuate constantly, so the rate you see at one moment may be different just a few minutes later. \n'

Conclusion : Without tool, no answer

### Add a tool

In [14]:
gemini_model = GenerativeModel(
    "gemini-1.5-flash",
    generation_config=GenerationConfig(temperature=0),
    tools=[tool]
)
chat = gemini_model.start_chat()

In [15]:
response = chat.send_message("What is the current exchange rate for USD vs EUR ?")

# Extract the function call response
function_call = response.candidates[0].content.parts[0].function_call
function_call

name: "get_exchange_rate"
args {
  fields {
    key: "currency_to"
    value {
      string_value: "EUR"
    }
  }
  fields {
    key: "currency_from"
    value {
      string_value: "USD"
    }
  }
  fields {
    key: "currency_date"
    value {
      string_value: "latest"
    }
  }
}

In [16]:
prompt = f"What is the current exchange rate for USD vs EUR ?"

response = chat.send_message(prompt)

# Extract the function call response
function_call = response.candidates[0].content.parts[0].function_call

# Check for a function call or a natural language response
if function_call.name in function_handler.keys():
    # Extract the function call name
    function_name = function_call.name
    print("#### Predicted function name")
    print(function_name, "\n")
    # msg.content = f'I think I need to use the `{function_name}` tool'
    # await msg.update()

    # Extract the function call parameters
    params = {key: value for key, value in function_call.args.items()}
    print("#### Predicted function parameters")
    print(params, "\n")

    function_api_response = function_handler[function_name](params)
    print("#### API response")
    print(function_api_response)
    response = chat.send_message(
        Part.from_function_response(
            name=function_name,
            response={"content": function_api_response},
        ),
    )   
    print("\n#### Final Answer")
    print(response.candidates[0].content.parts[0].text)

#### Predicted function name
get_exchange_rate 

#### Predicted function parameters
{'currency_from': 'USD', 'currency_date': 'latest', 'currency_to': 'EUR'} 

#### API response
{"amount":1.0,"base":"USD","date":"2024-11-20","rates":{"EUR":0.94679}}





#### Final Answer
The current exchange rate for USD vs EUR is 0.94679. This means that 1 USD is equal to 0.94679 EUR. 



## LangChain Agents

In [37]:
from langchain_google_vertexai import VertexAI
from langchain.agents import AgentType, initialize_agent
from langchain.agents import Tool as LangchainTool
from langchain_core.tools import tool
from langchain.memory import ConversationBufferMemory

### Decorate our apis with the `tool` decorator

In [38]:
@tool
def list_datasets(project: str) -> list:
    """
    Return a list of Bigquery datasets
    Args:
        project: GCP project id
    """
    client = bigquery.Client(project=project)
    datasets = list(client.list_datasets())
    if datasets:
        return [dataset.dataset_id for dataset in datasets]
    else:
        return "{} project does not contain any datasets.".format(project)

@tool
def list_tables(project: str, dataset_id: str) -> list:
    """
    Return a list of Bigquery tables
    Args:
        project: GCP project id
        dataset_id: ID of the dataset
    """
    client = bigquery.Client(project=project)
    try:
        response = client.list_tables(dataset_id)
        return [table.table_id for table in response]
    except Exception as e:
        return f"The dataset {dataset_id} is not found in the {project} project, please specify the dataset and project"

@tool
def get_exchange_rate_from_api(currency_from: str, currency_to: str) -> str:
    """
    Return the exchange rate between currencies
    Args:
        currency_from: str
        currency_to: str
    """
    url = f"https://api.frankfurter.app/latest?from={currency_from}&to={currency_to}"
    api_response = requests.get(url, verify=False)
    return api_response.text

### Build the tool

In [39]:
# langchain_tool = [
#     LangchainTool(
#         name='list_datasets',
#         func=list_datasets,
#         description=list_datasets.description,
#     ),
#     LangchainTool(
#         name='list_tables',
#         func=list_tables,
#         description=list_tables.description,
#     ),
#     LangchainTool(
#         name='get_exchange_rate',
#         func=get_exchange_rate_from_api,
#         description=get_exchange_rate_from_api.description
#     )
# ]

langchain_tool = [
    list_datasets,
    list_tables,
    get_exchange_rate_from_api
]

### Instantiate AgentExecutor - New way

In [6]:
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_vertexai import ChatVertexAI

In [7]:
gemini_llm = ChatVertexAI(model="gemini-1.5-flash")

In [36]:
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant"),
        ("human", "{input}"),
        # Placeholders fill up a **list** of messages
        ("placeholder", "{agent_scratchpad}"),
    ]
)


agent = create_tool_calling_agent(gemini_llm, langchain_tool, prompt)
agent_executor = AgentExecutor(agent=agent, tools=langchain_tool)
agent_executor.invoke({"input": "Which tables are available in the thelook_ecommerce dataset ?"})

{'input': 'Which tables are available in the thelook_ecommerce dataset ?',
 'output': 'I am sorry, I cannot find the dataset `thelook_ecommerce` in the project `gcp-project-id`. Please check if the dataset name and project id are correct. \n'}

In [37]:
agent_executor.invoke({"input": f"Project id is {PROJECT_ID}"})

{'input': 'Project id is bigquery-public-data',
 'output': 'OK. What else can I do for you? \n'}

In [38]:
agent_executor.invoke({"input": "Which tables are available in the thelook_ecommerce dataset ?"})

{'input': 'Which tables are available in the thelook_ecommerce dataset ?',
 'output': 'I am sorry, I cannot find the dataset "thelook_ecommerce" in your project. Please double check the dataset name and project ID and try again. \n'}

#### Adding memory

In [46]:
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

In [9]:
memory = InMemoryChatMessageHistory(session_id="foo")

In [10]:
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        # First put the history
        ("placeholder", "{chat_history}"),
        # Then the new input
        ("human", "{input}"),
        # Finally the scratchpad
        ("placeholder", "{agent_scratchpad}"),
    ]
)

In [41]:
agent = create_tool_calling_agent(gemini_llm, langchain_tool, prompt)
agent_executor = AgentExecutor(agent=agent, tools=langchain_tool)

agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)

config = {"configurable": {"session_id": "foo"}}

In [42]:
agent_with_chat_history.invoke({"input": "Which tables are available in the thelook_ecommerce dataset ?"}, config)

{'input': 'Which tables are available in the thelook_ecommerce dataset ?',
 'chat_history': [],
 'output': 'The dataset `thelook_ecommerce` is not found in the `gcp-project-id` project. Please specify the correct dataset and project. \n'}

In [13]:
agent_with_chat_history.invoke({"input": f"Project id is {PROJECT_ID}"}, config)

{'input': 'Project id is bigquery-public-data',
 'chat_history': [HumanMessage(content='Which tables are available in the thelook_ecommerce dataset ?'),
  AIMessage(content='I am sorry, I cannot find the dataset thelook_ecommerce in your project. Please check if the dataset name is correct and if the dataset is available in your project. \n')],
 'output': 'The tables available in the thelook_ecommerce dataset are: distribution_centers, events, inventory_items, order_items, orders, products, users. \n'}

#### Chaining method

First we need to bind the tools to our LLM

In [40]:
gemini_with_tools = gemini_llm.bind_tools(langchain_tool)

Then we create a chain, which will be wrapped in a Runnable

In [45]:
chain = prompt | gemini_with_tools
memory = InMemoryChatMessageHistory(session_id="foo")
config = {"configurable": {"session_id": "foo"}}
chain_with_history = RunnableWithMessageHistory(
    chain,
    # Uses the get_by_session_id function defined in the example
    # above.
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)

resp = chain_with_history.invoke({"input": "What is the current CHF EUR exchange rate ?"}, config)
resp

AIMessage(content='', additional_kwargs={'function_call': {'name': 'get_exchange_rate_from_api', 'arguments': '{"currency_from": "CHF", "currency_to": "EUR"}'}}, response_metadata={'is_blocked': False, 'safety_ratings': [{'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability_label': 'NEGLIGIBLE', 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE'}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability_label': 'NEGLIGIBLE', 'blocked': False, 'severity': 'HARM_SEVERITY_LOW'}, {'category': 'HARM_CATEGORY_HARASSMENT', 'probability_label': 'NEGLIGIBLE', 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE'}, {'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT', 'probability_label': 'NEGLIGIBLE', 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE'}], 'usage_metadata': {'prompt_token_count': 185, 'candidates_token_count': 17, 'total_token_count': 202}}, id='run-9ad6b7e3-1314-442b-be35-e3b6d9864c3c-0', tool_calls=[{'name': 'get_exchange_rate_from_api', 'args': {'currency_from': 

Are we done ? No. The LLM correcty guessed the function to call, but we need to execute the function now !

In [47]:
from langchain_core.messages import AIMessage

def call_tools(msg: AIMessage) -> list[dict]:
    """Simple sequential tool calling helper."""
    tool_map = {tool.name: tool for tool in langchain_tool}
    tool_calls = msg.tool_calls.copy()
    for tool_call in tool_calls:
        tool_call["output"] = tool_map[tool_call["name"]].invoke(tool_call["args"])
    return tool_calls

chain = prompt | gemini_with_tools | call_tools
memory = InMemoryChatMessageHistory(session_id="foo")
chain_with_history = RunnableWithMessageHistory(
    chain,
    # Uses the get_by_session_id function defined in the example
    # above.
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)

chain_with_history.invoke({"input": "What is the current CHF EUR exchange rate ?"}, config)



[{'name': 'get_exchange_rate_from_api',
  'args': {'currency_from': 'CHF', 'currency_to': 'EUR'},
  'id': '81bc85ea-dfd4-4c01-85e8-f3ca592fff5b',
  'type': 'tool_call',
  'output': '{"amount":1.0,"base":"USD","date":"2024-11-20","rates":{"EUR":0.94679}}'}]

#### Human in the loop validation

In [50]:
def human_approval(msg: AIMessage) -> AIMessage:
    """Responsible for passing through its input or raising an exception.

    Args:
        msg: output from the chat model

    Returns:
        msg: original output from the msg
    """
    for tool_call in msg.tool_calls:
        print(f"I want to use function [{tool_call.get('name')}] with the following parameters :")
        for k,v in tool_call.get('args').items():
            print(" {} = {}".format(k, v))
            
    print("")
    input_msg = (
        f"Do you approve (Y|y)?\n\n"
        ">>>"
    )
    resp = input(input_msg)
    if resp.lower() not in ("yes", "y"):
        raise ValueError(f"Tool invocations not approved")
    return msg

In [52]:

chain = prompt | gemini_with_tools | human_approval | call_tools
memory = InMemoryChatMessageHistory(session_id="foo")
chain_with_history = RunnableWithMessageHistory(
    chain,
    # Uses the get_by_session_id function defined in the example
    # above.
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)

chain_with_history.invoke({"input": "What is the current USD to EUR exchange rate ?"}, config)

I want to use function [get_exchange_rate_from_api] with the following parameters :
 currency_from = USD
 currency_to = EUR



Do you approve (Y|y)?

>>> y




[{'name': 'get_exchange_rate_from_api',
  'args': {'currency_from': 'USD', 'currency_to': 'EUR'},
  'id': '169200f6-9319-44ad-89ba-96dd7c2d893e',
  'type': 'tool_call',
  'output': '{"amount":1.0,"base":"USD","date":"2024-11-20","rates":{"EUR":0.94679}}'}]

### Adding a tool from langchain community

In [48]:
memory = InMemoryChatMessageHistory()
agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)
agent_with_chat_history.invoke({"input": "What was the result of Rafael Nadal's latest game ?"}, config)

{'input': "What was the result of Rafael Nadal's latest game ?",
 'chat_history': [],
 'output': "I am sorry, I do not have access to real-time information, including sports results. To find out the result of Rafael Nadal's latest game, I recommend checking a reputable sports website or news source. \n"}

In [56]:
from langchain_community.utilities import GoogleSerperAPIWrapper

search = GoogleSerperAPIWrapper(serper_api_key=os.environ.get('SERPER_API_KEY'))

@tool
def google_search(query: str):
    """
    Perform a search on Google
    Args:
        query: the information to be retrieved with google search
    """
    return search.run(query)

langchain_tool = [
    list_datasets,
    list_tables,
    get_exchange_rate_from_api,
    google_search
]
agent = create_tool_calling_agent(gemini_llm, langchain_tool, prompt)
agent_executor = AgentExecutor(agent=agent, tools=langchain_tool)

memory = InMemoryChatMessageHistory()
agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    lambda session_id: memory,
    input_messages_key="input",
    history_messages_key="chat_history",
)

In [57]:
agent_with_chat_history.invoke({"input": "What was the result of Rafael Nadal's latest game ?"}, config)

{'input': "What was the result of Rafael Nadal's latest game ?",
 'chat_history': [],
 'output': "Rafael Nadal's last match was a loss to Botic van de Zandschulp in the Davis Cup. Spain was eliminated by the Netherlands. \n"}