# Adrián Gayo Andrés

## Enunciado
Se propone la creación de un prototipo con varios
agentes que permita extraer publicaciones científicas de ArXiv y las publique en X.
En el ITCL se busca desarrollar una herramienta basada en un sistema de agentes cuyo
objetivo principal es permitir que los usuarios accedan a ArXiv a través de una API,
utilizando una interfaz de chat que funcione mediante lenguaje natural. Además, el sistema
debe ofrecer la posibilidad de que los usuarios publiquen, de forma sencilla y en cualquier
momento, los contenidos de una o varias investigaciones en la plataforma X. Este sistema
estará diseñado para ser semiautónomo, pero siempre bajo la supervisión directa del
usuario.


### Imports

In [None]:
# %pip install arxiv
# %pip install langchain
# %pip install langgraph
# %pip install chromadb 
# %pip install cohere
# %pip install tweepy 
# %pip install gradio
# %pip install openai

In [53]:
import arxiv
import langchain
import langgraph
import tweepy
import openai 
import chromadb
import cohere
import gradio
import os
from dotenv import load_dotenv
import pandas as pd

print(lc.__version__)


0.3.14


### ArXiV API prueba

In [2]:
# Iinicializamos el cliente de arXiv
arxiv_client = arxiv.Client()

# Query de ejemplo con la palabra "quantum."
search = arxiv.Search(
  query = "quantum",
  max_results = 10,
  sort_by = arxiv.SortCriterion.SubmittedDate
)

# Obtenemos los resultados de la búsqueda
results = arxiv_client.results(search)

# Mostremos los resultados.
for r in arxiv_client.results(search):
  print(r.title)
  

Parton distributions confront LHC Run II data: a quantitative appraisal
Quantum locally recoverable code with intersecting recovery sets
Purcell-Enhanced, Directional Light-Matter Interaction in a Waveguide-Coupled Nanocavity
Photonic chiral state transfer near the Liouvillian exceptional point
Sign Switching in Dark Sector Coupling Interactions as a Candidate for Resolving Cosmological Tensions
Fault-tolerant quantum simulation of generalized Hubbard models
The $q$-Racah polynomials from scalar products of Bethe states II
Asymptotic safety meets tensor field theory: towards a new class of gravity-matter systems
Static Born charges and quantum capacitance in metals and doped semiconductors
Near-Boundary Asymptotics and Unique Continuation for the AdS--Einstein--Maxwell System


In [11]:
# También podemos descargar el PDF de un artículo.
paper = next(arxiv_client.results(search))
# Creamos un directorio para almacenar los PDFs en caso de que no exista.
os.makedirs("pdfs", exist_ok=True)
# Download the PDF to the PWD with a custom filename.
paper.download_pdf(filename="pdfs/" + paper.title + ".pdf")

'./pdfs/Coming full circle -- A unified framework for Kochen-Specker contextuality.pdf'

### X API prueba (tweepy)

In [29]:
# Nos autenticamos en la API de Twitter
load_dotenv()
twitter_api_key = os.getenv("TWITTER_API_KEY")
twitter_api_secret = os.getenv("TWITTER_API_SECRET")
twitter_access_token = os.getenv("TWITTER_ACCESS_TOKEN")
twitter_access_token_secret = os.getenv("TWITTER_ACCESS_TOKEN_SECRET")
bearer_token = os.getenv("BEARER_TOKEN")
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
redirect_uri = "http://127.0.0.1:5000/callback" 
print(tw.__version__)

# v1.0
# auth = tw.OAuth1UserHandler(twitter_api_key, twitter_api_secret, twitter_access_token, twitter_access_token_secret)
# api = tw.API(auth, wait_on_rate_limit=True)
# api.verify_credentials()
# print("Autenticación exitosa")
# api.update_status(status="Hello, world!")
# print("Tweet publicado exitosamente")

# v2.0
auth = tweepy.OAuth2UserHandler(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope="tweet.write")
print(auth)

client_X = tweepy.Client(
        consumer_key=twitter_api_key,
        consumer_secret=twitter_api_secret,
        access_token=twitter_access_token,
        access_token_secret=twitter_access_token_secret,
    )



