# Herramientas

**Sumario**
1. Clase `Tool`
<br></br>
2. Clase `StructuredTool`
<br></br>
3. Subclase de `BaseTool`
   1. Único parámetro
   2. Múltiples parámetros
   3. Ejemplo: Agente con reconocimiento de imágenes

In [None]:
import openai
import os
from langchain.llms import AzureOpenAI

# Lee la clave de API desde el archivo de configuración
with open('config.txt') as f:
    config = dict(line.strip().split('=') for line in f)

openai.api_type = "azure"
openai.api_base = "https://gpt3tests.openai.azure.com/"
openai.api_version = "2022-12-01"
openai.api_key = config.get("OPENAI_API_KEY", "")

# Nombre del despliegue en mi Azure OpenAI Studio is "Davinci003", el modelo es "text-davinci-003"
engine = "Davinci003"
model = "text-davinci-003"
openai_api_version = "2023-12-01" 

# Nombre del despliegue en mi Azure OpenAI Studio para el modelo de embeddings es "TextEmbeddingAda002"
embeddings_engine = "TextEmbeddingAda002"

max_tokens = 1000

llm = AzureOpenAI(
    azure_endpoint=openai.api_base, 
    azure_deployment=engine, 
    openai_api_key=config.get("OPENAI_API_KEY", ""), 
    openai_api_version=openai.api_version,
    # temperature=0, # Podemos poner la temperatura a 0 si queremos reducir la variabilidad de las respuestas
)
llm.openai_api_base = openai.api_base 
llm.max_tokens = max_tokens

In [None]:
from langchain.callbacks import get_openai_callback
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain

def call_agent_with_translate(agent, es_eng_chain, eng_es_chain, query):

    print(f"Input: {query}")

    with get_openai_callback() as cb:
        query_translated = es_eng_chain(query)["text"]
        query_translated = query_translated.replace('\n', '') # Simplemente para mostrarlo mejor en este ejemplo
        es_eng_token_count = cb.total_tokens
    print(f"Input (traducido al inglés): {query_translated}")

    with get_openai_callback() as cb:
        result = agent(query_translated)
        agent_token_count = cb.total_tokens
    print(f"Output: {result['output']}")

    with get_openai_callback() as cb:
        output_translated = eng_es_chain(result["output"])["text"]
        output_translated = output_translated.replace('\n', '') # Simplemente para mostrarlo mejor en este ejemplo
        eng_es_token_count = cb.total_tokens
    print(f"Output (traducido al español): {output_translated}")

    total_tokens = es_eng_token_count + agent_token_count + eng_es_token_count
    print(f'He usado un total de {total_tokens} tokens')

    return result

# Chain para traducir el texto de entrada de español a inglés
translate_eng_es_prompt = ChatPromptTemplate.from_template(
    "Translate the following spanish text to english (write only the translated text): {input}"
)
translate_eng_es_chain = LLMChain(llm=llm, prompt=translate_eng_es_prompt)
# Chain para traducir el texto de salida del modelo de inglés a español
translate_es_eng_prompt = ChatPromptTemplate.from_template(
    # "Traduce el siguiente texto a español, deja las operaciones matematicas sin modificar: {input}"
    "Si el siguiente texto esta en inglés, traducelo al español, en caso contrario, simplemente devuelve el mismo texto: {input}"
)
translate_es_eng_chain = LLMChain(llm=llm, prompt=translate_es_eng_prompt)

## 1 - Clase `Tool`

El enfoque más simple para definir una herramienta en Langchain. La dataclass Tool encapsula funciones que aceptan un único string de entrada y devuelven un string de salida.

`pip install wikipedia`

In [None]:
from langchain.tools import Tool

# Función propia mediante la cual devolvemos el valor hash de un string de entrada
def return_hash(input: str):
    return str(hash(input))

hash_tool = Tool.from_function(
    func=return_hash,
    name="Hash",
    description="Useful for when you need to estimate the hash of an input"
)

In [None]:
from langchain.utilities import WikipediaAPIWrapper

wikipedia = WikipediaAPIWrapper(
    doc_content_chars_max=2000
)
wikipedia_tool = Tool.from_function(
    func=wikipedia.run,
    name="Wikipedia",
    description="Wikipedia, useful for when you need to answer questions about history, science, biology, etc.",
)

