# Skill 5: API Search - Make our bot to talk to any API

We have observed the remarkable synergy created by combining **OpenAI llms with intelligent agents and detailed prompts**. This powerful combination has consistently delivered impressive results. To further capitalize on this capability, we should aim to integrate it with various systems through API communication. Essentially, we can develop within this notebook what is referred to in OpenAI's ChatGPT as 'GPTs.'

Envision a bot that seamlessly integrates with:

- **CRM Systems:** Including Dynamics, Salesforce, and HubSpot.
- **ERP Systems:** Such as SAP, Dynamics, and Oracle.
- **CMS Systems:** Including Adobe, Oracle, and other content management platforms.

The objective is to connect our bot with data repositories, minimizing data duplication as much as possible. These systems typically offer APIs, facilitating programmatic data access.

In this notebook, we plan to develop an agent capable of querying an API to retrieve information and effectively answer questions.
This time we are going to use an open API for currency and digital coins pricing: https://docs.kraken.com/rest/#tag/Market-Data

In [1]:
import os
import json
import requests
from pydantic import BaseModel, Field, BaseModel
from typing import Optional, Dict, Any

from langchain_openai import AzureChatOpenAI
from langchain_core.tools import BaseTool, StructuredTool
from langchain_community.agent_toolkits.openapi.toolkit import RequestsToolkit
from langchain_community.utilities.requests import RequestsWrapper, TextRequestsWrapper

from langgraph.prebuilt import create_react_agent

from common.utils import num_tokens_from_string, reduce_openapi_spec
from common.prompts import APISEARCH_PROMPT_TEXT

from IPython.display import Markdown, HTML, display  

from dotenv import load_dotenv
load_dotenv("credentials.env")

def printmd(string):
    display(Markdown(string.replace("$","USD ")))


In [2]:
# Set the ENV variables that Langchain needs to connect to Azure OpenAI
os.environ["OPENAI_API_VERSION"] = os.environ["AZURE_OPENAI_API_VERSION"]
api_key = os.environ["VT_API_KEY"]
print(api_key)


ca89e780b9986e9d0cdb22ea621556f7b5bd86e0d4ad6ffc1f9bfcef0583c244


In [3]:
COMPLETION_TOKENS = 2000

# This notebook needs GPT-4-Turbo (context size of 128k tokens)
llm = AzureChatOpenAI(deployment_name=os.environ["GPT4o_DEPLOYMENT_NAME"], 
                      temperature=0.5, max_tokens=COMPLETION_TOKENS, 
                      streaming=True)

## The Logic

By now, you must infer that the solution for an API Agent has to be something like: give the API specification as part of the system prompt to the LLM , then let a graph agent plan for the right steps to formulate the API calls.<br>

Let's do that. But we must first understand the industry standards of Swagger/OpenAPI


## Introduction to OpenAPI (formerly Swagger)

The OpenAPI Specification, previously known as the Swagger Specification, is a specification for a machine-readable interface definition language for describing, producing, consuming and visualizing web services. Previously part of the Swagger framework, it became a separate project in 2016, overseen by the OpenAPI Initiative, an open-source collaboration project of the Linux Foundation.

OpenAPI Specification is an API description format for REST APIs. An OpenAPI file allows you to describe your entire API, including: Available endpoints (/users for example) and operations on each endpoint ( GET /users, POST /users), description, contact information, license, terms of use and other information.

### Let's get the OpenAPI (Swagger) spec from our desired API that we want to talk to
You can also download it from the Kraken website: https://docs.kraken.com/rest/

Let's see how big is this API specification:

In [83]:
LOCAL_FILE_PATH = "./data/openapi_kraken.json"

In [84]:
# Open and read the JSON file
with open(LOCAL_FILE_PATH, 'r') as file:
    spec = json.load(file)

In [85]:
# You can check the function "reduce_openapi_spec()" in utils.py
reduced_api_spec = reduce_openapi_spec(spec)

In [86]:
api_tokens = num_tokens_from_string(str(spec))
print("API spec size in tokens:",api_tokens)
api_tokens = num_tokens_from_string(str(reduced_api_spec))
print("Reduced API spec size in tokens:",api_tokens)

