In [1]:
from langgraph.graph import StateGraph, END, START
from langchain.agents import initialize_agent, load_tools
from langchain.chat_models import ChatOpenAI
from typing import TypedDict
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
from langchain.tools import tool
import urllib.request
import json
import psycopg2
from langchain_experimental.sql import SQLDatabaseChain
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
import re, json, time, os
from datetime import date

ModuleNotFoundError: No module named 'langgraph'

# Notebook overview — imports and purpose

This cell documents the imports and the high-level role of this notebook.

- langgraph.graph.StateGraph, START, END: used to build a simple state-machine graph where nodes represent processing steps (used later to define the collector flow).
- langchain.agents.initialize_agent, load_tools: load and wire external tools (APIs, wrappers) into a LangChain-style agent.
- ChatOpenAI / ChatGroq / ChatGoogleGenerativeAI: LLM client wrappers used to instantiate a language model backend. The notebook uses `ChatGroq` for the main LLM.
- TypedDict: used to define a typed state dictionary for LangGraph nodes.
- numpy / pandas: data handling and preprocessing.
- sqlalchemy.create_engine, psycopg2: database connection and low-level DB access.
- langchain.tools.tool: decorator used to expose Python functions as agent tools.
- langchain_experimental.sql.SQLDatabaseChain, langchain_community.utilities.SQLDatabase, create_sql_query_chain: utilities for converting NL -> SQL and executing queries safely.
- urllib.request, json, requests (used later): HTTP calls / JSON parsing.
- re, time, os, datetime: small utilities used across tools.

Security note: several API keys and a DB connection string appear later in the notebook. Move secrets to environment variables (`os.environ`) or a secrets manager before production.


In [None]:
llm = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct",
               groq_api_key="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"
               )

# Define tools (APIs, DB connectors)
tools = load_tools(["serpapi", "requests_all"], 
                   serpapi_api_key = "49a312e94db629a1d7d4efa33647dc82322dd921680f5cbe1441de0aee587bbd",
                   allow_dangerous_tools=True
                   )

# 2️⃣ Create SQLAlchemy engine
engine = create_engine('postgresql+psycopg2://postgres:ElDiabloX32@localhost:5432/GISDb')
db = SQLDatabase(engine)
sql_chain = create_sql_query_chain(llm, db)


## LLM, tools and database setup

This block configures the language model, external tools, and the database connection used by the agent:

- `llm = ChatGroq(...)`: creates the LLM client used for prompting and SQL generation. Replace hard-coded API keys with environment variables before sharing or deploying.
- `tools = load_tools([...])`: registers external helper tools (e.g., web search via SerpAPI, HTTP via `requests_all`) so the agent can call them.
- `engine = create_engine(...)` and `db = SQLDatabase(engine)`: create an SQLAlchemy engine and wrap it with LangChain's `SQLDatabase` utility so higher-level chains can query the database.
- `sql_chain = create_sql_query_chain(llm, db)`: wires the LLM and DB together so later code can convert natural language questions into SQL safely.

Operational notes:
- Secrets (API keys, DB passwords) must live in `os.environ` or a secrets manager, not inside notebook cells.
- `SQLDatabase` is a wrapper — some code later expects `.run()` while other libraries expect a raw SQLAlchemy engine. Keep an eye for mismatches between `engine` vs `db` usage (see notes in the upload cell explanation).

Preprocessing

In [None]:
#Preprocess the data

def extract_date_or_placeholder(sunrise_val):
    if pd.isna(sunrise_val):
        return "###"  
    return sunrise_val.date()


