# Process and Upload Airnow Daily AQI

Processes the Airnow daily AQI files located in `airnow-data/daily-aqi/dat` and uploads to ESDR.  During processing, this script also creates daily JSON files containing a subset of the information contained in the Airnow daily `.dat` files, but in a format more readibly usable by visualizations.

Reports to stat.createlab.org as `Airnow Daily AQI - Upload`.

Docs for the daily data files are here: https://docs.airnowapi.org/docs/DailyDataFactSheet.pdf

In [0]:
import json, os, dateutil, re, requests, subprocess, datetime, glob, stat

from dateutil import rrule, tz, parser
from sqlitedict import SqliteDict

In [0]:
# Boilerplate to load utils.ipynb
# See https://github.com/CMU-CREATE-Lab/python-utils/blob/master/utils.ipynb


def exec_ipynb(filename_or_url):
    nb = (requests.get(filename_or_url).json() if re.match(r'https?:', filename_or_url) else json.load(open(filename_or_url)))
    if(nb['nbformat'] >= 4):
        src = [''.join(cell['source']) for cell in nb['cells'] if cell['cell_type'] == 'code']
    else:
        src = [''.join(cell['input']) for cell in nb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']

    tmpname = '/tmp/%s-%s-%d.py' % (os.path.basename(filename_or_url),
                                    datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'),
                                    os.getpid())
    src = '\n\n\n'.join(src)
    open(tmpname, 'w').write(src)
    code = compile(src, tmpname, 'exec')
    exec(code, globals())


exec_ipynb('./python-utils/utils.ipynb')
exec_ipynb('./python-utils/esdr-library.ipynb')
exec_ipynb('./airnow-common.ipynb')

In [0]:
NUM_FILES_PER_UPLOAD_BATCH = 500

STAT_SERVICE_NAME = 'Airnow Daily AQI - Upload'
STAT_HOSTNAME = 'hal21'
STAT_SHORTNAME = 'airnow-daily-aqi-upload'

MIRROR_TIME_PERIOD_SECS = 60 * 60 * 1   # every hour

ESDR_MONITORING_SITE_LOCATION_DEVICES_JSON_FILENAME = 'esdr_monitoring_site_location_devices.json'

In [0]:
Stat.set_service(STAT_SERVICE_NAME)

In [0]:
uploaded_file_timestamps_db = SqliteDict(AirnowCommon.DAILY_AQI_DIRECTORY + '/uploaded_file_timestamps.db', autocommit=True)

In [0]:
accumulated = {}
accumulated_file_timestamps = {}

def clear_accumulated():
    global accumulated, accumulated_file_timestamps
    accumulated = {}
    accumulated_file_timestamps = {}

In [0]:
# given a path to a .dat data file, this returns a path to the corresponding .json data file
def build_path_to_json_data_file(src):
    # Starting with a path like this: "../airnow-data/daily/dat/2020021700.dat", this chops
    # off the path and yields something like "2020021700.dat"
    src_file_name_and_ext = os.path.basename(src)

    # now split into filename and extension, e.g. "2020021700" and ".dat"
    (src_file_name, ext) = os.path.splitext(src_file_name_and_ext)

    # now build up the path to where we want the JSON file, e.g. "../airnow-data/daily/json/2020021700.json"
    return AirnowCommon.DAILY_AQI_JSON_DIRECTORY + '/' + src_file_name + '.json'


#build_path_to_json_data_file(AirnowCommon.DAILY_AQI_DAT_DIRECTORY + '/2020021700.dat')

In [0]:
# Info about the daily AQI files is at https://docs.airnowapi.org/docs/DailyDataFactSheet.pdf
# Fields:  "Valid date|AQSID|site name|parameter name|reporting units|value|averaging period|data source|AQI|AQI Category|latitude|longitude|full AQSID with 3 digit country code prefix"

def process_airnow_daily_aqi_file(src):
    src_epoch_timestamp = os.path.getmtime(src)
    dt = datetime.datetime.strptime(os.path.basename(src), '%Y%m%d%H.dat')
    epoch_time = (dt - datetime.datetime(1970, 1, 1)).total_seconds()
    Stat.debug('Processing file %s' % src, host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

    json_data = {'site_id_to_channels': {}, 'channel_info': {}}
    json_dest = build_path_to_json_data_file(src)

    num_records_read = 0
    with open(src, 'r', encoding='cp437') as records:
        lineno = 0
        error_count = 0
        for record in records:
            lineno += 1
            try:
                (mmddyy, id, _, param_name, units, value, averaging_period, _, aqi, aqi_category, _, _, _) = record.split('|')
                channel_name = ('AVG' if (float(averaging_period) == 24) else 'PEAK') + '_' + (param_name + '_' + units).upper()
            except:
                sys.stderr.write('Problem parsing %s line %d, skipping\n' % (src, lineno))
                sys.stderr.write('Line "%s"\n' % record)
                error_count += 1
                continue
            channel_name = re.sub(r'\W', '_', channel_name) # Replace non-word chars with _;  e.g. PM2.5 becomes PM2_5

            if id not in accumulated:
                accumulated[id] = {}

            if channel_name not in accumulated[id]:
                accumulated[id][channel_name] = []

            if id not in json_data['site_id_to_channels']:
                json_data['site_id_to_channels'][id] = {}

            # warn if we're overwriting an existing channel name...pretty sure this should never happen
            if channel_name in json_data['site_id_to_channels'][id]:
                Stat.warning('Found duplicate channel %s for site %s in %s' % (channel_name, id, src), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

            json_data['site_id_to_channels'][id][channel_name]={'val': float(value), 'aqi': float(aqi), 'aqi_cat': float(aqi_category)}

            if channel_name not in json_data['channel_info']:
                json_data['channel_info'][channel_name] = {
                    'param' : param_name,
                    'units' : units,
                    'avg_period' : averaging_period,
                    'sites' : []
                }
            json_data['channel_info'][channel_name]['sites'].append(id)

            accumulated[id][channel_name].append([epoch_time, float(value), float(aqi), float(aqi_category)])
            num_records_read += 1
        if error_count > 5:
            raise Exception('Too many parse errors (%d) reading %s, aborting' % (error_count, src))

    if error_count > 0:
        Stat.warning('Read %d records from %s (%d error(s))' % (num_records_read, src, error_count), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    else:
        Stat.debug('Read %d records from %s (%d error(s))' % (num_records_read, src, error_count), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    accumulated_file_timestamps[src] = src_epoch_timestamp

    # now write the JSON file
    tmp = json_dest + '.tmp' + str(os.getpid())
    os.makedirs(os.path.dirname(tmp), exist_ok=True)
    with open(tmp, 'w') as json_file:
        json.dump(json_data, json_file, sort_keys=True)
    os.rename(tmp, json_dest)

    # make the JSON file readable by everyone
    os.chmod(json_dest, stat.S_IREAD | stat.S_IWRITE | stat.S_IRGRP | stat.S_IROTH)

    # make the JSON file's file stat times match those of the .dat
    source_file_stat = os.stat(src)
    os.utime(json_dest, (source_file_stat.st_mtime, source_file_stat.st_mtime))

#process_airnow_daily_aqi_file(AirnowCommon.DAILY_AQI_DAT_DIRECTORY + '/2020022300.dat')

In [0]:
sites_cached = None
def get_site_info(site_id):
    global sites_cached
    if not sites_cached:
        with open(AirnowCommon.DATA_DIRECTORY + '/monitoring_site_locations.json', 'r') as f:
            sites_cached = json.load(f)

    try:
        return sites_cached['sites'][site_id]
    except:
        return None

# print(json.dumps(get_site_info('420030008'), sort_keys=True, indent=3))  # Lawrenceville aka "BAPC 301 39TH STREET BLDG #7 AirNow"
# print(json.dumps(get_site_info('000050121'), sort_keys=True, indent=3))  # Meteorological Service of Canada"
# print(json.dumps(get_site_info('044201010'), sort_keys=True, indent=3))  # null

In [0]:
esdr = None
airnow_product = None
esdr_monitoring_site_devices = None

In [0]:
def get_airnow_product():
    global esdr, airnow_product
    if not esdr:
        esdr = Esdr('esdr-auth-airnow-uploader.json', user_agent='esdr-library.py['+STAT_SERVICE_NAME+']')
    if not airnow_product:
        # esdr.create_product('AirNow', 'AirNow', 'EPA and Sonoma Tech', 'Real-time feeds from EPA/STI AirNow')
        airnow_product = esdr.get_product_by_name('AirNow')
    return airnow_product

In [0]:
def get_esdr_monitoring_site_device(serialNumber):
    global airnow_product, esdr_monitoring_site_devices
    if not airnow_product:
        airnow_product = get_airnow_product()
    if not esdr_monitoring_site_devices:
        with open(AirnowCommon.DATA_DIRECTORY + '/' + ESDR_MONITORING_SITE_LOCATION_DEVICES_JSON_FILENAME, 'r') as f:
            esdr_monitoring_site_devices = json.load(f)

    if serialNumber in esdr_monitoring_site_devices:
        # get a copy of the device
        device = esdr_monitoring_site_devices[serialNumber].copy()

        # add the serial number and product id
        device['serialNumber'] = serialNumber
        device['productId'] = airnow_product['id']
        return device

    return None

# print(get_esdr_monitoring_site_device('no such site'))  # None
# print(get_esdr_monitoring_site_device('010972005'))     # {'id': 2264, 'name': 'BAYROAD', 'serialNumber': '010972005', 'productId': 11}

In [0]:
def upload_site(site_id):
    global esdr, airnow_product
    if not esdr:
        esdr = Esdr('esdr-auth-airnow-uploader.json', user_agent='esdr-library.py['+STAT_SERVICE_NAME+']')
    if not airnow_product:
        airnow_product = get_airnow_product()

    # try to get the device from the cache
    device = get_esdr_monitoring_site_device(site_id)

    site_info = get_site_info(site_id)

    if not device:
        if not site_info:
            Stat.warning('Cannot create device for site %s because no information can be found for it.  Skipping.' % (site_id), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
            return
        device = esdr.get_or_create_device(airnow_product, serial_number=site_id, name=site_info['site name'])

    # find the feed, but give the lat/lon for the case where the site has moved, because esdr.get_feed()
    # will match by lat/lon if there are multiple feeds for the device
    lat = float(site_info['latitude']) if site_info else None
    lon = float(site_info['longitude']) if site_info else None
    feed = esdr.get_feed(device, lat=lat, lon=lon)

    if not feed:
        if not site_info:
            Stat.warning('Cannot create feed for site %s because no information can be found for it.  Skipping.' % (site_id), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
            return
        feed = esdr.get_or_create_feed(device, lat=lat, lon=lon)

    if site_id in accumulated:
        channels = accumulated[site_id]

        for channel in channels:
            try:
                esdr.upload(feed, {
                    'channel_names': [channel + "_VALUE", channel + "_AQI", channel + "_AQI_CATEGORY"],
                    'data': channels[channel]
                })
                Stat.info('%s/%s, %s: Uploaded %d samples.' % (site_id, device['name'], channel, len(channels[channel])), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
            except requests.HTTPError as e:
                Stat.warning('%s/%s, %s: Failed to upload %d samples (HTTP %d).' %
                             (site_id, device['name'], channel, len(channels[channel]), e.response.status_code), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
            except:
                Stat.warning('%s/%s, %s: Failed to upload %d samples.' %
                             (site_id, device['name'], channel, len(channels[channel])), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    else:
        Stat.warning('%s/%s: No accumulated data found. Skipping.' %
                             (site_id, device['name']), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

#upload_site('420030008')  # Lawrenceville aka "BAPC 301 39TH STREET BLDG #7 AirNow"

In [0]:
def upload_accumulated():
    global uploaded_file_timestamps_db
    for site_id in sorted(accumulated.keys()):
        upload_site(site_id)
    for src in sorted(accumulated_file_timestamps):
        uploaded_file_timestamps_db[src] = accumulated_file_timestamps[src]
        Stat.debug('Uploaded %s to ESDR' % (src), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    clear_accumulated()

In [0]:
def is_unmodified(src):
    global uploaded_file_timestamps_db
    return os.path.getmtime(src) == uploaded_file_timestamps_db[src]

In [0]:
def process_all():
    starting_timestamp = datetime.datetime.now().timestamp()
    clear_accumulated()
    data_files = sorted(glob.glob(AirnowCommon.DAILY_AQI_DAT_DIRECTORY + '/[0-9]*.dat'))
    Stat.up('Processing %d data files...' % (len(data_files)), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME, valid_for_secs=MIRROR_TIME_PERIOD_SECS*1.5)

    for src in data_files:
        if len(accumulated_file_timestamps) == NUM_FILES_PER_UPLOAD_BATCH:
            upload_accumulated()
        try:
            if is_unmodified(src):
                continue
        except:
            pass

        process_airnow_daily_aqi_file(src)
    upload_accumulated()
    elapsed_seconds = datetime.datetime.now().timestamp() - starting_timestamp

    Stat.up('Done processing %d data files (elapsed time: %d seconds)' % (len(data_files), elapsed_seconds), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME, valid_for_secs=MIRROR_TIME_PERIOD_SECS*1.5)

def process_all_forever():
    while True:
        process_all()
        sleep_until_next_period(MIRROR_TIME_PERIOD_SECS, 15*60)  # start at 15 minutes after the hour


process_all_forever()