Skip to content

Commit

Permalink
file notifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
divi255 committed Nov 11, 2021
1 parent dd14028 commit 2524a2c
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cli/notifymanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def add_notifier_common_functions(self):
db:db_uri
timescaledb:db_uri
udp:host:port
file:file_path#<json|csv>
influxdb:http(s)://uri#[org/]database
prometheus:'''),
metavar='PROPS').completer = self.ComplNProto()
Expand Down Expand Up @@ -360,6 +361,15 @@ def create_notifier(self, params):
db_uri=db_uri,
keep=None,
space=space)
elif p[0] == 'file':
path = ':'.join(p[1:])
try:
path, file_format = path.split('#', 2)
except:
return self.local_func_result_failed
n = eva.notify.FileNotifier(notifier_id=notifier_id,
path=path,
file_format=file_format)
elif p[0] == 'timescaledb':
db_uri = ':'.join(p[1:])
n = eva.notify.TimescaleNotifier(notifier_id=notifier_id,
Expand Down Expand Up @@ -411,6 +421,8 @@ def list_notifiers(self, params):
method = getattr(i, 'method', None)
n['params'] = 'uri: {}{} '.format(
i.uri, ('#{}'.format(method) if method else ''))
elif isinstance(i, eva.notify.FileNotifier):
n['params'] = f'path: {i.path} ({i.file_format})'
elif isinstance(i, eva.notify.SQLANotifier) or isinstance(
i, eva.notify.TimescaleNotifier):
n['params'] = 'db: %s' % i.db_uri
Expand Down
22 changes: 22 additions & 0 deletions doc/notifiers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,28 @@ E.g., let's toggle *unit:equipment/cctv*:
{"jsonrpc": "2.0", "method": "action_toggle" }
File notifiers
==============

EVA ICS controllers can write state events directly into external files.
Currently, the following file formats are supported:

* **json** JSON (`NDJSON <http://ndjson.org/>`_)
* **csv** comma-separated values

Files are written as endless, use any external file rotator or pick up the
file manually when required. As soon as the output file is removed, a new file
is automatically created.

Special options:

* **auto_flush** by default, files are flushed only when rotated. By setting
this option, data is flushed immediately, after each data string (increases
disk usage)

* **dos_cr** use DOS-style CR/LF line ending

* **eu_numbers** use European number format for certain file formats (CSV)

HTTP Notifiers
==============
Expand Down
183 changes: 183 additions & 0 deletions lib/eva/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading
import socket
import sqlalchemy as sa
from stat import ST_DEV, ST_INO

import eva.registry

Expand Down Expand Up @@ -1294,6 +1295,174 @@ def get_state(self,
return list(reversed(data[1:] if sfr else data))


class FileNotifier(GenericNotifier):

file_formats = ['csv', 'json']

def __init__(self,
notifier_id,
interval=None,
file_format=None,
path=None,
dos_cr=False,
eu_numbers=False,
auto_flush=False):
if file_format not in self.file_formats:
raise RuntimeError(f'Unsupported file format: {file_format}')
notifier_type = 'file'
super().__init__(notifier_id=notifier_id,
notifier_type=notifier_type,
interval=interval)
self.path = path
self.connected = self.path is not None
self.file_stream = None
self.dos_cr = dos_cr
self.eu_numbers = eu_numbers
self.auto_flush = auto_flush
self.file_format = file_format
if file_format == 'csv':
import pytz
self.tz = pytz.timezone(time.tzname[0])
if self.dos_cr:
self.new_line_cr = '\r\n'
else:
self.new_line_cr = '\n'
self.dev, self.ino = -1, -1

def test(self):
try:
open(self.path, 'a').close()
return True
except Exception as e:
self.log_error(message=e)
return False

def start(self):
super().start()
self.file_stream = open(self.path, 'a')
self.store_fstat()
self.write_file_header()

def stop(self):
if self.file_stream:
self.file_stream.close()
self.dev, self.ino = -1, -1
super().stop()

def serialize(self, props=False):
d = super().serialize(props=props)
for p in ['space', 'timeout']:
try:
del d[p]
except KeyError:
pass
d['path'] = self.path
d['dos_cr'] = self.dos_cr
d['eu_numbers'] = self.eu_numbers
d['auto_flush'] = self.auto_flush
d['format'] = self.file_format
return d

def send_notification(self, subject, data, retain=None, unpicklable=False):
if subject == 'state':
for d in data if isinstance(data, list) else [data]:
self.write_file_string(self.format_file_string(d))

def store_fstat(self):
sres = os.fstat(self.file_stream.fileno())
self.dev, self.ino = sres[ST_DEV], sres[ST_INO]

def write_file_string(self, s):
if self.file_stream:
try:
sres = os.stat(self.path)
except FileNotFoundError:
sres = None
if not sres or sres[ST_DEV] != self.dev or sres[ST_INO] != self.ino:
if self.file_stream is not None:
self.file_stream.flush()
self.file_stream.close()
self.file_stream = open(self.path, 'a')
self.store_fstat()
self.write_file_header()
self.file_stream.write(s)
self.file_stream.write(self.new_line_cr)
if self.auto_flush:
self.file_stream.flush()

def write_file_header(self):
if self.file_stream.tell() == 0:
if self.file_format == 'csv':
self.file_stream.write((';' if self.eu_numbers else ',').join([
'OID', 'Type', 'Group', 'Id', 'Timestamp', 'Time', 'Status',
'Value'
]))
self.file_stream.write(self.new_line_cr)

def format_file_string(self, obj):

def format_csv_string(s):
return '"' + s.replace('"', '""') + '"'

def format_csv_number(n):
if self.eu_numbers:
return str(n).replace('.', ',')
else:
return str(n)

if self.file_format == 'json':
return format_json(obj, minimal=True)
elif self.file_format == 'csv':
from datetime import datetime
s = []
for f in ['oid', 'type', 'group', 'id']:
s.append(format_csv_string(obj.get(f, '')))
ts = obj.get('set_time', 0)
s.append(format_csv_number(ts))
if ts:
s.append(
datetime.fromtimestamp(
ts, self.tz).strftime('%Y-%m-%d %H:%M:%S'))
else:
s.append('')
s.append(str(obj.get('status')))
value = obj.get('value')
try:
s.append(format_csv_number(float(value)))
except:
s.append(format_csv_string(value))
return (';' if self.eu_numbers else ',').join(s)

def set_prop(self, prop, value):
if prop == 'path':
self.path = value
return True
elif prop == 'dos_cr':
v = val_to_boolean(value)
if v is None:
return False
self.dos_cr = v
return True
elif prop == 'eu_numbers':
v = val_to_boolean(value)
if v is None:
return False
self.eu_numbers = v
return True
elif prop == 'auto_flush':
v = val_to_boolean(value)
if v is None:
return False
self.auto_flush = v
return True
elif prop == 'format':
if value not in self.file_formats:
return False
self.file_format = value
return True
return super().set_prop(prop, value)


class GenericHTTPNotifier(GenericNotifier):

def __init__(self,
Expand Down Expand Up @@ -4235,6 +4404,20 @@ def load_notifier(notifier_id, ncfg=None, test=True, connect=True):
space=space,
buf_ttl=buf_ttl,
interval=interval)
elif ncfg['type'] == 'file':
path = ncfg.get('path')
file_format = ncfg.get('format')
interval = ncfg.get('interval')
dos_cr = ncfg.get('dos_cr', False)
eu_numbers = ncfg.get('eu_numbers', False)
auto_flush = ncfg.get('auto_flush', False)
n = FileNotifier(notifier_id,
path=path,
file_format=file_format,
dos_cr=dos_cr,
eu_numbers=eu_numbers,
auto_flush=auto_flush,
interval=interval)
elif ncfg['type'] == 'http-json':
space = ncfg.get('space')
ssl_verify = ncfg.get('ssl_verify')
Expand Down

0 comments on commit 2524a2c

Please sign in to comment.