diff --git a/DeviceManager/DeviceHandler.py b/DeviceManager/DeviceHandler.py index 5e01e18..a6229f7 100644 --- a/DeviceManager/DeviceHandler.py +++ b/DeviceManager/DeviceHandler.py @@ -21,6 +21,7 @@ from DeviceManager.SerializationModels import * from DeviceManager.TenancyManager import init_tenant_context from DeviceManager.app import app +from .StatusMonitor import StatusMonitor device = Blueprint('device', __name__) diff --git a/DeviceManager/StatusMonitor.py b/DeviceManager/StatusMonitor.py new file mode 100644 index 0000000..6e7d3c0 --- /dev/null +++ b/DeviceManager/StatusMonitor.py @@ -0,0 +1,189 @@ +import time +import json +from kafka import KafkaConsumer, KafkaProducer +from kafka import ConsumerRebalanceListener +from kafka.errors import KafkaTimeoutError + +import redis + +from .conf import CONFIG +from .KafkaNotifier import get_topic +from .TenancyManager import list_tenants +from .DatabaseModels import db + +import threading + +class Listener(ConsumerRebalanceListener): + def __init__(self, monitor): + self.__monitor = monitor + self.__collectors = {} + + def on_partitions_assigned(self, assigned): + # print('got listener event {} {}'.format(len(assigned), assigned)) + for partition in assigned: + self.__monitor.collect(partition.partition) + if partition not in self.__collectors.keys(): + # print('will start iterator') + self.__monitor.begin(partition.partition) + +class StatusMonitor: + def __init__(self, tenant): + self.producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'), + bootstrap_servers=CONFIG.get_kafka_url()) + + self.tenant = tenant + self.topic=get_topic(self.tenant, CONFIG.device_subject) + + self.redis = redis.StrictRedis(host=CONFIG.redis_host, port=CONFIG.redis_port) + + self.__consumer_thread = threading.Thread(target=self.run) + self.__consumer_thread.daemon = True + self.__consumer_thread.start() + + @staticmethod + def wait_init(consumer): + done = False + while not done: + try: + # make sure we process initial partition assignment messages + consumer.poll() + consumer.seek_to_end() + done = True + except AssertionError as error: + # give kafka some time to assign us a partition + time.sleep(1) + + def run(self): + group_id="device-manager.monitor" + start = time.time() + # print('will create consumer {} {} {}'.format(CONFIG.get_kafka_url(), group_id, self.topic)) + consumer = KafkaConsumer(bootstrap_servers=CONFIG.get_kafka_url(), group_id=group_id) + consumer.subscribe(topics=[self.topic], listener=Listener(self)) + StatusMonitor.wait_init(consumer) + # print('kafka consumer created {} - {}'.format(self.topic, time.time() - start)) + # print(consumer.assignment()) + for message in consumer: + # print("Got kafka event [{}] {}".format(self.topic, message)) + data = None + try: + data = json.loads(message.value) + except Exception as error: + print("Received message is not valid json {}".format(error)) + continue + + metadata = data.get('metadata', None) + if metadata is None: + print('Invalid kafka event detected - no metadata included') + continue + + reason = metadata.get('reason', None) + if reason == 'statusUpdate': + continue + + deviceid = metadata.get('deviceid', None) + tenant = metadata.get('tenant', None) + if (deviceid is None) or (tenant is None): + print('Missing device identification from event') + continue + + self.set_online(tenant, deviceid, message.partition, metadata.get('exp', None)) + + @staticmethod + def get_key_for(tenant, device, partition): + return 'st:{}:{}:{}:exp'.format(tenant, device, partition) + + def notify(self, tenant, device, status): + message = { + 'metadata': { + 'deviceid': device, + 'tenant': tenant, + 'status': status, + 'reason': 'statusUpdate' + }, + 'attrs': { + 'status': status + } + } + self.producer.send(self.topic, message) + + def set_online(self, tenant, device, partition, exp): + """ + Brands given device as online for the stipulated period of time 'exp' + """ + if exp is None: + exp = 5 + print('will set {}:{} online for {}s'.format(tenant, device, exp)) + + key = StatusMonitor.get_key_for(tenant, device, partition) + old_ts = self.redis.get(key) + if old_ts is None: + # publish online event + self.notify(tenant, device, 'online') + + pass + self.redis.set(key, time.time() + exp) + + def begin(self, partition): + gc_thread = threading.Thread(target=self.iterate, args=(partition, -1)) + gc_thread.daemon = True + gc_thread.start() + + def iterate(self, partition=None, times=-1): + """ + Periodically invoke iterator + """ + # print('starting iterator') + while times != 0: + # now = time.time() + # print('[times] gc is about to run {}'.format(now - self.lastgc)) + # self.lastgc = now + self.collect(partition) + time.sleep(2) + if times > 0: + times -= 1 + + def collect(self, partition): + match = StatusMonitor.get_key_for(self.tenant, '*', partition) + devices = [] + + cursor, data = self.redis.scan(0, match, count=1000) + for i in data: + if i is not None: + devices.append(i.decode('utf-8')) + # print('scan {} {} {}'.format(match, cursor, data)) + while cursor != 0: + cursor, data = self.redis.scan(cursor, match, count=1000) + for i in data: + if i is not None: + devices.append(i.decode('utf-8')) + # print('scan {} {}'.format(cursor, data)) + + # print('gc devices {}'.format(devices)) + now = time.time() + for device in devices: + exp = float(self.redis.get(device).decode('utf-8')) + # print('will check {} {}'.format(device, exp)) + if now > exp: + self.redis.delete(device) + print('device {} offline'.format(device)) + parsed = device.split(':') + self.notify(parsed[1],parsed[2],'offline') + + @staticmethod + def get_status(tenant, device): + """ + Returns boolean indicating whether device is online or not. + """ + client = redis.StrictRedis(host=CONFIG.redis_host, port=CONFIG.redis_port) + match = StatusMonitor.get_key_for(tenant, device, '*') + cursor, data = client.scan(0, match, count=1000) + if len(data): + exp = client.get(data) + if time.time() < exp: + return 'online' + else: + return 'offline' + + return None + + diff --git a/DeviceManager/TenancyManager.py b/DeviceManager/TenancyManager.py index a6089a4..0d38c0d 100644 --- a/DeviceManager/TenancyManager.py +++ b/DeviceManager/TenancyManager.py @@ -1,5 +1,5 @@ import json -from sqlalchemy.sql import exists, select, text +from sqlalchemy.sql import exists, select, text, column from DeviceManager.utils import HTTPRequestError, decode_base64 @@ -100,6 +100,19 @@ def init_tenant(tenant, db): else: switch_tenant(tenant, db) +def list_tenants(db): + query = 'select schema_name from information_schema.schemata;' + tenants = db.session.execute(query) + result = [] + for i in tenants: + if i.schema_name.startswith('pg_'): + continue + if i.schema_name in ['public', 'information_schema']: + continue + + result.append(i.schema_name) + return result + def init_tenant_context(request, db): try: token = request.headers['authorization'] diff --git a/DeviceManager/conf.py b/DeviceManager/conf.py index 906201e..92efcf3 100644 --- a/DeviceManager/conf.py +++ b/DeviceManager/conf.py @@ -15,6 +15,9 @@ def __init__(self, kafka_port="9092", broker="http://data-broker", subject="dojot.device-manager.device", + device_subject="device-data", + redis_host="device-manager-redis", + redis_port="6379", create_db=True): # Postgres configuration data self.dbname = os.environ.get('DBNAME', db) @@ -34,14 +37,21 @@ def __init__(self, self.data_broker = os.environ.get('BROKER', broker) # Which subject to publish new device information to self.subject = os.environ.get('SUBJECT', subject) + self.device_subject = os.environ.get('DEVICE_SUBJECT', device_subject) + + self.redis_host = os.environ.get('REDIS_HOST', redis_host) + self.redis_port = int(os.environ.get('REDIS_PORT', redis_port)) def get_db_url(self): """ From the config, return a valid postgresql url """ if self.dbpass is not None: - return "%s://%s:%s@%s/%s" % (self.dbdriver, self.dbuser, self.dbpass, - self.dbhost, self.dbname) + return "{}://{}:{}@{}/{}".format(self.dbdriver, self.dbuser, self.dbpass, + self.dbhost, self.dbname) else: - return "%s://%s@%s/%s" % (self.dbdriver, self.dbuser, self.dbhost, self.dbname) + return "{}://{}@{}/{}".format(self.dbdriver, self.dbuser, self.dbhost, self.dbname) + + def get_kafka_url(self): + return "{}:{}".format(self.kafka_host, self.kafka_port) CONFIG = Config() diff --git a/DeviceManager/main.py b/DeviceManager/main.py index f5150fe..83e5a79 100644 --- a/DeviceManager/main.py +++ b/DeviceManager/main.py @@ -5,5 +5,12 @@ import DeviceManager.TemplateHandler import DeviceManager.ErrorManager +from .DatabaseModels import db +from .StatusMonitor import StatusMonitor +from .TenancyManager import list_tenants + +for tenant in list_tenants(db): + StatusMonitor(tenant) + if __name__ == '__main__': app.run(host='0.0.0.0', threaded=True) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 8c5d71c..eac6e7b 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -17,3 +17,4 @@ requests==2.18.4 SQLAlchemy==1.2.4 urllib3==1.22 Werkzeug==0.14.1 +redis==2.10.6 \ No newline at end of file