In [1]:
import re, requests, json, os, datetime

In [2]:
def exec_ipynb(filename_or_url):
    nb = (requests.get(filename_or_url).json() if re.match(r'https?:', filename_or_url) else json.load(open(filename_or_url)))
    if(nb['nbformat'] >= 4):
        src = [''.join(cell['source']) for cell in nb['cells'] if cell['cell_type'] == 'code']
    else:
        src = [''.join(cell['input']) for cell in nb['worksheets'][0]['cells'] if cell['cell_type'] == 'code']

    tmpname = '/tmp/%s-%s-%d.py' % (os.path.basename(filename_or_url),
                                    datetime.datetime.now().strftime('%Y%m%d%H%M%S%f'),
                                    os.getpid())
    src = '\n\n\n'.join(src)
    open(tmpname, 'w').write(src)
    code = compile(src, tmpname, 'exec')
    exec(code, globals())

exec_ipynb('./python-utils/esdr-library.ipynb')
exec_ipynb('./python-utils/utils.ipynb')
esdr = Esdr('esdr-auth-2btech-uploader.json')

In [3]:
STAT_SERVICE_NAME = '2BTech Monitoring Data Mirror'
STAT_HOSTNAME = 'hal21'
STAT_SHORTNAME = '2btech-data-mirror'

Stat.set_service(STAT_SERVICE_NAME)

In [4]:
def get_new_token():
    #read client_id from auth.json
    with open('2btech_auth.json') as f:
        data = json.load(f)
        client_id = data['client_id']
        client_secret = data['client_secret']

    audience = "https://sharpapi.airqdb.com"
    grant_type = "client_credentials"


    response = requests.post(
        "https://airdb.auth0.com/oauth/token",
        data={
            "client_id": client_id,
            "client_secret": client_secret,
            "audience": audience,
            "grant_type": grant_type
        }
    )

    token_info = response.json()
    token_info['expiration_epoch'] = (datetime.datetime.now() + datetime.timedelta(seconds=data['expires_in'])).timestamp()

    with open('token.json', 'w') as f:
        json.dump(token_info, f)
    
    return token_info['access_token']

In [5]:
def read_or_get_token():
    with open('token.json') as f:
        data = json.load(f)
    if(data['expiration_epoch'] < datetime.datetime.now().timestamp()):
        print("Getting new access token...")
        token = get_new_token()
    else:
        print("Using current access token...")
        token = data['access_token']
    return token

In [6]:
def get_2btech_data(access_token, start_iso, end_iso):
    # Make a request to get time series data (no averaging applied)
    print(f"Getting data from {start_iso} to {end_iso}...")
    response = requests.get(
        "https://air.api.airqdb.com/v2/uploads/primary/time-series/AQSync-1034",
        headers={
            "Authorization": f"Bearer {access_token}"
        },
        params={
            "start": start_iso,
            "end": end_iso,
            "average": "0"
        }
    )
    if response.status_code != 200:
        print("Request to air.api.airqdb.com failed with code %s and message: %s" % (response.code, response.text))
        return None
    else:
        print("Request to air.api.airqdb.com succeeded")
        rsp = response.json()
        if 'GPS:LAT' in rsp:
            print("Parsing %s data points..." % len(rsp['GPS:LAT']))
        else:
            print("No data retrieved")
        return response.json()

In [7]:
def get_aqsync_esdr_feed(data):
    product = esdr.get_or_create_product('AQSync', '2BTech', 'Air quality monitoring station by 2BTech')

    sample = data['GPS:LAT'][0]
    dv = {
        "name": sample['dataPoint']['sid'],
        "code": sample['dataPoint']['sid'].split('-')[1],
        # "latitude": sample['dataPoint']['value'],
        # "longitude": sample['dataPoint']['value']
    }

    esdrDevice = esdr.get_or_create_device(product, dv["code"], dv["name"])
    feed = esdr.get_or_create_feed(esdrDevice)
    return feed

In [8]:
def format_2btech_data(data):
    new_keyed_data = {x.split(':')[1] : y for x,y in data.items()}
    time_dict = {}

    for channel_name in new_keyed_data:
        if channel_name in ['Cell Strength', 'isBackfill', 'UploadType']:
                continue
        for data_point in new_keyed_data[channel_name]:
            tstamp = datetime.datetime.fromisoformat(data_point['averagedStartDate']).timestamp()
            try:
                val = float(data_point['dataPoint']['value'])
            except ValueError:
                val = None
            if not tstamp in time_dict:
                time_dict[tstamp] = {channel_name: val}
            else:
                time_dict[tstamp][channel_name] = val
                
    esdr_data = [{'time':t, **v} for t,v in time_dict.items()]
    return esdr_data


In [9]:
end = datetime.datetime.now(datetime.UTC).isoformat().split('.')[0].replace('+00:00', 'Z')
start = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(hours=1)).isoformat().split('.')[0].replace('+00:00', 'Z')

data = get_2btech_data(read_or_get_token(), start, end)
feed = get_aqsync_esdr_feed(data)
esdr_data = format_2btech_data(data)

try:
    esdr.upload_dicts(feed, esdr_data)
    Stat.up('Mirror completed successfully', host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)
except Exception as e:
    Stat.down('Exception recorded %s' % e, host=STAT_HOSTNAME, shortname=STAT_SHORTNAME)

Using current access token...
Getting data from 2024-05-17T13:57:25 to 2024-05-17T17:57:25...
Request to air.api.airqdb.com succeeded
Parsing 47 data points...
uploaded 47 records of 19 values to AQSync-1034 AQSync/153223/174124
Stat.log up 2BTech Monitoring Data Mirror hal21 Mirror completed successfully None
