Skip to content

Commit

Permalink
Merge pull request #19 from bakerkj/multithreaded-service
Browse files Browse the repository at this point in the history
Multithreaded service
  • Loading branch information
bakerkj committed Apr 19, 2021
2 parents d79a93a + 097c74a commit 4c6fc1b
Showing 1 changed file with 137 additions and 48 deletions.
185 changes: 137 additions & 48 deletions bin/user/purpleair.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Kenneth Baker <bakerkj@umich.edu>
# Copyright 2021 Kenneth Baker <bakerkj@umich.edu>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -57,6 +57,9 @@
import time
import requests
import configobj
import threading
import socket
import math

import weewx
import weeutil.weeutil
Expand Down Expand Up @@ -145,11 +148,7 @@ def logerr(msg):
logmsg(syslog.LOG_ERR, msg)


def collect_data(session, hostname, port, timeout, now_ts = None):
# used for testing
if now_ts is None:
now_ts = int(time.time() + 0.5)

def collect_data(session, hostname, port, timeout):
if isinstance(hostname, binary_type):
hostname = hostname.decode('utf-8')

Expand All @@ -174,7 +173,7 @@ def collect_data(session, hostname, port, timeout, now_ts = None):
j = r.json()

record = dict()
record['dateTime'] = now_ts
record['dateTime'] = int(time.time())
record['usUnits'] = weewx.US

# put items into record
Expand Down Expand Up @@ -237,6 +236,7 @@ def __init__(self, engine, config_dict):

self.config_dict.setdefault('port', 80) # default port is HTTP
self.config_dict.setdefault('timeout', 10) # url fetch timeout
self.config_dict.setdefault('interval', 300) # how often to fetch data

# get the database parameters we need to function
binding = self.config_dict.get('data_binding', 'purpleair_binding')
Expand All @@ -253,50 +253,97 @@ def __init__(self, engine, config_dict):
if dbcol != memcol:
raise Exception('purpleair schema mismatch: %s != %s' % (dbcol, memcol))

self.last_ts = None
# listen for NEW_ARCHIVE_RECORDS
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
# create a session
self.session = requests.Session()

# init and start up data collection thread
self._thread = PurpleAirMonitorDataThread(self.config_dict)
self._thread.start()

def shutDown(self):
try:
self.dbm.close()
except:
pass
try:
self.session.close()
except:
pass

if self._thread:
self._thread.running = False
self._thread.join()
self._thread = None

def new_archive_record(self, event):
"""save data to database"""
now = int(time.time() + 0.5)
delta = now - event.record['dateTime']
if delta > event.record['interval'] * 60:
logdbg("Skipping record: time difference %s too big" % delta)
return
if self.last_ts is not None:
try:
data = self.get_data(now, self.last_ts)
except Exception as e:
# failure to fetch data, log and then return
logerr(e)
return
self.save_data(data)
self.last_ts = now
record = self._thread.get_record()
if not record:
logdbg("Skipping record: empty")
else:
delta = math.fabs(record['dateTime'] - event.record['dateTime'])
if delta > self.config_dict['interval'] * 1.5:
logdbg("Skipping record: time difference %f too big" % delta)
else:
self.save_data(record)

def save_data(self, record):
"""save data to database"""
self.dbm.addRecord(record)

def get_data(self, now_ts, last_ts):
record = collect_data(self.session, self.config_dict['hostname'],
weeutil.weeutil.to_int(self.config_dict['port']),
weeutil.weeutil.to_int(self.config_dict['timeout']),
now_ts)
record['interval'] = max(1, int((now_ts - last_ts) / 60))
return record


class PurpleAirMonitorDataThread(threading.Thread):
def __init__(self, config_dict):
threading.Thread.__init__(self, name="PurpleAirMonitor")
self.config_dict = config_dict
self._lock = threading.Lock()
self._record = None
self.running = False

def get_record(self):
with self._lock:
if not self._record:
return None
else:
return self._record.copy()

def run(self):
# starting thread running
self.running = True

# create a session
session = requests.Session()