response = client_X.create_tweet(text="Hola mundo!", user_auth=True)
print(f"Tweet publicado exitosamente: {response.data}")



4.15.0
<tweepy.auth.OAuth2UserHandler object at 0x000001916E5D3F20>


Unauthorized: 401 Unauthorized
Unauthorized

### Inicialización del modelo

In [3]:
from langchain.llms import OpenAI
from langchain.embeddings import OpenAIEmbeddings

# Parámetros
MAX_TOKENS = 1000
TEMPERATURE = 0.3
MODEL= "gpt-3.5-turbo"
EMBEDDING_MODEL = "gpt-3.5-turbo"

# OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

# Inicializamos el modelo de openai que vamos a utilizar con langchain.
llm = OpenAI(
    model=MODEL,  
    temperature=TEMPERATURE,        
    max_tokens=MAX_TOKENS,        
    openai_api_key=openai_api_key  
)

# Inicializamos también el modelos de embeddings seleccionado.
embeddings = OpenAIEmbeddings(    # (openai embeddings)
    model=EMBEDDING_MODEL,  
    openai_api_key=openai_api_key    
)

#  alternativa para obtener embeddings
#

'sk-1234567890abcdef1234567890abcdef'

In [None]:
# Se define una función que permita monitorizar el coste de la llamada a la API de OpenAI.
from langchain.callbacks import get_openai_callback

def llamada_monitorizada(llm, texto):
    """ Función que realiza una llamada al modelo de lenguaje especificado y monitoriza el coste de la llamada.
    Args:
        llm (OpenAI): Modelo de lenguaje
        texto (str): Texto de entrada

    Returns:
        response (str): Texto de salida generado por el modelo de lenguaje
    """
    
    with get_openai_callback() as callback:
        response = llm(texto)
        
        # Detalles de uso y coste de la llamada.
        print(f"Tokens usados: {callback.total_tokens}")
        print(f"Coste de la llamada: ${callback.total_cost:.5f}")
        
        return response


### Declaración de herramientas

#### Funciones

In [20]:
def buscar_publicaciones_arxiv(consulta: str, max_resultados: int = 3) -> object:
    """
    Busca publicaciones científicas en arXiv según una consulta del usuario.
    
    Args:
        consulta (str): El término de búsqueda para arXiv.
        max_resultados (int): Número máximo de publicaciones a devolver.

    Returns:
        PDFs de las publicaciones encontradas.
    """
    try:
        # Iinicializamos el cliente de arXiv
        arxiv_client = arxiv.Client()

        # Realizar la consulta a arXiv
        search = arxiv.Search(
            query=consulta,
            max_results=max_resultados,
            sort_by=arxiv.SortCriterion.Relevance
        )
        
        results = arxiv_client.results(search)
        
        if not results:
            return "No se encontraron publicaciones."
        
        for paper in results:
            print(paper.title)
            
        return results
          
    except Exception as e:
        return f"Error al buscar en arXiv: {str(e)}"
    
    
def publicar_en_x(texto: str) -> str:
    """
    Publica un texto en X (Twitter) utilizando la API de Tweepy.
    
    Args:
        texto (str): El texto que se desea publicar en X.
        api_key (str): La API Key de la cuenta de desarrollador de Twitter.
        api_key_secret (str): El API Key Secret asociado.
        access_token (str): El Access Token del usuario.
        access_token_secret (str): El Access Token Secret asociado.
    
    Returns:
        str: Mensaje de éxito o error.
    """
    try:
        load_dotenv()
        twitter_api_key = os.getenv("TWITTER_API_KEY")
        twitter_api_secret = os.getenv("TWITTER_API_SECRET")
        twitter_access_token = os.getenv("TWITTER_ACCESS_TOKEN")
        twitter_access_token_secret = os.getenv("TWITTER_ACCESS_TOKEN_SECRET")

        #auth = tw.OAuth1UserHandler(twitter_api_key, twitter_api_secret, twitter_access_token, twitter_access_token_secret)
        #api = tw.API(auth, wait_on_rate_limit=True)
        x_client = tweepy.Client(consumer_key=twitter_api_key, consumer_secret=twitter_api_secret, access_token=twitter_access_token, access_token_secret=twitter_access_token_secret)
        
        response = x_client.create_tweet(
            text=texto
        )
        
        return response.status_code
    
    except Exception as e:
        return f"Error al publicar el tweet: {e.response.text}"

