# Data Loading
## Saisriram Gurajala
## 10/27/2024 

This notebook loads in the geojson data from the USGS Wildland Fire dataset, and filters the data by year and proximity to my assigned city: Dearborn, Michigan. This notebook references code from Dr. David W. McDonald's source notebook, `wildfire_geo_proximity_example`, which is housed in the `src/notebooks` directory with no modification. It additionally calls the AQI API from the EPA, and references code from Dr. David W. McDonald's source code notebook: `epa_air_quality_history_example.ipynb`. This notebook is housed in the `src/notebooks` directory with no modification.

### Loading GeoJSON Data

An executable version of this script is found in `libs/load_and_filter_json.py`

### Import Libraries and Read in Files

Here we set a base path for the directory this is housed in, as we will need to reference different subdirectories.

In [1]:
import sys
sys.path.append("../")
from libs.helpers import *
from libs.api_key_store import Api_Key_Store
import os
import time
import logging 
import json
import multiprocessing
import geojson
import requests
import re
import pandas as pd
from datetime import datetime
os.chdir(os.getenv("DATA512_BASE_FILE_PATH"))

This cell sets up a logger to track the parsing.

In [3]:
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    filename=f"./logs/geojson_loading.log")

This assigns the filename for the data we are going to read in. It also assigns a filename for the output filename, and sets coordinates for Dearborn, Michigan.

In [19]:
DATA_FILE = "data/USGS_Wildland_Fire_Combined_Dataset.json"
OUT_FILE = "data/USGS_Wildland_Fire_Combined_Dataset_filtered.json"

#coords of dearborn MI
coords = [42.322262, -83.176315]

### Loading in and filtering JSON Features

Here we process a given feature. We filter based on the fire year and don't record fires from the wrong years. We additionally filter based on the distance. I leverage the `shortest_distance_from_place_to_fire_perimeter` sourced from the `wildfire_geo_proximity_example.ipynb`. Given we are told the reported fire dates, such as specific months and years, are unreliable we focus on the reported fire year.

Inputs to this function are: 
- feature (dict): A dictionary representing a wildfire polygon
- coords (tuple) : Default set of coordinates representing where the distance should be calculated from

Outputs of this function are: 
- feature (dict or None): The function returns the original feature if it meets all filtering criteria, with an added Distance_to_DearbornMI attribute. Otherwise, it returns None.

This function operates through the following steps:
1. Assign ring_data based on rings or curveRings in geometry. If neither exists, set feature to None.
2. Check if Fire_Year is between 1961 and 2021.
3. If the date criteria are met, calculate the distance from coords to ring_data. If it’s ≤ 1800, add Distance_to_DearbornMI to attributes and include the feature; otherwise, set feature to None.
4. Log errors and set feature to None if an exception occurs.

In [13]:
def load_and_filter_year_geojson(feature, coords = coords):
    """
    Filter function that operates on features. 
    """
    #assign ring data
    if 'rings' in feature['geometry']:
        ring_data = feature['geometry']['rings'][0]
    elif 'curveRings' in feature['geometry']:
        ring_data = feature['geometry']['curveRings'][0]
    else:
        feature = None
    try:
        #check fire year
        if feature['attributes']['Fire_Year'] >= 1961 and feature['attributes']['Fire_Year'] < 2022:
            distance = shortest_distance_from_place_to_fire_perimeter(coords, ring_data)
            #filter on distance
            if distance[0] <= 1800:
                feature['attributes']['Distance_to_DearbornMI'] = distance[0]
                logging.info(f"Fire {feature['attributes']['Listed_Fire_Names']} is included!.")
        #otherwise set feature to be none
            else:
                feature = None
        else:
            feature = None
    except Exception as e:
        logging.info(f"Error for Fire {feature['attributes']['Listed_Fire_Names']}:{e}")
        feature = None
    return feature

The main function calls the constituent functions outlined above. The file is read in as a geojson object, and logging tracks these steps. We loop through the features from the geojson data and load and filter them. We set up some logging to report out every 10000 processed features.

The main function has no inputs or outputs, and leverages constants set above.

This function operates through the following steps:

1. Open DATA_FILE and load GeoJSON data into gj_data. Log that data has been read.
2. Extract features from gj_data, initialize filtered_features as an empty list, and set feature_number to 0.
3. Use load_and_filter_year_geojson to filter each feature. If a feature passes all filters, append its attributes to filtered_features. Log every 10,000 features processed.
4. Write filtered_features to OUT_FILE in JSON format and log the total number written.
5. Run main and log the time taken to run the script.

