From 2524a2cdc29a52340a360b3e32b6f10d13c4a51b Mon Sep 17 00:00:00 2001 From: Sergei S Date: Fri, 12 Nov 2021 00:05:28 +0100 Subject: [PATCH] file notifiers --- cli/notifymanager.py | 12 +++ doc/notifiers.rst | 22 ++++++ lib/eva/notify.py | 183 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+) diff --git a/cli/notifymanager.py b/cli/notifymanager.py index 1f63b3d3..dff2fc43 100644 --- a/cli/notifymanager.py +++ b/cli/notifymanager.py @@ -95,6 +95,7 @@ def add_notifier_common_functions(self): db:db_uri timescaledb:db_uri udp:host:port + file:file_path# influxdb:http(s)://uri#[org/]database prometheus:'''), metavar='PROPS').completer = self.ComplNProto() @@ -360,6 +361,15 @@ def create_notifier(self, params): db_uri=db_uri, keep=None, space=space) + elif p[0] == 'file': + path = ':'.join(p[1:]) + try: + path, file_format = path.split('#', 2) + except: + return self.local_func_result_failed + n = eva.notify.FileNotifier(notifier_id=notifier_id, + path=path, + file_format=file_format) elif p[0] == 'timescaledb': db_uri = ':'.join(p[1:]) n = eva.notify.TimescaleNotifier(notifier_id=notifier_id, @@ -411,6 +421,8 @@ def list_notifiers(self, params): method = getattr(i, 'method', None) n['params'] = 'uri: {}{} '.format( i.uri, ('#{}'.format(method) if method else '')) + elif isinstance(i, eva.notify.FileNotifier): + n['params'] = f'path: {i.path} ({i.file_format})' elif isinstance(i, eva.notify.SQLANotifier) or isinstance( i, eva.notify.TimescaleNotifier): n['params'] = 'db: %s' % i.db_uri diff --git a/doc/notifiers.rst b/doc/notifiers.rst index a209768e..7b8e1339 100644 --- a/doc/notifiers.rst +++ b/doc/notifiers.rst @@ -873,6 +873,28 @@ E.g., let's toggle *unit:equipment/cctv*: {"jsonrpc": "2.0", "method": "action_toggle" } +File notifiers +============== + +EVA ICS controllers can write state events directly into external files. +Currently, the following file formats are supported: + +* **json** JSON (`NDJSON `_) +* **csv** comma-separated values + +Files are written as endless, use any external file rotator or pick up the +file manually when required. As soon as the output file is removed, a new file +is automatically created. + +Special options: + +* **auto_flush** by default, files are flushed only when rotated. By setting + this option, data is flushed immediately, after each data string (increases + disk usage) + +* **dos_cr** use DOS-style CR/LF line ending + +* **eu_numbers** use European number format for certain file formats (CSV) HTTP Notifiers ============== diff --git a/lib/eva/notify.py b/lib/eva/notify.py index 3a66956f..8bb466b0 100644 --- a/lib/eva/notify.py +++ b/lib/eva/notify.py @@ -20,6 +20,7 @@ import threading import socket import sqlalchemy as sa +from stat import ST_DEV, ST_INO import eva.registry @@ -1294,6 +1295,174 @@ def get_state(self, return list(reversed(data[1:] if sfr else data)) +class FileNotifier(GenericNotifier): + + file_formats = ['csv', 'json'] + + def __init__(self, + notifier_id, + interval=None, + file_format=None, + path=None, + dos_cr=False, + eu_numbers=False, + auto_flush=False): + if file_format not in self.file_formats: + raise RuntimeError(f'Unsupported file format: {file_format}') + notifier_type = 'file' + super().__init__(notifier_id=notifier_id, + notifier_type=notifier_type, + interval=interval) + self.path = path + self.connected = self.path is not None + self.file_stream = None + self.dos_cr = dos_cr + self.eu_numbers = eu_numbers + self.auto_flush = auto_flush + self.file_format = file_format + if file_format == 'csv': + import pytz + self.tz = pytz.timezone(time.tzname[0]) + if self.dos_cr: + self.new_line_cr = '\r\n' + else: + self.new_line_cr = '\n' + self.dev, self.ino = -1, -1 + + def test(self): + try: + open(self.path, 'a').close() + return True + except Exception as e: + self.log_error(message=e) + return False + + def start(self): + super().start() + self.file_stream = open(self.path, 'a') + self.store_fstat() + self.write_file_header() + + def stop(self): + if self.file_stream: + self.file_stream.close() + self.dev, self.ino = -1, -1 + super().stop() + + def serialize(self, props=False): + d = super().serialize(props=props) + for p in ['space', 'timeout']: + try: + del d[p] + except KeyError: + pass + d['path'] = self.path + d['dos_cr'] = self.dos_cr + d['eu_numbers'] = self.eu_numbers + d['auto_flush'] = self.auto_flush + d['format'] = self.file_format + return d + + def send_notification(self, subject, data, retain=None, unpicklable=False): + if subject == 'state': + for d in data if isinstance(data, list) else [data]: + self.write_file_string(self.format_file_string(d)) + + def store_fstat(self): + sres = os.fstat(self.file_stream.fileno()) + self.dev, self.ino = sres[ST_DEV], sres[ST_INO] + + def write_file_string(self, s): + if self.file_stream: + try: + sres = os.stat(self.path) + except FileNotFoundError: + sres = None + if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino: + if self.file_stream is not None: + self.file_stream.flush() + self.file_stream.close() + self.file_stream = open(self.path, 'a') + self.store_fstat() + self.write_file_header() + self.file_stream.write(s) + self.file_stream.write(self.new_line_cr) + if self.auto_flush: + self.file_stream.flush() + + def write_file_header(self): + if self.file_stream.tell() == 0: + if self.file_format == 'csv': + self.file_stream.write((';' if self.eu_numbers else ',').join([ + 'OID', 'Type', 'Group', 'Id', 'Timestamp', 'Time', 'Status', + 'Value' + ])) + self.file_stream.write(self.new_line_cr) + + def format_file_string(self, obj): + + def format_csv_string(s): + return '"' + s.replace('"', '""') + '"' + + def format_csv_number(n): + if self.eu_numbers: + return str(n).replace('.', ',') + else: + return str(n) + + if self.file_format == 'json': + return format_json(obj, minimal=True) + elif self.file_format == 'csv': + from datetime import datetime + s = [] + for f in ['oid', 'type', 'group', 'id']: + s.append(format_csv_string(obj.get(f, ''))) + ts = obj.get('set_time', 0) + s.append(format_csv_number(ts)) + if ts: + s.append( + datetime.fromtimestamp( + ts, self.tz).strftime('%Y-%m-%d %H:%M:%S')) + else: + s.append('') + s.append(str(obj.get('status'))) + value = obj.get('value') + try: + s.append(format_csv_number(float(value))) + except: + s.append(format_csv_string(value)) + return (';' if self.eu_numbers else ',').join(s) + + def set_prop(self, prop, value): + if prop == 'path': + self.path = value + return True + elif prop == 'dos_cr': + v = val_to_boolean(value) + if v is None: + return False + self.dos_cr = v + return True + elif prop == 'eu_numbers': + v = val_to_boolean(value) + if v is None: + return False + self.eu_numbers = v + return True + elif prop == 'auto_flush': + v = val_to_boolean(value) + if v is None: + return False + self.auto_flush = v + return True + elif prop == 'format': + if value not in self.file_formats: + return False + self.file_format = value + return True + return super().set_prop(prop, value) + + class GenericHTTPNotifier(GenericNotifier): def __init__(self, @@ -4235,6 +4404,20 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True): space=space, buf_ttl=buf_ttl, interval=interval) + elif ncfg['type'] == 'file': + path = ncfg.get('path') + file_format = ncfg.get('format') + interval = ncfg.get('interval') + dos_cr = ncfg.get('dos_cr', False) + eu_numbers = ncfg.get('eu_numbers', False) + auto_flush = ncfg.get('auto_flush', False) + n = FileNotifier(notifier_id, + path=path, + file_format=file_format, + dos_cr=dos_cr, + eu_numbers=eu_numbers, + auto_flush=auto_flush, + interval=interval) elif ncfg['type'] == 'http-json': space = ncfg.get('space') ssl_verify = ncfg.get('ssl_verify')