In [13]:
import os 
from dotenv import load_dotenv
import json, time
import requests
import pandas as pd

In [5]:
load_dotenv()  # Loads the .env file

USERNAME = os.getenv('EMAIL')
APIKEY = os.getenv('APIKEY')

In [12]:
#########
#
#    CONSTANTS
#

#
#    This is the root of all AQS API URLs
#
API_REQUEST_URL = 'https://aqs.epa.gov/data/api'

#
#    These are some of the 'actions' we can ask the API to take or requests that we can make of the API
#
#    Sign-up request - generally only performed once - unless you lose your key
API_ACTION_SIGNUP = '/signup?email={email}'
#
#    List actions provide information on API parameter values that are required by some other actions/requests
API_ACTION_LIST_CLASSES = '/list/classes?email={email}&key={key}'
API_ACTION_LIST_PARAMS = '/list/parametersByClass?email={email}&key={key}&pc={pclass}'
API_ACTION_LIST_SITES = '/list/sitesByCounty?email={email}&key={key}&state={state}&county={county}'
#
#    Monitor actions are requests for monitoring stations that meet specific criteria
API_ACTION_MONITORS_COUNTY = '/monitors/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_MONITORS_BOX = '/monitors/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    Summary actions are requests for summary data. These are for daily summaries
API_ACTION_DAILY_SUMMARY_COUNTY = '/dailyData/byCounty?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&state={state}&county={county}'
API_ACTION_DAILY_SUMMARY_BOX = '/dailyData/byBox?email={email}&key={key}&param={param}&bdate={begin_date}&edate={end_date}&minlat={minlat}&maxlat={maxlat}&minlon={minlon}&maxlon={maxlon}'
#
#    It is always nice to be respectful of a free data resource.
#    We're going to observe a 100 requests per minute limit - which is fairly nice
API_LATENCY_ASSUMED = 0.002       # Assuming roughly 2ms latency on the API and network
API_THROTTLE_WAIT = (1.0/100.0)-API_LATENCY_ASSUMED
#
#
#    This is a template that covers most of the parameters for the actions we might take, from the set of actions
#    above. In the examples below, most of the time parameters can either be supplied as individual values to a
#    function - or they can be set in a copy of the template and passed in with the template.
# 
AQS_REQUEST_TEMPLATE = {
    "email":      "",     
    "key":        "",      
    "state":      "",     # the two digit state FIPS # as a string
    "county":     "",     # the three digit county FIPS # as a string
    "begin_date": "",     # the start of a time window in YYYYMMDD format
    "end_date":   "",     # the end of a time window in YYYYMMDD format, begin_date and end_date must be in the same year
    "minlat":    0.0,
    "maxlat":    0.0,
    "minlon":    0.0,
    "maxlon":    0.0,
    "param":     "",     # a list of comma separated 5 digit codes, max 5 codes requested
    "pclass":    ""      # parameter class is only used by the List calls
}

#
#   Once we have a list of the classes or groups of possible sensors, we can find the sensor IDs that make up that class (group)
#   The one that looks to be associated with the Air Quality Index is "AQI POLLUTANTS"
#   We'll use that to make another list request.
#
AQI_PARAM_CLASS = "AQI POLLUTANTS"
#
#   Given the set of sensor codes, now we can create a parameter list or 'param' value as defined by the AQS API spec.
#   It turns out that we want all of these measures for AQI, but we need to have two different param constants to get
#   all seven of the code types. We can only have a max of 5 sensors/values request per param.
#
#   Gaseous AQI pollutants CO, SO2, NO2, and O2
AQI_PARAMS_GASEOUS = "42101,42401,42602,44201"
#
#   Particulate AQI pollutants PM10, PM2.5, and Acceptable PM2.5
AQI_PARAMS_PARTICULATES = "81102,88101,88502"
#   
#

# We want to pull from 1961 to 2021
STARTYEAR = 1961
ENDYEAR = 2021


In [7]:
CITY_LOCATIONS = {
    'birmingham' :       {'city'   : 'Birmingham',
                       'county' : 'Jefferson',
                       'state'  : 'Alabama',
                       'fips'   : '01073',
                       'latlon' : [33.53, -86.80] }, 
}

