# **Agent Graph Architecture Overview**


This notebook walks you through the development of two uniquely structured agent graphs, each optimized for different use cases:

## **Live Weather Emergency Agent Graph:**
Built to handle real-time weather feeds, this graph enables the system to generate timely, actionable insights during ongoing natural disasters or severe weather conditions.


## **Hybrid Simulation Agent Graph:**
Designed for testing and stress-analysis, this graph merges simulated high-severity weather inputs with mock social signals and real-time data. It allows safe experimentation with extreme scenarios—without relying solely on real-world occurrences.

## **Live Weather Emergency Agent Graph:(Automated Weather Emergency):**

## **Overview**
This project showcases the development of an automated Weather Emergency Response System powered by LangGraph. The system continuously monitors weather patterns, evaluates potential disaster scenarios, and formulates emergency response strategies. By integrating real-time meteorological data with generative AI, it enables intelligent disaster assessment and dynamic response generation.

##  **1. Installation of Required Packages**



In [1]:
%%capture --no-stderr
%pip install -U langgraph langsmith langchain langchain_google_genai langchain_community schedule

## **2. Call Credentials and Set API Keys:**




In [2]:
import os
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

# Read the API keys from environment variables
google_api_key = os.getenv("GOOGLE_API_KEY")
API_KEY = os.getenv("W_API_KEY")

## **3. Standard Library Imports**

In [3]:
import os
import random
import requests
import schedule
import time
from typing import Dict, TypedDict, Union, List, Literal
import json
from datetime import datetime
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

## **4. Defining an LLM and a State Class**

In [4]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    google_api_key=google_api_key  
)

class WeatherState(TypedDict):
    city: str
    weather_data: Dict
    disaster_type: str
    severity: str
    response: str
    messages: List[Union[SystemMessage, HumanMessage, AIMessage]]
    alerts: List[str]
    human_approved: bool

## **5. Define Node Functions**



#### **(a) Fetching Weather Data**

In [5]:
def get_weather_data(state: WeatherState) -> Dict:
    """Fetch weather data from OpenWeatherMap API"""
    BASE_URL = "http://api.openweathermap.org/data/2.5/weather"
    request_url = f"{BASE_URL}?q={state['city']}&appid={API_KEY}&units=metric"
    try:
        response = requests.get(request_url)
        response.raise_for_status()

        data = response.json()
        weather_data = {
            "weather": data.get('weather', [{}])[0].get("description", "N/A"),
            "wind_speed": data.get("wind", {}).get("speed", "N/A"),
            "cloud_cover": data.get("clouds", {}).get("all", "N/A"),
            "sea_level": data.get("main", {}).get("sea_level", "N/A"),
            "temperature": data.get("main", {}).get("temp", "N/A"),
            "humidity": data.get("main", {}).get("humidity", "N/A"),
            "pressure": data.get("main", {}).get("pressure", "N/A")
        }

        message = SystemMessage(content=f"Weather data fetched successfully for {state['city']}")  
        return {
            **state,
            "weather_data": weather_data,
            "messages": state["messages"] + [message]
        }

    except Exception as e:
        message = SystemMessage(content=f"Failed to fetch weather data for {state['city']}: {str(e)}")  
        error_data = {
            "weather": "N/A",
            "wind_speed": "N/A",
            "cloud_cover": "N/A",
            "sea_level": "N/A",
            "temperature": "N/A",
            "humidity": "N/A",
            "pressure": "N/A"
        }
        return {
            **state,
            "weather_data": error_data,
            "messages": state["messages"] + [message]
        }


#### **(b) Disaster Type Analysis**

In [6]:
def analyze_disaster_type(state: WeatherState) -> WeatherState:
    """Analyze weather data to identify potential disasters"""
    weather_data = state["weather_data"]
    prompt = ChatPromptTemplate.from_template(
        "Based on the following weather conditions, identify if there's a potential weather disaster.\n"
        "Weather conditions:\n"
        "- Description: {weather}\n"
        "- Wind Speed: {wind_speed} m/s\n"
        "- Temperature: {temperature}°C\n"
        "- Humidity: {humidity}%\n"
        "- Pressure: {pressure} hPa\n"
        "Categorize into one of these types: Hurricane, Flood, Heatwave, Severe Storm, Winter Storm, or No Immediate Threat"
    )

    try:
        chain = prompt | llm
        print("Invoking LLM with weather data:")
        for k, v in weather_data.items():
            print(f"  - {k}: {v}")

        result = chain.invoke(weather_data)
        disaster_type = result.content.strip()
        print(f"Disaster type response: {disaster_type}")

        message = SystemMessage(content=f"Disaster type identified: {disaster_type}")
        # for msg in state["messages"] + [message]:
        #     print("Messages:", msg.content)

        return {
            **state,
            "disaster_type": disaster_type,
            "messages": state["messages"] + [message]
        }
    except Exception as e:
        message = SystemMessage(content=f"Failed to analyze disaster type: {str(e)}")
        print(f"Failed to analyze disaster type: {str(e)}")
        # for msg in state["messages"] + [message]:
        #     print("Messages:", msg.content)

        return {
            **state,
            "disaster_type": "Analysis Failed",
            "messages": state["messages"] + [message]
        }


#### **(c) Severity Assessment**