In [14]:
def main():
    #Open file 
    geojson_file = open(DATA_FILE,"r")
    gj_data = geojson.load(geojson_file)
    #read in data
    logging.info("Data Read in!")
    #
    features = gj_data['features']
    filtered_features= []
    feature_number = 0
    for feature in features:
        feature = load_and_filter_year_geojson(feature)
        feature_number += 1
        if feature:
            filtered_features.append(feature['attributes'])
        if feature_number % 10000 == 0:
            logging.info(f"Processed and filtered {feature_number} number of features sofar.")
    with open(OUT_FILE, "w") as output_file:
        json.dump(filtered_features, output_file)
    logging.info(f"A total of {len(filtered_features)} written.")

if __name__ == "__main__":
    start = datetime.now()
    main() 
    logging.info(f"Script has taken {datetime.now() - start} seconds to execute!")

### AQI API Call

An executable version of this script is found in `libs/api_call_AQI.py`

The section below is largely adapted from Dr. David McDonald's `epa_air_quality_history_example.ipynb` notebook. This notebook is available in an unaltered form in the `notebooks` subdirectory.


First we set up required constants and logging. We also specify an output file.
We set up AQS API for air quality data by configuring logging and specifying where to save the output as AQI_Dearborn_Michigan.csv. We initialize an Api_Key_Store for secure API key management and defines the base API URL along with various endpoints for making requests. To keep within the request limit, it includes a throttle mechanism and provides a template for necessary parameters, including pollutant codes.

In [2]:
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    filename=f"./logs/api_call.log")

OUT_FILE = "data/AQI_Dearborn_Michigan.csv"

#########
#
#    CONSTANTS
#

#   API Key store to not expose api key endpoint
api_key_store = Api_Key_Store()

#
#    This is the root of all AQS API URLs
#
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

#
#    These are some of the 'actions' we can ask the API to take or requests that we can make of the API
#
#    Sign-up request - generally only performed once - unless you lose your key
API_ACTION_SIGNUP = '/signup?email={email}'
#
#    List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'
#
#    Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    It is always nice to be respectful of a free data resource.
#    We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED
#
#
#    This is a template that covers most of the parameters for the actions we might take, from the set of actions
#    above. In the examples below, most of the time parameters can either be supplied as individual values to a
#    function - or they can be set in a copy of the template and passed in with the template.
# 
AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "26",     # the two digit state FIPS # as a string
    "county":     "163",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}

AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"
#
#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"

Next we set up some essential functions. 

This function is adapted from Dr.David McDonald's code. It runs an API Call using the api key store to store the username and the key. Additionally, it logs exceptions and it checks whether the response data is present and logs that edge case as well.

Inputs to this function are: 
- param (str): The parameter codes for the data request.
- begin_date (str): The start date for the data in YYYYMMDD format.
- end_date (str): The end date for the data in YYYYMMDD format.
- fips (str): The 5-digit FIPS code (state + county).
- endpoint_url (str): The base URL for the API request.
- api_key_store (Api_Key_Store): Object to manage API keys.
- endpoint_action (str): The specific API action endpoint.
- request_template (dict): Template containing API request parameters.
- headers (dict): Any additional headers for the request.

Outputs of this function are:
- json_response (dict or None): The JSON response from the API containing daily summary data, or None if the request fails or returns no data.

This function operates through the following steps:

1. Fill in the request_template with the provided parameters, prioritizing any given values over the defaults.
2. Check that the necessary fields (email, key, param, begin_date, end_date) are present in the request template; raise an exception if any are missing.
3. Build the full API request URL using the base URL and endpoint action, substituting in the values from request_template.
4. Implement a throttling delay before making the GET request. Log success or failure of the request and handle any exceptions.
5. If the response contains no data, log this and return None; otherwise, return the JSON response.