In [8]:
#
#   These are rough estimates for creating bounding boxes based on a city location
#   You can find these rough estimates on the USGS website:
#   https://www.usgs.gov/faqs/how-much-distance-does-a-degree-minute-and-second-cover-your-maps
#
LAT_25MILES = 25.0 * (1.0/69.0)    # This is about 25 miles of latitude in decimal degrees
LON_25MILES = 25.0 * (1.0/54.6)    # This is about 25 miles of longitude in decimal degrees
#
#   Compute rough estimates for a bounding box around a given place
#   The bounding box is scaled in 50 mile increments. That is, the bounding box will have sides that
#   are rough multiples of 50 miles, with the center of the box around the indicated place.
#   The scale parameter determines the scale (size) of the bounding box
#
def bounding_latlon(place=None,scale=1.0):
    minlat = place['latlon'][0] - float(scale) * LAT_25MILES
    maxlat = place['latlon'][0] + float(scale) * LAT_25MILES
    minlon = place['latlon'][1] - float(scale) * LON_25MILES
    maxlon = place['latlon'][1] + float(scale) * LON_25MILES
    return [minlat,maxlat,minlon,maxlon]

In [9]:
#
#    This implements the monitors request. This requests monitoring stations. This can be done by state, county, or bounding box. 
#
#    Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
#    parameters. If function parameters are provided, those take precedence over any parameters from the request template.
#
def request_monitors(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_MONITORS_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_monitors()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_monitors()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_monitors()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_monitors()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_monitors()'")
    # Note we're not validating FIPS fields because not all of the monitors actions require the FIPS numbers
    
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
    
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response

In [10]:
#
#    Create a copy of the AQS_REQUEST_TEMPLATE
#
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_PARTICULATES     # same particulate request as the one abover
# 
#   Not going to use these - comment them out
#request_data['state'] = CITY_LOCATIONS['bend']['fips'][:2]
#request_data['county'] = CITY_LOCATIONS['bend']['fips'][2:]
#
#   Now, we need bounding box parameters

#   50 mile box
bbox = bounding_latlon(CITY_LOCATIONS['birmingham'],scale=1.0)
#   100 mile box
#bbox = bounding_latlon(CITY_LOCATIONS['bend'],scale=2.0)
#   150 mile box
#bbox = bounding_latlon(CITY_LOCATIONS['bend'],scale=3.0)
#   200 mile box
#bbox = bounding_latlon(CITY_LOCATIONS['bend'],scale=4.0)

# the bbox response comes back as a list - [minlat,maxlat,minlon,maxlon]

#   put our bounding box into the request_data
request_data['minlat'] = bbox[0]
request_data['maxlat'] = bbox[1]
request_data['minlon'] = bbox[2]
request_data['maxlon'] = bbox[3]

#
#   we need to change the action for the API from the default to the bounding box - same recent date for now
response = request_monitors(request_template=request_data, begin_date="20210701", end_date="20210731",
                            endpoint_action = API_ACTION_MONITORS_BOX)
#
#
#
if response["Header"][0]['status'] == "Success":
    print(json.dumps(response['Data'],indent=4))
else:
    print(json.dumps(response,indent=4))

[
    {
        "state_code": "01",
        "county_code": "073",
        "site_number": "1005",
        "parameter_code": "88101",
        "poc": 1,
        "parameter_name": "PM2.5 - Local Conditions",
        "open_date": "1999-01-03",
        "close_date": null,
        "concurred_exclusions": null,
        "dominant_source": "MOBILE",
        "measurement_scale": "NEIGHBORHOOD",
        "measurement_scale_def": "500 M TO 4KM",
        "monitoring_objective": "POPULATION EXPOSURE",
        "last_method_code": "142",
        "last_method_description": "BGI Models PQ200-VSCC or PQ200A-VSCC - Gravimetric",
        "last_method_begin_date": "2013-01-01",
        "naaqs_primary_monitor": "Y",
        "qa_primary_monitor": "Y",
        "monitor_type": "SLAMS",
        "networks": null,
        "monitoring_agency_code": "0550",
        "monitoring_agency": "Jefferson County, AL  Department Of Health",
        "si_id": 177,
        "latitude": 33.331111,
        "longitude": -87.003611,
  

In [11]:
#
#    This implements the daily summary request. Daily summary provides a daily summary value for each sensor being requested
#    from the start date to the end date. 
#
#    Like the two other functions, this can be called with a mixture of a defined parameter dictionary, or with function
#    parameters. If function parameters are provided, those take precedence over any parameters from the request template.
#
def request_daily_summary(email_address = None, key = None, param=None,
                          begin_date = None, end_date = None, fips = None,
                          endpoint_url = API_REQUEST_URL, 
                          endpoint_action = API_ACTION_DAILY_SUMMARY_COUNTY, 
                          request_template = AQS_REQUEST_TEMPLATE,
                          headers = None):
    
    #  This prioritizes the info from the call parameters - not what's already in the template
    if email_address:
        request_template['email'] = email_address
    if key:
        request_template['key'] = key
    if param:
        request_template['param'] = param
    if begin_date:
        request_template['begin_date'] = begin_date
    if end_date:
        request_template['end_date'] = end_date
    if fips and len(fips)==5:
        request_template['state'] = fips[:2]
        request_template['county'] = fips[2:]            

    # Make sure there are values that allow us to make a call - these are always required
    if not request_template['email']:
        raise Exception("Must supply an email address to call 'request_daily_summary()'")
    if not request_template['key']: 
        raise Exception("Must supply a key to call 'request_daily_summary()'")
    if not request_template['param']: 
        raise Exception("Must supply param values to call 'request_daily_summary()'")
    if not request_template['begin_date']: 
        raise Exception("Must supply a begin_date to call 'request_daily_summary()'")
    if not request_template['end_date']: 
        raise Exception("Must supply an end_date to call 'request_daily_summary()'")
    # Note we're not validating FIPS fields because not all of the daily summary actions require the FIPS numbers
        
    # compose the request
    request_url = endpoint_url+endpoint_action.format(**request_template)
        
    # make the request
    try:
        # Wait first, to make sure we don't exceed a rate limit in the situation where an exception occurs
        # during the request processing - throttling is always a good practice with a free data source
        if API_THROTTLE_WAIT > 0.0:
            time.sleep(API_THROTTLE_WAIT)
        response = requests.get(request_url, headers=headers)
        json_response = response.json()
    except Exception as e:
        print(e)
        json_response = None
    return json_response

In [23]:
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY
request_data['param'] = AQI_PARAMS_GASEOUS

bbox = bounding_latlon(CITY_LOCATIONS['birmingham'],scale=1.0)
#   put our bounding box into the request_data
request_data['minlat'] = bbox[0]
request_data['maxlat'] = bbox[1]
request_data['minlon'] = bbox[2]
request_data['maxlon'] = bbox[3]

request_data['state'] = CITY_LOCATIONS['birmingham']['fips'][:2]   # the first two digits (characters) of FIPS is the state code
request_data['county'] = CITY_LOCATIONS['birmingham']['fips'][2:]  # the last three digits (characters) of FIPS is the county code

# request daily summary data for the month of July in 2021
gaseous_aqi = request_daily_summary(request_template=request_data, begin_date="20210701", end_date="20210731")
print("Response for the gaseous pollutants ...")
#
if gaseous_aqi["Header"][0]['status'] == "Success":
    print(json.dumps(gaseous_aqi['Data'],indent=4))
elif gaseous_aqi["Header"][0]['status'].startswith("No data "):
    print("Looks like the response generated no data. You might take a closer look at your request and the response data.")
else:
    print(json.dumps(gaseous_aqi,indent=4))

request_data['param'] = AQI_PARAMS_PARTICULATES
# request daily summary data for the month of July in 2021
particulate_aqi = request_daily_summary(request_template=request_data, begin_date="20210701", end_date="20210731")
print("Response for the particulate pollutants ...")
#
if particulate_aqi["Header"][0]['status'] == "Success":
    print(json.dumps(particulate_aqi['Data'],indent=4))
elif particulate_aqi["Header"][0]['status'].startswith("No data "):
    print("Looks like the response generated no data. You might take a closer look at your request and the response data.")
else:
    print(json.dumps(particulate_aqi,indent=4))


Response for the gaseous pollutants ...
[
    {
        "state_code": "01",
        "county_code": "073",
        "site_number": "0023",
        "parameter_code": "44201",
        "poc": 1,
        "latitude": 33.553056,
        "longitude": -86.815,
        "datum": "WGS84",
        "parameter": "Ozone",
        "sample_duration_code": "1",
        "sample_duration": "1 HOUR",
        "pollutant_standard": "Ozone 1-hour 1979",
        "date_local": "2021-07-04",
        "units_of_measure": "Parts per million",
        "event_type": "No Events",
        "observation_count": 24,
        "observation_percent": 100.0,
        "validity_indicator": "Y",
        "arithmetic_mean": 0.025583,
        "first_max_value": 0.056,
        "first_max_hour": 13,
        "aqi": null,
        "method_code": "087",
        "method": "INSTRUMENTAL - ULTRA VIOLET ABSORPTION",
        "local_site_name": "North Birmingham",
        "site_address": "NO. B'HAM,SOU R.R., 3009 28TH ST. NO.",
        "state": "

In [14]:
# This function extracts the data from the API and store them in a dataframe
def extract_aqs_table(aqs_data):
    if aqs_data['Header'][0]['status'] == 'Success':
        data = pd.DataFrame(aqs_data['Data'])
    elif aqs_data['Header'][0]['status'].startswith('No data'):
        data = pd.DataFrame()
    else:
        data = aqs_data['Header'][0]['status']
    return data

In [25]:
request_data = AQS_REQUEST_TEMPLATE.copy()
request_data['email'] = USERNAME
request_data['key'] = APIKEY

request_data['state'] = CITY_LOCATIONS['birmingham']['fips'][:2]   
request_data['county'] = CITY_LOCATIONS['birmingham']['fips'][2:]

# Define paths to save the data
gaseous_file_path = 'intermediate-data/gaseous_aqi_data.csv'
particulate_file_path = 'intermediate-data/particulate_aqi_data.csv'

# Initialize the DataFrames
gaseous_df = pd.DataFrame()
particulate_df = pd.DataFrame()

for year in range(STARTYEAR, ENDYEAR + 1):
    
    print(f'Pulling year {year}')
    start_date = str(year) + '0101'
    end_date = str(year) + '1231'

    # request daily summary data for gasses
    request_data['param'] = AQI_PARAMS_GASEOUS
    gaseous_aqi = request_daily_summary(request_template=request_data,
                                        begin_date=start_date, end_date=end_date)

    # request daily summary data for particulates
    request_data['param'] = AQI_PARAMS_PARTICULATES
    particulate_aqi = request_daily_summary(request_template=request_data,
                                            begin_date=start_date, end_date=end_date)
    
    # Extract dataframes
    gaseous_extract = extract_aqs_table(gaseous_aqi)
    particulate_extract = extract_aqs_table(particulate_aqi)
    
    # Check for unusual API statuses, break the loop if an unusual status pops up
    if isinstance(gaseous_extract, str):
        print(gaseous_extract)
        break
    else:
        gaseous_df = pd.concat([gaseous_df, gaseous_extract])
        
    if isinstance(particulate_extract, str):
        print(particulate_extract)
        break
    else:
        particulate_df = pd.concat([particulate_df, particulate_extract])

    # Save the data for the current year to CSV (appending to existing file)
    # If it's the first year, write the header; otherwise, append without header.
    gaseous_df.to_csv(gaseous_file_path, mode='a', header=not os.path.exists(gaseous_file_path), index=False)
    particulate_df.to_csv(particulate_file_path, mode='a', header=not os.path.exists(particulate_file_path), index=False)

    # Clear the DataFrame after saving to prevent data duplication in future iterations
    gaseous_df = pd.DataFrame()
    particulate_df = pd.DataFrame()

    print(f'Data for year {year} saved successfully.')

print("All data saved successfully.")


Pulling year 1961
Data for year 1961 saved successfully.
Pulling year 1962
Data for year 1962 saved successfully.
Pulling year 1963
Data for year 1963 saved successfully.
Pulling year 1964
Data for year 1964 saved successfully.
Pulling year 1965
Data for year 1965 saved successfully.
Pulling year 1966
Data for year 1966 saved successfully.
Pulling year 1967
Data for year 1967 saved successfully.
Pulling year 1968
Data for year 1968 saved successfully.
Pulling year 1969
Data for year 1969 saved successfully.
Pulling year 1970
Data for year 1970 saved successfully.
Pulling year 1971
Data for year 1971 saved successfully.
Pulling year 1972
Data for year 1972 saved successfully.
Pulling year 1973
Data for year 1973 saved successfully.
Pulling year 1974
Data for year 1974 saved successfully.
Pulling year 1975
Data for year 1975 saved successfully.
Pulling year 1976
Data for year 1976 saved successfully.
Pulling year 1977
Data for year 1977 saved successfully.
Pulling year 1978
Data for year