In [7]:
def assess_severity(state: WeatherState) -> WeatherState:
    """Assess the severity of the identified weather situation"""
    weather_data = state["weather_data"]
    prompt = ChatPromptTemplate.from_template(
        "Given the weather conditions and identified disaster type '{disaster_type}', "
        "assess the severity level. Consider:\n"
        "- Weather: {weather}\n"
        "- Wind Speed: {wind_speed} m/s\n"
        "- Temperature: {temperature}°C\n"
        "Respond with either 'Critical', 'High', 'Medium', or 'Low'"
    )

    try:
        chain = prompt | llm
        severity = chain.invoke({
            **weather_data,
            "disaster_type": state["disaster_type"]
        }).content

        return {
            **state,
            "severity": severity,
            "messages": state["messages"] + [SystemMessage(content=f"Severity assessed as: {severity}")]
        }
    except Exception as e:
        return {
            **state,
            "severity": "Assessment Failed",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to assess severity: {str(e)}")]
        }

#### **(d) Emergency Response Plan**

In [8]:
from datetime import datetime
current_date = datetime.now().strftime("%B %d, %Y")
def emergency_response(state: WeatherState) -> WeatherState:
    """Generate emergency response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create an emergency response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Include immediate actions needed."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"], 
            "date": current_date
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Emergency response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate emergency response: {str(e)}")]
        }

#### **(e) Civil Defense Response**

In [9]:

def civil_defense_response(state: WeatherState) -> WeatherState:
    """Generate civil defense response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create a civil defense response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Focus on public safety measures."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"], 
            "date": current_date
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Civil defense response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate civil defense response: {str(e)}")]
        }

#### **(f) Public Works Response**

In [10]:
def public_works_response(state: WeatherState) -> WeatherState:
    """Generate public works response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create a public works response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Focus on infrastructure protection."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"], 
            "date": current_date
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Public works response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate public works response: {str(e)}")]
        }

#### **(g) Data Logging**

In [11]:
def data_logging(state: WeatherState) -> WeatherState:
    """Log weather data, disaster analysis, and response to a file."""
    log_data = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "city": state["city"],
        "weather_data": state["weather_data"],
        "disaster_type": state["disaster_type"],
        "severity": state["severity"],
        "response": state["response"],
    }

    try:
        with open("disaster_log.txt", "a") as log_file:
            log_file.write(json.dumps(log_data) + "\n")

        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content="Data logged successfully")]
        }
    except Exception as e:
        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Failed to log data: {str(e)}")]
        }

#### **(h) Human Verification**

In [12]:
def get_human_verification(state: WeatherState) -> WeatherState:
    """Get human verification for low/medium severity alerts"""
    severity = state["severity"].strip().lower()

    if severity in ["low", "medium"]:
        print("\n" + "="*50)
        print(f"Low/Medium severity alert for {state['city']} requires human approval:")
        print(f"Disaster Type: {state['disaster_type']}")
        print(f"Current Weather: {state['weather_data']['weather']}")
        print(f"Temperature: {state['weather_data']['temperature']}°C")
        print(f"Wind Speed: {state['weather_data']['wind_speed']} m/s")
        print(f"Severity: {state['severity']}")
        print(f"Response Plan: {state['response']}")
        print("\nType 'y' to approve sending alert or 'n' to reject (waiting for input):")
        print("="*50)

        # Block and wait for input
        while True:
            try:
                user_input = input().lower().strip()
                if user_input in ['y', 'n']:
                    approved = user_input == 'y'
                    print(f"Human verification result: {'Approved' if approved else 'Rejected'}")
                    break
                else:
                    print("Please enter 'y' for yes or 'n' for no:")
            except Exception as e:
                print(f"Error reading input: {str(e)}")
                print("Please try again with 'y' or 'n':")

        return {
            **state,
            "human_approved": approved,
            "messages": state["messages"] + [
                SystemMessage(content=f"Human verification: {'Approved' if approved else 'Rejected'}")
            ]
        }
    else:
        # Auto-approve for high/critical severity
        return {
            **state,
            "human_approved": True,
            "messages": state["messages"] + [
                SystemMessage(content=f"Auto-approved {severity} severity alert")
            ]
        }

#### **(j) Email Alert**

In [13]:
def send_email_alert(state: WeatherState) -> WeatherState:
    """Send weather alert email"""
    sender_email = os.getenv("SENDER_EMAIL")
    receiver_email = os.getenv("RECEIVER_EMAIL")
    password = os.getenv("EMAIL_PASSWORD")

    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = receiver_email
    msg['Subject'] = f"Weather Alert: {state['severity']} severity weather event in {state['city']}"

    body = format_weather_email(state)
    msg.attach(MIMEText(body, 'plain'))

    try:
        server = smtplib.SMTP("smtp.gmail.com", 587)
        server.starttls()
        server.login(sender_email, password)
        text = msg.as_string()
        server.sendmail(sender_email, receiver_email, text)
        server.quit()

        # Add confirmation message
        severity = state["severity"].strip().lower()
        if severity in ["low", "medium"]:
            print(f"\nVerification was approved by human, Email sent to {receiver_email} successfully")
        else:
            print("\nEmail sent successfully for high severity alert")

        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Successfully sent weather alert email for {state['city']}")],
            "alerts": state["alerts"] + [f"Email alert sent: {datetime.now()}"]
        }

    except Exception as e:
        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Failed to send email alert: {str(e)}")]
        }

####  **(k) Handling No Approval**

In [14]:
def handle_no_approval(state: WeatherState) -> WeatherState:
    """Handle cases where human verification was rejected"""
    print("\nVerification was not approved by human, Email not sent")

    message = (
        f"Alert not sent for {state['city']} - "
        f"Weather severity level '{state['severity']}' was deemed non-critical "
        f"by human operator and verification was rejected."
    )
    return {
        **state,
        "messages": state["messages"] + [SystemMessage(content=message)]
    }

####  **(l) Routing the Response**

In [15]:
def route_response(state: WeatherState) -> Literal["emergency_response", "send_email_alert", "civil_defense_response", "public_works_response"]:
    """Route to appropriate department based on disaster type and severity"""
    disaster = state["disaster_type"].strip().lower()
    severity = state["severity"].strip().lower()

    if severity in ["critical", "high"]:
        return "emergency_response"
        "send_email_alert"
    elif "flood" in disaster or "storm" in disaster:
        return "public_works_response"
    else:
        return "civil_defense_response"

**(m) Approval Verification Router**

In [16]:
def verify_approval_router(state: WeatherState) -> Literal["send_email_alert", "handle_no_approval"]:
    """Route based on human approval decision"""
    return "send_email_alert" if state['human_approved'] else "handle_no_approval"

#### **(n) Formatting the Weather Email**

In [17]:
def format_weather_email(state: WeatherState) -> str:
    """Format weather data and severity assessment into an email message"""
    weather_data = state["weather_data"]

    email_content = f"""
