In [None]:
from dotenv import load_dotenv
import os

load_dotenv()

# Access the environment variables from the .env file
open_api_key = os.environ.get('OPENAI_API_KEY')
lang_api_key = os.environ.get('LANGCHAIN_API_KEY')
lang_tracing = os.environ.get('LANGCHAIN_TRACING_V2')


In [4]:
# Import relevant functionality
from pydantic import BaseModel, Field, StringConstraints
from typing import Literal
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, BaseMessage, SystemMessage
from langgraph.graph.message import add_messages

In [6]:
from pyvo import registry
from typing import Optional, Sequence
from typing_extensions import Annotated, TypedDict
from langchain.load.dump import dumps

srvs = Literal['sia', 'sia1', 'sia2', 'ssa', 'ssap', 'scs', 'conesearch', 'line', 'tap', 'table']
wavebands = Literal['EUV', 'Gamma-ray', 'Infrared', 'Millimeter', 'Neutrino', 'Optical', 'Photon', 'Radio', 'UV', 'X-ray']

class VoResource(TypedDict):
   res_title: str
   res_description: str
   content_level: str
   waveband: str
   created: str
   updated: str
   access_urls: str

class VoTable(BaseModel):
   Data: list[VoResource]

class RegistryResponse(BaseModel):
   """Respond to the user with this"""

   text_answer: str = Field(description="A concise answer to the user query. This message exist to present the user with the answer to their query, which will be a set of resources in a table. An example woudl be 'I have found 20 sources of information for your quey on asteroids'.")
   data_table: list[VoResource] = Field(description="A dictionary like object containing al data present in tha data table")

class RegistryConstraints(BaseModel):
   words: str = Field(description="Keywords that should be found in the resources of the registry")
   service: Optional[srvs] = Field(default=None, description="Service type that the resources found should serve. Could be any of: 'sia', 'sia1', 'sia2', 'ssa', 'ssap', 'scs', 'conesearch', 'line', 'tap', 'table'")
   waveband: Optional[wavebands] = Field(default=None, description="Waveband in which the resources data lies within")
   author: Optional[str] = Field(default=None, description="Author of (some of) the data found in the resources")
   ivoid: Optional[str] = Field(default=None, description="Exact if of the resource to be found")
   temporal: Optional[Annotated[str, StringConstraints(pattern=r'[0-9]{4}-[0-9]{2}-[0-9]{2}$')]] = Field(default=None, description="Time in which the resource was publiches or last updated")


@tool(args_schema=RegistryConstraints)
def get_registry(words: str, 
                 service: Optional[srvs] = None, 
                 waveband: Optional[wavebands] = None, 
                 author: Optional[str] = None, 
                 ivoid: Optional[str] = None, 
                 temporal: Optional[Annotated[str, StringConstraints(pattern=r'[0-9]{4}-[0-9]{2}-[0-9]{2}$')]] = None) -> list[dict]:
   """This function takes in contraints that are passed to a query against the virtual Observatory's Registry, a collection of metadata records of the resources in the VO. 
   The query returns a set of VO registry resources whose files and content match the given constraints.
   
   Returns:
      list[VoResource]: List of VoResources, dictionary like objects that describe each of the resources found in the query. The data they provide are: 
      resource title, resource description, the knowledge level of the resource's intended audience, the waveband of the resources, the created and last 
      updated dates of the resources, and the url's to access them.
   """

   args = {"keywords":words, "servicetype":service, "waveband":waveband, "author":author, "ivoid":ivoid, "temporal":temporal}
   resources = registry.search(**{k: v for k, v in args.items() if v is not None}, maxrec=300)
   res_table = resources.to_table()['res_title','res_description','content_level','waveband','created','updated','access_urls']
   print(f"Cantidad resources encontrados: ({len(resources)})")
   votable = []
   count = 10
   for res in res_table:
      if count <= 0:
         break
      # descriptions[res.short_name] = str(res.res_title) + "\n" + str(res.res_description)
      votable.append(VoResource(**res))
      count-=1
   print(f"Cantidad de resources a enviar al usuario: ({len(votable)})")
   return votable

In [7]:
# Inherit 'messages' key from MessagesState, which is a list of chat messages
class AgentState(MessagesState):
    """The state of the agent."""
    last_tool_called: Optional[str] = None  # Track the last tool called
    final_response: Optional[RegistryResponse] = None # Store structured output here if needed
    messages: Annotated[Sequence[BaseMessage], add_messages]


