diff --git a/bin/user/purpleair.py b/bin/user/purpleair.py index 56b9cff..1522715 100644 --- a/bin/user/purpleair.py +++ b/bin/user/purpleair.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2018 Kenneth Baker +# Copyright 2021 Kenneth Baker # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -57,6 +57,9 @@ import time import requests import configobj +import threading +import socket +import math import weewx import weeutil.weeutil @@ -145,11 +148,7 @@ def logerr(msg): logmsg(syslog.LOG_ERR, msg) -def collect_data(session, hostname, port, timeout, now_ts = None): - # used for testing - if now_ts is None: - now_ts = int(time.time() + 0.5) - +def collect_data(session, hostname, port, timeout): if isinstance(hostname, binary_type): hostname = hostname.decode('utf-8') @@ -174,7 +173,7 @@ def collect_data(session, hostname, port, timeout, now_ts = None): j = r.json() record = dict() - record['dateTime'] = now_ts + record['dateTime'] = int(time.time()) record['usUnits'] = weewx.US # put items into record @@ -237,6 +236,7 @@ def __init__(self, engine, config_dict): self.config_dict.setdefault('port', 80) # default port is HTTP self.config_dict.setdefault('timeout', 10) # url fetch timeout + self.config_dict.setdefault('interval', 300) # how often to fetch data # get the database parameters we need to function binding = self.config_dict.get('data_binding', 'purpleair_binding') @@ -253,50 +253,97 @@ def __init__(self, engine, config_dict): if dbcol != memcol: raise Exception('purpleair schema mismatch: %s != %s' % (dbcol, memcol)) - self.last_ts = None # listen for NEW_ARCHIVE_RECORDS self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record) - # create a session - self.session = requests.Session() + + # init and start up data collection thread + self._thread = PurpleAirMonitorDataThread(self.config_dict) + self._thread.start() def shutDown(self): try: self.dbm.close() except: pass - try: - self.session.close() - except: - pass + + if self._thread: + self._thread.running = False + self._thread.join() + self._thread = None def new_archive_record(self, event): """save data to database""" - now = int(time.time() + 0.5) - delta = now - event.record['dateTime'] - if delta > event.record['interval'] * 60: - logdbg("Skipping record: time difference %s too big" % delta) - return - if self.last_ts is not None: - try: - data = self.get_data(now, self.last_ts) - except Exception as e: - # failure to fetch data, log and then return - logerr(e) - return - self.save_data(data) - self.last_ts = now + record = self._thread.get_record() + if not record: + logdbg("Skipping record: empty") + else: + delta = math.fabs(record['dateTime'] - event.record['dateTime']) + if delta > self.config_dict['interval'] * 1.5: + logdbg("Skipping record: time difference %f too big" % delta) + else: + self.save_data(record) def save_data(self, record): """save data to database""" self.dbm.addRecord(record) - def get_data(self, now_ts, last_ts): - record = collect_data(self.session, self.config_dict['hostname'], - weeutil.weeutil.to_int(self.config_dict['port']), - weeutil.weeutil.to_int(self.config_dict['timeout']), - now_ts) - record['interval'] = max(1, int((now_ts - last_ts) / 60)) - return record + + +class PurpleAirMonitorDataThread(threading.Thread): + def __init__(self, config_dict): + threading.Thread.__init__(self, name="PurpleAirMonitor") + self.config_dict = config_dict + self._lock = threading.Lock() + self._record = None + self.running = False + + def get_record(self): + with self._lock: + if not self._record: + return None + else: + return self._record.copy() + + def run(self): + # starting thread running + self.running = True + + # create a session + session = requests.Session() + + # keep track of the last time we aquired the data + last_ts = None + while self.running: + try: + # if we haven't fetched data before, or the last time we fetched the data was longer than an interval + if not last_ts or time.time() - last_ts >= weeutil.weeutil.to_int(self.config_dict['interval']): + record = collect_data(session, self.config_dict['hostname'], + weeutil.weeutil.to_int(self.config_dict['port']), + weeutil.weeutil.to_int(self.config_dict['timeout'])) + record['interval'] = int(weeutil.weeutil.to_int(self.config_dict['interval']) / 60) + + with self._lock: + self._record = record + + # store the last time data was fetched successfully + last_ts = time.time() + + time.sleep(1) + + except socket.error as e: + loginf("Socket error: %s" % e) + time.sleep(weeutil.weeutil.to_int(self.config_dict['interval'])) + except requests.RequestException as e: + loginf("Requests error: %s" % e) + time.sleep(weeutil.weeutil.to_int(self.config_dict['interval'])) + except Exception as e: + loginf("Exception: %s" % e) + time.sleep(weeutil.weeutil.to_int(self.config_dict['interval'])) + + try: + session.close() + except: + pass # To test this extension, do the following: @@ -311,7 +358,7 @@ def main(): import optparse # WeeWX Version 3.x uses syslog, later versions use logging. try: - syslog.openlog('wee_purpleair', syslog.LOG_PID | syslog.LOG_CONS) + syslog.openlog('weewx_purpleair', syslog.LOG_PID | syslog.LOG_CONS) except NameError: pass parser = optparse.OptionParser(usage=usage) @@ -347,9 +394,12 @@ def test_collector(hostname, port): time.sleep(5) def test_service(hostname, port): - from weewx.engine import StdEngine + from weewx.engine import StdEngine, DummyEngine from tempfile import NamedTemporaryFile + INTERVAL = 60 + NUM_INTERATIONS = 3 + with NamedTemporaryFile() as temp_file: config = configobj.ConfigObj({ 'Station': { @@ -363,7 +413,8 @@ def test_service(hostname, port): 'PurpleAirMonitor': { 'binding': 'purpleair_binding', 'hostname': hostname, - 'port': port}, + 'port': port, + 'interval': INTERVAL}, 'DataBindings': { 'purpleair_binding': { 'database': 'purpleair_sqlite', @@ -377,16 +428,54 @@ def test_service(hostname, port): 'driver': 'weedb.sqlite'}}, 'Engine': { 'Services': { - 'archive_services': 'user.purpleair.PurpleAirMonitor'}}}) - engine = StdEngine(config) - svc = PurpleAirMonitor(engine, config) - for _ in range(4): - record = { - 'dateTime': int(time.time()), - 'interval': 1 + 'archive_services': 'user.purpleair.PurpleAirMonitor' + } + }}) + + weeutil.logger.setup("weewx_purpleair", { + 'Logging': { + 'root' : { + 'handlers': ['console' ] + } } - event = weewx.Event(weewx.NEW_ARCHIVE_RECORD, record=record) - svc.new_archive_record(event) + }) + + print("NOTICE: please be patient this will take ~%d seconds to run" % (INTERVAL * (NUM_INTERATIONS - 0.5))) + + engine = DummyEngine(config) + manager = engine.db_binder.get_manager(data_binding='purpleair_binding') + + last_time = time.time() + try: + # wait a moment for the 1st download + time.sleep(INTERVAL / 2) + + for x in range(NUM_INTERATIONS): + record = { + 'dateTime': int(time.time()), + } + event = weewx.Event(weewx.NEW_ARCHIVE_RECORD, record=record) + engine.dispatchEvent(event) + + # get and print all the current records + now_time = time.time() + for record in manager.genBatchRecords(last_time - 1, now_time + 1): + print(record) + + # update the time window + last_time = now_time + + # wait for the INTERVAL if this isn't the last cycle + if x < NUM_INTERATIONS - 1: + time.sleep(INTERVAL) + + except KeyboardInterrupt: + pass + finally: + try: + svc.shutDown() + except: + pass + engine.shutDown() - time.sleep(5) main()