Weather Alert for {state['city']}

Disaster Type: {state['disaster_type']}
Severity Level: {state['severity']}

Current Weather Conditions:
- Weather Description: {weather_data['weather']}
- Temperature: {weather_data['temperature']}C
- Wind Speed: {weather_data['wind_speed']} m/s
- Humidity: {weather_data['humidity']}%
- Pressure: {weather_data['pressure']} hPa
- Cloud Cover: {weather_data['cloud_cover']}%

Response Plan:
{state['response']}

This is an automated weather alert generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

    if state['severity'].lower() in ['low', 'medium']:
        email_content += "\nNote: This low/medium severity alert has been verified by a human operator."

    return email_content

## **5. Creating and Compiling the Workflow**


In [18]:
# Create the workflow
workflow = StateGraph(WeatherState)

# Add nodes
workflow.add_node("get_weather", get_weather_data)
workflow.add_node("analyze_disaster", analyze_disaster_type)
workflow.add_node("assess_severity", assess_severity)
workflow.add_node("data_logging", data_logging)
workflow.add_node("emergency_response", emergency_response)
workflow.add_node("civil_defense_response", civil_defense_response)
workflow.add_node("public_works_response", public_works_response)
workflow.add_node("get_human_verification", get_human_verification)
workflow.add_node("send_email_alert", send_email_alert)
workflow.add_node("handle_no_approval", handle_no_approval)

# Add edges
workflow.add_edge("get_weather","analyze_disaster" )
workflow.add_edge("analyze_disaster", "assess_severity")
workflow.add_edge("assess_severity", "data_logging")
workflow.add_conditional_edges("data_logging", route_response)
workflow.add_edge("civil_defense_response", "get_human_verification")
workflow.add_edge("public_works_response", "get_human_verification")
workflow.add_conditional_edges("get_human_verification", verify_approval_router)
workflow.add_edge("emergency_response", "send_email_alert")
workflow.add_edge("send_email_alert", END)
workflow.add_edge("handle_no_approval", END)

workflow.set_entry_point("get_weather")

# Compile the workflow
app = workflow.compile()

##  **6 Running the Weather Emergency System**

#### **(a) run_weather_emergency_system(city: str)**

In [19]:
def run_weather_emergency_system(city: str):
    """Initialize and run the weather emergency system for a given city"""
    initial_state = {
        "city": city,
        "weather_data": {},
        "disaster_type": "",
        "severity": "",
        "response": "",
        "messages": [],
        "alerts": [],
        "social_media_reports": [],
        "human_approved": False
    }

    try:
        result = app.invoke(initial_state)
        print(f"Completed weather check for {city}")
        return result
    except Exception as e:
        print(f"Error running weather emergency system: {str(e)}")

#### **(b) Main Function of the Weather Emergency Response System**




