Skip to content

Commit

Permalink
psrt notifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Oct 30, 2021
1 parent 8f521c3 commit 8900dd6
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Common
* MQTT compression for slow connections (notifier option "bulk_compress" to
compress bulk MQTT state payloads and controller option "compress" to
compress API calls)
* PSRT notifiers
* CLI client SSL verify is now on by default. To suppress SSL certificate
verification, manually set URL as "!https://..."
* State registry keys are not auto backed up any longer
Expand Down
29 changes: 27 additions & 2 deletions cli/notifymanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ComplNProto(object):

def __call__(self, prefix, **kwargs):
return [
'mqtt:', 'gcpiot:', 'json:',
'mqtt:', 'psrt:', 'gcpiot:', 'json:',
'db:', 'influxdb:', 'prometheus:'] if \
prefix and prefix.find(':') == -1 else True

Expand Down Expand Up @@ -90,6 +90,7 @@ def add_notifier_common_functions(self):
Notifier properties:
json:http(s)://[key]@uri[#jsonrpc|#list]
mqtt:[username:password]@host:[port]
psrt:[username:password]@host:[port]
gcpiot:project_id/region/registry
db:db_uri
timescaledb:db_uri
Expand Down Expand Up @@ -330,6 +331,29 @@ def create_notifier(self, params):
password=password,
space=space,
timeout=timeout)
elif p[0] == 'psrt':
_p = ':'.join(p[1:])
if _p.find('@') != -1:
auth = _p.split('@')[0]
host = _p.split('@')[1]
username = auth.split(':')[0]
try:
password = auth.split(':')[1]
except:
password = None
else:
username = None
password = None
host = _p
from eva.tools import parse_host_port
host, port = parse_host_port(host)
n = eva.notify.PSRTNotifier(notifier_id=notifier_id,
host=host,
port=port,
username=username,
password=password,
space=space,
timeout=timeout)
elif p[0] == 'db':
db_uri = ':'.join(p[1:])
n = eva.notify.SQLANotifier(notifier_id=notifier_id,
Expand Down Expand Up @@ -398,7 +422,8 @@ def list_notifiers(self, params):
n['params'] = '{}/{}/{}'.format(i.project, i.region, i.registry)
elif isinstance(i, eva.notify.UDPNotifier):
n['params'] = '{}:{} ({})'.format(i.host, i.port, i.fmt)
elif isinstance(i, eva.notify.MQTTNotifier):
elif isinstance(i, eva.notify.MQTTNotifier) or isinstance(
i, eva.notify.PSRTNotifier):
if i.username is not None:
n['params'] = '%s%s@' % (i.username,
':*' if i.password else '')
Expand Down
126 changes: 112 additions & 14 deletions lib/eva/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2272,7 +2272,7 @@ def __init__(self,
certfile=None,
keyfile=None,
proto='mqtt'):
notifier_type = 'mqtt'
notifier_type = proto
self.buf = {}
super().__init__(notifier_id=notifier_id,
notifier_type=notifier_type,
Expand Down Expand Up @@ -3111,7 +3111,6 @@ def serialize(self, props=False):
d['bulk_topic'] = self.bulk_topic
d['bulk_compress'] = self.bulk_compress
d['bulk_subscribe'] = self.bulk_subscribe
d['proto'] = self.proto
d.update(super().serialize(props=props))
return d

Expand All @@ -3120,12 +3119,6 @@ def set_prop(self, prop, value):
v = eva.tools.val_to_boolean(value)
self.collect_logs = v
return True
elif prop == 'proto':
if value not in ['mqtt', 'psrt']:
return False
else:
self.proto = value
return True
elif prop == 'bulk_compress':
v = eva.tools.val_to_boolean(value)
self.bulk_compress = v
Expand Down Expand Up @@ -3281,6 +3274,74 @@ def set_prop(self, prop, value):
return super().set_prop(prop, value)


class PSRTNotifier(GenericMQTTNotifier):
disabled_props = ['keepalive', 'certfile', 'keyfile', 'qos']

def __init__(self,
notifier_id,
host,
port=None,
space=None,
interval=None,
buf_ttl=0,
bulk_topic=None,
bulk_subscribe=None,
bulk_compress=False,
username=None,
password=None,
timeout=None,
collect_logs=None,
api_enabled=None,
discovery_enabled=None,
announce_interval=None,
ping_interval=None,
subscribe_all=False,
timestamp_enabled=True,
ca_certs=None):
if port is None:
port = 2883
super().__init__(notifier_id=notifier_id,
host=host,
port=port,
space=space,
interval=interval,
buf_ttl=buf_ttl,
bulk_topic=bulk_topic,
bulk_subscribe=bulk_subscribe,
bulk_compress=bulk_compress,
username=username,
password=password,
qos=None,
keepalive=None,
timeout=timeout,
collect_logs=collect_logs,
api_enabled=api_enabled,
discovery_enabled=discovery_enabled,
announce_interval=announce_interval,
ping_interval=ping_interval,
retain_enabled=False,
subscribe_all=subscribe_all,
timestamp_enabled=timestamp_enabled,
ca_certs=ca_certs,
certfile=None,
keyfile=None,
proto='psrt')

def set_prop(self, prop, value):
if prop in self.disabled_props:
return False
return super().set_prop(prop, value)

def serialize(self, props=False):
d = super().serialize(props=props)
for p in self.disabled_props:
try:
del d[p]
except KeyError:
pass
return d


class MQTTNotifier(GenericMQTTNotifier):

def __init__(self,
Expand Down Expand Up @@ -3308,8 +3369,7 @@ def __init__(self,
timestamp_enabled=True,
ca_certs=None,
certfile=None,
keyfile=None,
proto='mqtt'):
keyfile=None):
super().__init__(notifier_id=notifier_id,
host=host,
port=port,
Expand All @@ -3335,7 +3395,7 @@ def __init__(self,
ca_certs=ca_certs,
certfile=certfile,
keyfile=keyfile,
proto=proto)
proto='mqtt')


class UDPNotifier(GenericNotifier):
Expand Down Expand Up @@ -4006,7 +4066,6 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True):
bulk_topic = ncfg.get('bulk_topic')
bulk_subscribe = ncfg.get('bulk_subscribe')
bulk_compress = ncfg.get('bulk_compress', False)
proto = ncfg.get('proto', 'mqtt')
n = MQTTNotifier(notifier_id,
host=host,
port=port,
Expand All @@ -4031,8 +4090,47 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True):
bulk_compress=bulk_compress,
ca_certs=ca_certs,
certfile=certfile,
keyfile=keyfile,
proto=proto)
keyfile=keyfile)
elif ncfg['type'] == 'psrt':
host = ncfg.get('host')
port = ncfg.get('port')
ca_certs = ncfg.get('ca_certs')
space = ncfg.get('space')
username = ncfg.get('username')
password = ncfg.get('password')
timeout = ncfg.get('timeout')
buf_ttl = ncfg.get('buf_ttl', 0)
interval = ncfg.get('interval')
collect_logs = ncfg.get('collect_logs', False)
api_enabled = ncfg.get('api_enabled', False)
discovery_enabled = ncfg.get('discovery_enabled', False)
announce_interval = ncfg.get('announce_interval', 0)
ping_interval = ncfg.get('ping_interval', 30)
subscribe_all = ncfg.get('subscribe_all', False)
timestamp_enabled = ncfg.get('timestamp_enabled', True)
bulk_topic = ncfg.get('bulk_topic')
bulk_subscribe = ncfg.get('bulk_subscribe')
bulk_compress = ncfg.get('bulk_compress', False)
n = PSRTNotifier(notifier_id,
host=host,
port=port,
space=space,
interval=interval,
username=username,
password=password,
timeout=timeout,
buf_ttl=buf_ttl,
collect_logs=collect_logs,
api_enabled=api_enabled,
discovery_enabled=discovery_enabled,
announce_interval=announce_interval,
ping_interval=ping_interval,
subscribe_all=subscribe_all,
timestamp_enabled=timestamp_enabled,
bulk_topic=bulk_topic,
bulk_subscribe=bulk_subscribe,
bulk_compress=bulk_compress,
ca_certs=ca_certs)
elif ncfg['type'] == 'udp':
interval = ncfg.get('interval')
buf_ttl = ncfg.get('buf_ttl', 0)
Expand Down

0 comments on commit 8900dd6

Please sign in to comment.