#### Tools

In [45]:
from langchain.tools import BaseTool


class SearchOnArxivTool(BaseTool):
    """Herramienta de búsqueda de artículos científicos en arXiv."""

    name: str = "SearchOnArxivTool"
    description: str = "Searches for scientific articles on arXiv."
    
    def _run(self, consulta: str, max_resultados: int = 3) -> str:
        """Synchronous logic for executing the tool."""
        try:
            return buscar_publicaciones_arxiv(consulta, max_resultados)
        except Exception as e:
            return f"An error occurred: {str(e)}"

    async def _arun(self, consulta: str, max_resultados: int = 3) -> str:
        """Asynchronous logic for executing the tool."""
        raise NotImplementedError("This tool does not support asynchronous operations.")


class TwitterPostTool(BaseTool):
    """Herramienta para publicar texto en X(Twitter)."""
    
    name: str = "TwitterPostTool"
    description: str = (
        "Posts a message on X(Twitter). "
        "Use this to share a message given a in your X account."
    )
    
    def _run(self, texto: str) -> str:
        """Synchronous logic for executing the tool."""
        try:
            return publicar_en_x(texto)
        except Exception as e:
            return f"An error occurred: {str(e)}"
    
    async def _arun(self, texto: str) -> str:
        """Asynchronous logic for executing the tool."""
        raise NotImplementedError("This tool does not support asynchronous operations.")

### Clases de los nodos

In [41]:
from typing_extensions import TypedDict
from typing import Literal, Sequence
from langchain.schema import BaseMessage, HumanMessage
from langchain.tools import Tool


class Router(TypeDict):
    """Clase para el nodo que enruta las solicitudes a las herramientas correspondientes."""
    
    next: Literal["SearchOnArxivTool", "RetrivalDocumentInformation" "TwitterPostTool", "FINISH"]
    

class AgentState(TypeDict):
    """Clase para el estado del agente."""
    
    messages: Sequence[BaseMessage]

NameError: name 'TypeDict' is not defined

### Grafo Multiagente

In [None]:
from langgraph.graph import StateGraph, MessagesState, START, END



builder = StateGraph(MessagesState)

# Añadimos los nodos de las herramientas al grafo.
builder.add_edge(START,"supervisor")

builder.add_node("supervisor", supervisor_node)
builder.add_edge("SearchOnArxivTool", search_on_arxiv_node)
builder.add_edge("RetrivalDocumentInformation", retrival_document_information_node)
builder.add_edge("TwitterPostTool", twitter_post_node)

graph = builder.compile()


TypeError: StateGraph.add_edge() missing 1 required positional argument: 'end_key'

### Visualización del Grafo

In [48]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception as e:
    print(f"Error al mostrar la imagen: {str(e)}")

Error al mostrar la imagen: 'Graph' object has no attribute 'get_graph'


### Generación de base de datos vectorial

In [12]:
# Se incializa el cliente y se crea una BBDD (collection)
chroma_client = chromadb.Client()

# Crea la colección
collection = chroma_client.create_collection(name="arxiv")

# Añade documentos a la colección
collection.add(
    documents=["ejemplo número 1", "ejemplo número 2", "ejemplo número 3"],
    metadatas=[{"source": "arxiv"}, {"source": "arxiv"}, {"source": "arxiv"}],
    ids=["1", "2", "3"]
)

### Front 

In [16]:

def yes_man(message, history):
    if message.endswith("?"):
        return "Yes"
    else:
        return "Ask me anything!"
    
def user_interface(user_input, history=[]):
    state= {"messages": history + [HumanMessage(content=user_input)]}
    results = graph.invoke(state)
    response = results["messages"][-1].content
    history.append((user_input, response))
    return response, history
   

demo = gradio.ChatInterface(
    fn=yes_man,
    title="MULTIAGENTE ARXIV RAG",
    description="Say hello to someone!",
)

demo.launch()




* Running on local URL:  http://127.0.0.1:7867

To create a public link, set `share=True` in `launch()`.