In [20]:
def main():
    """Main function to run the weather emergency system"""
    # Set up environment variables
    os.environ["SENDER_EMAIL"] = "ritusinghpoem@gmail.com"  
    os.environ["RECEIVER_EMAIL"] = "iamkavitasingh08@gmail.com"  

    def scheduled_check():
        """Function to perform scheduled checks for multiple cities"""
        cities = ['Delhi', 'Mumbai', 'Mirzapur']  # Add more cities as needed
        print("cities", cities)
        print(f"\nStarting scheduled check at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        for city in cities:
            try:
                print(f"\nChecking weather conditions for {city}...")
                run_weather_emergency_system(city)
                time.sleep(2)  # Brief pause between cities
            except Exception as e:
                print(f"Error checking {city}: {str(e)}")

    # Schedule checks every hour
    schedule.every(1).minute.do(scheduled_check)
    print("Weather Emergency Response System started.")
    print("Monitoring scheduled for every minute.")

    while True:
        try:
            schedule.run_pending()
            time.sleep(1)
        except KeyboardInterrupt:
            print("\nShutting down Weather Emergency Response System...")
            break
        except Exception as e:
            print(f"Error in main loop: {str(e)}")
            time.sleep(1)

if __name__ == "__main__":
    main()

Weather Emergency Response System started.
Monitoring scheduled for every minute.
cities ['Delhi', 'Mumbai', 'Mirzapur']

Starting scheduled check at 2025-04-10 15:19:49

Checking weather conditions for Delhi...
Invoking LLM with weather data:
  - weather: overcast clouds
  - wind_speed: 0.12
  - cloud_cover: 100
  - sea_level: 999
  - temperature: 39.75
  - humidity: 12
  - pressure: 999
Disaster type response: The most likely categorization is **Heatwave**.  The temperature of 39.75°C (103.55°F) is extremely high, and combined with low humidity (12%), creates conditions ripe for a heatwave and potential heat-related illnesses.  The other factors (overcast clouds, low wind, relatively normal pressure) are not indicative of other severe weather events.

Email sent successfully for high severity alert
Completed weather check for Delhi

Checking weather conditions for Mumbai...
Invoking LLM with weather data:
  - weather: clear sky
  - wind_speed: 6.04
  - cloud_cover: 3
  - sea_level: 1

 y


Human verification result: Approved

Verification was approved by human, Email sent to iamkavitasingh08@gmail.com successfully
Completed weather check for Mumbai

Checking weather conditions for Mirzapur...
Invoking LLM with weather data:
  - weather: few clouds
  - wind_speed: 4.8
  - cloud_cover: 19
  - sea_level: 999
  - temperature: 41.19
  - humidity: 13
  - pressure: 999
Disaster type response: Heatwave.  The extremely high temperature (41.19°C) combined with low humidity (13%) presents a significant risk of a heatwave and associated health dangers.  The other conditions are unremarkable in this context.

Email sent successfully for high severity alert
Completed weather check for Mirzapur

Shutting down Weather Emergency Response System...


========================================================**Part 2** =================================================

## **Hybrid Simulation Agent Graph (Simulate High Severity Weather level check with dummy weather and Social Monitoring data):**

##  **1. Installation of Required Packages**

In [21]:
%%capture --no-stderr
%pip install -U langgraph langsmith langchain langchain_google_genai langchain_community schedule

## **2. Call Credentials and Set API Keys:**

In [22]:
import os
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

# Read the API keys from environment variables
google_api_key = os.getenv("GOOGLE_API_KEY")
API_KEY = os.getenv("W_API_KEY")

## **3. Standard Library Imports**

In [23]:
import os
import random
import requests
import schedule
import time
from typing import Dict, TypedDict, Union, List, Literal
import json
from datetime import datetime
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

## **4. Defining an LLM and a State Class**

In [24]:
# # Initialize LLM
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    google_api_key=google_api_key  
)

class WeatherState(TypedDict):
    city: str
    weather_data: Dict
    disaster_type: str
    severity: str
    response: str
    messages: List[Union[SystemMessage, HumanMessage, AIMessage]]
    alerts: List[str]
    social_media_reports: List[str]
    human_approved: bool

## **5. Define Node Functions**

#### **(a) Fetching Weather Data**

In [25]:
def get_weather_data(state: WeatherState) -> Dict:
    """Fetch weather data from OpenWeatherMap API or use simulated data in test mode"""
    # Check if we're in test mode (indicated by pre-populated weather_data)
    if state['weather_data']:
        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Using simulated weather data for {state['city']}")]
        }

    BASE_URL = "http://api.openweathermap.org/data/2.5/weather"
    request_url = f"{BASE_URL}?q={state['city']}&appid={API_KEY}&units=metric"
    try:
        response = requests.get(request_url)
        response.raise_for_status()

        data = response.json()
        weather_data = {
            "weather": data.get('weather', [{}])[0].get("description", "N/A"),
            "wind_speed": data.get("wind", {}).get("speed", "N/A"),
            "cloud_cover": data.get("clouds", {}).get("all", "N/A"),
            "sea_level": data.get("main", {}).get("sea_level", "N/A"),
            "temperature": round(data.get("main", {}).get("temp", 273.15) - 273.15, 1),
            "humidity": data.get("main", {}).get("humidity", "N/A"),
            "pressure": data.get("main", {}).get("pressure", "N/A")
        }

        return {
            **state,
            "weather_data": weather_data,
            "messages": state["messages"] + [SystemMessage(content=f"Weather data fetched successfully for {state['city']}")]
        }
    except Exception as e:
        error_data = {
            "weather": "N/A",
            "wind_speed": "N/A",
            "cloud_cover": "N/A",
            "sea_level": "N/A",
            "temperature": "N/A",
            "humidity": "N/A",
            "pressure": "N/A"
        }
        return {
            **state,
            "weather_data": error_data,
            "messages": state["messages"] + [SystemMessage(content=f"Failed to fetch weather data for {state['city']}: {str(e)}")]
        }

#### **(b) Social Media Monitoring**