API spec size in tokens: 66629
Reduced API spec size in tokens: 57398


Sometimes it makes sense to reduce the size of the API Specs by using the `reduce_openapi_spec` function. It's optional.

#### NOTE: As you can see, a large context LLM is needed. `GPT4` or newer models are necessary for this notebook to run succesfully.

### Define Tools

In [87]:
# Most of APIs require Authorization tokens, so we construct the headers using a lightweight python request wrapper called RequestsWrapper
access_token = "ABCDEFG123456" 
headers = {"Authorization": f"Bearer {access_token}"}
requests_wrapper = RequestsWrapper(headers=headers)

toolkit = RequestsToolkit(
    requests_wrapper=RequestsWrapper(headers=headers),
    allow_dangerous_requests=True,
)

In [88]:
toolkit.get_tools()

[RequestsGetTool(requests_wrapper=TextRequestsWrapper(headers={'Authorization': 'Bearer ABCDEFG123456'}, aiosession=None, auth=None, response_content_type='text', verify=True), allow_dangerous_requests=True),
 RequestsPostTool(requests_wrapper=TextRequestsWrapper(headers={'Authorization': 'Bearer ABCDEFG123456'}, aiosession=None, auth=None, response_content_type='text', verify=True), allow_dangerous_requests=True),
 RequestsPatchTool(requests_wrapper=TextRequestsWrapper(headers={'Authorization': 'Bearer ABCDEFG123456'}, aiosession=None, auth=None, response_content_type='text', verify=True), allow_dangerous_requests=True),
 RequestsPutTool(requests_wrapper=TextRequestsWrapper(headers={'Authorization': 'Bearer ABCDEFG123456'}, aiosession=None, auth=None, response_content_type='text', verify=True), allow_dangerous_requests=True),
 RequestsDeleteTool(requests_wrapper=TextRequestsWrapper(headers={'Authorization': 'Bearer ABCDEFG123456'}, aiosession=None, auth=None, response_content_type='te

In [89]:
tools = toolkit.get_tools()

### Define Prompt

In [90]:
printmd(APISEARCH_PROMPT_TEXT)



## Source of Information
- You have access to an API to help answer user queries.
- Here is documentation on the API: {api_spec}

## On how to use the Tools
- You are an agent designed to connect to RestFul APIs.
- Given API documentation above, use the right tools to connect to the API.
- **ALWAYS** before giving the Final Answer, try another method if available. Then reflect on the answers of the two methods you did and ask yourself if it answers correctly the original question. If you are not sure, try another method.
- If you are sure of the correct answer, create a beautiful and thorough response using Markdown.
- **DO NOT MAKE UP AN ANSWER OR USE Pre-Existing KNOWLEDGE, ONLY USE THE RESULTS OF THE CALCULATIONS YOU HAVE DONE**. 
- Only use the output of your code to answer the question. 


### Create Graph

In [91]:
graph = create_react_agent(llm, tools, state_modifier=APISEARCH_PROMPT_TEXT.format(api_spec=reduced_api_spec))

### Run the Graph

In [92]:
async def stream_graph_updates_async(graph, user_input: str):
    inputs = {"messages": [("human", user_input)]}

    async for event in graph.astream_events(inputs, version="v2"):
        if (event["event"] == "on_chat_model_stream"):
            # Print the content of the chunk progressively
            print(event["data"]["chunk"].content, end="", flush=True)
        elif (event["event"] == "on_tool_start"  ):
            print("\n--")
            print(f"Calling tool: {event['name']} with inputs: {event['data'].get('input')}")
            print("--")

In [93]:
QUESTION = """
Tell me the price of bitcoin against USD , also the latest OHLC values for Ethereum,
also tell me the bid and ask for Euro.
"""

In [None]:
await stream_graph_updates_async(graph, QUESTION)


--
Calling tool: requests_get with inputs: {'url': 'https://api.kraken.com/0/public/Ticker?pair=XBTUSD'}
--

--
Calling tool: requests_get with inputs: {'url': 'https://api.kraken.com/0/public/OHLC?pair=ETHUSD'}
--

--
Calling tool: requests_get with inputs: {'url': 'https://api.kraken.com/0/public/Ticker?pair=EURUSD'}
--


**Great!!** we have now an API Agent, capable of reasoning until it can find the answer given an API documentation.

## Simple APIs

What happens if the API is quite basic, meaning it's just a simple endpoint without a Swagger/OpenAPI definition? Let’s consider the following example:

[CountdownAPI](https://www.countdownapi.com/) is a streamlined version of the eBay API, available as a paid service. We can test it using their demo query, which does not require any Swagger or OpenAPI specification. In this scenario, our main task is to create a tool that retrieves the results. We then pass these results to an agent for analysis, providing answers to user queries, similar to our approach with the Bing Search agent.

In the example below, there is no API specification, but the response from the API is rather lengthy.

In [None]:
# set up the request parameters
params = {
  'x-apikey': 'ca89e780b9986e9d0cdb22ea621556f7b5bd86e0d4ad6ffc1f9bfcef0583c244'

}
headers = {"accept": "application/json",
           'x-apikey': 'ca89e780b9986e9d0cdb22ea621556f7b5bd86e0d4ad6ffc1f9bfcef0583c244'}

# make the http GET request to Countdown API
api_result = requests.get('https://www.virustotal.com/api/v3/ip_addresses/116.203.4.0', headers=headers)

num_tokens = num_tokens_from_string(str(api_result.json())) # this is a custom function we created in common/utils.py
print("Token count:",num_tokens,"\n")  

# print the first 2000 characters of JSON response from Countdown API
print(json.dumps(api_result.json())[:2000], "...")

Token count: 5492 

{"data": {"id": "116.203.4.0", "type": "ip_address", "links": {"self": "https://www.virustotal.com/api/v3/ip_addresses/116.203.4.0"}, "attributes": {"last_https_certificate": {"size": 1059, "public_key": {"algorithm": "EC", "ec": {"oid": "secp256r1", "pub": "3059301306072a8648ce3d020106082a8648ce3d03010703420004f0a67a8c8d13c3069be7c01392c20cebf38553f8b42ce2c0417990882af9558c66eb8d430161d5da5d3538f1ccb3354ebfbfcf4ab4d6ef948f8696eb97135bb0"}}, "thumbprint_sha256": "3556bfff78204b8b3053fabfe2afa22fbba9e95b86e7553d22eb1487d9f632be", "cert_signature": {"signature_algorithm": "sha256RSA", "signature": "66fb79fb4d348f931aa2e14ca87c14be456b085488827ea472a058941cd05612cc72dd61709c41fef8472b7357771c063eaa3626dfa367afd07639c384a46d90a1e8b67078df3fd02dfb7eebf95f76516162341d6dc3c05af73af9d85e60cea0452f79677192b9aa2bd7f77a2ccac57aa750109427891fb746cc857303a6dc37d87dcae41107d297cce2866d15c01e17aceae37f3d0c6d8dd7715302480b37dd24e46c3fdb3d1fd4967d0afbebb2e40573efe0587ef3d3179dad58c6

So, the answer from this product query (the demo only works with 'memory cards' - you will need to sign up for their trial if you want to try any query with an API key), is about ~17k tokens. When combined with the prompt, we won't have any other option than to use GPT-4 models. 

### Define a custom tool to call the API endpoint

In [None]:
class MySimpleAPISearch(BaseTool):
    """Tool for simple API calls that doesn't require OpenAPI 3.0 specs"""
    
    name:str = "apisearch"
    description:str = "useful when the questions includes the term: apisearch.\n"

    api_key: str
    
    def _run(self, query: str) -> str:
        
        params = {
          'api_key': self.api_key,
          'type': 'search',
          'ebay_domain': 'ebay.com',
          'search_term': query
        }
        headers = {"accept": "application/json",
           'x-apikey': 'ca89e780b9986e9d0cdb22ea621556f7b5bd86e0d4ad6ffc1f9bfcef0583c244'}

        # make the http GET request to Countdown API
        api_result = requests.get('https://www.virustotal.com/api/v3/ip_addresses/116.203.4.0', headers=headers)
        
        try:
            response = json.dumps(api_result.json())
        except Exception as e:
            response = e
        
        return response
            
    async def _arun(self, query: str) -> str:
        """Use the tool asynchronously."""
        raise NotImplementedError("This Tool does not support async")

In [None]:
tools = [MySimpleAPISearch(api_key='demo')]
print(MySimpleAPISearch(api_key='demo'))
graph = create_react_agent(llm, tools, state_modifier=APISEARCH_PROMPT_TEXT.format(api_spec="API provided by the tool"))
print(APISEARCH_PROMPT_TEXT.format(api_spec="API provided by the tool"))

api_key='demo'


## Source of Information
- You have access to an API to help answer user queries.
- Here is documentation on the API: API provided by the tool

## On how to use the Tools
- You are an agent designed to connect to RestFul APIs.
- Given API documentation above, use the right tools to connect to the API.
- **ALWAYS** before giving the Final Answer, try another method if available. Then reflect on the answers of the two methods you did and ask yourself if it answers correctly the original question. If you are not sure, try another method.
- If you are sure of the correct answer, create a beautiful and thorough response using Markdown.
- **DO NOT MAKE UP AN ANSWER OR USE Pre-Existing KNOWLEDGE, ONLY USE THE RESULTS OF THE CALCULATIONS YOU HAVE DONE**. 
- Only use the output of your code to answer the question. 



In [None]:
from typing import Optional, Dict, Any
from pydantic import Field
from langchain.tools import BaseTool
import requests
import json

class MySimpleAPISearch(BaseTool):
    name: str = "virustotal_ip_lookup"
    description: str = "Consulta de información de una IP en VirusTotal"
    
    base_url_template: str = Field(..., description="URL con marcador {query} para la IP")
    api_key: Optional[str] = Field(default=api_key, description="Clave de API de VirusTotal")
    headers: Dict[str, str] = Field(default_factory=dict, description="Headers personalizados")
    auth_type: str = Field(default='bearer', description="Tipo de autenticación")

    def _run(self, query: str) -> str:
        url = self.base_url_template.format(query=query)

        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return json.dumps(response.json(), indent=2)
        except requests.exceptions.RequestException as e:
            return json.dumps({
                "error": "Error en la solicitud de búsqueda API",
                "details": str(e),
                "status_code": getattr(e.response, 'status_code', None),
                "response_text": getattr(e.response, 'text', None),
                "headers": self.headers,
                "url": url
            }, indent=2)

    async def _arun(self, query: str) -> str:
        raise NotImplementedError("Búsqueda asíncrona no implementada.")


In [None]:
tools = [
    MySimpleAPISearch(
        base_url_template='https://www.virustotal.com/api/v3/ip_addresses/{query}',
        headers={'Accept': 'application/json',
                 'x-apikey': api_key},
        auth_type='bearer'
    )
]


graph = create_react_agent(llm, tools, state_modifier=APISEARCH_PROMPT_TEXT.format(api_spec="API provided by the tool"))

In [4]:
##ALL VT

from typing import Optional, Dict, Any
from pydantic import Field
from langchain.tools import BaseTool
import requests
import json
import base64

class MySimpleAPISearch(BaseTool):
    name: str = "virustotal_lookup"
    description: str = "Consulta de información en VirusTotal (IP, hash, URL, dominio)"
    
    base_url_template: str = Field(..., description="URL base con marcador {query} para la búsqueda")
    api_key: Optional[str] = Field(default=None, description="Clave de API de VirusTotal")
    headers: Dict[str, str] = Field(default_factory=dict, description="Headers personalizados")
    auth_type: str = Field(default='bearer', description="Tipo de autenticación")
    search_type: str = Field(..., description="Tipo de búsqueda: 'ip', 'hash', 'url', 'domain'")
    
    def _get_url(self, query: str) -> str:
        """
        Obtiene la URL correcta dependiendo del tipo de búsqueda.
        """
        if self.search_type == 'url':
            # Codificar la URL en Base64
            query = base64.urlsafe_b64encode(query.encode('utf-8')).decode('utf-8').strip("=")
            return self.base_url_template.format(query=query)  # URL para URLs codificadas
        elif self.search_type in ['ip', 'hash', 'domain']:
            return self.base_url_template.format(query=query)  # URL para IP, hash, dominio
        else:
            raise ValueError(f"Tipo de búsqueda no soportado: {self.search_type}")

    def _run(self, query: str) -> str:
        url = self._get_url(query)



        try:
            # Realizar la solicitud a la API de VirusTotal
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return json.dumps(response.json(), indent=2)
        except requests.exceptions.RequestException as e:
            return json.dumps({
                "error": "Error en la solicitud de búsqueda API",
                "details": str(e),
                "status_code": getattr(e.response, 'status_code', None),
                "response_text": getattr(e.response, 'text', None),
                "headers": self.headers,
                "url": url
            }, indent=2)

    async def _arun(self, query: str) -> str:
        raise NotImplementedError("Búsqueda asíncrona no implementada.")


In [17]:
# tools = [
#     MySimpleAPISearch(
#         base_url_template='https://www.virustotal.com/api/v3/ip_addresses/{query}',
#         headers={'Accept': 'application/json',
#                  'x-apikey': api_key},
#         auth_type='bearer'
#     )
# ]

tools = [
    # MySimpleAPISearch(
    #     base_url_template='https://www.virustotal.com/api/v3/ip_addresses/{query}',
    #     headers={'Accept': 'application/json',
    #              'x-apikey': api_key},
    #     auth_type='bearer',
    #     search_type='ip'  # Para buscar por IP
    # )
    # MySimpleAPISearch(
    #     base_url_template='https://www.virustotal.com/api/v3/files/{query}',
    #     headers={'Accept': 'application/json',
    #              'x-apikey': api_key},
    #     auth_type='bearer',
    #     search_type='hash'  # Para buscar por hash
    # ),
    MySimpleAPISearch(
        base_url_template='https://www.virustotal.com/api/v3/urls/{query}',
        headers={'Accept': 'application/json',
                 'x-apikey': api_key},
        auth_type='bearer',
        search_type='url'  # Para buscar por URL (recuerda que la URL debe estar codificada en Base64)
    ),
    # MySimpleAPISearch(
    #     base_url_template='https://www.virustotal.com/api/v3/domains/{query}',
    #     headers={'Accept': 'application/json',
    #              'x-apikey': api_key},
    #     auth_type='bearer',
    #     search_type='domain'  # Para buscar por dominio
    # )
]



graph = create_react_agent(llm, tools, state_modifier=APISEARCH_PROMPT_TEXT.format(api_spec="API provided by the tool"))

In [62]:
import re
import base64
from typing import Optional, Dict, Any
from pydantic import Field
from langchain.tools import BaseTool
import requests
import json

class MySimpleAPISearch(BaseTool):
    name: str = "virustotal_lookup"
    description: str = "Consulta dinámica de información en VirusTotal (IP, hash, URL, dominio)"
    
    base_url_template: str = Field(default='https://www.virustotal.com/api/v3/', description="URL base con marcador {query} para la búsqueda")
    api_key: Optional[str] = Field(default=None, description="Clave de API de VirusTotal")
    headers: Dict[str, str] = Field(default_factory=dict, description="Headers personalizados")
    auth_type: str = Field(default='bearer', description="Tipo de autenticación")
    
    def _get_url(self, query: str) -> str:
        """
        Obtiene la URL correcta dependiendo del tipo de búsqueda.
        """
        search_type = identify_search_type(query)

        if search_type == 'urls':
            # Codificar la URL en Base64
            query = base64.urlsafe_b64encode(query.encode('utf-8')).decode('utf-8').strip("=")
            return f'{self.base_url_template}urls/{query}'  # URL para URLs codificadas
        elif search_type in ['ip_addresses', 'files', 'domains']:
            return f'{self.base_url_template}{search_type}/{query}'  # URL para IP, hash, dominio
        else:
            raise ValueError(f"Tipo de búsqueda no soportado: {search_type}")

    def _run(self, query: str) -> str:
        url = self._get_url(query)

        try:
            # Realizar la solicitud a la API de VirusTotal
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return json.dumps(response.json(), indent=2)
        except requests.exceptions.RequestException as e:
            return json.dumps({
                "error": "Error en la solicitud de búsqueda API",
                "details": str(e),
                "status_code": getattr(e.response, 'status_code', None),
                "response_text": getattr(e.response, 'text', None),
                "headers": self.headers,
                "url": url,
                "query": query
            }, indent=2)

    async def _arun(self, query: str) -> str:
        raise NotImplementedError("Búsqueda asíncrona no implementada.")

# Función para identificar el tipo de entrada
def identify_search_type(query: str) -> str:
    # Expresión regular para identificar IP
    ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
    # Expresión regular para identificar URL
    url_pattern = r'^(https?://)?([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$'
    # Expresión regular para identificar hash (MD5, SHA1, SHA256)
    hash_pattern = r'^[a-f0-9]{32}$|^[a-f0-9]{40}$|^[a-f0-9]{64}$'
    # Expresión regular para identificar dominio (similar a la URL, pero sin protocolo)
    domain_pattern = r'^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$'

    # Identificación del tipo de entrada
    if re.match(ip_pattern, query):
        return 'ip_addresses'
    elif re.match(url_pattern, query):
        return 'urls'
    elif re.match(hash_pattern, query):
        return 'files'
    elif re.match(domain_pattern, query):
        return 'domains'
    else:
        return 'unknown'


In [63]:



tools = [
    MySimpleAPISearch(
        headers={'Accept': 'application/json', 'x-apikey': api_key},
        auth_type='bearer'
    )
]

graph = create_react_agent(llm, tools, state_modifier=APISEARCH_PROMPT_TEXT.format(api_spec="API provided by the tool"))

This time let's use the .stream() method 

In [72]:
QUESTION = 'Give me information relevant about the 113.221.24.74:51241/Mozi.m, what is the score? ISP? is it on black list? wich? At the end make and evaluation and provide a veredict if it is malicious or not'

In [68]:
def print_stream(stream):
    for s in stream:
        message = s["messages"][-1]
        if isinstance(message, tuple):
            print(message)
        else:
            message.pretty_print()

In [73]:
inputs = {"messages": [("user", QUESTION)]}

print(tools)

print_stream(graph.stream(inputs, stream_mode="values"))

[MySimpleAPISearch(headers={'Accept': 'application/json', 'x-apikey': 'ca89e780b9986e9d0cdb22ea621556f7b5bd86e0d4ad6ffc1f9bfcef0583c244'})]

Give me information relevant about the 113.221.24.74:51241/Mozi.m, what is the score? ISP? is it on black list? wich? At the end make and evaluation and provide a veredict if it is malicious or not
Tool Calls:
  virustotal_lookup (call_uyNLvtveQV67QrsYhozd7UeR)
 Call ID: call_uyNLvtveQV67QrsYhozd7UeR
  Args:
    query: 113.221.24.74:51241/Mozi.m
Name: virustotal_lookup

Error: ValueError('Tipo de búsqueda no soportado: unknown')
 Please fix your mistakes.
Tool Calls:
  virustotal_lookup (call_mRKawkQd9tLPrcBWihkTuUNN)
 Call ID: call_mRKawkQd9tLPrcBWihkTuUNN
  Args:
    query: 113.221.24.74
Name: virustotal_lookup

{
  "data": {
    "id": "113.221.24.74",
    "type": "ip_address",
    "links": {
      "self": "https://www.virustotal.com/api/v3/ip_addresses/113.221.24.74"
    },
    "attributes": {
      "last_analysis_results": {
        "Acronis":

# Summary

In this notebook, we learned about how to create very smart API agents for simple or complex APIs that use Swagger or OpenAPI specifications.
We see, again, that the key to success is to use: Agents with Expert tools + GPT-4 + good prompts.

As homework, try to create a shopping assistant for Etsy e-commerce site using the following API spec: (you will need to register for free and create an API-Key)

- https://developers.etsy.com/documentation/
- https://www.etsy.com/openapi/generated/oas/3.0.0.json

# NEXT

The Next Notebook will guide you on how to add vision and audio to our engine.