Skip to content

Commit

Permalink
Merge pull request #56 from mmagr/status_update
Browse files Browse the repository at this point in the history
Implements status monitoring
  • Loading branch information
mmagr committed Mar 29, 2018
2 parents a53c31e + c651151 commit 2ab91d6
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 4 deletions.
1 change: 1 addition & 0 deletions DeviceManager/DeviceHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from DeviceManager.SerializationModels import *
from DeviceManager.TenancyManager import init_tenant_context
from DeviceManager.app import app
from .StatusMonitor import StatusMonitor

device = Blueprint('device', __name__)

Expand Down
189 changes: 189 additions & 0 deletions DeviceManager/StatusMonitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import time
import json
from kafka import KafkaConsumer, KafkaProducer
from kafka import ConsumerRebalanceListener
from kafka.errors import KafkaTimeoutError

import redis

from .conf import CONFIG
from .KafkaNotifier import get_topic
from .TenancyManager import list_tenants
from .DatabaseModels import db

import threading

class Listener(ConsumerRebalanceListener):
def __init__(self, monitor):
self.__monitor = monitor
self.__collectors = {}

def on_partitions_assigned(self, assigned):
# print('got listener event {} {}'.format(len(assigned), assigned))
for partition in assigned:
self.__monitor.collect(partition.partition)
if partition not in self.__collectors.keys():
# print('will start iterator')
self.__monitor.begin(partition.partition)

class StatusMonitor:
def __init__(self, tenant):
self.producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=CONFIG.get_kafka_url())

self.tenant = tenant
self.topic=get_topic(self.tenant, CONFIG.device_subject)

self.redis = redis.StrictRedis(host=CONFIG.redis_host, port=CONFIG.redis_port)

self.__consumer_thread = threading.Thread(target=self.run)
self.__consumer_thread.daemon = True
self.__consumer_thread.start()

@staticmethod
def wait_init(consumer):
done = False
while not done:
try:
# make sure we process initial partition assignment messages
consumer.poll()
consumer.seek_to_end()
done = True
except AssertionError as error:
# give kafka some time to assign us a partition
time.sleep(1)

def run(self):
group_id="device-manager.monitor"
start = time.time()
# print('will create consumer {} {} {}'.format(CONFIG.get_kafka_url(), group_id, self.topic))
consumer = KafkaConsumer(bootstrap_servers=CONFIG.get_kafka_url(), group_id=group_id)
consumer.subscribe(topics=[self.topic], listener=Listener(self))
StatusMonitor.wait_init(consumer)
# print('kafka consumer created {} - {}'.format(self.topic, time.time() - start))
# print(consumer.assignment())
for message in consumer:
# print("Got kafka event [{}] {}".format(self.topic, message))
data = None
try:
data = json.loads(message.value)
except Exception as error:
print("Received message is not valid json {}".format(error))
continue

metadata = data.get('metadata', None)
if metadata is None:
print('Invalid kafka event detected - no metadata included')
continue

reason = metadata.get('reason', None)
if reason == 'statusUpdate':
continue

deviceid = metadata.get('deviceid', None)
tenant = metadata.get('tenant', None)
if (deviceid is None) or (tenant is None):
print('Missing device identification from event')
continue

self.set_online(tenant, deviceid, message.partition, metadata.get('exp', None))

@staticmethod
def get_key_for(tenant, device, partition):
return 'st:{}:{}:{}:exp'.format(tenant, device, partition)

def notify(self, tenant, device, status):
message = {
'metadata': {
'deviceid': device,
'tenant': tenant,
'status': status,
'reason': 'statusUpdate'
},
'attrs': {
'status': status
}
}
self.producer.send(self.topic, message)

def set_online(self, tenant, device, partition, exp):
"""
Brands given device as online for the stipulated period of time 'exp'
"""
if exp is None:
exp = 5
print('will set {}:{} online for {}s'.format(tenant, device, exp))

key = StatusMonitor.get_key_for(tenant, device, partition)
old_ts = self.redis.get(key)
if old_ts is None:
# publish online event
self.notify(tenant, device, 'online')

pass
self.redis.set(key, time.time() + exp)

def begin(self, partition):
gc_thread = threading.Thread(target=self.iterate, args=(partition, -1))
gc_thread.daemon = True
gc_thread.start()