In [None]:
from pydantic import BaseModel, Field
from langchain import LLMMathChain

llm_math_chain = LLMMathChain.from_llm(llm=llm, verbose=True)

calculator_tool = Tool.from_function(
    func=llm_math_chain.run,
    name="Calculator",
    description="Useful for when you need to answer questions about math",
)

In [None]:
from langchain.agents import AgentType, initialize_agent

tools = [hash_tool, wikipedia_tool, calculator_tool]

zero_shot_agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

result = call_agent_with_translate(
    zero_shot_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "¿Cuanta gente vive en Paris?"
)

In [None]:
result = call_agent_with_translate(
    zero_shot_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Cual es el resultado de multiplicar el hash de la palabra Everest por 0.005?"
)

In [None]:
everest_hash = return_hash('Everest')
print(f"Hash: {everest_hash}")
print(f"Resultado: {float(everest_hash) * 0.005}")

## 2 - Clase `StructuredTool`

Con la clase `Tool` podemos usar nuestras propias funciones, pero estamos limitados a `str -> str`. Si queremos utilizar funciones con argumentos más estructurados, podemos usar la clase `StructuredTool`.

In [None]:
from langchain.tools import StructuredTool

# Es importante que indiquemos el tipo de los parámetros ya que dan 
# información al agente
def avg_2_nums(num1: float, num2: float) -> float:
    return (num1 + num2) / 2

avg_2_nums_tool = StructuredTool.from_function(
    func=avg_2_nums,
    name="Average_two_numbers",
    description="Useful for when you need to estimate the average of two numbers",
)

# Las herramientas con multiples inputs sona dia de hoy solo compatiblse con el agente STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION
structured_agent_executor = initialize_agent(
    [avg_2_nums_tool],
    llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
)

In [None]:
print(structured_agent_executor.agent.llm_chain.prompt.messages[0].prompt.template)

In [None]:
result = call_agent_with_translate(
    structured_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Cual es la media de 3.7 y 7.75?"
)

Vamos a hacer un ejemplo un poco más complejo donde simulamos que enviamos un correo a una dirección y le vamos a poder al agente que por favor lo envie.

In [None]:
import time
import random

# Función donde simulamos el envio de un correo electronico cuyo éxito es del 50% 
# (para mostrar como se comporta el agente en cada caso)
def send_email(direccion: str, cabecero: str, mensaje: str) -> str:
    print(f"Enviando email a la dirección: {direccion}")

    if random.random() < 0.5:

        for i in range(3, 0, -1):
            print(f"Enviando, por favor espere... {i}")
            time.sleep(1)

        print("Enviado correctamente")
        print(f"Cabecero: {cabecero}")
        print(f"Mensaje: {mensaje}")

        return "Success"
    else:
        return "Error"

send_email("test@gmail.com", "Prueba", "Hola Mundo")

In [None]:
send_email_tool = StructuredTool.from_function(
    func=send_email,
    name="send_email",
    description="Useful for when you need to send an email to a specific adress",
)

structured_agent_executor = initialize_agent(
    [send_email_tool],
    llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
)

In [None]:
result = call_agent_with_translate(
    structured_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Envia un correo a test@gmail.com con el mensaje 'Hola, que tal?' y un cabecero del mail que consideres apropiado"
)

## 3 - Subclase de `BaseTool`

Otra opción es subclasificar directamente `BaseTool`. Esto es útil si queremos tener más control sobre las variables o si queremos propagar callbacks de llamada a cadenas anidadas u otras herramientas. 

Este enfoque se puede utilizar para definir herramientas que requieren uno o más parámetros.

### 3.1 - Único parámetro

Vamos a empezar con una herramienta simple donde estimamos la circunferencia de un círculo a partir de su radio.

<table>
    <tr>
        <td><img src="images_3_4/circunferencia.png" width="300"/></td>
    </tr>
</table>

La idea seria utilizar esta herramienta para responder preguntas como la siguiente:

```
Puedes decirme cual es la circunferencia de un círculo con radio 7.81mm?
```

In [None]:
from langchain.tools import BaseTool
from math import pi
from typing import Callable, Optional,  Union

class CircumferenceTool(BaseTool):
    name = "Circumference calculator"
    description = "use this tool when you need to calculate a circumference using the radius of a circle"

    def _run(self, radius: Union[int, float]):
        return float(radius)*2.0*pi

    def _arun(self, radius: int):
        raise NotImplementedError("This tool does not support async")