In [17]:
def request_daily_summary(param=None,
                           begin_date=None, 
                           end_date=None, 
                           fips=None,
                           endpoint_url = API_REQUEST_URL, 
                           api_key_store=api_key_store,
                           endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY, 
                           request_template = AQS_REQUEST_TEMPLATE,
                           headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if api_key_store:
        request_template['email'] = api_key_store.username
        request_template['key'] = api_key_store.key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")
    # Note we're not validating FIPS fields because not all of the annual summary actions require the FIPS numbers
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
        logging.info(f"API Pull for {begin_date} to {end_date} succeeded for param {param}.")
    except Exception as e:
        logging.info(f"API Pull for {begin_date} to {end_date} for param {param} failed due to reason {e}.")
        json_response = None
    
    if json_response['Data'] == []:
        logging.info(f"No data available for API Call of {begin_date} to {end_date} for param {param}.")
        json_response = None
    return json_response


This function takes in gas and particulate pollutant AQI data and consolidates it into a single record per day. A given parameter may be measured at multiple different stations per day. Therefore, we need a way to conslidate AQI per day for a given parameter p. I chose to calculate this with the following formula, which averages AQI per parameter per day. 

$$AQI_{day_p} = \frac{1}{n}\sum_{x=0}^{n} AQI^{x}_{day_p}$$

where n is the total number of measurements for a given parameter on a given day.

We then produce a single AQI measurement per day by taking the maximum average AQI measurement per parameter.

$$AQI_{day} = max([AQI_{day_p}]^{params}_p)$$

where p is a parameter in the set of gaseous and particulate pollutant parameters.

The following functions consolidate the AQI data for gaseous and particulate pollutants according to the above formulas. We additionally define the fire season with this function as being between May 1st and October 31st. 

Inputs to this function are: 
- res_gas (dict): Response data for gaseous pollutants.
- res_particulates (dict): Response data for particulate pollutants.
- year (int): The year for which data is being consolidated.

Outputs to this function are: 
- response_df (DataFrame): A Pandas DataFrame containing consolidated AQI data per date, or None if consolidation fails or no valid data is provided.

This function operates through the following steps:
1. Verify that either res_gas or res_particulates is provided, and that a valid year is specified. Log messages if checks fail and return None.
2. Define lower and upper bounds for the given year (May 1st to October 31st).
3. Loop through res_gas['Data'], filtering entries within the date bounds and appending valid data to a list of DataFrames.
4. Repeat the filtering and appending process for res_particulates['Data'].
5. Concatenate all collected DataFrames, drop any NaN values, and calculate the mean AQI per date and parameter. Then, group by date to take the maximum AQI for each day across params.
6. Catch any exceptions during the consolidation process, log the error, and return None if an error occurs.

In [None]:
def consolidate_results(res_gas = None, 
                        res_particulates = None,
                        year = None):
    if not res_gas and not res_particulates:
        logging.info("Must provide valid data to consolidate!")
        response_df = None
        return response_df 
    if not year:
        logging.info("Must provide a valid year to consolidate data")
        response_df = None
        return response_df
    #set upper and lower bounds for years
    lower_bound = datetime.datetime.strptime(f"{year}-05-01", "%Y-%m-%d")
    upper_bound = datetime.datetime.strptime(f"{year}-10-31", "%Y-%m-%d")
    response_dataframes = []
    #consolidate res for gas
    try:
        if res_gas: 
            for response in res_gas['Data']:
                res_date = datetime.datetime.strptime(response['date_local'], "%Y-%m-%d")
                if res_date >= lower_bound and res_date <= upper_bound:
                    res_df = pd.DataFrame({"date" : [response['date_local']],
                                        "parameter" : [response['parameter']],
                                        "aqi" : [response['aqi']]})
                    response_dataframes.append(res_df)
        #consolidate res for particulates
        if res_particulates:
            for response in res_particulates['Data']:
                res_date = datetime.datetime.strptime(response['date_local'], "%Y-%m-%d")
                if res_date >= lower_bound and res_date <= upper_bound:
                    res_df = pd.DataFrame({"date" : [response['date_local']],
                                        "parameter" : [response['parameter']],
                                        "aqi" : [response['aqi']]})
                    response_dataframes.append(res_df)
        #take mean per date and parameter
        response_df = pd.concat(response_dataframes).dropna().groupby(['date', 'parameter'], as_index=False)['aqi'].mean()
        #take AQI for 
        response_df = response_df.groupby(['date'], as_index = False)['aqi'].max()
    except Exception as e:
        logging.info(f"Data consolidation for year {year} failed with error {e}")
        response_df = None
    return response_df

The main function leverages set constants and runs api calls per year range. 
It has no inputs or outputs. 

The function operates through the following steps:
1. Create an empty list to store yearly AQI DataFrames.
2. Loop through each year in reverse from 2021 to 1961. 
3. Define start and end dates for the year. 
4. Request daily summaries of gaseous and particulate AQI data. 
5. Combine and filter results to create a DataFrame per year.
6. Filter out empty DataFrames, concatenate the yearly DataFrames, add a year column, and save the result to CSV. 
7. Main is executed and execution time is logged

In [19]:
def main():
    year_dfs = []
    #loop through years
    for year in reversed(range(1961, 2022)):
        start_date = f"{year}0101"
        end_date = f"{year}1231"
        #request daily summary for gas and particulate
        res_gas = request_daily_summary(param = AQI_PARAMS_GASEOUS,
                                        begin_date=start_date,
                                        end_date=end_date,
                                        api_key_store=api_key_store)
        res_particulates = request_daily_summary(param = AQI_PARAMS_PARTICULATES,
                                                begin_date=start_date,
                                                end_date=end_date,
                                                api_key_store=api_key_store)
        #consolidate responses
        response_df = consolidate_results(res_gas, 
                                          res_particulates,
                                          year)
        year_dfs.append(response_df)
    #concatenate dataframes
    full_dataframe = pd.concat(year_dfs)
    full_dataframe['year'] = pd.to_datetime(full_dataframe['date']).dt.year
    full_dataframe.to_csv(OUT_FILE, index = False)

if __name__ == "__main__":
    start = datetime.datetime.now()
    main()
    logging.info(f"Script has taken {datetime.datetime.now() - start} seconds to execute.")