def iterate(self, partition=None, times=-1):
"""
Periodically invoke iterator
"""
# print('starting iterator')
while times != 0:
# now = time.time()
# print('[times] gc is about to run {}'.format(now - self.lastgc))
# self.lastgc = now
self.collect(partition)
time.sleep(2)
if times > 0:
times -= 1

def collect(self, partition):
match = StatusMonitor.get_key_for(self.tenant, '*', partition)
devices = []

cursor, data = self.redis.scan(0, match, count=1000)
for i in data:
if i is not None:
devices.append(i.decode('utf-8'))
# print('scan {} {} {}'.format(match, cursor, data))
while cursor != 0:
cursor, data = self.redis.scan(cursor, match, count=1000)
for i in data:
if i is not None:
devices.append(i.decode('utf-8'))
# print('scan {} {}'.format(cursor, data))

# print('gc devices {}'.format(devices))
now = time.time()
for device in devices:
exp = float(self.redis.get(device).decode('utf-8'))
# print('will check {} {}'.format(device, exp))
if now > exp:
self.redis.delete(device)
print('device {} offline'.format(device))
parsed = device.split(':')
self.notify(parsed[1],parsed[2],'offline')

@staticmethod
def get_status(tenant, device):
"""
Returns boolean indicating whether device is online or not.
"""
client = redis.StrictRedis(host=CONFIG.redis_host, port=CONFIG.redis_port)
match = StatusMonitor.get_key_for(tenant, device, '*')
cursor, data = client.scan(0, match, count=1000)
if len(data):
exp = client.get(data)
if time.time() < exp:
return 'online'
else:
return 'offline'

return None


15 changes: 14 additions & 1 deletion DeviceManager/TenancyManager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from sqlalchemy.sql import exists, select, text
from sqlalchemy.sql import exists, select, text, column

from DeviceManager.utils import HTTPRequestError, decode_base64

Expand Down Expand Up @@ -100,6 +100,19 @@ def init_tenant(tenant, db):
else:
switch_tenant(tenant, db)

def list_tenants(db):
query = 'select schema_name from information_schema.schemata;'
tenants = db.session.execute(query)
result = []
for i in tenants:
if i.schema_name.startswith('pg_'):
continue
if i.schema_name in ['public', 'information_schema']:
continue

result.append(i.schema_name)
return result

def init_tenant_context(request, db):
try:
token = request.headers['authorization']
Expand Down
16 changes: 13 additions & 3 deletions DeviceManager/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def __init__(self,
kafka_port="9092",
broker="http://data-broker",
subject="dojot.device-manager.device",
device_subject="device-data",
redis_host="device-manager-redis",
redis_port="6379",
create_db=True):
# Postgres configuration data
self.dbname = os.environ.get('DBNAME', db)
Expand All @@ -34,14 +37,21 @@ def __init__(self,
self.data_broker = os.environ.get('BROKER', broker)
# Which subject to publish new device information to
self.subject = os.environ.get('SUBJECT', subject)
self.device_subject = os.environ.get('DEVICE_SUBJECT', device_subject)

self.redis_host = os.environ.get('REDIS_HOST', redis_host)
self.redis_port = int(os.environ.get('REDIS_PORT', redis_port))

def get_db_url(self):
""" From the config, return a valid postgresql url """
if self.dbpass is not None:
return "%s://%s:%s@%s/%s" % (self.dbdriver, self.dbuser, self.dbpass,
self.dbhost, self.dbname)
return "{}://{}:{}@{}/{}".format(self.dbdriver, self.dbuser, self.dbpass,
self.dbhost, self.dbname)
else:
return "%s://%s@%s/%s" % (self.dbdriver, self.dbuser, self.dbhost, self.dbname)
return "{}://{}@{}/{}".format(self.dbdriver, self.dbuser, self.dbhost, self.dbname)

def get_kafka_url(self):
return "{}:{}".format(self.kafka_host, self.kafka_port)


CONFIG = Config()
7 changes: 7 additions & 0 deletions DeviceManager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,12 @@
import DeviceManager.TemplateHandler
import DeviceManager.ErrorManager

from .DatabaseModels import db
from .StatusMonitor import StatusMonitor
from .TenancyManager import list_tenants

for tenant in list_tenants(db):
StatusMonitor(tenant)

if __name__ == '__main__':
app.run(host='0.0.0.0', threaded=True)
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ requests==2.18.4
SQLAlchemy==1.2.4
urllib3==1.22
Werkzeug==0.14.1
redis==2.10.6

0 comments on commit 2ab91d6

Please sign in to comment.