def preprocess_weather_data_csv(df):

    df = df.drop(columns=['feelslike','feelslikemax','feelslikemin','dew','precipprob','precipcover','severerisk','stations','severerisk'])

    df["sunrise"] = pd.to_datetime(df["sunrise"], errors="coerce")
    df["datetime"] = df["sunrise"].apply(extract_date_or_placeholder)
    df["sunrise"] = df["sunrise"].dt.time
    
    if "sunset" in df.columns:
        df["sunset"] = pd.to_datetime(df["sunset"], errors="coerce").dt.time

    df["name"] = df["name"].astype(str).str.replace("", "")
    df["conditions"] = df["conditions"].astype(str).str.replace(",", "")
    df["country"] = "Sri Lanka"
    df.head()

    df = df.rename(columns={
    "name": "statedistrict",
    "precip": "rainsum",
    "preciptype": "rain",
    "tempmax": "tempmax",
    "tempmin": "tempmin",
    "temp": "temp",
    "humidity": "humidity",
    "snow": "snow",
    "snowdepth": "snowdepth",
    "windgust": "windgust",
    "windspeed": "windspeed",
    "winddir": "winddir",
    "sealevelpressure": "sealevelpressure",
    "cloudcover": "cloudcover",
    "visibility": "visibility",
    "solarradiation": "solarradiation",
    "solarenergy": "solarenergy",
    "uvindex": "uvindex",
    "sunrise": "sunrise",
    "sunset": "sunset",
    "moonphase": "moonphase",
    "conditions": "conditions",
    "description": "description",
    "icon": "icon",
    "country": "country"
    })

    for col in ['snow', 'rain']:
        # Convert existing values to boolean: True if any value exists, False if NaN or empty
        df[col] = df[col].apply(lambda x: True if pd.notna(x) and x != "" else False)


    output_path = "preprocessed_climate_dataset5.csv"
    df.to_csv(output_path, index=False)

    print("✅ Preprocessing completed. Saved to:", output_path)
    return df


## Preprocessing function — what it does and why

This block defines data cleaning and normalization before inserting weather records:

- `extract_date_or_placeholder`: helper that returns a date or a placeholder if sunrise is missing.
- `preprocess_weather_data_csv(df)`: main preprocessing routine. Steps:
  1. Drops a set of unused or duplicate columns to reduce noise.
  2. Parses `sunrise` (and optionally `sunset`) into datetime/time types and creates a `datetime` date column.
  3. Cleans text columns (`name` → `statedistrict`, `conditions`) and sets `country` to a default.
  4. Renames many incoming fields into your internal schema names.
  5. Converts `snow` and `rain` into booleans.
  6. Saves the cleaned dataset to `preprocessed_climate_dataset5.csv` and returns the DataFrame.

Edge cases and suggestions:
- If `sunrise` values are inconsistent, the `errors='coerce'` helps, but downstream code must handle `NaT`/placeholder values.
- If the input file already matches your schema, consider making the rename optional to avoid accidental data loss.

In [None]:
@tool("upload_to_postgresql", return_direct=True)
def upload_to_postgresql(file_path: str) -> str:
    """Upload a CSV file into the PostgreSQL database."""
    df = pd.read_csv(file_path)
    df = preprocess_weather_data_csv(df)
    
    # 1️⃣ Replace missing values
    # Text columns → "N/A"
    text_cols = ['statedistrict', 'conditions', 'description', 'icon', 'country']
    df[text_cols] = df[text_cols].fillna("N/A").replace("", "N/A")

    # Numeric columns → 0
    num_cols = [
        'tempmax', 'tempmin', 'temp', 'humidity', 'rainsum', 'snow', 'snowdepth',
        'windgust', 'windspeed', 'winddir', 'sealevelpressure', 'cloudcover',
        'visibility', 'solarradiation', 'solarenergy', 'uvindex', 'moonphase'
    ]
    df[num_cols] = df[num_cols].fillna(0)

    # Boolean columns → False
    bool_cols = ['rain', 'snow']
    df[bool_cols] = df[bool_cols].fillna(False)


    # Convert datetime/time columns
    df['datetime'] = pd.to_datetime(df['datetime']).dt.date
    df['sunrise'] = pd.to_datetime(df['sunrise'], format='%H:%M:%S').dt.time
    df['sunset']  = pd.to_datetime(df['sunset'], format='%H:%M:%S').dt.time

    # 3️⃣ Insert into PostgreSQL table
    df.to_sql('weather_data', db, if_exists='append', index=False)

## `upload_to_postgresql` tool — CSV → PostgreSQL

This `@tool`-decorated function is exposed to the agent to ingest a CSV and store weather rows:

- Flow:
  1. Reads CSV into pandas, calls the preprocessing routine.
  2. Fills missing values: text columns → "N/A", numeric → 0, booleans → False.
  3. Parses `datetime`, `sunrise`, and `sunset` into date/time objects.
  4. Calls `df.to_sql('weather_data', db, if_exists='append', index=False)` to insert rows.