Al igual que vimos más arriba, LangChain requiere dos atributos para reconocer un objeto como una herramienta válida. Estos son los parámetros `name` y `description`. 

A continuación, tenemos dos métodos: `_run()` y `_arun()`. Cuando se usa una herramienta, se llama al método `_run()` de forma predeterminada. Se llama al método `_arun()` cuando se debe usar una herramienta de forma asincrónica.

A partir de aquí, creamos un agente conversacional simple.

In [None]:
from langchain.chains.conversation.memory import ConversationBufferWindowMemory
from langchain.agents import initialize_agent

circumference_conversational_memory = ConversationBufferWindowMemory(
    memory_key='chat_history',
    k=5, # Recuerda las 5 últimas interacciones humanas
    return_messages=True
)

# initialize agent with tools
circumference_agent_executor = initialize_agent(
    agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
    tools=[CircumferenceTool()],
    llm=llm,
    verbose=True,
    max_iterations=10,
    # early_stopping_method: Si se para el modelo antes de tiempo (por ejemplo si se ha metico en un bucle), 
    # el output resultante considera todo lo que ha hecho hasta el momento para dar una mejor respuesta
    early_stopping_method='generate', 
    memory=circumference_conversational_memory
)

In [None]:
result = call_agent_with_translate(
    circumference_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Puedes decirme cual es la circunferencia de un círculo con radio 7.81mm?"
)

Si nos encontrarnos con un `ValueError` porque el LLM decide pasar directamente "7.81mm" a la calculadora en lugar de "7.81".

Cuando una herramienta encuentra un error y la excepción no se captura, el agente dejará de ejecutarse. Si desea que el agente continúe su ejecución, puede lanzar un `ToolException` y configurar `handle_tool_error` en consecuencia.

Cuando se lanza un `ToolException`, el agente no deja de funcionar. En cambio, manejará la excepción de acuerdo con la variable `handle_tool_error` de la herramienta, y **el resultado del procesamiento se devolverá al agente como observación y se imprimirá en rojo**.

In [None]:
from langchain.tools.base import ToolException

def handle_error(error: ToolException) -> str:
    return f"The following errors occurred during tool execution:{error.args[0]}"

class SafeCircumferenceTool(BaseTool):
    name: str = "Circumference calculator with exception handling"
    description: str = "use this tool when you need to calculate a circumference using the radius of a circle."
    handle_tool_error: Optional[
        Union[bool, str, Callable[[ToolException], str]]
    ] = handle_error

    def _run(self, radius: Union[int, float]):
        try:
            return float(radius)*2.0*pi
        except ValueError as error:
            raise ToolException(error.args[0])

    def _arun(self, radius: int):
        raise NotImplementedError("This tool does not support async")

In [None]:
safe_circumference_conversational_memory = ConversationBufferWindowMemory(
    memory_key='chat_history',
    k=5, # Recuerda las 5 últimas interacciones humanas
    return_messages=True
)

# initialize agent with tools
safe_circumference_agent_executor = initialize_agent(
    agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
    tools=[SafeCircumferenceTool()],
    llm=llm,
    verbose=True,
    max_iterations=10,
    # early_stopping_method: Si se para el modelo antes de tiempo (por ejemplo si se ha metico en un bucle), 
    # el output resultante considera todo lo que ha hecho hasta el momento para dar una mejor respuesta
    early_stopping_method='generate', 
    memory=safe_circumference_conversational_memory
)

In [None]:
result = call_agent_with_translate(
    safe_circumference_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Puedes decirme cual es la circunferencia de un círculo con radio 7.81mm?"
)

Podemos ver que el agente no se bloquea en este ejemplo, pero tampoco puede comprender correctamente qué debe hacer con el error. Podríamos abordar este problema desde varias perspectivas diferentes:

* Reformular nuestra pregunta para que el agente pueda entenderla.
<br></br>
* Mejorar la herramienta para que preprocese automáticamente la entrada para evitar un ValueError. Por ejemplo, podemos poner un paso intermedio que traduzca el input que recibe el método en un valor numérico. Esto podria hacerse con un LLM.
<br></br>
* Generar prompts para el modelo para que pueda comprender cómo debe comportarse con un error de formato de este tipo. Por ejemplo, reintenandolo mediante una modificación del input

