## License
This code example was developed by Dr. David W. McDonald for use in DATA 512, a course in the UW MS Data Science degree program. This code is provided under the [Creative Commons](https://creativecommons.org) [CC-BY license](https://creativecommons.org/licenses/by/4.0/). Revision 1.1 - September 5, 2023



### Using this code, we are extracting the AQI from or near the city we are looking at to investigate the impact of the fire around the city

In [1]:
import json, time
import pandas as pd
import requests
USERNAME = USERNAME
APIKEY = APIKEY

In [2]:
#########
#
#    CONSTANTS
#

#
#    This is the root of all AQS API URLs
#
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

#
#    These are 'actions' we can ask the API to take or requests that we can make of the API
#
#    Sign-up request - generally only performed once - unless you lose your key
API_ACTION_SIGNUP = '/signup?email={email}'
#
#    List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'
#
#    Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    It is always nice to be respectful of a free data resource.
#    We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED
#
#
#    This is a template that covers most of the parameters for the actions we might take, from the set of actions
#    above. In the examples below, most of the time parameters can either be supplied as individual values to a
#    function - or they can be set in a copy of the template and passed in with the template.
# 
AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}



In [3]:
#
#   Once we have a list of the classes or groups of possible sensors, we can find the sensor IDs that make up that class (group)
#   The one that looks to be associated with the Air Quality Index is "AQI POLLUTANTS"
#   We'll use that to make another list request.
#
AQI_PARAM_CLASS = "AQI POLLUTANTS"


In [4]:
#
#   Given the set of sensor codes, now we can create a parameter list or 'param' value as defined by the AQS API spec.
#   It turns out that we want all of these measures for AQI, but we need to have two different param constants to get
#   all seven of the code types. We can only have a max of 5 sensors/values request per param.
#
#   Gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"
#
#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"
#   
#

In [5]:
# New Mexico, Alamogordo, Otero
CITY_LOCATIONS = {
    'alamogordo' :       {'city'   : 'Alamogordo',
                       'county' : 'Otero',
                       'state'  : 'New Mexico',
                       'fips'   : '35035',
                       'latlon' : [32.8995325, -105.96026499999999] }
}


Given our CITY_LOCATIONS constant we can now find which monitoring locations are nearby. One option is to use the county to define the area we're interest in. You can get the EPA to list their monitoring stations by county. You can also get a set of monitoring stations by using a bounding box of latitude, longitude points.


The above response gives us a list of monitoring stations. Each monitoring station has a unique "code" which is a string number, and, sometimes, a description. The description seems to be something about where the monitoring station is located.


The function below is designed to encapsulate requests to the EPA AQS API. When calling the function one should create/copy a parameter template, then initialize that template with values that won't change with each call. Then on each call simply pass in the parameters that need to change, like date ranges.

Another function below provides an example of extracting values and restructuring the response to make it a little more usable.