Important implementation notes and potential bug:
- `pandas.DataFrame.to_sql()` usually accepts a SQLAlchemy engine/connection, not a LangChain `SQLDatabase` wrapper. If `db` is a `SQLDatabase` object, that call may fail at runtime. Use `engine` or `engine.connect()` with `to_sql`, or convert the DataFrame rows to parameterized INSERTs.
- Consider using parameterized inserts (psycopg2 or SQLAlchemy `execute_many`) for large files to improve performance and avoid SQL injection risks.
- Retain an audit log or deduplication key to avoid inserting duplicate rows on retries.

In [None]:
@tool("query_postgresql_tool", return_direct=True)
def query_postgresql_tool(question: str) -> str:
    """
    Safely convert a natural-language question into a SQL SELECT using LangChain's SQLDatabaseChain.
    """


    audit_path = os.environ.get("SQL_AUDIT_LOG", "sql_audit.log")

    # --- helper to clean SQL ---
    def extract_sql(text: str) -> str:
        # Capture SQL inside ```sql ... ```
        match = re.search(r"```sql\s+(.*?)\s+```", text, re.DOTALL | re.IGNORECASE)
        if match:
            return match.group(1).strip()
        # Otherwise, return the first SELECT/WITH onwards
        match = re.search(r"(select|with)\b.*", text, re.IGNORECASE | re.DOTALL)
        if match:
            return match.group(0).strip()
        return text.strip()

    # Step 1: generate SQL
    try:
        sql_raw = sql_chain.invoke({"question": question})
        sql_clean = extract_sql(str(sql_raw)).rstrip(";")
    except Exception as e:
        return json.dumps({"error": "sql_generation_failed", "details": str(e)})

    # Step 2: safety checks
    banned = r"\b(drop|delete|update|insert|alter|grant|truncate|create|replace|merge|shutdown)\b"
    if re.search(banned, sql_clean, flags=re.IGNORECASE):
        return json.dumps({"error": "disallowed_statement"})

    if not re.match(r"^\s*(select|with)\b", sql_clean, flags=re.IGNORECASE):
        return json.dumps({"error": "not_select", "raw": str(sql_raw)})

    # Step 3: enforce LIMIT
    if not re.search(r"\blimit\b", sql_clean, flags=re.IGNORECASE):
        sql_exec = sql_clean + " LIMIT 100"
    else:
        sql_exec = sql_clean

    # Step 4: audit
    try:
        with open(audit_path, "a", encoding="utf-8") as f:
            f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')}  QUESTION: {question}  SQL: {sql_exec}\n")
    except Exception:
        pass

    # Step 5: execute query
    try:
        raw = db.run(sql_exec)
    except Exception as e:
        return json.dumps({"error": "execution_failed", "details": str(e), "sql": sql_exec})

    # Normalize rows
    def normalize_rows(r):
        if isinstance(r, list):
            return [dict(row) if hasattr(row, "keys") else list(row) for row in r]
        return str(r)

    rows = normalize_rows(raw)
    output = {"sql": sql_exec, "row_count": len(rows), "rows": rows}

    return json.dumps(output, default=str)


## `query_postgresql_tool` — safe natural-language to SQL

This tool converts a natural-language question into a SQL `SELECT` and executes it, with multiple safety layers:

- Steps:
  1. Uses `sql_chain.invoke({"question": question})` to ask the LLM to produce SQL.
  2. `extract_sql()` pulls SQL from fenced ```sql blocks or finds the first `SELECT`/`WITH` segment.
  3. Safety checks:
     - Banned keywords (DROP, DELETE, UPDATE, INSERT, ALTER, etc.) block dangerous statements.
     - Ensures the SQL starts with `SELECT`/`WITH`.
     - Enforces a `LIMIT` of 100 if none provided.
  4. Audits the query to a log file.
  5. Executes the query via `db.run(sql_exec)` and normalizes the results into JSON.

Security & operational notes:
- This function intentionally prevents data-modifying SQL. Do not remove the banned pattern checks unless you add strict RBAC and auditing.
- If you expect large result sets, either paginate or increase the enforced limit intentionally.
- The function returns structured JSON with `sql`, `row_count`, and `rows` to make it easy for agents to consume.

In [None]:

@tool("fetch_weather_tool", return_direct=True)
def fetch_weather_tool (tool_input: str) -> str:
    """
    tool_input: expected format "city=Colombo;date=yesterday"
    """
    # Parse input
    params = dict(item.split("=") for item in tool_input.split(";"))
    city = params.get("city", "Colombo")
    date = params.get("date", "yesterday")

    # Call the original function
    weather_info = fetch_and_store_weather(city, date)
    print(weather_info)
    # Return as JSON string for the agent
    return json.dumps(weather_info)

def fetch_and_store_weather(city="Colombo", date="yesterday"):
    """Fetch weather data from API and store in PostgreSQL."""
    # API Call
    url = f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{city}/{date}?unitGroup=metric&include=days&key=KGCW7SXGVXRYL7ZK7W7SEJSR8&contentType=json"
    ResultBytes = urllib.request.urlopen(url)
    jsonData = json.load(ResultBytes)
    day = jsonData["days"][0]

    # Map to schema
    weather_info = {
        "country": "Sri Lanka",
        "statedistrict": city,
        "datetime": day["datetime"],
        "tempmax": day.get("tempmax"),
        "tempmin": day.get("tempmin"),
        "temp": day.get("temp"),
        "humidity": day.get("humidity"),
        "rain": day.get("precip", 0) > 0,
        "rainsum": day.get("precip"),
        "snow": day.get("snow", 0) > 0,
        "snowdepth": day.get("snowdepth"),
        "windgust": day.get("windgust"),
        "windspeed": day.get("windspeed"),
        "winddir": day.get("winddir"),
        "sealevelpressure": day.get("pressure"),
        "cloudcover": day.get("cloudcover"),
        "visibility": day.get("visibility"),
        "solarradiation": day.get("solarradiation"),
        "solarenergy": day.get("solarenergy"),
        "uvindex": day.get("uvindex"),
        "sunrise": day.get("sunrise"),
        "sunset": day.get("sunset"),
        "moonphase": day.get("moonphase"),
        "conditions": day.get("conditions"),
        "description": day.get("description"),
        "icon": day.get("icon")
    }

    # Insert into DB
    conn = psycopg2.connect(
        host="localhost",
        database="GISDb",
        user="postgres",
        password="ElDiabloX32"
    )
    cursor = conn.cursor()
    cursor.execute("""
    INSERT INTO weather_data (
        country, statedistrict, datetime, tempmax, tempmin, temp, humidity,
        rain, rainsum, snow, snowdepth, windgust, windspeed, winddir,
        sealevelpressure, cloudcover, visibility, solarradiation, solarenergy,
        uvindex, sunrise, sunset, moonphase, conditions, description, icon
    ) VALUES (
        %(country)s, %(statedistrict)s, %(datetime)s, %(tempmax)s, %(tempmin)s, %(temp)s, %(humidity)s,
        %(rain)s, %(rainsum)s, %(snow)s, %(snowdepth)s, %(windgust)s, %(windspeed)s, %(winddir)s,
        %(sealevelpressure)s, %(cloudcover)s, %(visibility)s, %(solarradiation)s, %(solarenergy)s,
        %(uvindex)s, %(sunrise)s, %(sunset)s, %(moonphase)s, %(conditions)s, %(description)s, %(icon)s
    )
    """, weather_info)
    conn.commit()
    cursor.close()
    conn.close()

    return weather_info


## Weather fetcher — `fetch_weather_tool` and `fetch_and_store_weather`

This area implements fetching weather from VisualCrossing and storing it directly in Postgres:

- `fetch_weather_tool`: a thin wrapper that parses `tool_input` (format `city=Colombo;date=yesterday`), calls `fetch_and_store_weather`, prints, and returns JSON.
- `fetch_and_store_weather`:
  - Calls VisualCrossing's timeline API with a provided API key.
  - Extracts the day's data (`jsonData['days'][0]`) and maps fields into the internal schema (`weather_info`).
  - Opens a `psycopg2` connection and executes a parameterized `INSERT` into `weather_data` using a mapping dict (`%(country)s`, etc.), then commits.

Notes and improvements:
- The insertion uses parameterized placeholders (good). Keep credentials out of the notebook.
- The VisualCrossing response shape can vary; consider wrapping extraction in safety checks to avoid KeyError on missing `days`.
- Add retry/backoff for HTTP errors, and handle API rate limits.
- Consider using a connection pool (e.g., `psycopg2.pool`) instead of opening/closing connections for each call if this runs frequently.

In [None]:
'''

@tool("fetch_climate_news", return_direct=True)
def fetch_climate_news(query: str = "climate change"):



    import requests

    api_key = "f875db6eac964594bbcd54e77f9d9b22"
    url = "https://newsapi.org/v2/everything"
    params = {
        "q": query,
        "language": "en",
        "sortBy": "publishedAt",
        "pageSize": 5,
        "apiKey": api_key,
    }

    try:
        resp = requests.get(url, params=params, timeout=10)
        data = resp.json()
    except Exception:
        return {
            "title": [None]*5,
            "url": [None]*5,
            "publishedAt": [None]*5
        }

    articles = data.get("articles", [])
    titles, urls, dates = [], [], []
    for a in articles[:5]:
        titles.append(a.get("title"))
        urls.append(a.get("url"))
        dates.append(a.get("publishedAt"))

    # Pad lists if fewer than 5 articles
    while len(titles) < 5:
        titles.append(None)
        urls.append(None)
        dates.append(None)

    return {
        "title": titles,
        "url": urls,
        "publishedAt": dates
    }
    
'''

'\n\n@tool("fetch_climate_news", return_direct=True)\ndef fetch_climate_news(query: str = "climate change"):\n\n\n\n    import requests\n\n    api_key = "f875db6eac964594bbcd54e77f9d9b22"\n    url = "https://newsapi.org/v2/everything"\n    params = {\n        "q": query,\n        "language": "en",\n        "sortBy": "publishedAt",\n        "pageSize": 5,\n        "apiKey": api_key,\n    }\n\n    try:\n        resp = requests.get(url, params=params, timeout=10)\n        data = resp.json()\n    except Exception:\n        return {\n            "title": [None]*5,\n            "url": [None]*5,\n            "publishedAt": [None]*5\n        }\n\n    articles = data.get("articles", [])\n    titles, urls, dates = [], [], []\n    for a in articles[:5]:\n        titles.append(a.get("title"))\n        urls.append(a.get("url"))\n        dates.append(a.get("publishedAt"))\n\n    # Pad lists if fewer than 5 articles\n    while len(titles) < 5:\n        titles.append(None)\n        urls.append(None)\n

In [None]:
@tool("fetch_extra_earth_data", return_direct=True)
def fetch_extra_earth_data(location: str = "colombo") -> str:
    """Fetch air quality metrics (pm10, pm2_5, carbon_monoxide, ozone) using Open-Meteo.

    This uses Open-Meteo's Air Quality API (no API key). `location` can be a city name or "lat,lon".
    Returns a JSON string with the latest available values or an error dict.
    """
    import requests
    import json
    import re

    def geocode_city(city_name: str):
        # Simple geocode via Nominatim (OpenStreetMap) - no key but rate-limited
        try:
            r = requests.get("https://nominatim.openstreetmap.org/search", params={"q": city_name, "format": "json", "limit": 1}, headers={"User-Agent": "collector-agent/1.0"}, timeout=10)
            if r.status_code == 200:
                j = r.json()
                if j:
                    return float(j[0]["lat"]), float(j[0]["lon"])
        except Exception:
            return None
        return None

    # Parse location: allow "lat,lon" or city name
    lat_lon_match = re.match(r"^\s*([-+]?\d+\.?\d*)\s*,\s*([-+]?\d+\.?\d*)\s*$", location)
    if lat_lon_match:
        lat, lon = float(lat_lon_match.group(1)), float(lat_lon_match.group(2))
    else:
        gc = geocode_city(location)
        if gc is None:
            # fallback to Colombo coordinates
            lat, lon = 6.9271, 79.8612
        else:
            lat, lon = gc

    # Open-Meteo Air Quality API
    url = "https://air-quality-api.open-meteo.com/v1/air-quality"
    params = {
    "latitude": lat,
    "longitude": lon,
    "hourly": "pm10,pm2_5,carbon_monoxide,ozone"
}
    try:
        resp = requests.get(url, params=params, timeout=10)
    except Exception as e:
        return json.dumps({"error": "request_failed", "details": str(e)})

    if resp.status_code != 200:
        return json.dumps({"error": "open_meteo_error", "status_code": resp.status_code, "details": resp.text})

    try:
        data = resp.json()
    except Exception as e:
        return json.dumps({"error": "invalid_json", "details": str(e)})

    # Get latest index (last hourly point)
    hourly = data.get("hourly", {})
    times = hourly.get("time", [])
    if not times:
        return json.dumps({"error": "no_hourly_data"})

    idx = -1
    try:
        latest = {
            "time": times[idx],
            "pm10": hourly.get("pm10", [None])[idx],
            "pm2_5": hourly.get("pm2_5", [None])[idx],
            "carbon_monoxide": hourly.get("carbon_monoxide", [None])[idx],
            "ozone": hourly.get("ozone", [None])[idx],
            "lat": lat,
            "lon": lon,
        }
    except Exception as e:
        return json.dumps({"error": "parse_error", "details": str(e), "raw": data})

    return json.dumps(latest)


In [None]:
@tool("upload_air_quality_to_postgres", return_direct=True)
def upload_air_quality_to_postgres(location: str = "Colombo") -> str:
    """
    Fetch latest air quality data via fetch_extra_earth_data and upload to PostgreSQL.
    """
    import json
    from datetime import datetime

    try:
        # Use the existing tool instead of refetching
        data_json = fetch_extra_earth_data.invoke(location)
        data = json.loads(data_json)
    except Exception as e:
        return json.dumps({"error": "fetch_failed", "details": str(e)})

    # Check for errors in fetch
    if data.get("error"):
        return json.dumps({"error": "fetch_error", "details": data})

    # Extract fields, safely using None if missing
    datetime_val = data.get("time") or str(datetime.today().date())
    pm10 = data.get("pm10")
    pm2_5 = data.get("pm2_5")
    carbon_monoxide = data.get("carbon_monoxide")
    ozone = data.get("ozone")
    lat = data.get("lat")
    lon = data.get("lon")
    country = data.get("country") or "Sri Lanka"  # optional default
    statedistrict = location
    source = "Open-Meteo Air Quality API"

    # Insert into PostgreSQL
    try:
        sql = f"""
        INSERT INTO air_quality_data (
            country, statedistrict, datetime, pm10, pm2_5, carbon_monoxide, ozone, lat, lon, source
        ) VALUES (
            '{country}', '{statedistrict}', '{datetime_val}', {pm10}, {pm2_5}, {carbon_monoxide}, {ozone}, {lat}, {lon}, '{source}'
        );
        """
        db.run(sql)
    except Exception as e:
        return json.dumps({"error": "db_insert_failed", "details": str(e), "sql": sql})

    return json.dumps({"success": True, "uploaded_data": data}, default=str)


In [None]:
tools.append(upload_to_postgresql)
tools.append(fetch_weather_tool)
# add the new tools we inserted above
# fetch_climate_news and fetch_extra_earth_data are defined in earlier cells
try:
    #tools.append(fetch_climate_news)
    tools.append(fetch_extra_earth_data)
    tools.append(query_postgresql_tool)
    tools.append(upload_air_quality_to_postgres)
except NameError:
    # In case the notebook is executed top-to-bottom and the cells haven't been run yet,
    # we proceed silently; the agent will fail to initialize until those cells are run.
    pass

# Collector agent
collector = initialize_agent(
    tools, llm, agent="zero-shot-react-description", verbose=True,
)

class collectorState(TypedDict):
    input: str  # input query
    title: list | None
    url: list | None
    publishedAt: list | None
    output: str  # collected data
    
# Define LangGraph nodes
graph = StateGraph(collectorState)

def collector_node(state: collectorState) -> collectorState:
    result = collector.run(state["input"])
    
    if isinstance(result, dict):
        state["title"] = result.get("title")
        state["url"] = result.get("url")
        state["publishedAt"] = result.get("publishedAt")
    else:
        state["output"] = result
    
    return state

graph.add_node("collector", collector_node)
graph.add_edge(START, "collector")  # input query
graph.add_edge("collector", END)  # outputs collected data

# Run graph
app = graph.compile()
result = app.invoke({"input": "upload_air_quality_to_postgres for Colombo"})
print(result["output"])


def run_collector_agent(query: str) -> str:
    """Run the collector agent with the given query and return the output."""
    result = app.invoke({"input": query})
    return result["output"]

  collector = initialize_agent(
  result = collector.run(state["input"])




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mTo answer this question, I need to understand what the `upload_air_quality_to_postgres` function does and how it interacts with the `fetch_extra_earth_data` function and the PostgreSQL database.

Thought: The `upload_air_quality_to_postgres` function seems to fetch the latest air quality data for a given location and then upload it to a PostgreSQL database. I need to call this function with the location 'Colombo' as the argument.

Action: upload_air_quality_to_postgres
Action Input: location='Colombo'[0m
Observation: [33;1m[1;3m{"error": "db_insert_failed", "details": "(psycopg2.errors.SyntaxError) syntax error at or near \"Colombo\"\nLINE 5:             'Sri Lanka', 'location='Colombo'', '2025-09-27T2...\n                                            ^\n\n[SQL: \n        INSERT INTO air_quality_data (\n            country, statedistrict, datetime, pm10, pm2_5, carbon_monoxide, ozone, lat, lon, source\n        ) VALUES (\n  

## Agent-to-Agent (A2A) connection — where and how to wire it

Placement (exact spot)
- Insert the A2A wiring immediately after the agent is created — i.e., right after the line that reads `collector = initialize_agent(...)` and before the `class collectorState(TypedDict):` / graph creation lines.
- In this notebook the correct logical place is the same code cell that creates `collector` (or as the very next cell). This guarantees the collector agent and its `tools` list are available in the same scope when you expose or register another agent.

Why here
- The agent object and the `tools` list are already defined and loaded with helper functions. Registering the other agent (or a wrapper that calls it) at this point lets the collector call the other agent as a tool during execution.

Two common A2A patterns
1) In-process wrapper (recommended if both agents run in the same notebook/process)
- Create a small `@tool` wrapper that calls the other agent's `.run()` method or function interface.
- Append that wrapper to the `tools` list before the collector is used. Example contract:
  - Input: JSON string or short instruction
  - Output: JSON-serializable dict or plain text
  - Error mode: raise or return an error object
- Pros: low latency, simple. Cons: both agents share memory and resources (watch for blocking).

2) Out-of-process RPC (HTTP/message queue)
- If the other agent runs on another host or process, expose it via a small HTTP endpoint or a message queue (Redis, RabbitMQ). Register a `requests`-based tool that calls that endpoint.
- Pros: isolation, better scaling and fault isolation. Cons: more infra and auth.

Suggested minimal in-process wiring (where to insert)
- Right after `collector = initialize_agent(...)` add:
  - A `@tool("other_agent_tool", return_direct=True)` wrapper that calls `other_agent.run(input)` (or the function entrypoint for Trend/Prediction agents) and returns JSON/text.
  - `tools.append(other_agent_tool)` so the collector can call it by name.

Contract to document and enforce
- Input shape: string or JSON (example: `{ "task": "analyze", "payload": {...} }`).
- Output shape: JSON with at least `{ "status": "ok"|"error", "data": ... }`.
- Timeouts: enforce a call timeout (e.g., 10s) and return a clear error object on timeout.
- Retries: 0-2 retries for transient network errors.
- Authentication: if using RPC, add an HMAC or bearer token header; do not put tokens in notebook cells.

Operational considerations
- Concurrency: if the collector runs concurrently, ensure the called agent is thread-safe or use a queue to serialize work.
- Rate limiting: enforce per-agent rate limits to avoid cascading overloads.
- Observability: log every A2A call (caller, callee, latency, status) and surface errors.
- Security: never allow anonymous arbitrary code execution via A2A. Limit the callee's abilities and validate inputs.
- Fail-open vs fail-closed: prefer fail-closed for destructive operations and fail-open for best-effort augmentations.

Example integration checklist (to perform at the placement point)
- [ ] Implement `other_agent` or ensure its entrypoint is importable in this notebook.
- [ ] Add `@tool` wrapper that calls `other_agent.run()` or performs HTTP/RPC with auth.
- [ ] Append wrapper to `tools` before agent initialization is finalized (or right after, then re-initialize agent if necessary).
- [ ] Add timeouts, retries, and error normalization in the wrapper.
- [ ] Add logging/audit lines for A2A calls.

Notes about re-initialization
- Some agent frameworks snapshot tools at initialization. If you append a tool after initializing `collector`, either (A) append before initializing, or (B) recreate the agent (call `initialize_agent(...)` again) so the new tool is visible. The safer pattern is to append all tools (including other-agent wrappers) before calling `initialize_agent(...)`.

If you want, I can insert a small example `@tool` wrapper (in-process) and show exactly which notebook cell to edit and the exact code to add. Which A2A pattern do you prefer: in-process wrapper or HTTP/RPC?