# Architectural Diagram with LangGraph

This notebook sets up a conversation flow using LangGraph with multiple specialized agents.

In [None]:
from typing import Annotated, Sequence, TypedDict, Optional, List, Literal
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langgraph.graph.message import add_messages
import json

class GraphState(TypedDict):
   """Graph state for the LLM."""
   messages: List[BaseMessage]  # Conversation history
   next_step: str  # Next node to route to

## Environment Setup

Load API keys and other configurations

In [None]:
from dotenv import load_dotenv
import os
import requests

# Load environment variables
load_dotenv()

# API Keys
WEATHER_API_KEY = os.getenv("WEATHER_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
TUBE_API_KEY = os.getenv("TUBE_API_KEY")
SOURCE_COUNTRY_CODE = "cl"  # Default country code for news

## Agent Definitions

Define specialized agents for different tasks

In [None]:
from langchain_community.chat_models import ChatOllama 
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

weather_system_prompt = ChatPromptTemplate.from_messages(
   [
      (
         "system",
         """You are a helpful weather assistant. When users ask about the weather,
         get information for the specified city or location. Provide accurate and concise
         responses about weather conditions. If coordinates are needed, you can work with
         specific latitude and longitude values for cities."""
      ),
      MessagesPlaceholder(variable_name="messages"),
   ]
)

weather_llm = ChatOllama(
    model="llama3",
    base_url="http://localhost:11434",
    temperature=0,
    streaming=True,
    callbacks=[],
    verbose=True,
)

weather = weather_system_prompt | weather_llm.with_config(tags=["weather"])  

  weather_llm = ChatOllama(


In [None]:
financial_system_prompt = ChatPromptTemplate.from_messages(
   [
      (
         "system",
         """You are a helpful financial assistant that provides accurate financial information. 
         Respond to queries about economic indicators, exchange rates, and financial markets.
         Focus on providing current values, trends, and relevant context for financial data."""
      ),
      MessagesPlaceholder(variable_name="messages"),
   ]
)

finance_llm = ChatOllama(
    model="llama3",
    temperature=0,
    base_url="http://localhost:11434",
    streaming=True,
    callbacks=[],
    verbose=True,
)

financial = financial_system_prompt | finance_llm.with_config(tags=["financial"])

In [None]:
notice_system_prompt = ChatPromptTemplate.from_messages(
   [
      (
         "system",
         """You are a helpful news assistant that provides current news and information. 
         When users ask about news, provide relevant, accurate, and recent news stories.
         Focus on the most important details while being concise."""
      ),
      MessagesPlaceholder(variable_name="messages"),
   ]
)

notice_llm = ChatOllama(
    model="llama3",
    temperature=0,
    base_url="http://localhost:11434",
    streaming=True,
    callbacks=[],
    verbose=True,
)

notice = notice_system_prompt | notice_llm.with_config(tags=["notice"])

In [None]:
general_system_prompt = ChatPromptTemplate.from_messages(
   [
      (
         "system",
         """You are a helpful general assistant who can answer a wide variety of questions.
         For questions about weather, financial information, or news, you'll indicate that
         specialized assistants can provide better information."""
      ),
      MessagesPlaceholder(variable_name="messages"),
   ]
)

general_llm = ChatOllama(
    model="llama3",
    temperature=0,
    base_url="http://localhost:11434",
    streaming=True,
    verbose=True,
)

general = general_system_prompt | general_llm.with_config(tags=["general"])

In [None]:
supervisor_system_prompt = ChatPromptTemplate.from_messages(
   [
      (
         "system",
         """You are a supervisor that routes user queries to the appropriate specialized assistant.
         
         - 'weather': For questions about weather, temperature, climate conditions in any location
         - 'financial': For questions about financial indicators, dollar value, UF, economic data
         - 'notice': For requests about news, current events, or recent happenings
         - 'general': For greetings, general knowledge questions, or anything that doesn't fit the above
         
         Based on the user's message, output ONLY ONE of these exact values: 'weather', 'financial', 'notice', or 'general'.
         """
      ),
      MessagesPlaceholder(variable_name="messages"),
   ]
)

supervisor_llm = ChatOllama(
   model="llama3",
   temperature=0,
   verbose=True,
   base_url="http://localhost:11434",
)

supervisor = supervisor_system_prompt | supervisor_llm.with_config(tags=["supervisor"])# )



## Tool Functions

Define the tool functions for retrieving data

In [None]:
def get_weather(input_str: str) -> dict:
    """
    Search for the current weather in a given city.
    
    Args:
        input_str (str): A string containing the city name and optional coordinates.
            Example: '{"city": "Madrid"}' or '{"city": "Madrid", "lat": 40.4168, "lon": -3.7038}'
            
    Returns:
        dict: The weather data for the given city.
    """
    try:
        # Parse the input
        params = {}
        if isinstance(input_str, dict):
            params = input_str
        elif isinstance(input_str, str):
            # Clean up the input string - remove comments
            lines = []
            for line in input_str.split('\n'):
                # Remove anything after # (comments)
                if '#' in line:
                    line = line.split('#')[0]
                lines.append(line)
            clean_input = '\n'.join(lines)
            
            try:
                params = json.loads(clean_input)
            except json.JSONDecodeError:
                # Fall back to key=value parsing if not valid JSON
                parts = input_str.split(',')
                for part in parts:
                    if '=' in part:
                        key, value = part.split('=', 1)
                        key = key.strip()
                        value = value.strip().strip('\'"')
                        
                        # Convert numeric values
                        try:
                            if '.' in value and any(c.isdigit() for c in value):
                                params[key] = float(value)
                            elif value.isdigit():
                                params[key] = int(value)
                            else:
                                params[key] = value
                        except:
                            params[key] = value
        
        # Get parameters with defaults
        city = params.get('city', '')
        lat = params.get('lat')
        lon = params.get('lon')
        units = params.get('units', 'metric')
        lang = params.get('lang', 'es')
        
        # Validate parameters
        if not city and (lat is None or lon is None):
            return {"error": "Missing required parameters: city or both lat and lon must be provided"}
        
        # Prepare API request
        url = "https://api.openweathermap.org/data/2.5/weather"
        api_params = {
            "appid": WEATHER_API_KEY,
            "units": units,
            "lang": lang,
        }
        
        # Set coordinates or city
        if lat is not None and lon is not None:
            api_params["lat"] = float(lat)
            api_params["lon"] = float(lon)
            print(f"Using coordinates: Lat={lat}, Lon={lon}")
        else:
            api_params["q"] = city
            print(f"Using city name: {city}")
        
        # Make the request
        response = requests.get(url, params=api_params)
        response.raise_for_status()
        return response.json()
        
    except json.JSONDecodeError as e:
        print(f"JSON parsing error: {e}")
        return {"error": f"Invalid JSON format: {e}"}
    except requests.exceptions.RequestException as e:
        print(f"API request error: {e}")
        return {"error": f"API request failed: {e}"}
    except Exception as e:
        print(f"Unexpected error: {e}")
        return {"error": f"Error processing request: {e}"}

In [None]:
def get_indicator(indicator: str) -> dict:
    """Get financial information from Mindicador API.
    
    Args:
        indicator (str): The indicator to fetch (e.g., "dolar", "uf", "euro")
        
    Returns:
        dict: The data for the indicator
    """
    url = f"https://mindicador.cl/api/{indicator}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        
        if data.get("error") is None:
            return data
        else:
            return {"error": f"API error: {data.get('mensaje', 'Unknown error')}"}
            
    except requests.exceptions.RequestException as e:
        print(f"API request error: {e}")
        return {"error": f"API request failed: {e}"}
    except Exception as e:
        print(f"Unexpected error: {e}")
        return {"error": f"Error processing request: {e}"}

In [None]:
def get_notices():
    """Get recent news from the API.
    
    Returns:
        dict: News data with headlines and summaries
    """
    url = f"https://api.apitube.io/v1/news/everything?api_key={TUBE_API_KEY}&source.country.code={SOURCE_COUNTRY_CODE}"
    
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        
        if "error" not in data:
            # Extract only the essential information
            simplified_data = {
                "articles": [{
                    "title": article.get("title"),
                    "description": article.get("description"),
                    "url": article.get("url"),
                    "source": article.get("source", {}).get("name")
                } for article in data.get("articles", [])][:5]  # Limit to 5 articles
            }
            return simplified_data
        else:
            return {"error": "Error fetching news data"}
            
    except requests.exceptions.RequestException as e:
        print(f"API request error: {e}")
        return {"error": f"API request failed: {e}"}
    except Exception as e:
        print(f"Unexpected error: {e}")
        return {"error": f"Error processing request: {e}"}

## Node Functions

Define node functions for the LangGraph

In [None]:
def supervisor_router(state) -> Literal["weather", "financial", "notice", "general"]:
    """Route the conversation to the appropriate node based on supervisor's decision."""
    # Extract the supervisor's decision from the state
    messages = state["messages"]
    
    # Get the last message which should be from the supervisor
    last_message = messages[-1]
    content = last_message.content.lower().strip()
    
    # Map content to valid next nodes
    valid_nodes = ["weather", "financial", "notice", "general"]
    
    # Check if content exactly matches one of our valid nodes
    for node in valid_nodes:
        if node in content:
            return node
    
    # Default to general if no match is found
    return "general"

In [None]:
async def supervisor_node(state):
    """Supervisor node that decides which agent should handle the request."""
    messages = state["messages"]
    
    # Get the user's message
    user_message = next(
        (msg for msg in reversed(messages) if isinstance(msg, HumanMessage)), 
        None
    )
    
    if not user_message:
        # No user message found, default to general
        return {"messages": messages + [SystemMessage(content="general")]}
    
    # Create a message list for the supervisor with just the user's question
    supervisor_messages = [HumanMessage(content=user_message.content)]
    
    # Ask the supervisor to decide which agent should handle this
    response = await supervisor.invoke(supervisor_messages)
    
    # Return the decision in the messages
    return {"messages": messages + [SystemMessage(content=response.content)]}

async def weather_node(state):
    """Weather agent node that handles weather-related queries."""
    messages = state["messages"]
    response = await weather.invoke(messages)
    return {"messages": messages + [response]}

async def financial_node(state):
    """Financial agent node that handles financial queries."""
    messages = state["messages"]
    response = await financial.invoke(messages)
    return {"messages": messages + [response]}

async def notice_node(state):
    """News agent node that handles news-related queries."""
    messages = state["messages"]
    response = await notice.invoke(messages)
    return {"messages": messages + [response]}
    
async def general_node(state):
    """General agent node that handles all other queries."""
    messages = state["messages"]
    response = await general.invoke(messages)
    return {"messages": messages + [response]}

## Graph Definition

Define the graph structure for conversation flow

In [None]:
from langgraph.graph import StateGraph, END

# Define the graph
graph = StateGraph(GraphState)

# Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("weather", weather_node)
graph.add_node("financial", financial_node)
graph.add_node("notice", notice_node)
graph.add_node("general", general_node)

# Add edges
graph.set_entry_point("supervisor")
graph.add_edge("supervisor", supervisor_router)

# Each agent node connects to the end
graph.add_edge("weather", END)
graph.add_edge("financial", END)
graph.add_edge("notice", END)
graph.add_edge("general", END)

<langgraph.graph.state.StateGraph at 0x7ad2a0153bf0>

In [None]:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()

# Compile the graph
app = graph.compile(checkpointer=memory)

## Graph Visualization

Visualize the conversation flow graph

In [None]:
from IPython.display import Image, display
import tempfile
import os

try:
    # Try to visualize with native langgraph methods
    graph_img = graph.get_graph().draw_mermaid_png()
    display(Image(graph_img))
except Exception as e:
    print(f"Error generating image with draw_mermaid_png: {e}")
    
    try:
        # Alternative: Export to DOT and use Graphviz directly
        dot_graph = graph.get_graph().to_dot()
        
        # Save the dot in a temporary file
        with tempfile.NamedTemporaryFile(suffix='.dot') as tmp:
            tmp.write(dot_graph.encode('utf-8'))
            tmp.flush()
            
            # Generate PNG with graphviz from command line
            png_file = f"{tmp.name}.png"
            os.system(f"dot -Tpng {tmp.name} -o {png_file}")
            
            if os.path.exists(png_file):
                display(Image(filename=png_file))
                os.remove(png_file)  # Cleanup
            else:
                print("Could not generate image with Graphviz")
    except Exception as e2:
        print(f"Error with alternative visualization: {e2}")
        
    # Show basic graph info as fallback
    print("\nGraph structure:")
    print(f"Nodes: {[node for node in graph.nodes]}")
    print(f"Edges: {[(src, dst) for src, dst in graph.edges]}")

Error al generar imagen con draw_mermaid_png: 'StateGraph' object has no attribute 'get_graph'
Error con la alternativa de visualización: 'StateGraph' object has no attribute 'get_graph'

Estructura del grafo:
Nodos: ['weather', 'financial', 'notice']
Aristas: [('__start__', 'weather'), ('weather', 'financial'), ('notice', '__end__'), ('financial', 'notice')]


## Sample Usage

Test the conversation flow with sample inputs

In [None]:
from langchain_core.messages import HumanMessage

# Test function to interact with the graph
async def test_query(user_input: str):
    """Test a query through the conversation graph."""
    messages = [HumanMessage(content=user_input)]
    result = await app.ainvoke({"messages": messages})
    for msg in result["messages"]:
        if isinstance(msg, SystemMessage):
            print(f"\033[33m[System] {msg.content}\033[0m")
        elif not isinstance(msg, HumanMessage):
            print(f"\033[32m[Assistant] {msg.content}\033[0m")
    return result

Instala langchain-visualizer: pip install langchain-visualizer
{'edges': [('__start__', 'weather'),
           ('weather', 'financial'),
           ('notice', '__end__'),
           ('financial', 'notice')],
 'nodes': ['weather', 'financial', 'notice']}


In [None]:
# Test with various queries
test_queries = [
    "¿Qué tiempo hace hoy en Madrid?",
    "¿Cuál es el valor del dólar hoy?",
    "¿Qué noticias hay sobre política?",
    "Hola, ¿cómo estás?"
]

for query in test_queries:
    print(f"\n\033[1;34m[User Query] {query}\033[0m\n")
    await test_query(query)

## API Integration

Define functions to integrate with an API endpoint

In [None]:
from fastapi import FastAPI, WebSocket, Request
import asyncio
import json
from langchain_core.messages import HumanMessage

# This code would be used in a separate API file
async def process_message(message_content):
    """Process a message through the graph and return the response."""
    messages = [HumanMessage(content=message_content)]
    result = await app.ainvoke({"messages": messages})
    
    # Extract the assistant's response
    for msg in reversed(result["messages"]):
        if not isinstance(msg, (HumanMessage, SystemMessage)):
            return msg.content
    
    return "Lo siento, no pude procesar tu solicitud."