# Parse And Upload ACHD

Convert CSV reports (starting 2017) to JSON-formatted data for ESDR upload

In [None]:
# Force reprocessing of earlier uploaded files
force_reprocess = False

# Dry run -- everything but actual upload
dry_run = False

In [None]:
import csv, datetime, fcntl, glob, json, math, os, re, subprocess, sys, time, xml.dom.minidom
from dateutil import tz

# To install dateutil on Ubuntu
# sudo pip install python-dateutil

def exec_ipynb(filename_or_url):
    nb = (urllib2.urlopen(filename_or_url) if re.match(r'https?:', filename_or_url) else open(filename_or_url)).read()
    jsonNb = json.loads(nb)
    #check for the modified formatting of Jupyter Notebook v4
    if(jsonNb['nbformat'] == 4):
        exec '\n'.join([''.join(cell['source']) for cell in jsonNb['cells'] if cell['cell_type'] == 'code']) in globals()
    else:
        exec '\n'.join([''.join(cell['input']) for cell in jsonNb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']) in globals()

exec_ipynb('python-utils/esdr-library.ipynb')

Timezone
--------

In [None]:
achd_tz = tz.tzoffset("EST", -5 * 3600)

# Allegheny health department always reports in Eastern Standard Time,
# even when Pittsburgh observes Eastern Daylight Time during the summer.

# In other words, ACHD times look correct in the winter, but are appear
# to be one hour behind people's clocks during the summer.

# For example, if during the summer, if
# ACHD reports 3pm Eastern Standard Time,
# that would correspond to 4pm Eastern Daylight Time.

# Test that timezone has offset 5 hours (EST) both during summer and winter

# Test this timezone.  Confirm epoch time of midnight 1/1/70 was 5 hours
date = datetime.datetime.strptime('1/1/1970 00:00', '%m/%d/%Y %H:%M').replace(tzinfo=achd_tz)
epoch = (date - datetime.datetime(1970, 1, 1, tzinfo=tz.tzutc())).total_seconds()
if epoch != 5 * 3600:
    raise Exception("Error in timezone")

date = datetime.datetime.strptime('7/1/1970 00:00', '%m/%d/%Y %H:%M').replace(tzinfo=achd_tz)
epoch = (date - datetime.datetime(1970, 1, 1, tzinfo=tz.tzutc())).total_seconds()
if epoch % 86400 != 5 * 3600:
    raise Exception("Error in timezone")

## Parse and upload single CSV file

In [None]:
esdr = None
esdr_product = None

def epoch_time(dt):
    epoch = datetime.datetime(1970, 1, 1, tzinfo=tz.tzutc())
    return (dt - epoch).total_seconds()    

# Process a table parsed from a single day
# Should have 26 total rows
# First two rows are column headers
# Next 24 are the hourly data

def process_achd_site(achd_site, base_datetime, achd_table):
    global global_achd_site
    global global_base_datetime
    global global_achd_table
    global_achd_site = achd_site
    global_base_datetime = base_datetime
    global_achd_table = achd_table
    # Replace non-alphanum with underscores in achd_site
    devname = "%s" % (re.sub('\W+', ' ',achd_site))
    channel_names = []

    # Process the header into channel names
    for i in range (1, len(achd_table[0])):
        if achd_table[0][i] == '':
            break
        channel_names.append(Esdr.make_identifier('%s_%s' % (achd_table[0][i], achd_table[1][i])))
    
    data = []

    for i in range (2, len(achd_table)):
        row = achd_table[i]
        
        # Add base_datetime to the first column of the row, which is the local time within that date
        local_row_hour =  datetime.datetime.strptime(row[0], '%H:%M')
        local_row_datetime = base_datetime.replace(hour=local_row_hour.hour).replace(tzinfo=achd_tz)
        # Offset time by 1800 seconds so we're at the center of the hour-long sample
        unix_ts = epoch_time(local_row_datetime) + 1800
        
        # data_row starts with epoch time
        data_row = [unix_ts]
        
        # Add all samples to data_row.  Add false for missing or unparsable data
        for j in range (1, len(channel_names) + 1):
            try:
                val_str = row[j].encode('utf8')
                if " " in val_str:
                    # condensation
                    val_elts = val_str.split(' ')
                    val_str = val_elts[0]
                    annotation = val_elts[1]
                    #print "Ignoring annotation %s on %s, using %s [%d][%d]" % (annotation, row[j], val_str, i, j)
                data_row.append(float(val_str))
            except:
                data_row.append(False)
        
        data.append(data_row)
    if dry_run:
        print ("Would have uploaded to devname=%s, channels=%s, starting %s" %
               (devname, channel_names, base_datetime))
    else:
        global esdr, esdr_product
        if not esdr:
            esdr = Esdr('esdr-auth.json')
        esdr_product = esdr.get_product_by_name('ACHD')
        serial_number = re.sub(r'\s+', '_', devname)
        device = esdr.get_or_create_device(esdr_product, serial_number, name=devname)
        feed = esdr.get_or_create_feed(device)
        before = time.time()
        esdr.upload(feed, {
            'channel_names': channel_names,
            'data': data
        });
        
        
        print ("Uploaded to devname=%s, channels=%s, starting %s.  Upload took %.1f seconds" % 
            (devname, channel_names, base_datetime, time.time() - before))
        
def process_page(page):
    # Find site name (e.g. 'Avalon')
    assert page[2][0] == 'Site:'
    site = page[2][1]
    
    # Find date for data from page (e.g. '4/18/2017')
    date = datetime.datetime.strptime(page[2][5].strip(), '%m/%d/%Y')

    assert page[5][0] == '00:00'
    assert page[28][0] == '23:00'
    
    process_achd_site(site, date, page[3:29])

def process_csv_file(path):
    page = []
    for row in csv.reader(open(path)):
        if re.match(r'Date Printed', row[0]):
            # Start new page
            if len(page):
                process_page(page)
                page = []
        page.append(row)
    if len(page):
        process_page(page)
    
# process_csv_file('mirror-csv/AirQualityDataSummary-2017-05-25-historical-report.csv')

## Find new CSV files not yet parsed and uploaded, and parse and upload them

In [None]:
def compute_done_path(csv_path):
    done_dir = 'upload-to-esdr'
    try:
        os.mkdir(done_dir)
    except OSError:
        pass
    return done_dir + '/' + os.path.basename(os.path.splitext(csv_path)[0]) + '.successful-upload'

def process_all():
    count = 0
    for csv in sorted(glob.glob('mirror-csv/*.csv')):
        done_file = compute_done_path(csv)
        if not os.path.exists(done_file):
            print 'Processing %s...' % csv
            process_csv_file(csv)
            open(done_file, 'w') # create empty file
            count += 1
    if count:
        print 'Parsed and uploaded %d CSV files' % count
    else:
        print 'No new CSV files to parse and upload, exiting'
        
process_all()