tools = [get_registry, RegistryResponse]

model = ChatOpenAI(model="gpt-3.5-turbo", verbose=True)

model_with_tools = model.bind_tools(tools, tool_choice="any")

In [8]:
# Define the function that calls the model
def call_model(state: AgentState):
    system_prompt = SystemMessage(
        """You are an assistant to astronomers and people interested in astronomy. Your speciality is
          aswering question related to the virtual Observatory and return data from it using the tools you will be provided with."""
    )
    response = model_with_tools.invoke([system_prompt] + state["messages"])
    # if hasattr(response, "tool_calls") and len(response.tool_calls) != 0:
    #     print("\n¡Llamó a la tool!\n")
    #     last_tool_called = response.tool_calls[-1]['name']
    #     print("La última tool llamada fué: ", last_tool_called, end="\n\n")
    #     return {"messages": [response], "last_tool_called": last_tool_called}
    return {"messages": [response]}


# Define the function that responds to the user
def respond(state: AgentState):
    # We call the model with structured output in order to return the same format to the user every time
    # state['messages'][-2] is the last ToolMessage in the convo, which we convert to a HumanMessage for the model to use
    # We could also pass the entire chat history, but this saves tokens since all we care to structure is the output of the tool
    response = RegistryResponse(**state["messages"][-1].tool_calls[0]["args"])
    print("Final structured response:", response)
    return {"final_response": response}


# Define the function that determines whether to continue or not
def should_continue(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    if len(last_message.tool_calls) == 1 and last_message.tool_calls[0]["name"] == "RegistryResponse":
        return "respond"
    else:
        return "continue"


# Define a new graph
workflow = StateGraph(AgentState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("respond", respond)
workflow.add_node("tools", ToolNode(tools))

# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")

# We now add a conditional edge
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "tools",
        "respond": "respond",
    },
)

workflow.add_edge("tools", "agent")
workflow.add_edge("respond", END)
graph = workflow.compile()

In [9]:
answer = graph.invoke(input={
  "messages": [("human", "I am loking for resources related to 'eta carina'")],
  "last_tool_called": "None"
})["final_response"]
answer

Cantidad resources encontrados: (33)
Cantidad de resources a enviar al usuario: (10)
Final structured response: text_answer="I have found 8 sources of information related to 'eta carina'. Here is a summary of the resources:" data_table=[{'res_title': 'Survey of faint OB stars in Carina', 'res_description': 'A finding list is presented for OB stars in a 75 square degree region centered on {eta} Carinae, observed with the the 50/65/175cm Schmidt telescope of the Uppsala Southern Station at Mount Stromlo, with an objective prism giving a dispersion of 470Angstrom/mm et H{gamma}. Completeness between 8.5 and 11.5 was the goal. Accurate positions were added by Brian Skiff (Lowell Observatory) on 2003-Aug-18 for non-HD/CPD stars, and on 2009-12-29 for all stars; the result is the file "positions.dat". Coordinates were found with VizieR usually from UCAC2 or Tycho-2. V magnitudes were added from published photoelectric data, ASAS-3, or Tycho-2. In the remarks the CD names were added if availa

RegistryResponse(text_answer="I have found 8 sources of information related to 'eta carina'. Here is a summary of the resources:", data_table=[{'res_title': 'Survey of faint OB stars in Carina', 'res_description': 'A finding list is presented for OB stars in a 75 square degree region centered on {eta} Carinae, observed with the the 50/65/175cm Schmidt telescope of the Uppsala Southern Station at Mount Stromlo, with an objective prism giving a dispersion of 470Angstrom/mm et H{gamma}. Completeness between 8.5 and 11.5 was the goal. Accurate positions were added by Brian Skiff (Lowell Observatory) on 2003-Aug-18 for non-HD/CPD stars, and on 2009-12-29 for all stars; the result is the file "positions.dat". Coordinates were found with VizieR usually from UCAC2 or Tycho-2. V magnitudes were added from published photoelectric data, ASAS-3, or Tycho-2. In the remarks the CD names were added if available for CPD stars. The implicit spectral type is \'OB\', but some follow-up observations from 