# keep track of the last time we aquired the data
last_ts = None
while self.running:
try:
# if we haven't fetched data before, or the last time we fetched the data was longer than an interval
if not last_ts or time.time() - last_ts >= weeutil.weeutil.to_int(self.config_dict['interval']):
record = collect_data(session, self.config_dict['hostname'],
weeutil.weeutil.to_int(self.config_dict['port']),
weeutil.weeutil.to_int(self.config_dict['timeout']))
record['interval'] = int(weeutil.weeutil.to_int(self.config_dict['interval']) / 60)

with self._lock:
self._record = record

# store the last time data was fetched successfully
last_ts = time.time()

time.sleep(1)

except socket.error as e:
loginf("Socket error: %s" % e)
time.sleep(weeutil.weeutil.to_int(self.config_dict['interval']))
except requests.RequestException as e:
loginf("Requests error: %s" % e)
time.sleep(weeutil.weeutil.to_int(self.config_dict['interval']))
except Exception as e:
loginf("Exception: %s" % e)
time.sleep(weeutil.weeutil.to_int(self.config_dict['interval']))

try:
session.close()
except:
pass


# To test this extension, do the following:
Expand All @@ -311,7 +358,7 @@ def main():
import optparse
# WeeWX Version 3.x uses syslog, later versions use logging.
try:
syslog.openlog('wee_purpleair', syslog.LOG_PID | syslog.LOG_CONS)
syslog.openlog('weewx_purpleair', syslog.LOG_PID | syslog.LOG_CONS)
except NameError:
pass
parser = optparse.OptionParser(usage=usage)
Expand Down Expand Up @@ -347,9 +394,12 @@ def test_collector(hostname, port):
time.sleep(5)

def test_service(hostname, port):
from weewx.engine import StdEngine
from weewx.engine import StdEngine, DummyEngine
from tempfile import NamedTemporaryFile

INTERVAL = 60
NUM_INTERATIONS = 3

with NamedTemporaryFile() as temp_file:
config = configobj.ConfigObj({
'Station': {
Expand All @@ -363,7 +413,8 @@ def test_service(hostname, port):
'PurpleAirMonitor': {
'binding': 'purpleair_binding',
'hostname': hostname,
'port': port},
'port': port,
'interval': INTERVAL},
'DataBindings': {
'purpleair_binding': {
'database': 'purpleair_sqlite',
Expand All @@ -377,16 +428,54 @@ def test_service(hostname, port):
'driver': 'weedb.sqlite'}},
'Engine': {
'Services': {
'archive_services': 'user.purpleair.PurpleAirMonitor'}}})
engine = StdEngine(config)
svc = PurpleAirMonitor(engine, config)
for _ in range(4):
record = {
'dateTime': int(time.time()),
'interval': 1
'archive_services': 'user.purpleair.PurpleAirMonitor'
}
}})

weeutil.logger.setup("weewx_purpleair", {
'Logging': {
'root' : {
'handlers': ['console' ]
}
}
event = weewx.Event(weewx.NEW_ARCHIVE_RECORD, record=record)
svc.new_archive_record(event)
})

print("NOTICE: please be patient this will take ~%d seconds to run" % (INTERVAL * (NUM_INTERATIONS - 0.5)))

engine = DummyEngine(config)
manager = engine.db_binder.get_manager(data_binding='purpleair_binding')

last_time = time.time()
try:
# wait a moment for the 1st download
time.sleep(INTERVAL / 2)

for x in range(NUM_INTERATIONS):
record = {
'dateTime': int(time.time()),
}
event = weewx.Event(weewx.NEW_ARCHIVE_RECORD, record=record)
engine.dispatchEvent(event)

# get and print all the current records
now_time = time.time()
for record in manager.genBatchRecords(last_time - 1, now_time + 1):
print(record)

# update the time window
last_time = now_time

# wait for the INTERVAL if this isn't the last cycle
if x < NUM_INTERATIONS - 1:
time.sleep(INTERVAL)

except KeyboardInterrupt:
pass
finally:
try:
svc.shutDown()
except:
pass
engine.shutDown()

time.sleep(5)
main()

0 comments on commit 4c6fc1b

Please sign in to comment.