## FloodGPT: An Advanced AI Assistant for Flood Risk Management

This notebook is an extension of our research and development work detailed in the paper, "Towards Democratized Flood Risk Management: An Advanced AI Assistant Enabled by GPT-4 for Enhanced Interpretability and Public Engagement." Here, we delve into the practical application of GPT-4's AI to create an AI Assistant that simplifies complex flood risk data into actionable insights for the public.

Our goal is to demonstrate how cutting-edge AI, specifically GPT-4, can be utilized to provide real-time, user-friendly flood alerts, and responses to flood-related inquiries. By combining the AI's ability to interpret vast amounts of data with interactive mapping and social vulnerability assessments, we aim to empower individuals and communities to make informed decisions in the face of flood risks.

This notebook serves as a hands-on guide to the prototype we developed. Whether you're a researcher, a tech enthusiast, or someone interested in disaster management, we invite you to explore this notebook and witness the potential of AI in transforming flood risk management.

### Author
Rafaela Martelo & Dr. Ruo-Qian (Roger) Wang

### Acknowledgments

This project's initial structure and function calling implementation were inspired by the following resources:

- "Function Calling: Integrate Your GPT Chatbot With Anything" by Tomas Fernandez
  [Link to Article](https://dzone.com/articles/function-calling-integrate-your-gpt-chatbot-with-a)

- "gpt-function-calling-tutorial" by JayZeeDesign
  [Link to GitHub Repository](https://github.com/JayZeeDesign/gpt-function-calling-tutorial)

While the project started with these inspirations, it has evolved with additional features and modifications to suit specific requirements.


### Setup and Dependencies

In [None]:
# Use older version of openai due to recent library update. 
# A detailed Migration guide is available here 'https://github.com/openai/openai-python/discussions/742'
#!pip install openai==0.28

In [1]:
from dotenv import load_dotenv
import openai
import tiktoken
import json
import os
import io
import requests
import fiona
import folium
from PIL import Image
from IPython.display import IFrame

### Model Configuration

In [2]:
load_dotenv(".env")
openai.api_key = os.getenv('OPENAI_API_KEY')

llm_model = "gpt-4-1106-preview" # try and compare with "gpt-4-0613", "gpt-3.5-turbo-16k"
llm_max_tokens = 15500

llm_system_prompt = "You are an assistant that provides flood information relevant to the user queries. You may receive \
                    queries about flood alerts, Social Vulnerability Indexes (SVI) for a location, flood alerts, or display \
                    maps to the request of the user. Interactive maps are always displayed to the user. \
                    Always try your best to answer in natural language easy to understand using the available function calls."

encoding_model_messages = "gpt-3.5-turbo-0613" # can try with "gpt-4-1106-preview" 
encoding_model_strings = "cl100k_base"
function_call_limit = 4

In [4]:
def num_tokens_from_messages(messages):
    """Returns the number of tokens used by a list of messages."""
    try:
        encoding = tiktoken.encoding_for_model(encoding_model_messages)
    except KeyError:
        encoding = tiktoken.get_encoding(encoding_model_strings)

    num_tokens = 0
    for message in messages:
        num_tokens += 4
        for key, value in message.items():
            num_tokens += len(encoding.encode(str(value)))
            if key == "name":
                num_tokens += -1
    num_tokens += 2
    return num_tokens

### Available Functions

This AI Assistant includes the following functions to interact with flood-related data sources and provide information:

- `get_flood_map(latitude, longitude, zoom)`: Retrieves a flood map for the specified location.
- `get_flood_data(address)`: Fetches flood-related data for the given location.
- `get_svi_stats_and_tracts(state_abbr, county, theme, op, threshold)`: Gets Social Vulnerability Index (SVI) statistics and tract information for the location.
- `get_flash_flood_warnings(location)`: Retrieves current flash flood warnings for the specified area.

In [11]:
def get_flood_map(latitude, longitude, zoom):
    '''
    Retrieves and saves a static map image indicating flood data for a specified location.
    
    Parameters:
    - latitude (float): The latitude of the location in standard geographic coordinate system.
    - longitude (float): The longitude of the location in standard geographic coordinate system.
    - zoom (int): The zoom level for the map.
    
    Returns:
    - A JSON string containing the latitude, longitude, and the name of the saved image file.
      If an error occurs while retrieving the static map, it returns a JSON string with the error message
      and ensures the interactive map will still be displayed.
    '''

    headers = {'x-api-key': os.getenv('FEMA_API_KEY')}
    payload = {
      'lat': latitude,
      'lng': longitude,
      'height': 500,
      'width': 800,
      'showMarker': True,
      'showLegend': True,
      'zoom': zoom
    }
    
    try:
        response = requests.get('https://api.nationalflooddata.com/v3/staticmap', headers=headers, params=payload)
        response.raise_for_status()

        image = Image.open(io.BytesIO(response.content))

        # Save the image to a file
        image.save('output_static_map.png')

        # Return a JSON string with the relevant information
        result_json = json.dumps({
            'latitude': latitude,
            'longitude': longitude,
            'image_name': 'output_static_map.png'
        })

        return result_json
    except requests.exceptions.RequestException as e:
        
        # Return a JSON string with the error message
        error_message = {
            'latitude': latitude,
            'longitude': longitude,
            'image_name': f"Interactive map will display but an error occurred while retrieving static map: {str(e)}"
        }
        return json.dumps(error_message)

        
def get_flood_data(address):
    '''
    Retrieves flood risk data for a specified address using the National Flood Data API.
    
    Parameters:
    - address: The address for which flood data is requested.
    
    Returns:
    - A JSON string containing flood data for the specified address. This includes flood zone designation, risk level, and other related information.
    '''
    
    headers = {'x-api-key': os.getenv('FEMA_API_KEY')}
    payload = {
      'address': address,
      'searchtype': 'addressparcel',
      'loma': False,
    }
    
    try:
        response = requests.get('https://api.nationalflooddata.com/v3/data', headers=headers, params=payload)
        response.raise_for_status()
        
        result_dict = response.json()
        result_json = json.dumps(result_dict)  
        return result_json
    except requests.exceptions.RequestException as e:
        # Handle any request errors and return a JSON string with the error message
        error_message = {"Error": str(e)}
        return json.dumps(error_message)


def get_svi_stats_and_tracts(state_abbr, county, theme, op, threshold):
    """
    Retrieve SVI statistics and census tracts based on user-specified conditions,
    and return a JSON string containing the information.

    Parameters:
        state_abbr (str): State abbreviation (e.g., 'NJ').
        county (str): County name (e.g., 'Middlesex').
        theme (str): SVI theme (e.g., 'RPL_THEME4').
        op (str): Comparison operator ('<', '<=', '>', '>=').
        threshold (float): Threshold value for the comparison.

    Returns:
        str: JSON string containing SVI statistics and census tract information.
    """
    # Set the file path and name of the shapefile
    shapefile_path = r'SVI_2020_US\SVI2020_US_tract.shp'

    # Open the shapefile for reading
    with fiona.open(shapefile_path) as input:
        # Select relevant features based on user-specified conditions
        relevant_features = [feature for feature in input if
                             feature["properties"]["ST_ABBR"] == state_abbr and
                             feature["properties"]["COUNTY"] == county and
                             eval(f"feature['properties']['{theme}'] {op} {threshold}")]

    # Check if relevant features are empty
    if not relevant_features:
        return json.dumps({"Statistics": "No data available for the specified conditions."})

    # Extract SVI scores, FIPS, and coordinates from the relevant features
    data = []
    for feature in relevant_features:
        coordinates = feature['geometry']['coordinates']
        fips = feature['properties']['FIPS']
        svi_score = feature['properties'][theme]

        # Append data for each relevant feature
        data.append({
            "FIPS": fips,
            "Coordinates": coordinates,
            "SVI_Score": svi_score
        })

    # Calculate SVI statistics
    total_vulnerable_areas = len(data)
    max_svi = max(entry["SVI_Score"] for entry in data)
    min_svi = min(entry["SVI_Score"] for entry in data)
    avg_svi = round(sum(entry["SVI_Score"] for entry in data) / total_vulnerable_areas, 4)

    # Create a dictionary with SVI statistics and census tract information
    result_dict = {
        "Statistics": {
            "Total_Areas": total_vulnerable_areas,
            "Max_SVI": max_svi,
            "Min_SVI": min_svi,
            "Average_SVI": avg_svi
        },
        "Census_Tracts": data
    }

    # Convert the result dictionary to a JSON-formatted string
    result_json = json.dumps(result_dict, separators=(',', ':'))

    # Return the result as a JSON string
    return result_json



def get_flash_flood_warnings(location):
    '''
    Retrieves flash flood warnings for a specified location from the National Weather Service API.
    
    Parameters:
    - location: Can be "None" for the entire United States or a specific state code (e.g., "NJ").
    
    Returns:
    - A JSON string of flood alerts. Each alert contains the event, description, time sent, expiration time, and the area description.
    '''

    base_url = "https://api.weather.gov/alerts/active"
    
    # Provide the location as a parameter, "None" for the entire United States or a state code (e.g., "NJ")
    if location is None:
        params = {"event": "Flood Warning"}
    else:
        params = {"area": location, "event": "Flood Warning"}
    
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()
        data = response.json()
        
        # Extract the alert details
        alerts = []
        for feature in data['features']:
            properties = feature['properties']
            alert = {
                'event': properties['event'],
                'description': properties['description'],
                'sent': properties['sent'],
                'expires': properties['expires'],
                'areaDesc': properties['areaDesc'],
            }
            alerts.append(alert)
        
        # Convert the alerts list to a JSON string
        alerts_json = json.dumps(alerts)
        
        return alerts_json
    except requests.exceptions.RequestException as e:
        # Handle any request errors
        error_message = {"Error": str(e)}
        return json.dumps(error_message)

### Function Signatures

Provide a brief description of the parameters and expected output for each function so that GPT understands how to use them effectively when answering user queries.

In [6]:
signature_get_flood_map = {
    "name": "get_flood_map",
    "description": "Directly display interactive flood map for a given location using FEMA Flood Map. User may also see the downloaded static flood map as 'output_static_map.png'. The map displayed has the following flood zone legends:A: An area with a 1% annual chance of flood; does not have base flood elevations (BFEs) available. B, X500:An area with at least a 0.2% chance of annual flood or with a 1% annual chance of flood with average depths less that one foot or with drainage area less than one square mile. (C is the older designation and X500 is the current designation. C, X: An area outside the 0.2% and 1% annual chance of flood regions. (C is the older designation and X500 is the current designation.) D: An area where flooding is possible but has not been studied. V:An area with a 1% annual chance flooding with velocity hazard due to waves; BFEs have are not available.",
    "parameters": {
        "type": "object",
        "properties": {
            "latitude": {
                "type": "number",
                "description": "The latitude coordinate of interest in standard geographic coordinate system. For example, 34.071783. Please include negative numbers to represent directions."
            },
            "longitude": {
                "type": "number",
                "description": "The longitude coordinate of interest in standard geographic coordinate system. For example, -113.2596. Please include negative numbers to represent directions."
            },
            "zoom": {
                "type": ["integer", "null"],
                "description": "Integer parameter between 6 (low zoom) and 17 (high zoom). Use 'null' when zoom is not specified. "
            }
        }
    }
}


signature_get_flood_data = {
    "name": "get_flood_data",
    "description": "Get Fema flood data by address. The query returns flood data for the provided address, including information about the flood zone, Special Flood Hazard Area (SFHA), and related details. Additionally, it provides geocoding information, such as the latitude, longitude, and match level for the given address.",
    "parameters": {
        "type": "object",
        "properties": {
            "address": {
                "type": "string",
                "description": "The address of interest. Example: address=12 Maple Pl Mapleville KS 12345 address with street number, street type, city, state, zip and without punctuation. Do not include 4 digit zipcode extension."
            }
        }
    }
}


signature_get_svi_stats_and_tracts = {
    "name": "get_svi_stats_and_tracts",
    "description": "Retrieve Social Vulnerability Index (SVI) statistics based on user-specified criteria \
                    and display census tract on map with SVI index. State abbreviation, county, theme, \
                    operation and threshold must always be specified.",
    "parameters": {
        "type": "object",
        "properties": {
            "state_abbr": {
                "type": "string",
                "description": "The state abbreviation (e.g., 'NJ')."
            },
            "county": {
                "type": "string",
                "description": "The county name (e.g., 'Middlesex')."
            },
            "theme": {
                "type": "string",
                "description": "The SVI theme to analyze. Choose from the following options:  \
                                - RPL_THEME1 (SVI for Socioeconomic Status theme);  \
                                - RPL_THEME2 (SVI for Household Characteristics theme); \
                                - RPL_THEME3 (SVI for Racial and Ethnic Minority Status theme); \
                                - RPL_THEME4 (SVI for Housing Type/Transportation theme); \
                                - RPL_THEMES (Overall SVI)",
                "enum": ["RPL_THEME1", "RPL_THEME2", "RPL_THEME3", "RPL_THEME4", "RPL_THEMES"]
            },
            "op": {
                "type": "string",
                "description": "Operation of interest, e.g. less than, use '<'.",
                "enum": ["<", "<=", "=>", ">"]
            },
            "threshold": {
                "type": "number",
                "description": "Threshold value for comparison from 0 to 1, e.g. 0.6. Vulnerability thresholds are: 'Low': 0.249, 'Low-Medium': 0.49, 'Medium-High': 0.50, 'High': 0.75. 90th percentile flag vulnerability is >=0.90 index score."
            }
        }
    }
}

signature_get_flash_flood_warnings = {
    "name": "get_flash_flood_warnings",
    "description": "Get flash flood warnings for a given location using the National Weather Service (NWS) API.",
    "parameters": {
        "type": "object",
        "properties": {
            "location": {
                "type": ["string", "null"],
                "description": "The state abbreviation in uppercase for which state you want to retrieve flash flood warnings (e.g., 'NJ'). Use 'null' or omit this parameter to search for flash flood warnings in the entire US."
            }
        }
    } 
}



### Integrating Dynamic Function Calls with ChatGPT API

The `complete` function outlined here is the core of our interaction with the Chat Completions API. It dynamically manages the conversation flow, ensuring that the assistant can handle a variety of user inputs, including special commands like 'sleep'. More importantly, it integrates function calling, where the AI decides to invoke specific functions based on the context of the conversation.

Below is a breakdown of how the `complete` function operates:
- It first checks for the special commands 'sleep' and handles it accordingly.
- It prepares the system prompt and trims the conversation to stay within the token limit for efficient processing.
- It then calls the ChatCompletion API, passing in the current conversation and available function signatures.
- Upon receiving a function call request from the API, it executes the corresponding function and appends the result back into the conversation.


In [7]:
def complete(messages, function_call: str = "auto"):
    """Fetch completion from OpenAI's GPT"""
    
    # Check if the user entered 'sleep'
    last_user_message = messages[-1]["content"].strip().lower()
    if last_user_message == 'sleep':
        # If the user wants to sleep, provide a different response
        print("\nTime for a power nap! I'll be here, waiting for your next question!")
        
        return messages

    messages.append({"role": "system", "content": llm_system_prompt})

    # Delete older completions to keep conversation under token limit
    while num_tokens_from_messages(messages) >= llm_max_tokens:
        messages.pop(0)

    print('Working...')
    res = openai.ChatCompletion.create(
        model=llm_model,
        messages=messages,
        functions=[signature_get_flood_map, signature_get_flood_data, 
                   signature_get_svi_stats_and_tracts, signature_get_flash_flood_warnings],
        function_call=function_call
    )
    
    # Remove system message and append response from the LLM
    messages.pop(-1)
    response = res["choices"][0]["message"]
    messages.append(response)

    
    # Call functions requested by the model
    if response.get("function_call"):
        function_name = response["function_call"]["name"]
        if function_name == "get_flood_map":
            args = json.loads(response["function_call"]["arguments"])
            flood_map = get_flood_map(
                latitude= args.get("latitude"),
                longitude= args.get("longitude"),
                zoom= args.get("zoom")
            )
            messages.append({"role": "function", "name": "get_flood_map", "content": flood_map})
        elif function_name == "get_flood_data":
            args = json.loads(response["function_call"]["arguments"])
            flood_data = get_flood_data(
                address= args.get("address")
            )
            messages.append({"role": "function", "name": "get_flood_data", "content": flood_data})
        elif function_name == "get_svi_stats_and_tracts":
            args = json.loads(response["function_call"]["arguments"])
            svi_stats = get_svi_stats_and_tracts(
                state_abbr= args.get("state_abbr"),
                county = args.get("county"),
                theme = args.get("theme"),
                op = args.get("op"),
                threshold = args.get("threshold")
            )
            messages.append({"role": "function", "name": "get_svi_stats_and_tracts", "content": svi_stats})
        elif function_name == "get_flash_flood_warnings":
            args = json.loads(response["function_call"]["arguments"])
            flash_flood_warnings = get_flash_flood_warnings(
                location=args.get("location")
            )
            messages.append({"role": "function", "name": "get_flash_flood_warnings", "content": flash_flood_warnings})
    return messages


## Explore FloodGPT in Action

In [8]:
print("\nHi, I'm FloodGPT, your friendly Flood Risk & Management AI assistant! 🌊")
print("Here to help with flood alerts in the US.")
print("Ask me anything about flood risks, like 'Are there flood alerts in Texas?'\n or 'What are the most vulnerable communities in New York?'")
print("Ready to give me a rest? Just say 'sleep' and I'll take a break! 😴")

messages = []
while True:
    prompt = input("\nI just woke up, but fire away: ")
    messages.append({"role": "user", "content": prompt})
    complete(messages)
    
    # Check if the user wants to exit
    if prompt.lower() == 'sleep':
        break

    # The LLM can chain function calls, this implements a limit
    call_count = 0
    while messages[-1]['role'] == "function":
        call_count = call_count + 1
        # Create a map centered around New Jersey (you can adjust the coordinates)
        if messages[-1]['name'] == "get_flood_map":

            # Display the interactive map with a marker at a specific location
            content_str = messages[-1]['content']
            content_dict = json.loads(content_str)
            if "Error" not in content_dict:
                latitude = content_dict.get('latitude', None)
                longitude = content_dict.get('longitude', None)  
                map_url = f"https://www.nationalflooddata.com/v3/dynamic.html?lat={latitude}&lng={longitude}"
                map_frame = IFrame(src=map_url, width=900, height=600)
                display(map_frame)
            
        elif messages[-1]['name'] == "get_svi_stats_and_tracts":
            
            content_str = messages[-1]['content']
            # Parse the JSON string
            data = json.loads(content_str)
            # Check if "Total_Areas" key exists
            if "Total_Areas" in data["Statistics"]:
                # Check if "Total_Areas" is greater than 0
                if data["Statistics"]["Total_Areas"] > 0:
                    # Parse the JSON string
                    result_dict = json.loads(content_str)

                    # Extract census tracts data
                    census_tracts = result_dict.get("Census_Tracts", [])

                    # Extract the first coordinate from the first tract
                    first_coordinate = census_tracts[0]["Coordinates"][0][0]

                    # Create a folium map centered around the first coordinate
                    m = folium.Map(location=[first_coordinate[1], first_coordinate[0]], zoom_start=13)

                    # Create a GeoJSON object with features for each tract
                    geojson_object = {
                        "type": "FeatureCollection",
                        "features": []
                    }

                    for tract in census_tracts:
                        feature = {
                            "type": "Feature",
                            "geometry": {
                                "type": "Polygon",
                                "coordinates": tract["Coordinates"]
                            },
                            "properties": {
                                "FIPS": tract["FIPS"],
                                "SVI_Score": tract["SVI_Score"]
                            }
                        }
                        geojson_object["features"].append(feature)

                    # Add GeoJSON data for each tract to the map
                    for feature in geojson_object['features']:
                        coordinates = feature['geometry']['coordinates']
                        folium.GeoJson(feature, name=f"Census Tract {feature['properties']['FIPS']} - SVI: {feature['properties']['SVI_Score']}", style_function=lambda x: {'color': 'red'}).add_to(m)

                    # Add Layer Control for turning on/off GeoJSON features (legend)
                    folium.LayerControl().add_to(m)

                    m.save("census_tract_map_with_boundaries_and_labels.html")

                    map_frame = IFrame("census_tract_map_with_boundaries_and_labels.html", width=900, height=600)
                    display(map_frame)



        if call_count < function_call_limit:
            complete(messages)
        else:
            complete(messages, function_call="none")
            

    # Print last message
    print("\n\n**Response**\n")
    print(messages[-1]["content"].strip())
    print("\n**End of response**")


Hi, I'm FloodGPT, your friendly Flood Risk & Management AI assistant! 🌊
Here to help with flood alerts in the US.
Ask me anything about flood risks, like 'Are there flood alerts in Texas?'
 or 'What are the most vulnerable communities in New York?'
Ready to give me a rest? Just say 'sleep' and I'll take a break! 😴

I just woke up, but fire away: sleep

Time for a power nap! I'll be here, waiting for your next question!
