# Upload PurpleAir Backlog to ESDR

This script uploads PurpleAir data files which are *newer than* START_FILE.  Useful for running side-by-side with the realtime uploader, to clean up gaps in the mirrored data left behind by the realtime uploader's frequent failures.

IMPORTANT: The script has *no* concept of an ending data file, so it'll keep running until it catches up to the realtime mirror and will thus be doing duplicate work.  It is thus 100% UP TO YOU to stop the script when the gap in the mirror is fixed.

In [None]:
START_FILE = '20230804/000000utc.json.bz2'

In [None]:
import bz2, html, json, os, re, requests, subprocess, sys, dateutil, datetime, glob, stat, math

from dateutil import rrule, tz, parser
from sqlitedict import SqliteDict
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import pandas as pd


In [None]:
# Boilerplate to load utils.ipynb
# See https://github.com/CMU-CREATE-Lab/python-utils/blob/master/utils.ipynb


def exec_ipynb(filename_or_url):
    nb = (requests.get(filename_or_url).json() if re.match(r'https?:', filename_or_url) else json.load(open(filename_or_url)))
    if(nb['nbformat'] >= 4):
        src = [''.join(cell['source']) for cell in nb['cells'] if cell['cell_type'] == 'code']
    else:
        src = [''.join(cell['input']) for cell in nb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']

    tmpname = '/tmp/%s-%s-%d.py' % (os.path.basename(filename_or_url),
                                    datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'),
                                    os.getpid())
    src = '\n\n\n'.join(src)
    open(tmpname, 'w').write(src)
    code = compile(src, tmpname, 'exec')
    exec(code, globals())


exec_ipynb('./python-utils/utils.ipynb')
exec_ipynb('./python-utils/esdr-library.ipynb')

In [None]:
STAT_SERVICE_NAME = 'Purpleair backlog uploader (20230804 - 20230811)'
STAT_HOSTNAME = 'hal21'
STAT_SHORTNAME = 'purpleair-upload-to-esdr-backlog'

In [None]:
Stat.set_service(STAT_SERVICE_NAME)
progress = SqliteDict('upload-purpleair-backlog.sqlite', autocommit=True)

In [None]:
PURPLE_AIR_V2_ESDR_PRODUCT_NAME = 'purpleair_v2' # https://esdr.cmucreatelab.org/api/v1/products/purpleair_v2

In [None]:
# First time uploading, create a new client like so:

# Esdr.save_client('esdr-auth-purpleair-uploader.json', 'PurpleAir uploader for timemachine1')

# and then follow the directions it prints, which include visiting esdr.cmucreatelab.org and creating
# a client with given parameters, and also editing esdr-auth-baaqm-uploader.json to include your
# username and password

# Do not add esdr-auth-*.json to the git repo
# !echo 'esdr-auth-*.json' >>.gitignore

esdr = Esdr('esdr-auth-purpleair-uploader.json')

# load the PurpleAir v2 product
purpleair_product = esdr.get_product_by_name(PURPLE_AIR_V2_ESDR_PRODUCT_NAME)

feed_cache = {}

In [None]:
############################################
#
# Parse and convert device records to ESDR
#

def computeLatLon(deviceRecord):
    return (float(deviceRecord['latitude']), float(deviceRecord['longitude']))

def computeEsdrId(deviceRecord):
    lat, lon = computeLatLon(deviceRecord)

    id = "%s_%06d%s%06d%s" % (deviceRecord['sensor_index'], 
                              round(1000 * abs(lat)), 'NS'[lat < 0], 
                              round(1000 * abs(lon)), 'EW'[lon < 0])
    return id.replace('.','_')

def computeEpochTimestamp(deviceRecord):
    try:
        return deviceRecord['last_seen']
    except:
        return None

def computeEsdrRecord(deviceRecord):
    data = {}

    data['time'] = computeEpochTimestamp(deviceRecord)

    # As of 2022-06-02, we're only mirroring the following
    channelNames = ['voc', 'ozone1', 'humidity', 'pressure', 'temperature', 'pm2.5', 'pm2.5_a', 'pm2.5_b']

    for key in channelNames:
        translated_key = key
        if key == "ozone1":
            translated_key = "ozone"
        elif key == "pm2.5":
            translated_key = "PM2_5"
        elif key == "pm2.5_a":
            translated_key = "PM2_5_a"
        elif key == "pm2.5_b":
            translated_key = "PM2_5_b"
        elif key == "temperature":
            translated_key = "temp_f"
            
        if key in deviceRecord:
            try:
                val = float(deviceRecord[key])
                if not math.isnan(val):
                    data[translated_key] = float(deviceRecord[key])
            except Exception as e:
                print(f"Failed to parse '{key}' value as float for ID {deviceRecord['sensor_index']}: {repr(e)}")
                pass
        #else:
        #    print(f"Expected channel '{key}' not found for ID {deviceRecord['ID']}")

    return data

def computeEsdrName(deviceRecord):
    if 'name' in deviceRecord:
        return deviceRecord['name'].strip()
    else:
        return None

###########################################################
#
# Accumulate deviceRecords from multiple JSON input files
#

def accumulateReset():
    global accumulator
    accumulator = defaultdict(lambda: {'records':defaultdict(lambda: {})})

def accumulateJson(path):
    nUploads = 0
    nonameCount = 0
    locationCounts = defaultdict(lambda:0)
    try:
        js = json.load(bz2.open(path, 'r'))
    except:
        Stat.warning(f"Error parsing file '{path}'", host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
        return
    fields = js['fields']
    data = js['data']
    records = pd.DataFrame(data, columns =fields).to_dict('records')

    for deviceRecord in records:
        if math.isnan(deviceRecord['sensor_index']):
             locationCounts['noID'] += 1
             continue

        if math.isnan(deviceRecord['latitude']) or math.isnan(deviceRecord['longitude']):
            locationCounts['noLatLon'] += 1
            continue

        epochTimestamp = computeEpochTimestamp(deviceRecord)
        if not epochTimestamp:
            locationCounts['noTimestamp'] += 1
            continue

        location = "outdoor" if deviceRecord['location_type'] == 0 else "indoor"
        locationCounts[location] += 1
        #if location == 'indoor':
        #    continue

        purpleAirId = deviceRecord['sensor_index']

        record = computeEsdrRecord(deviceRecord)
        name = computeEsdrName(deviceRecord)
        if not name:
            nonameCount += 1
            continue

        if not isinstance(purpleAirId, int):
            continue
        
        a = accumulator[purpleAirId]

        # create a bucket for this timestamp if we haven't seen it before
        if epochTimestamp not in a['records']:
            a['records'][epochTimestamp] = {}

        # copy the keys for this record into this timestamp's bucket.  Need
        # to do this to handle device A and B for the same timestamp.
        for key in record.keys():
            a['records'][epochTimestamp][key] = record[key]

    
        a['name'] = name
        lat, lon = computeLatLon(deviceRecord)
        a['lat'] = lat
        a['lon'] = lon
        a['exposure'] = location
        
        nUploads += 1

    Stat.info('%s: using %d of %d records %s' % (path, nUploads, len(records), json.dumps(locationCounts)), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    Stat.info('After merge, total of %d ESDR IDs' % len(accumulator), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    if nonameCount:
        Stat.warning('%d purpleair records had no Label field and could not be merged' % nonameCount, host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

############################################
#
# Find JSON files we haven't processed yet
#

def pathTopDir(path):
    return path.split('/')[0]

def pathSansTopDir(path):
    tokens = path.split('/')
    return '/'.join(tokens[1:])

# Get newer files than path in dir
def getNewerFiles(dir, newerThan, nSubdirLevels):
    ret = []
    if nSubdirLevels:
        for subdir in sorted(os.listdir(dir)):
            if not newerThan or subdir > pathTopDir(newerThan):
                for file in getNewerFiles(dir + '/' + subdir, None, nSubdirLevels-1):
                    ret.append(subdir + '/' + file)
            elif subdir == pathTopDir(newerThan):
                for file in getNewerFiles(dir + '/' + subdir, pathSansTopDir(newerThan), nSubdirLevels-1):
                    ret.append(subdir + '/' + file)
    else:
        for file in sorted(os.listdir(dir)):
            if not newerThan or file > newerThan:
                ret.append(file)
    return ret

In [None]:
def get_feed_from_cache_or_create_it(device_serial_number, name, lat, lon, exposure):
    global feed_cache, esdr, purpleair_product

    cache_key = (device_serial_number, lat, lon)

    if not (cache_key in feed_cache):
        device = esdr.get_or_create_device(purpleair_product, str(device_serial_number))
        feed_name = name + ' ' + purpleair_product['prettyName']
        feed = esdr.get_or_create_feed_with_name(device, feed_name, lat, lon, exposure)
        feed_id = feed['id']
        # If the name of a PurpeAir has changed since last upload, change it
        if (feed['name'] != feed_name):
            old_name = feed['name']
            esdr.update_feed_name(feed_id, feed_name)
            feed['name'] = feed_name
            Stat.info(f"Field 'name' for feed '{feed_id}' changed from '{old_name}' to '{feed_name}'", host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
        # If the exposure field (location in/out) of a PurpleAir has changed since last upload, change it
        if (feed['exposure'] != exposure):
            old_exposure = feed['exposure']
            esdr.update_feed_exposure(feed_id, exposure)
            feed['exposure'] = exposure
            Stat.info(f"Field 'exposure' for feed '{feed_id}' changed from '{old_exposure}' to '{exposure}'", host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
        feed_cache[cache_key] = feed
        #print(f"cached feed for device {device_serial_number} at ({lat},{lon})")

    return feed_cache[cache_key]

In [None]:
###############################################
#
# Upload to ESDR
#

NUM_UPLOADS_IN_PARALLEL = 6

def sortedDict(dict):
    return {k:dict[k] for k in sorted(dict.keys()) }

def uploadId(id):
    try:
        a = accumulator[id]
        if 'lat' in a and 'lon' in a:
            feed = get_feed_from_cache_or_create_it(id, a['name'], a['lat'], a['lon'], a['exposure'])
            dicts = list(sortedDict(a['records']).values())
            # df = pd.DataFrame(dicts)
            esdr.upload_dicts(feed, dicts)
            print('Successfully uploaded PurpleAir ID %d' % (id))
            #Stat.info('Successfully uploaded PurpleAir ID %d' % (id), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
        else:
            # print('Skipping ID %s because it has no lat/lon' % (id))
            Stat.warning('Skipping ID %s because it has no lat/lon' % (id), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    except requests.HTTPError as e:
        # print('Failed to upload to feed corresponding to ID %s (HTTP %d)' % (id, e.response.status_code))
        Stat.warning('Failed to upload to feed corresponding to ID %s (HTTP %d)' % (id, e.response.status_code), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    except:
        # print('Failed to upload to feed corresponding to ID %s' % (id))
        Stat.warning('Failed to upload to feed corresponding to ID %s' % (id), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
    sys.stdout.flush()
    sys.stderr.flush()

In [None]:
# device = esdr.get_device_by_serial_number(purpleair_product, str(79177))
# print(device)
# feed = esdr.get_feed(device=device, lat=float("37.258984"), lon=float("-121.95168"), order_by='-id')
# print(feed)
# print()
# if (feed['exposure'] != "outdoor"):
#   resp = esdr.update_feed_exposure(feed['id'], "outdoor")
#   print(resp)
# print()
# feed = esdr.get_feed(device=device, lat=float("37.258984"), lon=float("-121.95168"), order_by='-id')
# print(feed)

In [None]:
# feed_ids = []

# for id in feed_ids:
#      response = esdr.delete_feed_by_id(id)
#      print(response)


In [None]:
# Do this 20 times and then exit, relying on the cron job to start it up again. We do this as a simple way to deal with
# refreshing the ESDR OAuth2 token, rather than adding code to catch the HTTP 401/403 error upon upload, yada yada. The
# drawback of doing it this way is that the get_feed_from_cache_or_create_it() function has to start fresh
# with a new cache, but doing so once every 20 runs is fine.
for i in list(range(20)):
    lastUploaded = progress.get('lastUploaded', START_FILE)

    Stat.info("Mirroring PurpleAir backlog of data files newer than %s.  Last uploaded = %s" % (START_FILE, lastUploaded), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

    files = getNewerFiles('mirror2', lastUploaded, nSubdirLevels=1)

    # Only json.bz2 files
    files = list(filter(re.compile(r'\.json\.bz2$').search, files))

    # A maximum of the first 720 files
    files = files[0:720]

    accumulateReset()

    for file in files:
        accumulateJson('mirror2/' + file)

    # for id in sorted(accumulator.keys()):
    #     uploadId(id)

    # Multithreading support
    with ThreadPoolExecutor(NUM_UPLOADS_IN_PARALLEL) as pool:
        pool.map(uploadId, sorted(accumulator.keys()))

    Stat.up('Uploaded %d devices from %d files (%s ... %s)' % (len(accumulator), len(files), files[0], files[-1]), host=STAT_HOSTNAME, shortname=STAT_SHORTNAME,
            valid_for_secs=3600 * 4)
    progress['lastUploaded'] = files[-1]