In [0]:
# Boilerplate to load utils.ipynb
# See https://github.com/CMU-CREATE-Lab/python-utils/blob/master/utils.ipynb

import concurrent.futures, dateutil.tz, json, math, os, re, requests, subprocess, sys

if not os.path.exists('python-utils'):
    subprocess.check_output('git clone https://github.com/CMU-CREATE-Lab/python-utils.git', shell=True)

def exec_ipynb(filename_or_url):
    nb = (requests.get(filename_or_url).json() if re.match(r'https?:', filename_or_url) else json.load(open(filename_or_url)))
    if(nb['nbformat'] >= 4):
        src = [''.join(cell['source']) for cell in nb['cells'] if cell['cell_type'] == 'code']
    else:
        src = [''.join(cell['input']) for cell in nb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']
    exec('\n'.join(src), globals())

exec_ipynb('python-utils/utils.ipynb')
exec_ipynb('python-utils/esdr-library.ipynb')

# Wide display
from IPython.core.display import display, HTML
display(HTML("<style>#notebook-container { margin-left:-14px; width:calc(100% + 27px) !important; }</style>"))

In [0]:
try:
    import mysql.connector
except:
    subprocess_check('conda install -y mysql-connector-python', verbose=True)
    import mysql.connector


In [0]:
# First time uploading, create a new client like so:

# Esdr.save_client('esdr-auth-argos-uploader.json', 'Argos uploader')

# and then follow the directions it prints, which include visiting esdr.cmucreatelab.org and creating
# a client with given parameters, and also editing json to include your
# username and password

# Do not add esdr-auth-*.json to the git repo
# !echo 'esdr-auth-*.json' >>.gitignore

In [0]:
esdr = Esdr('esdr-auth-argos-uploader.json')

product = esdr.get_or_create_product('ArgosSpectrometer',
                                     vendor='Argos',
                                     description='Argos Open Path and Hound Air Spectrometers',
                                     default_channel_specs={
                                         'version':1,
                                         'channels': {
                                             'signal_strength': {
                                                 'prettyName': 'Signal Strength'
                                             },
                                             'qa_complete': {
                                                 'prettyName': 'QA complete'
                                             },
                                             'status': {
                                                 'prettyName': 'Status'
                                             },
                                             'SO2': {
                                                'prettyName': 'SO2 PPB',
                                                'units': 'PPB'
                                             },
                                             'benzene': {
                                                 'prettyName': 'Benzene PPB',
                                                 'units': 'PPB'
                                             },
                                         }
                                     })                           

device = esdr.get_or_create_device(product, 'Glassport')

feed = esdr.get_or_create_feed(device, 40.326009, -79.881742)

In [0]:
# Do not add argos-auth.json to the git repo
#!echo 'argos-auth.json' >>.gitignore

argos_auth = json.load(open('argos-auth.json'))

integration_time = 5 * 60 # seconds




# Note that database timezone is Pacific Local, even though sensor is in Eastern Time Zone
database_timezone = dateutil.tz.gettz('America/Los_Angeles') # Pacific local time (daylight savings observed in summer)

In [0]:
def format_dhm(secs):
    days = math.floor(secs / 86400)
    hours = math.floor((secs % 86400) / 3600)
    mins = math.floor((secs % 3600) / 60)
    ret = []
    if days:
        ret.append('%d days' % days)
    if hours:
        ret.append('%d hrs' % hours)
    if len(ret) == 0 or (mins and not days):
        ret.append('%d mins' % mins)
    return ', '.join(ret)


In [0]:
# Returns end record
def download_and_upload(table, start_record, mysql2esdr_colmap, nickname, max_staleness_secs):
    cur.execute('SELECT * FROM %s ORDER BY sampledate LIMIT 100000 OFFSET %s;' % (table, start_record))

    assert(cur.column_names[0] == 'sampledate')
    assert(cur.column_names[1] == 'actualdate')

    actual_sql_data_cols = sorted(cur.column_names[2:])
    expected_sql_data_cols = sorted(mysql2esdr_colmap.keys())

    if actual_sql_data_cols == expected_sql_data_cols:
        print('Data columns: %s' % mysql2esdr_colmap)
    else:
        print('WARNING: Expected sql data columns %s but found %s' % (expected_sql_data_cols, actual_sql_data_cols))
    
    data = []

    for sample in cur:
        assert(cur.column_names[0] == 'sampledate')
        sample_end_epochtime = sample[0].replace(tzinfo=database_timezone).timestamp()
        sample_midpoint_epochtime = sample_end_epochtime - 0.5 * integration_time
        sample_data = list(sample[2:])
        data.append([sample_midpoint_epochtime] + sample_data)
        
    channel_names = [mysql2esdr_colmap[c] for c in cur.column_names[2:]]
    esdr.upload(feed, {'channel_names': channel_names, 'data':data})

    staleness_secs = time.time() - data[-1][0]
    
    too_stale = staleness_secs > max_staleness_secs
    
    Stat.info('Starting at record %d, captured and uploaded %d samples of %d channels each, with last sample %d seconds ago (health limit %d secs)' 
                  % (start_record, len(data), len(data[0])-1, staleness_secs, max_staleness_secs),
               host = nickname)
    
    if too_stale:
        msg = '%s is TOO OLD (%s old > %s)' % (nickname, format_dhm(staleness_secs), format_dhm(max_staleness_secs))
    else:
        msg = '%s is recent (%s old <= %s)' % (nickname, format_dhm(staleness_secs), format_dhm(max_staleness_secs))
    
    updown = Stat.down if too_stale else Stat.up
    updown(msg, host=nickname, shortname=nickname)
    
    return start_record + len(data)



In [0]:
mysql2esdr_colmap_raw = {
    'signalstrength': 'signal_strength_v2',
    'ben': 'benzene_v2',
    'nh3': 'ammonia_v2',
    'tol': 'toluene_v2',
    'so2': 'so2_v2',
    'xyl': 'p-xylene_v2',
    'naph': 'naphthalene_v2',
    'status': 'status_v2'
}

mysql2esdr_colmap_qa = {
    'signalstrength': 'signal_strength_qa_v2',
    'ben': 'benzene_qa_v2',
    'nh3': 'ammonia_qa_v2',
    'tol': 'toluene_qa_v2',
    'so2': 'so2_qa_v2',
    'xyl': 'p-xylene_qa_v2',
    'naph': 'naphthalene_qa_v2',
    'status': 'status_qa_v2'
}

Stat.set_service('Mirror Argos to ESDR')


cur = None
cnx = None

def mirror():
    global cnx, cur
    cnx = mysql.connector.connect(
        host=argos_auth['hostname'],
        user=argos_auth['username'],
        password=argos_auth['password'],
        database='argos',
        connection_timeout = 30)

    cur = cnx.cursor(buffered=True)
    end_record = 0
    end_record_qa = 0
    while True:
        end_record = download_and_upload('pittsburg', 
                                         max(0, end_record - 500), 
                                         mysql2esdr_colmap_raw, 
                                         'Realtime',
                                         20 * 60) # 20 mins max staleness

        end_record_qa = download_and_upload('pittsburg_r',
                                            max(0, end_record_qa - 500),
                                            mysql2esdr_colmap_qa,
                                            'QA',
                                            9 * 86400) # 9 days max staleness

        sleep_until_next_period(60)


In [0]:
try:
    mirror()
except Exception as e:
    for nickname in ['Realtime', 'QA']:
        Stat.down('Exception while mirroring', 
                  host=nickname,
                  shortname=nickname,
                  details='Exception %s' % e)