In [26]:
def social_media_monitoring(state: WeatherState) -> WeatherState:
    """Simulate monitoring social media for additional reports of the weather event."""
    simulated_reports = [
        "Local reports of rising water levels and minor flooding.",
        "High winds causing power outages in parts of the city.",
        "Citizens reporting high temperatures and increased heat discomfort.",
        "Social media reports indicate severe storm damage in local infrastructure.",
        "Reports of traffic disruptions due to heavy rain.",
        "No unusual social media reports related to the weather at this time."
    ]

    social_media_report = random.choice(simulated_reports)
    return {
        **state,
        "social_media_reports": state["social_media_reports"] + [social_media_report],
        "messages": state["messages"] + [SystemMessage(content=f"Social media report added: {social_media_report}")]
    }

#### **(c) Analyzing Disaster Type**


In [27]:
def analyze_disaster_type(state: WeatherState) -> WeatherState:
    """Analyze weather data to identify potential disasters"""
    weather_data = state["weather_data"]
    prompt = ChatPromptTemplate.from_template(
        "Based on the following weather conditions, identify if there's a potential weather disaster.\n"
        "Weather conditions:\n"
        "- Description: {weather}\n"
        "- Wind Speed: {wind_speed} m/s\n"
        "- Temperature: {temperature}°C\n"
        "- Humidity: {humidity}%\n"
        "- Pressure: {pressure} hPa\n"
        "Categorize into one of these types: Hurricane, Flood, Heatwave, Severe Storm, Winter Storm, or No Immediate Threat"
    )

    try:
        chain = prompt | llm
        print("Invoking LLM with weather data:")
        for k, v in weather_data.items():
            print(f"  - {k}: {v}")

        result = chain.invoke(weather_data)
        disaster_type = result.content.strip()
        print(f"Disaster type response: {disaster_type}")

        message = SystemMessage(content=f"Disaster type identified: {disaster_type}")

        return {
            **state,
            "disaster_type": disaster_type,
            "messages": state["messages"] + [message]
        }
    except Exception as e:
        message = SystemMessage(content=f"Failed to analyze disaster type: {str(e)}")
        print(f"Failed to analyze disaster type: {str(e)}")

        return {
            **state,
            "disaster_type": "Analysis Failed",
            "messages": state["messages"] + [message]
        }


#### **(d) Assess Severity Level**


In [28]:
def assess_severity(state: WeatherState) -> WeatherState:
    """Assess the severity of the identified weather situation"""
    weather_data = state["weather_data"]
    prompt = ChatPromptTemplate.from_template(
        "Given the weather conditions and identified disaster type '{disaster_type}', "
        "assess the severity level. Consider:\n"
        "- Weather: {weather}\n"
        "- Wind Speed: {wind_speed} m/s\n"
        "- Temperature: {temperature}°C\n"
        "Respond with either 'Critical', 'High', 'Medium', or 'Low'"
    )

    try:
        chain = prompt | llm
        severity = chain.invoke({
            **weather_data,
            "disaster_type": state["disaster_type"]
        }).content

        return {
            **state,
            "severity": severity,
            "messages": state["messages"] + [SystemMessage(content=f"Severity assessed as: {severity}")]
        }
    except Exception as e:
        return {
            **state,
            "severity": "Assessment Failed",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to assess severity: {str(e)}")]
        }

#### **(e) Emergency Response**

In [29]:
def emergency_response(state: WeatherState) -> WeatherState:
    """Generate emergency response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create an emergency response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Include immediate actions needed."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"], 
            "date": current_date
            
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Emergency response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate emergency response: {str(e)}")]
        }

#### **(f) Civil Defense Response**

In [30]:
def civil_defense_response(state: WeatherState) -> WeatherState:
    """Generate civil defense response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create a civil defense response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Focus on public safety measures."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"],
            "date": current_date
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Civil defense response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate civil defense response: {str(e)}")]
        }

#### **(g) Public Works Response**

In [31]:
def public_works_response(state: WeatherState) -> WeatherState:
    """Generate public works response plan"""
    prompt = ChatPromptTemplate.from_template(
        "Create a public works response plan for a {disaster_type} situation "
        "with {severity} severity level in {city}. Focus on infrastructure protection."
    )
    try:
        chain = prompt | llm
        response = chain.invoke({
            "disaster_type": state["disaster_type"],
            "severity": state["severity"],
            "city": state["city"],
            "date": current_date
        }).content

        return {
            **state,
            "response": response,
            "messages": state["messages"] + [SystemMessage(content="Public works response plan generated")]
        }
    except Exception as e:
        return {
            **state,
            "response": "Failed to generate response plan",
            "messages": state["messages"] + [SystemMessage(content=f"Failed to generate public works response: {str(e)}")]
        }

#### **(h) Data Logging**

In [32]:
def data_logging(state: WeatherState) -> WeatherState:
    """Log weather data, disaster analysis, and response to a file."""
    log_data = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "city": state["city"],
        "weather_data": state["weather_data"],
        "disaster_type": state["disaster_type"],
        "severity": state["severity"],
        "response": state["response"],
        "social_media_reports": state["social_media_reports"]
    }

    try:
        with open("disaster_log.txt", "a") as log_file:
            log_file.write(json.dumps(log_data) + "\n")

        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content="Data logged successfully")]
        }
    except Exception as e:
        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Failed to log data: {str(e)}")]
        }


#### **(j) Get Human Verification**

