# US EPA Air Quality System API Example
This example illustrates how to request data from the US Environmental Protection Agency (EPA) Air Quality Service (AQS) API. This is a historical API and does not provide real-time air quality data. The [documentation](https://aqs.epa.gov/aqsweb/documents/data_api.html) for the API provides definitions of the different call parameter and examples of the various calls that can be made to the API.

This notebook works systematically through example calls, requesting an API key, using 'list' to get various IDs and parameter values, and using 'daily summary' to get summary data that meets specific condistions. The notebook contains example function definitions that could be reused in other code. In general, the notebook explains each step along the way, referring back to possible output. Some of the explanations are tailored to the specific example requests of the API. Changing values to explore the results of the API is probably useful, but that will result in some explanations being out of sync with the outputs.

The US EPA was created in the early 1970's. The EPA reports that they only started broad based monitoring with standardized quality assurance procedures in the 1980's. Many counties will have data starting somewhere between 1983 and 1988. However, some counties still do not have any air quality monitoring stations. The API helps resolve this by providing calls to search for monitoring stations and data using either station ids, or a county designation or a geographic bounding box. This example code provides examples of the county based and bounding box based API calls. Some [additional information on the Air Quality System can be found in the EPA FAQ](https://www.epa.gov/outdoor-air-quality-data/frequent-questions-about-airdata) on the system.

The end goal of this example is to get to some values that we might use for the Air Quality Index or AQI. You might see this reported on the news, most often around smog, but more frequently with regard to smoke. The AQI index is meant to tell us something about how healthy or clean the air is on any day. The AQI is actually a somewhat complext measure. When I started this example I looked up [how to calculate the AQI](https://www.airnow.gov/sites/default/files/2020-05/aqi-technical-assistance-document-sept2018.pdf) so that I would know roughly what goes into that value.


## License
This code example was developed by Dr. David W. McDonald for use in DATA 512, a course in the UW MS Data Science degree program. This code is provided under the [Creative Commons](https://creativecommons.org) [CC-BY license](https://creativecommons.org/licenses/by/4.0/). Revision 1.2 - August 16, 2024



In [1]:
# 
#    These are standard python modules
#
#import json, time, urllib.parse
import json, time
#
#    The 'requests' module is a distribution module for making web requests. If you do not have it already, you'll need to install it
import requests

import numpy as np
import pandas as pd

In [None]:
#########
#
#    CONSTANTS
#

#
#    This is the root of all AQS API URLs
#
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

#
#    These are some of the 'actions' we can ask the API to take or requests that we can make of the API
#
#    Sign-up request - generally only performed once - unless you lose your key
API_ACTION_SIGNUP = '/signup?email={email}'
#
#    List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'
#
#    Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    It is always nice to be respectful of a free data resource.
#    We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED
#
#
#    This is a template that covers most of the parameters for the actions we might take, from the set of actions
#    above. In the examples below, most of the time parameters can either be supplied as individual values to a
#    function - or they can be set in a copy of the template and passed in with the template.
# 
AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}


#   Given the set of sensor codes, now we can create a parameter list or 'param' value as defined by the AQS API spec.
#   It turns out that we want all of these measures for AQI, but we need to have two different param constants to get
#   all seven of the code types. We can only have a max of 5 sensors/values request per param.
#
#   Gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"
#
#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"

## Example 3. Making a daily summary request

The function below is designed to encapsulate requests to the EPA AQS API. When calling the function one should create/copy a parameter template, then initialize that template with values that won't change with each call. Then on each call simply pass in the parameters that need to change, like date ranges.

Another function below provides an example of extracting values and restructuring the response to make it a little more usable.

In [3]:
#
#    This implements the daily summary request. Daily summary provides a daily summary value for each sensor being requested
#    from the start date to the end date. 
#
#    Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
#    parameters. If function parameters are provided, those take precedence over any parameters from the request template.
#
def request_daily_summary(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")
    # Note we're not validating FIPS fields because not all of the daily summary actions require the FIPS numbers
        
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
        
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response



The form of the daily summary response is a bit verbose with lots of repeated values. What we'll do is create a data structure that relies on a hierarchical context to summarize the data.

The two responses (for Bend, OR) show that not every monitoring site produces values. As well, it looks like the monitoring sites only produce values for particulates and not for gaseous pollutants.

The next function takes the response and a set of fields that should be extracted for their data values. The code assumes those fields are available. If there are missing values something could certainly go wrong. The function creates a summary for each monitoring site.

In [4]:
#
#    This is a list of field names - data - that will be extracted from each record
#
EXTRACTION_FIELDS = ['sample_duration','observation_count','arithmetic_mean','aqi']

#
#    The function creates a summary record
def extract_summary_from_response(r=None, fields=EXTRACTION_FIELDS):
    
    ## the result will be structured around monitoring site, parameter, and then date
    result = dict()
    #data = r["Data"]
    for record in r:
        # make sure the record is set up
        site = record['site_number']
        param = record['parameter_code']
        #date = record['date_local']    # this version keeps the respnse value YYYY-
        date = record['date_local'].replace('-','') # this puts it in YYYYMMDD format
        if site not in result:
            result[site] = dict()
            result[site]['local_site_name'] = record['local_site_name']
            result[site]['site_address'] = record['site_address']
            result[site]['state'] = record['state']
            result[site]['county'] = record['county']
            result[site]['city'] = record['city']
            result[site]['pollutant_type'] = dict()
        if param not in result[site]['pollutant_type']:
            result[site]['pollutant_type'][param] = dict()
            result[site]['pollutant_type'][param]['parameter_name'] = record['parameter']
            result[site]['pollutant_type'][param]['units_of_measure'] = record['units_of_measure']
            result[site]['pollutant_type'][param]['method'] = record['method']
            result[site]['pollutant_type'][param]['data'] = dict()
        if date not in result[site]['pollutant_type'][param]['data']:
            result[site]['pollutant_type'][param]['data'][date] = list()
        
        # now extract the specified fields
        extract = dict()
        for k in fields:
            if str(k) in record:
                extract[str(k)] = record[k]
            else:
                # this makes sure we always have the requested fields, even if
                # we have a missing value for a given day/month
                extract[str(k)] = None
        
        # add this extraction to the list for the day
        result[site]['pollutant_type'][param]['data'][date].append(extract)
    
    return result


The helper function below, multi_year_data_pull, is used to pull multiple years of EPA data. It has the option to save the raw files or continue to have them processed using the function above. This function was adapted heavily from the tutorial code provided. 

In [5]:
def multi_year_data_pull(first_year, last_year, city_fips, save=False):
    """
    Takes in the starting and ending years, the fips code for a given city, 
    and whether to save the raw file and outputs the EPA AQI information for both gaseous and particulate pollutants.

    Parameters:
        - first_year: The starting year to begin requesting data
        - last_year: The last year to request data
        - city_fips: The fips code for a given city
        - save: Whether to save the raw JSONs

    Returns two processed JSONs, one for gaseous and one for particulate. 
    """
    request_data = AQS_REQUEST_TEMPLATE.copy()
    request_data['email'] = USERNAME
    request_data['key'] = APIKEY
    request_data['param'] = AQI_PARAMS_GASEOUS
    request_data['state'] = city_fips[:2]
    request_data['county'] = city_fips[2:]
    
    # Define the start and end year range
    start_year = first_year
    end_year = last_year
    
    # Initialize lists to hold combined data for each pollutant type
    combined_gaseous_data = []
    combined_particulate_data = []
    
    # Loop through each year in the range
    for year in range(start_year, end_year + 1):
        begin_date = f"{year}0501"
        end_date = f"{year}1031"
        
        # Request daily summary data for gaseous pollutants
        gaseous_aqi = request_daily_summary(request_template=request_data, begin_date=begin_date, end_date=end_date)
        if gaseous_aqi["Header"][0]['status'] == "Success":
            combined_gaseous_data.extend(gaseous_aqi['Data'])
        elif gaseous_aqi["Header"][0]['status'].startswith("No data"):
            print(f"No data for gaseous pollutants in {year}.")
        else:
            print(f"Error in gaseous data request for {year}: {json.dumps(gaseous_aqi, indent=4)}")
        
        # Adjust request data for particulate pollutants
        request_data['param'] = AQI_PARAMS_PARTICULATES
        
        # Request daily summary data for particulate pollutants
        particulate_aqi = request_daily_summary(request_template=request_data, begin_date=begin_date, end_date=end_date)
        if particulate_aqi["Header"][0]['status'] == "Success":
            combined_particulate_data.extend(particulate_aqi['Data'])
        elif particulate_aqi["Header"][0]['status'].startswith("No data"):
            print(f"No data for particulate pollutants in {year}.")
        else:
            print(f"Error in particulate data request for {year}: {json.dumps(particulate_aqi, indent=4)}")

    if save: 
        # Save the gaseous data to a JSON file
        with open(f"combined_gaseous_aqi_data_{start_year}_{end_year}.json", "w") as f:
            json.dump(combined_gaseous_data, f, indent=4)
        
        # Save the particulate data to a separate JSON file
        with open(f"combined_particulate_aqi_data_{start_year}_{end_year}.json", "w") as f:
            json.dump(combined_particulate_data, f, indent=4)
    
    processed_gaseous_data = extract_summary_from_response(combined_gaseous_data)
    processed_particulate_data = extract_summary_from_response(combined_particulate_data)

    return processed_gaseous_data, processed_particulate_data
    


The helper function below, process_extract, is used to take in the processed JSONs, extract the AQI data, and create a dataframe that gives AQI data per day for a given dataset

I also make the decision to average the AQI values if there are multiple for a given day.

In [6]:
def process_extract(data):
    """
    Takes in a JSON file and returns the AQI information 

    Parameters:
        - data: A JSON containing information about the AQI

    Returns a dataframe where the rows are days, and the columns are pollutants. 
    The values are the average AQI for a given day
    """
    # Initialize a dictionary to hold data for each pollutant and day
    daily_data = {}

    # Iterate through each main entry in data
    for main_key, main_data in data.items():
        pollutants_data = main_data['pollutant_type']
        
        # Iterate over each pollutant type
        for pollutant_id, pollutant_info in pollutants_data.items():
            pollutant_name = pollutant_info['parameter_name']
            
            # Access daily observations for this pollutant
            for date, records in pollutant_info['data'].items():
                
                # Extract AQI values, excluding `None`s
                aqi_values = [entry['aqi'] for entry in records if entry['aqi'] is not None]
                
                # Calculate mean AQI or assign `np.nan` if all values are `None`
                mean_aqi = np.mean(aqi_values) if aqi_values else np.nan
                
                # Populate the dictionary: each date has a dictionary of pollutants
                if date not in daily_data:
                    daily_data[date] = {}
                
                # Store the mean AQI for the pollutant
                daily_data[date][pollutant_name] = mean_aqi
    # Convert the dictionary to a DataFrame
    df = pd.DataFrame.from_dict(daily_data, orient='index')
    return df



The function below, get_max_aqi, handles the entire extraction process of daily AQI information. It uses the two helper function above to get data from both gaseuous and particulate pollutants, get the daily AQI values and output them to a dataframe.

I also make the decision that I'm going to be the maximum AQI for a given day of all pollutants. Based on the research I found, there wasn't a good way to assess AQI values between pollutants. Therefore, I figured the max would likely represent the most sensitive measurement. 

In [7]:
# Given a beginning and ending year and a tuple entry where the key is a city and the value is the fips ID, 
# will return a processed dataframe with AQI levels for each pollutant
# Includes a column with the maximum AQI recorded for each day of fire season between the two years.

def get_max_aqi(first_year, last_year, city_info, save_raw = False, save_dataframe = False, loc = "data"):
    """
    Takes in the starting and ending years, the fips code for a given city, 
    and whether to save the raw file, whether to save the final dataframe, and what folder to save it to
    and outputs a dataframe giving the AQI for each pollutant for each day.

    Parameters:
        - first_year: The starting year to begin requesting data
        - last_year: The last year to request data
        - city_info: A list where the first 
          element is the city name and the last element is the fips code. Both are strings
        - save_raw: Whether to save the raw JSONs
        - save_dataframe: Whether to save the final dataframe
        - loc: Which folder to save the final dataframe if save_dataframe = True

    Returns a combined dataframe - where the rows are days, the columns pollutants, and the values are AQI values.
    """
    gaseous, particulate = multi_year_data_pull(first_year, last_year, city_info[1], save_raw)
    
    # Process each extract
    
    df_gaseous = process_extract(gaseous)
    df_particulate = process_extract(particulate)
    
    # Combine the two dataframes by joining them on the date index
    df_combined = pd.concat([df_gaseous, df_particulate], axis=1)
    
    # Convert the index to datetime format, sort, and display the final DataFrame
    df_combined.index = pd.to_datetime(df_combined.index, format='%Y%m%d')
    df_combined = df_combined.sort_index()

    df_combined['max_aqi'] = df_combined.max(axis=1)
    if save_dataframe:
        df_combined.to_csv(f"{loc}/AQI_{city_info[0]}_{first_year}_{last_year}.csv")
    return df_combined


The cell below creates my city information for my given city, then runs the files. I choose to just save the final dataframe and not the raw JSON files. Even though the assignment requests EPA data as far back as possible (theoretically 1970s), I start in 1980 because there isn't any data until 1986.

In [8]:
sioux_falls_info = ["Sioux_Falls", "46099"]
sioux_falls_1980_2021 = get_max_aqi(1980, 2021, sioux_falls_info, save_raw=False, save_dataframe=True)

No data for particulate pollutants in 1980.
No data for gaseous pollutants in 1981.
No data for particulate pollutants in 1981.
No data for gaseous pollutants in 1982.
No data for particulate pollutants in 1982.
No data for gaseous pollutants in 1983.
No data for particulate pollutants in 1983.
No data for gaseous pollutants in 1984.
No data for particulate pollutants in 1984.
No data for gaseous pollutants in 1985.
No data for particulate pollutants in 1985.