### 3.2 - Múltiples parámetros

En la *calculadora de circunferencias*, solo podíamos ingresar un único valor (es decir, el radio). Sin embargo, a menudo necesitaremos múltiples parámetros.

Para demostrar cómo hacerlo, construiremos una *calculadora de hipotenusas*. La herramienta nos ayudará a calcular la hipotenusa de un triángulo dada una combinación de longitudes de lados y/o ángulos del triángulo.

<table>
    <tr>
        <td><img src="images_3_4/hipotenusa.png" width="300"/></td>
    </tr>
</table>

Queremos múltiples entradas aquí porque calculamos la hipotenusa del triángulo con diferentes valores (los lados y el ángulo). Además, no necesitamos todos los valores. **Podemos calcular la hipotenusa con cualquier combinación de dos o más parámetros**.

Ejemplo de preguntas:

```
Si tengo un triángulo con dos lados de 10 cm y 6 cm de longitud, ¿cuál es la longitud de la hipotenusa?

Si tengo un triángulo cuyo 'lado adyacente' mide 10 cm de longitud y su ángulo es de 30.96 grados, ¿cuál es la longitud de la hipotenusa?

Si tengo un triángulo cuyo 'lado opuesto' mide 6 cm de longitud y su ángulo es de 30.96 grados, ¿cuál es la longitud de la hipotenusa?
```

In [None]:
import math

def angulos_triangulo_dos_lados(a, b):
    # Calcula la hipotenusa del triángulo rectángulo
    c = math.sqrt(a**2 + b**2)

    # Calcula los ángulos usando funciones trigonométricas
    angulo_alfa = math.degrees(math.atan2(b, a))
    angulo_rect = 90.0  # Dado que es un triángulo rectángulo
    angulo_theta = 180.0 - angulo_alfa - angulo_rect  # La suma de los ángulos en un triángulo es 180 grados
    
    angulo_adyacente = angulo_theta
    angulo_opuesto = angulo_alfa

    return angulo_adyacente, angulo_opuesto

a = 10 # lado adyacente
b = 6 # lado opuesto 

angulos = angulos_triangulo_dos_lados(a, b)
print(f"Ángulo adyacente (theta en nuestro diagrama): {angulos[0]:.2f} grados")

In [None]:
from typing import Optional
from math import sqrt, cos, sin, radians

desc = (
    "use this tool when you need to calculate the length of a hypotenuse"
    "given one or two sides of a triangle and/or an angle (in degrees). "
    "To use the tool, you must provide at least two of the following parameters "
    "['adjacent_side', 'opposite_side', 'angle']. Parameters are float numbers."
)

class HypotenuseTool(BaseTool):
    name = "hypotenuse_calculator"
    description = desc
    
    def _run(
        self,
        adjacent_side: Optional[Union[int, float]] = None,
        opposite_side: Optional[Union[int, float]] = None,
        angle: Optional[Union[int, float]] = None
    ):
        # check for the values we have been given
        if adjacent_side and opposite_side:
            return sqrt(float(adjacent_side)**2 + float(opposite_side)**2)
        elif adjacent_side and angle:
            return adjacent_side / cos(radians(float(angle)))
        elif opposite_side and angle:
            return opposite_side / sin(radians(float(angle)))
        else:
            return "Could not calculate the hypotenuse of the triangle. Need two or more of `adjacent_side`, `opposite_side`, or `angle`."
    
    def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

----

**Nota:** Al igual que con `StructuredTool.from_function()`, las herramientas con multiples inputs sona dia de hoy solo compatiblse con el agente `STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION`

----

In [None]:
hypotenuse_conversational_memory = ConversationBufferWindowMemory(
    memory_key='chat_history',
    k=5, # Remember 5 last human interactions
    return_messages=True
)

hypotenuse_agent_executor = initialize_agent(
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    tools=[HypotenuseTool()],
    llm=llm,
    verbose=True,
    max_iterations=3,
    early_stopping_method='generate',
    memory=hypotenuse_conversational_memory
)

In [None]:
result = call_agent_with_translate(
    hypotenuse_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Si tengo un triángulo con dos lados de 10 cm y 6 cm de longitud, ¿cuál es la longitud de la hipotenusa?"
)