In [33]:
def get_human_verification(state: WeatherState) -> WeatherState:
    """Get human verification for low/medium severity alerts"""
    severity = state["severity"].strip().lower()

    if severity in ["low", "medium"]:
        print("\n" + "="*50)
        print(f"Low/Medium severity alert for {state['city']} requires human approval:")
        print(f"Disaster Type: {state['disaster_type']}")
        print(f"Current Weather: {state['weather_data']['weather']}")
        print(f"Temperature: {state['weather_data']['temperature']}°C")
        print(f"Wind Speed: {state['weather_data']['wind_speed']} m/s")
        print(f"Severity: {state['severity']}")
        print(f"Response Plan: {state['response']}")
        print("\nType 'y' to approve sending alert or 'n' to reject (waiting for input):")
        print("="*50)

        # Block and wait for input
        while True:
            try:
                user_input = input().lower().strip()
                if user_input in ['y', 'n']:
                    approved = user_input == 'y'
                    print(f"Human verification result: {'Approved' if approved else 'Rejected'}")
                    break
                else:
                    print("Please enter 'y' for yes or 'n' for no:")
            except Exception as e:
                print(f"Error reading input: {str(e)}")
                print("Please try again with 'y' or 'n':")

        return {
            **state,
            "human_approved": approved,
            "messages": state["messages"] + [
                SystemMessage(content=f"Human verification: {'Approved' if approved else 'Rejected'}")
            ]
        }
    else:
        # Auto-approve for high/critical severity
        return {
            **state,
            "human_approved": True,
            "messages": state["messages"] + [
                SystemMessage(content=f"Auto-approved {severity} severity alert")
            ]
        }

#### **(k) Send Email Alert**

In [34]:
def send_email_alert(state: WeatherState) -> WeatherState:
    """Send weather alert email"""
    sender_email = os.getenv("SENDER_EMAIL")
    receiver_email = os.getenv("RECEIVER_EMAIL")
    password = os.getenv("EMAIL_PASSWORD")

    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = receiver_email
    msg['Subject'] = f"Weather Alert: {state['severity']} severity weather event in {state['city']}"

    body = format_weather_email(state)
    msg.attach(MIMEText(body, 'plain'))

    try:
        server = smtplib.SMTP("smtp.gmail.com", 587)
        server.starttls()
        server.login(sender_email, password)
        text = msg.as_string()
        server.sendmail(sender_email, receiver_email, text)
        server.quit()

        # Add confirmation message
        severity = state["severity"].strip().lower()
        if severity in ["low", "medium"]:
            print(f"\nVerification was approved by human, Email sent to {receiver_email} successfully")
        else:
            print(f"\nEmail sent successfully for high severity alert to {receiver_email}")

        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Successfully sent weather alert email for {state['city']}")],
            "alerts": state["alerts"] + [f"Email alert sent: {datetime.now()}"]
        }

    except Exception as e:
        return {
            **state,
            "messages": state["messages"] + [SystemMessage(content=f"Failed to send email alert: {str(e)}")]
        }

#### **(l) Handle No Approval From Human**

In [35]:
def handle_no_approval(state: WeatherState) -> WeatherState:
    """Handle cases where human verification was rejected"""
    print("\nVerification was not approved by human, Email not sent")

    message = (
        f"Alert not sent for {state['city']} - "
        f"Weather severity level '{state['severity']}' was deemed non-critical "
        f"by human operator and verification was rejected."
    )
    return {
        **state,
        "messages": state["messages"] + [SystemMessage(content=message)]
    }

#### **(m) Route Response for Disaster Type and Severity**

In [36]:
def route_response(state: WeatherState) -> Literal["emergency_response", "send_email_alert", "civil_defense_response", "public_works_response"]:
    """Route to appropriate department based on disaster type and severity"""
    disaster = state["disaster_type"].strip().lower()
    severity = state["severity"].strip().lower()

    if severity in ["critical", "high"]:
        return "emergency_response"
        return "send_email_alert"
    elif "flood" in disaster or "storm" in disaster:
        return "public_works_response"
    else:
        return "civil_defense_response"

#### **(n) Verify Approval Route after Human Verification**

In [37]:
def verify_approval_router(state: WeatherState) -> Literal["send_email_alert", "handle_no_approval"]:
    """Route based on human approval decision"""
    return "send_email_alert" if state['human_approved'] else "handle_no_approval"

#### **(p) Format Weather Email**