In [6]:
#
#    This implements the daily summary request. Daily summary provides a daily summary value for each sensor being requested
#    from the start date to the end date. 
#
#    Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
#    parameters. If function parameters are provided, those take precedence over any parameters from the request template.
#
def request_daily_summary(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")
    # Note we're not validating FIPS fields because not all of the daily summary actions require the FIPS numbers
        
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
        
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response



The requests above return no data, so we need to go into the bounding box.

In [7]:
#
#    This is a list of field names - data - that will be extracted from each record
#
EXTRACTION_FIELDS = ['sample_duration','observation_count','arithmetic_mean','aqi']

#
#    The function creates a summary record
def extract_summary_from_response(r=None, fields=EXTRACTION_FIELDS):
    ## the result will be structured around monitoring site, parameter, and then date
    result = dict()
    data = r["Data"]
    for record in data:
        # make sure the record is set up
        site = record['site_number']
        param = record['parameter_code']
        #date = record['date_local']    # this version keeps the respnse value YYYY-
        date = record['date_local'].replace('-','') # this puts it in YYYYMMDD format
        if site not in result:
            result[site] = dict()
            result[site]['local_site_name'] = record['local_site_name']
            result[site]['site_address'] = record['site_address']
            result[site]['state'] = record['state']
            result[site]['county'] = record['county']
            result[site]['city'] = record['city']
            result[site]['pollutant_type'] = dict()
        if param not in result[site]['pollutant_type']:
            result[site]['pollutant_type'][param] = dict()
            result[site]['pollutant_type'][param]['parameter_name'] = record['parameter']
            result[site]['pollutant_type'][param]['units_of_measure'] = record['units_of_measure']
            result[site]['pollutant_type'][param]['method'] = record['method']
            result[site]['pollutant_type'][param]['data'] = dict()
        if date not in result[site]['pollutant_type'][param]['data']:
            result[site]['pollutant_type'][param]['data'][date] = list()
        
        # now extract the specified fields
        extract = dict()
        for k in fields:
            if str(k) in record:
                extract[str(k)] = record[k]
            else:
                # this makes sure we always have the requested fields, even if
                # we have a missing value for a given day/month
                extract[str(k)] = None
        
        # add this extraction to the list for the day
        result[site]['pollutant_type'][param]['data'][date].append(extract)
    
    return result


The AQS API has a mechanism of requesting data and monitoring stations using a geographic bounding box. The above examples just demonstrated the use of the AQS API for making requests by counties. The examples below illustrate the use of bounding boxes. The example below makes requests to identify monitoring stations within the bounding box. Once you knew you have monitoring stations, then the bounding box could be used in the daily summary requests to get AQS data.


In [8]:
#
#   These are rough estimates for creating bounding boxes based on a city location
#   You can find these rough estimates on the USGS website:
#   https://www.usgs.gov/faqs/how-much-distance-does-a-degree-minute-and-second-cover-your-maps
#
LAT_25MILES = 25.0 * (1.0/69.0)    # This is about 25 miles of latitude in decimal degrees
LON_25MILES = 25.0 * (1.0/54.6)    # This is about 25 miles of longitude in decimal degrees
#
#   Compute a rough estimates for a bounding box around a given place
#   The bounding box is scaled in 50 mile increments. That is the bounding box will have sides that
#   are rough multiples of 50 miles, with the center of the box around the indicated place.
#   The scale parameter determines the scale (size) of the bounding box
#
def bounding_latlon(place=None,scale=1.0):
    minlat = place['latlon'][0] - float(scale) * LAT_25MILES
    maxlat = place['latlon'][0] + float(scale) * LAT_25MILES
    minlon = place['latlon'][1] - float(scale) * LON_25MILES
    maxlon = place['latlon'][1] + float(scale) * LON_25MILES
    return [minlat,maxlat,minlon,maxlon]



In [9]:
#
#    This implements the monitors request. This requests monitoring stations. This can be done by state, county, or bounding box. 
#
#    Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
#    parameters. If function parameters are provided, those take precedence over any parameters from the request template.
#
def request_monitors(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_MONITORS_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_monitors()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_monitors()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_monitors()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_monitors()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_monitors()'")
    # Note we're not validating FIPS fields because not all of the monitors actions require the FIPS numbers
    
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
    
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response


In [10]:
#
#    Create a copy of the AQS_REQUEST_TEMPLATE
#
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_PARTICULATES     

#   50 mile box
# bbox = bounding_latlon(CITY_LOCATIONS['alamogordo'],scale=1.0)
#   100 mile box
# bbox = bounding_latlon(CITY_LOCATIONS['alamogordo'],scale=2.0)
#   150 mile box
bbox = bounding_latlon(CITY_LOCATIONS['alamogordo'],scale=3.0)
#   200 mile box
#bbox = bounding_latlon(CITY_LOCATIONS['alamogordo'],scale=4.0)

# the bbox response comes back as a list - [minlat,maxlat,minlon,maxlon]

#   put our bounding box into the request_data
request_data['minlat'] = bbox[0]
request_data['maxlat'] = bbox[1]
request_data['minlon'] = bbox[2]
request_data['maxlon'] = bbox[3]

#
#   we need to change the action for the API from the default to the bounding box - same recent date for now

response = request_monitors(request_template=request_data, begin_date="20210701", end_date="20210731",
                            endpoint_action = API_ACTION_MONITORS_BOX)

if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
elif response["Header"][0]['status'].startswith("No data "):
    print("Looks like the response generated no monitoring stations.")
else:
    print(json.dumps(response,indent=4))


[
    {
        "state_code": "48",
        "county_code": "109",
        "site_number": "0101",
        "parameter_code": "88502",
        "poc": 1,
        "parameter_name": "Acceptable PM2.5 AQI & Speciation Mass",
        "open_date": "1988-03-02",
        "close_date": null,
        "concurred_exclusions": null,
        "dominant_source": null,
        "measurement_scale": "REGIONAL SCALE",
        "measurement_scale_def": "50 TO HUNDREDS KM",
        "monitoring_objective": "GENERAL/BACKGROUND",
        "last_method_code": "707",
        "last_method_description": "IMPROVE Module A with Cyclone Inlet-Teflon Filter, 2.2 sq. cm. - GRAVIMETRIC",
        "last_method_begin_date": "1999-11-03",
        "naaqs_primary_monitor": null,
        "qa_primary_monitor": null,
        "monitor_type": "EPA",
        "networks": "IMPROVE",
        "monitoring_agency_code": "0745",
        "monitoring_agency": "National Park Service",
        "si_id": 15192,
        "latitude": 31.83345,
        

We found no monitors in the city and thus expanding the box to twice the scale.

Four monitors were found: 3 are from 35013 and 1 is from 35027

35013 (1): open date is 2015-11-20

35013 (2): open date is 2015-11-13

35013 (3): open date is 2017-04-01

35027    : open date is 2001-12-03

When we further expanded the box into three times the scale, we found that 35013 had a site started since 1988 and at the same time one site from 48109 also started in 1988. In order to obtain as many yearly-data without transforming data that covers different ranges, we at last decided to use 35013 and 48109 from 1988 to 2023.

The below code extracts the daily AQI data from both 48109 and 35013 since 1988 till now.

In [11]:
from tqdm import tqdm

# both sites dated back to 1988
site48109_date_range = list(range(1988, 2024))
site35013_date_range = list(range(1988, 2024))

request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

In [12]:

# in order to do better comparison with the prediction made in the time-series model
# which is predicting using daily or monthly data
# we are extracting daily AQI here and save them as json files

request_data['state'] = '35'
request_data['county'] = '013'
site35013_gas = {"monitors":[]}
dates_35013_gas = {}
for year in tqdm(site35013_date_range):
    year = str(year)
    request_data['param'] = AQI_PARAMS_GASEOUS
    gaseous_aqi = request_daily_summary(request_template=request_data, 
                                        begin_date=year+"0501", 
                                        end_date=year+"1031")

    extract_gaseous = extract_summary_from_response(gaseous_aqi)
    if extract_gaseous:
        for site in extract_gaseous:
            for param in extract_gaseous[site]['pollutant_type']:
                for date in extract_gaseous[site]['pollutant_type'][param]['data']:
                    for i in extract_gaseous[site]['pollutant_type'][param]['data'][date]:
                        if i['aqi'] != None:
                            if date in dates_35013_gas:
                                dates_35013_gas[date].append(i['aqi'])
                            else:
                                dates_35013_gas[date] = [i['aqi']]


100%|██████████| 36/36 [01:24<00:00,  2.34s/it]


In [13]:
site35013_gas['monitors'].append(dates_35013_gas)

with open("site35013_gas.json", "w") as outfile:
        outfile.write(json.dumps(site35013_gas,indent=4))

In [15]:

request_data['state'] = '35'
request_data['county'] = '013'
site35013_par = {"monitors":[]}
dates_35013_par = {}
for year in tqdm(site35013_date_range):
    year = str(year)
    request_data['param'] = AQI_PARAMS_PARTICULATES
    particulate_aqi = request_daily_summary(request_template=request_data, 
                                        begin_date=year+"0501", 
                                        end_date=year+"1031")
    
    extract_particulate = extract_summary_from_response(particulate_aqi)

    if extract_particulate:
        for site in extract_particulate:
            for param in extract_particulate[site]['pollutant_type']:
                for date in extract_particulate[site]['pollutant_type'][param]['data']:
                    for i in extract_particulate[site]['pollutant_type'][param]['data'][date]:
                        if i['aqi'] != None:
                            if date in dates_35013_par:
                                dates_35013_par[date].append(i['aqi'])
                            else:
                                dates_35013_par[date] = [i['aqi']]



100%|██████████| 36/36 [01:16<00:00,  2.14s/it]


In [16]:
site35013_par['monitors'].append(dates_35013_par)

with open("site35013_par.json", "w") as outfile:
        outfile.write(json.dumps(site35013_par,indent=4))

In [17]:

request_data['state'] = '48'
request_data['county'] = '109'
site48109_gas = {"monitors":[]}
dates_48109_gas = {}
for year in tqdm(site48109_date_range):
    year = str(year)
    request_data['param'] = AQI_PARAMS_GASEOUS
    gaseous_aqi = request_daily_summary(request_template=request_data, 
                                        begin_date=year+"0501", 
                                        end_date=year+"1031")


    extract_gaseous = extract_summary_from_response(gaseous_aqi)

    if extract_gaseous:
        for site in extract_gaseous:
            for param in extract_gaseous[site]['pollutant_type']:
                for date in extract_gaseous[site]['pollutant_type'][param]['data']:
                    for i in extract_gaseous[site]['pollutant_type'][param]['data'][date]:
                        if i['aqi'] != None:
                            if date in dates_48109_gas:
                                dates_48109_gas[date].append(i['aqi'])
                            else:
                                dates_48109_gas[date] = [i['aqi']]



100%|██████████| 36/36 [00:15<00:00,  2.28it/s]


In [18]:
site48109_gas['monitors'].append(dates_48109_gas)

with open("site48109_gas.json", "w") as outfile:
        outfile.write(json.dumps(site48109_gas,indent=4))

In [19]:

request_data['state'] = '48'
request_data['county'] = '109'
site48109_par = {"monitors":[]}
dates_48109_par = {}
for year in tqdm(site48109_date_range):
    year = str(year)
    request_data['param'] = AQI_PARAMS_PARTICULATES
    particulate_aqi = request_daily_summary(request_template=request_data, 
                                        begin_date=year+"0501", 
                                        end_date=year+"1031")


    extract_particulate = extract_summary_from_response(particulate_aqi)

    if extract_particulate:
        for site in extract_particulate:
            for param in extract_particulate[site]['pollutant_type']:
                for date in extract_particulate[site]['pollutant_type'][param]['data']:
                    for i in extract_particulate[site]['pollutant_type'][param]['data'][date]:
                        if i['aqi'] != None:
                            if date in dates_48109_par:
                                dates_48109_par[date].append(i['aqi'])
                            else:
                                dates_48109_par[date] = [i['aqi']]



100%|██████████| 36/36 [00:14<00:00,  2.56it/s]


In [20]:
site48109_par['monitors'].append(dates_48109_par)

with open("site48109_par.json", "w") as outfile:
        outfile.write(json.dumps(site48109_par,indent=4))