In [None]:
result = call_agent_with_translate(
    hypotenuse_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Si tengo un triángulo cuyo 'lado adyacente' mide 10 cm de longitud y su ángulo es de 30.96 grados, ¿cuál es la longitud de la hipotenusa?"
)

In [None]:
result = call_agent_with_translate(
    hypotenuse_agent_executor, 
    translate_eng_es_chain,
    translate_es_eng_chain,
    "Si tengo un triángulo cuyo 'lado opuesto' mide 6 cm de longitud y su ángulo es de 30.96 grados, ¿cuál es la longitud de la hipotenusa?"
)

### 3.3 - Ejemplo: Agente de reconocimiento de imágenes

Si bien ya hemos visto dos ejemplos de cómo crear herramientas personalizadas con `BaseTool`, lo cierto es que su utilidad era algo limitada. Por ello, vamos mostrar un caso de uso "más potente".

Inspirándonos en el artículo ["HuggingGPT: Solving AI Tasks with ChatGPT and its Friends in Hugging Face"](https://arxiv.org/abs/2303.17580), tomaremos un modelo "experto" que ha sido entrenado en una tarea específica que nuestro LLM no puede realizar. En este caso vamos a usar [BLIP](https://arxiv.org/abs/2201.12086), un modelo desarrollado por Salesfore, el cual toma una imagen y la describe. [Este modelo se encuentra alojado de forma libre en HuggingFace](https://huggingface.co/Salesforce/blip-image-captioning-large).

`pip install transformers`

In [None]:
import torch
from transformers import BlipProcessor, BlipForConditionalGeneration


# specify model to be used
hf_model = "Salesforce/blip-image-captioning-base"
# use GPU if it's available
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# preprocessor will prepare images for the model
processor = BlipProcessor.from_pretrained(hf_model)
# then we initialize the model itself
model = BlipForConditionalGeneration.from_pretrained(hf_model).to(device)

El proceso que seguiremos aquí es el siguiente:

* Descargamos una imagen
* La abrimos como un objeto PIL en Python
* Cambia el tamaño y normaliza la imagen utilizando el "procesador" asociado a BLIP
* Pasamos la imagen procesada por el modelo BLIP

In [None]:
import requests
import urllib.request
from PIL import Image

# Descargamos la imagen
img_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/4/4c/Brillenkaiman_Caiman_yacare.jpg/1200px-Brillenkaiman_Caiman_yacare.jpg"
urllib.request.urlretrieve(img_url, "img.jpg")

# Cargamos la imagen
image = Image.open(requests.get(img_url, stream=True).raw).convert("RGB")
image

Veamos que es lo que dice nuestro modelo al "ver" la imagen:

In [None]:
# Procesamos la imagen
inputs = processor(image, return_tensors="pt").to(device)

# La pasamos por el modelo BLIP
out = model.generate(**inputs, max_new_tokens=20)
print(processor.decode(out[0], skip_special_tokens=True))

Con esto confirmamos que el proceso funciona correctamente. El siguiente paso es destilar los pasos que hemos seguido en una herramienta que el agente pueda utilizar:

In [None]:
class ImageCaptionTool(BaseTool):
    name = "image_captioner"
    description = (
        "use this tool when given the URL of an image that you'd like to be "
        "described. It will return a simple caption describing the image."
    )
    
    def _run(self, url: str):
        # Descargamla imagen y la convierte en un objeto PIL
        urllib.request.urlretrieve(url, "img.jpg")
        image = Image.open(requests.get(img_url, stream=True).raw).convert("RGB")
        # Preprocesa la imagen
        inputs = processor(image, return_tensors="pt").to(device)
        # Pasa el input por el modelo y general el caption
        out = model.generate(**inputs, max_new_tokens=20)
        caption = processor.decode(out[0], skip_special_tokens=True)
        return caption
    
    def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

In [None]:
blip_conversational_memory = ConversationBufferWindowMemory(
    memory_key='chat_history',
    k=5, # Remember 5 last human interactions
    return_messages=True
)

blip_agent_executor = initialize_agent(
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    tools=[ImageCaptionTool()],
    llm=llm,
    verbose=True,
    max_iterations=10,
    early_stopping_method='generate',
    memory=blip_conversational_memory
)

In [None]:
blip_agent_executor(f"What does this image show?\n{img_url}")