In [38]:
def format_weather_email(state: WeatherState) -> str:
    """Format weather data and severity assessment into an email message"""
    weather_data = state["weather_data"]
    social_media_reports = "\n".join(state["social_media_reports"])

    email_content = f"""
Weather Alert for {state['city']}

Disaster Type: {state['disaster_type']}
Severity Level: {state['severity']}

Current Weather Conditions:
- Weather Description: {weather_data['weather']}
- Temperature: {weather_data['temperature']}C
- Wind Speed: {weather_data['wind_speed']} m/s
- Humidity: {weather_data['humidity']}%
- Pressure: {weather_data['pressure']} hPa
- Cloud Cover: {weather_data['cloud_cover']}%


Response Plan:
{state['response']}

This is an automated weather alert generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

    if state['severity'].lower() in ['low', 'medium']:
        email_content += "\nNote: This low/medium severity alert has been verified by a human operator."

    return email_content

## **5. Creating and Compiling the Workflow**

In [39]:
# Create the workflow
workflow = StateGraph(WeatherState)

# Add nodes
workflow.add_node("get_weather", get_weather_data)
workflow.add_node("social_media_monitoring", social_media_monitoring)
workflow.add_node("analyze_disaster", analyze_disaster_type)
workflow.add_node("assess_severity", assess_severity)
workflow.add_node("data_logging", data_logging)
workflow.add_node("emergency_response", emergency_response)
workflow.add_node("civil_defense_response", civil_defense_response)
workflow.add_node("public_works_response", public_works_response)
workflow.add_node("get_human_verification", get_human_verification)
workflow.add_node("send_email_alert", send_email_alert)
workflow.add_node("handle_no_approval", handle_no_approval)

# Add edges
workflow.add_edge("get_weather", "social_media_monitoring")
workflow.add_edge("social_media_monitoring", "analyze_disaster")
workflow.add_edge("analyze_disaster", "assess_severity")
workflow.add_edge("assess_severity", "data_logging")
workflow.add_conditional_edges("data_logging", route_response)

workflow.add_edge("civil_defense_response", "get_human_verification")
workflow.add_edge("public_works_response", "get_human_verification")
workflow.add_conditional_edges("get_human_verification", verify_approval_router)
workflow.add_edge("emergency_response", "send_email_alert")
workflow.add_edge("send_email_alert", END)
workflow.add_edge("handle_no_approval", END)

workflow.set_entry_point("get_weather")

# Compile the workflow
app = workflow.compile()

##  **6 Running the Weather Emergency System**

#### **(a) run_weather_emergency_system(city: str):**


In [40]:
def run_weather_emergency_system(city: str):
    """Initialize and run the weather emergency system for a given city"""
    initial_state = {
        "city": city,
        "weather_data": {},
        "disaster_type": "",
        "severity": "",
        "response": "",
        "messages": [],
        "alerts": [],
        "social_media_reports": [],
        "human_approved": False
    }

    try:
        result = app.invoke(initial_state)
        print(f"Completed weather check for {city}")
        return result
    except Exception as e:
        print(f"Error running weather emergency system: {str(e)}")

#### **(b) Get Simulated Weather Data:**

In [41]:
def get_simulated_weather_data(scenario: str = "high") -> Dict:
    """Generate simulated weather data for testing different scenarios"""
    scenarios = {
        "high": {
            "weather": "severe thunderstorm with heavy rainfall and strong winds",
            "wind_speed": 32.5,  # Increased for high severity
            "cloud_cover": 95,
            "sea_level": 1015,
            "temperature": 35.5,  # Higher temperature
            "humidity": 90,
            "pressure": 960  # Low pressure indicating severe weather
        },
        "medium": {
            "weather": "moderate rain with gusty winds",
            "wind_speed": 15.2,
            "cloud_cover": 75,
            "sea_level": 1012,
            "temperature": 22.3,
            "humidity": 70,
            "pressure": 1005
        },
        "low": {
            "weather": "light drizzle",
            "wind_speed": 8.5,
            "cloud_cover": 45,
            "sea_level": 1013,
            "temperature": 20.1,
            "humidity": 60,
            "pressure": 1015
        }
    }

    return scenarios.get(scenario.lower(), scenarios["medium"])


#### **(c) Run Weather Emergency System Test**

In [42]:
def run_weather_emergency_system_test(city: str, scenario: str = "high"):
    """Test the weather emergency system with simulated data"""
    # Pre-populate with simulated data
    initial_state = {
        "city": city,
        "weather_data": get_simulated_weather_data(scenario),
        "disaster_type": "",
        "severity": "",
        "response": "",
        "messages": [],
        "alerts": [],
        "social_media_reports": [],
        "human_approved": False
    }

    try:
        result = app.invoke(initial_state)
        print(f"\nCompleted test weather check for {city} with {scenario} severity scenario")
        return result
    except Exception as e:
        print(f"Error running weather emergency system test: {str(e)}")


#### **(d) Run Main Function**

In [43]:
def main():
    """Main function to run the weather emergency system"""
    # Set up environment variables

    os.environ["SENDER_EMAIL"] = "ritusinghpoem@gmail.com"  
    os.environ["RECEIVER_EMAIL"] = "iamkavitasingh08@gmail.com"  

    # Add test mode option
    print("\nWeather Emergency Response System")
    print("1. Run normal monitoring")
    print("2. Run test with simulated data")
    choice = input("Select mode (1 or 2): ")

    if choice == "2":
        print("\nSelect test scenario:")
        print("1. High severity (no human verification needed)")
        print("2. Medium severity (requires human verification)")
        print("3. Low severity (requires human verification)")

        scenario_choice = input("Select scenario (1, 2, or 3): ")
        scenario_map = {"1": "high", "2": "medium", "3": "low"}
        scenario = scenario_map.get(scenario_choice, "medium")

        city = input("\nEnter city name for test (default: Lahore): ").strip() or "Lahore"
        print(f"\nRunning test scenario: {scenario.upper()} severity for {city}")
        run_weather_emergency_system_test(city, scenario)
    else:
        def scheduled_check():
            """Function to perform scheduled checks for multiple cities"""
            cities = ['Lucknow']  # Add more cities as needed
            print(f"\nStarting scheduled check at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

            for city in cities:
                try:
                    print(f"\nChecking weather conditions for {city}...")
                    run_weather_emergency_system(city)
                    time.sleep(2)  # Brief pause between cities
                except Exception as e:
                    print(f"Error checking {city}: {str(e)}")

        # Schedule checks every minut
        schedule.every(1).minute.do(scheduled_check)
        print("Weather Emergency Response System started.")
        print("Monitoring scheduled for every minute.")

        while True:
            try:
                schedule.run_pending()
                time.sleep(1)
            except KeyboardInterrupt:
                print("\nShutting down Weather Emergency Response System...")
                break
            except Exception as e:
                print(f"Error in main loop: {str(e)}")
                time.sleep(1)

if __name__ == "__main__":
    main()


Weather Emergency Response System
1. Run normal monitoring
2. Run test with simulated data


Select mode (1 or 2):  1


Weather Emergency Response System started.
Monitoring scheduled for every minute.
cities ['Delhi', 'Mumbai', 'Mirzapur']

Starting scheduled check at 2025-04-10 15:21:47

Checking weather conditions for Delhi...
Invoking LLM with weather data:
  - weather: overcast clouds
  - wind_speed: 0.12
  - cloud_cover: 100
  - sea_level: 999
  - temperature: -233.4
  - humidity: 12
  - pressure: 999
Disaster type response: The temperature of -233.4°C is far below anything possible on Earth.  This indicates an error in the data.  Therefore, the correct categorization is **No Immediate Threat** (because the given data is unrealistic).

Email sent successfully for high severity alert to iamkavitasingh08@gmail.com
Completed weather check for Delhi

Checking weather conditions for Mumbai...
Invoking LLM with weather data:
  - weather: clear sky
  - wind_speed: 6.04
  - cloud_cover: 3
  - sea_level: 1006
  - temperature: -245.0
  - humidity: 75
  - pressure: 1006
Disaster type response: No Immediate T

In [44]:
def main():
    """Main function to run the weather emergency system"""
    # Set up environment variables

    os.environ["SENDER_EMAIL"] = "ritusinghpoem@gmail.com"  
    os.environ["RECEIVER_EMAIL"] = "iamkavitasingh08@gmail.com"  

    # Add test mode option
    print("\nWeather Emergency Response System")
    print("1. Run normal monitoring")
    print("2. Run test with simulated data")
    choice = input("Select mode (1 or 2): ")

    if choice == "2":
        print("\nSelect test scenario:")
        print("1. High severity (no human verification needed)")
        print("2. Medium severity (requires human verification)")
        print("3. Low severity (requires human verification)")

        scenario_choice = input("Select scenario (1, 2, or 3): ")
        scenario_map = {"1": "high", "2": "medium", "3": "low"}
        scenario = scenario_map.get(scenario_choice, "medium")

        city = input("\nEnter city name for test (default: Lahore): ").strip() or "Lahore"
        print(f"\nRunning test scenario: {scenario.upper()} severity for {city}")
        run_weather_emergency_system_test(city, scenario)
    else:
        def scheduled_check():
            """Function to perform scheduled checks for multiple cities"""
            cities = ['Lahore']  # Add more cities as needed
            print(f"\nStarting scheduled check at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

            for city in cities:
                try:
                    print(f"\nChecking weather conditions for {city}...")
                    run_weather_emergency_system(city)
                    time.sleep(2)  # Brief pause between cities
                except Exception as e:
                    print(f"Error checking {city}: {str(e)}")

        # Schedule checks every minut
        schedule.every(1).minute.do(scheduled_check)
        print("Weather Emergency Response System started.")
        print("Monitoring scheduled for every minute.")

        while True:
            try:
                schedule.run_pending()
                time.sleep(1)
            except KeyboardInterrupt:
                print("\nShutting down Weather Emergency Response System...")
                break
            except Exception as e:
                print(f"Error in main loop: {str(e)}")
                time.sleep(1)

if __name__ == "__main__":
    main()


Weather Emergency Response System
1. Run normal monitoring
2. Run test with simulated data


Select mode (1 or 2):  2



Select test scenario:
1. High severity (no human verification needed)
2. Medium severity (requires human verification)
3. Low severity (requires human verification)


Select scenario (1, 2, or 3):  3

Enter city name for test (default: Lahore):  Lucknow



Running test scenario: LOW severity for Lucknow
Invoking LLM with weather data:
  - weather: light drizzle
  - wind_speed: 8.5
  - cloud_cover: 45
  - sea_level: 1013
  - temperature: 20.1
  - humidity: 60
  - pressure: 1015
Disaster type response: No Immediate Threat.  The conditions described are relatively mild.  Light drizzle, moderate wind, pleasant temperature, and normal pressure don't indicate any imminent severe weather event.

Low/Medium severity alert for Lucknow requires human approval:
Disaster Type: No Immediate Threat.  The conditions described are relatively mild.  Light drizzle, moderate wind, pleasant temperature, and normal pressure don't indicate any imminent severe weather event.
Current Weather: light drizzle
Temperature: 20.1°C
Wind Speed: 8.5 m/s
Severity: Low
Response Plan: ## Civil Defense Response Plan: Lucknow - No Immediate Threat (NIT) Situation

**Date:** October 26, 2023
**Time:** 14:00 hrs
**Situation:** Light drizzle, moderate wind, pleasant temperatu

 y


Human verification result: Approved

Verification was approved by human, Email sent to iamkavitasingh08@gmail.com successfully

Completed test weather check for Lucknow with low severity scenario
