Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/UMONS-GFA/ardas
Browse files Browse the repository at this point in the history
  • Loading branch information
bastinc committed Feb 13, 2018
2 parents eabbeda + 66666f4 commit 7affcb2
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 73 deletions.
16 changes: 16 additions & 0 deletions ardas/get_git_version.py
@@ -0,0 +1,16 @@
import subprocess
import os


def get_version():
""" Get the current version from git info
:return: a string foramted as branch_name | last_named_tag-number_of_commits_ahead-commit_number
"""
dir = os.path.dirname(__file__)
branch = subprocess.check_output('cd ' + dir + '; git rev-parse --abbrev-ref HEAD', shell=True)
version = subprocess.check_output('cd ' + dir + '; git describe --long --dirty --abbrev=6 --tags', shell=True)
return branch.decode('ascii')[:-1] + ' | ' + version.decode('ascii')[:-1]

if __name__ == '__main__':
print(get_version())
109 changes: 96 additions & 13 deletions ardas/influxdb_events.py
Expand Up @@ -7,28 +7,51 @@
from ardas.settings import DATABASE, DATA_LOGGING_CONFIG, LOGGING_CONFIG


def influxdb_log_event(influxdb_client, tags, text, title, msg_logger):
event_time = datetime.datetime.utcnow().isoformat()
event = [{'measurement': 'events', 'time': event_time,
'fields': {'value': 1, 'tags': tags, 'text': text, 'title': title}
def influxdb_log_event(influxdb_client, title, default_tags, event_args, msg_logger):
""" Log an event in the influxdb series
:param influxdb_client:
:param title:
:param default_tags:
:param event_args:
:param msg_logger:
:param event_time:
:return:
"""
decoded_event_args = decode_event_args(event_args, default_tags)
event = [{'measurement': 'events', 'time': decoded_event_args['event_time'],
'fields': {'value': 1, 'tags': decoded_event_args['tags'], 'text': decoded_event_args['comment'],
'title': title}
}]
msg_logger.debug('Writing event "%s" with tag(s) "%s" to %s @%s' % (text, tags, DATABASE['dbname'], event_time))
msg_logger.debug('Writing event "%s: %s" with tag(s) "%s" to %s @%s'
% (title, decoded_event_args['comment'], decoded_event_args['tags'], DATABASE['dbname'],
decoded_event_args['event_time']))
event_written = False
while not event_written:
try:
influxdb_client.write_points(event)
event_written = True
except Exception as e:
msg_logger.error(e)
msg_logger.debug('Unable to store event "%s" with tag(s) "%s" to %s @%s' % (text, tags, DATABASE['dbname'],
event_time))
msg_logger.debug('Unable to store event "%s: %s" with tag(s) "%s" to %s @%s'
% (title, decoded_event_args['comment'], decoded_event_args['tags'], DATABASE['dbname'],
decoded_event_args['event_time']))
sleep(5)
msg_logger.debug('Retry storing event "%s" with tag(s) "%s" to %s @%s' % (text, tags, DATABASE['dbname'],
event_time))
msg_logger.info('Event "%s" with tag(s) "%s" written to %s @%s' % (text, tags, DATABASE['dbname'], event_time))
msg_logger.debug('Retry storing event "%s: %s" with tag(s) "%s" to %s @%s'
% (title, decoded_event_args['comment'], decoded_event_args['tags'], DATABASE['dbname'],
decoded_event_args['event_time']))
msg_logger.info('Event "%s" with tag(s) "%s" written to %s @%s'
% (decoded_event_args['comment'], decoded_event_args['tags'], DATABASE['dbname'],
decoded_event_args['event_time']))


def influxdb_clean_events(influxdb_client, msg_logger):
""" Erase all events in influxdb series
:param influxdb_client:
:param msg_logger:
:return:
"""
msg_logger.info('Deleting all events from %s' % DATABASE['dbname'])
events_deleted = False
while not events_deleted:
Expand All @@ -41,6 +64,65 @@ def influxdb_clean_events(influxdb_client, msg_logger):
msg_logger.info('Events successfully deleted')


def decode_event_args(event_args, default_tags='message'):
""" Decodes event arguments of the type [comment text] [-tags tag1 tag2 ...] [-datetime YYYY mm dd HH MM SS]
:param event_args: string that contain the event arguments
:param default_tags: comma separated default tags to add to tags given in arguments
:return: a dict of
"""
s = event_args.strip()
comment = ''
if s.find('-') == 0: # command args starting with an option (no comment)
msg_split = 0
else:
msg_split = s.find(' -')
if msg_split == -1:
msg_split = len(s)
comment = s[:msg_split].strip()
if msg_split > 0:
opts = [i.split(' ') for i in s[msg_split+2:].split(' -')]
elif msg_split == 0:
opts = [i.split(' ') for i in s[msg_split+1:].split(' -')]
else:
opts = None
decoded_event_args = {}
if opts is not None:
for i in opts:
if i[0] != '':
option = i[0]
values = i[1:]
decoded_event_args.update({option: values})
tags = decoded_event_args.get('tags', None)
if type(tags) is not list:
decoded_event_args['tags'] = default_tags
else:
tags = []
for i in default_tags.split(','):
tags.append(i.strip())
for i in decoded_event_args['tags']:
tags.append(i.strip())
tags = ', '.join([i for i in tags])
decoded_event_args['tags'] = tags
event_time = decoded_event_args.get('datetime', None)
if event_time is not None:
decoded_event_args.pop('datetime')
event_time = [int(i) for i in event_time]
if len(event_time) < 6:
for i in range(len(event_time),6):
event_time[i] = 0
elif len(event_time) > 6:
event_time = event_time[0:6]
event_time = datetime.datetime(event_time[0], event_time[1], event_time[2], event_time[3], event_time[4], event_time[5])
else:
event_time = datetime.datetime.utcnow()
decoded_event_args['event_time'] = event_time.isoformat()
if comment == '':
comment = 'No comment...'
decoded_event_args['comment'] = comment
return decoded_event_args


if __name__ == '__main__':
stop = False
influxdb_clean = False
Expand Down Expand Up @@ -89,7 +171,8 @@ def influxdb_clean_events(influxdb_client, msg_logger):
if influxdb_clean:
influxdb_clean_events(influxdb_client=influxdb_client, msg_logger=msg_logger)
else:
tags = 'Info, Message, Test'
text = 'This is a test'
tags = 'info, message'
event_args = 'This is a test of datetime argument 4°C -tags test enhancement -datetime 2018 02 12 16 49 00'
title = 'TITLE'
influxdb_log_event(influxdb_client=influxdb_client, text=text, tags=tags, title=title, msg_logger=msg_logger)
influxdb_log_event(influxdb_client=influxdb_client, title=title, default_tags=tags,
event_args=event_args, msg_logger=msg_logger)
118 changes: 58 additions & 60 deletions ardas/raspardas.py
Expand Up @@ -13,8 +13,9 @@
from influxdb import InfluxDBClient
from ardas.compressed_sized_timed_rotating_logger import CompressedSizedTimedRotatingFileHandler
from ardas.influxdb_events import influxdb_log_event
from ardas.get_git_version import get_version

version = 'v1.1.3.2'
version = get_version()

# setup loggers
# Message logging setup
Expand Down Expand Up @@ -82,9 +83,8 @@
chunk_size = 4096
raw_data = ARDAS_CONFIG['raw_data_on_disk']

peer_download = False # TODO: find a way to set peer_download to True if another ardas is downloading at startup
downloading = False
pause = True # used to suspend data logging during some operations such as start_sequence
starting = True
stop = False

slave_queue = queue.Queue() # what comes from ArDAS
Expand Down Expand Up @@ -166,7 +166,9 @@ def init_logging():
f.truncate()
except Exception as e:
msg_logger.error('Unable to read restart_msg.txt: %s' % e)
influxdb_log_event(influxdb_client=client, tags='start', text=text, title=title, msg_logger=msg_logger)
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'start',
event_args=text, msg_logger=msg_logger)
except Exception as e:
msg_logger.error('*** Unable to log to database %s: %s' % (DATABASE['dbname'], e))

Expand All @@ -184,8 +186,7 @@ def listen_slave():
infinite loop, and only exit when
the main thread ends.
"""
global stop, downloading, slave_io, slave_queue, msg_logger

global stop, slave_io, slave_queue, msg_logger, master_online, starting
# slave_io.reset_input_buffer()
# slave_io.reset_output_buffer()
msg_logger.debug('Initiating listen_slave thread...')
Expand Down Expand Up @@ -217,15 +218,15 @@ def listen_slave():
msg_logger.debug('Slave says : ' + msg[0:msg_end].decode('ascii', 'replace'))
except Exception as e:
msg_logger.warning('*** listen_slave thread - Unable to decode slave message: %s' % e)
if not downloading:
if master_online or starting:
slave_queue.put(msg[0:msg_end])
msg = msg[msg_end+1:]
msg_end = msg.find(b'\r')
msg_logger.debug('listen_slave: message no rec no msg: %s' % msg.decode('ascii', errors='ignore'))
byte = slave_io.read(1)
sleep(0.95)
except queue.Full:
msg_logger.warning('*** Data or slave queue is full!')
msg_logger.warning('*** Slave queue is full!')
except serial.SerialTimeoutException:
pass
msg_logger.debug('Closing listen_slave thread...')
Expand Down Expand Up @@ -352,9 +353,10 @@ def connect_master():
master_connection, addr = master_socket.accept()
msg_logger.info('Master connected, addr: ' + str(addr))
title = 'Connection by user'
text = 'addr: ' + str(addr)
influxdb_log_event(influxdb_client=client, tags='connection', text=text, title=title,
msg_logger=msg_logger)
event_args = 'addr: ' + str(addr)
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'connection',
event_args=event_args, msg_logger=msg_logger)
master_online = True
except Exception as e:
msg_logger.error('*** Master connection error: %s' % e)
Expand Down Expand Up @@ -391,6 +393,7 @@ def listen_master():
msg_logger.debug('Master says: ' + msg.decode('ascii'))
except Exception as e:
msg_logger.warning('*** listen_master thread - Unable to decode master message: %s ...' % e)
event_args = ''
if msg[:-1] == b'#XB':
msg_logger.info('Full download is not available')
elif msg[:-1] == b'#XP':
Expand All @@ -403,21 +406,19 @@ def listen_master():
if pause:
title = 'Resumed by user'
if len(msg) > 4:
text = msg[4:-1].decode('utf-8')
else:
text = 'No message'
msg_logger.info(text)
influxdb_log_event(influxdb_client=client, tags='resume', text=text, title=title,
msg_logger=msg_logger)
event_args = msg[4:-1].decode('utf-8')
msg_logger.info('%s: %s' %(title, event_args))
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'resume',
event_args=event_args, msg_logger=msg_logger)
else:
text = 'Pause'
title = 'Paused by user'
if len(msg) > 4:
text = msg[4:-1].decode('utf-8')
else:
text = 'No message'
msg_logger.info(text)
influxdb_log_event(influxdb_client=client, tags='pause', text=text, title=title,
msg_logger=msg_logger)
event_args = msg[4:-1].decode('utf-8')
msg_logger.info('%s: %s' % (title, event_args))
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'pause',
event_args=event_args, msg_logger=msg_logger)
pause = not pause
elif msg[:-1] == b'#RC':
raw_data = not raw_data
Expand All @@ -428,31 +429,28 @@ def listen_master():
elif msg[:3] == b'#KL':
title = 'Stopped by user'
if len(msg) > 4:
text = msg[4:-1].decode('utf-8')
else:
text = 'No message'
msg_logger.info(text)
influxdb_log_event(influxdb_client=client, tags='stop', text=text, title=title,
msg_logger=msg_logger)
event_args = msg[4:-1].decode('utf-8')
msg_logger.info('%s: %s' % (title, event_args))
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'stop',
event_args=event_args, msg_logger=msg_logger)
stop = True
elif msg[:3] == b'#MS':
title = 'Message from user'
if len(msg) > 4:
text = msg[4:-1].decode('utf-8')
else:
text = 'No message'
msg_logger.info(text)
influxdb_log_event(influxdb_client=client, tags='message', text=text, title=title,
msg_logger=msg_logger)
event_args = msg[4:-1].decode('utf-8')
msg_logger.info('%s: %s' % (title, event_args))
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'message',
event_args=event_args, msg_logger=msg_logger)
elif msg[:3] == b'#QT':
title = 'Connection ended by user'
if len(msg) > 4:
text = msg[4:-1].decode('utf-8')
else:
text = 'No message'
msg_logger.info(text)
influxdb_log_event(influxdb_client=client, tags='end connection', text=text, title=title,
msg_logger=msg_logger)
event_args = msg[4:-1].decode('utf-8')
msg_logger.info('%s: %s' % (title, event_args))
influxdb_log_event(influxdb_client=client, title=title,
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'end connection',
event_args=event_args, msg_logger=msg_logger)
master_online = False
else:
master_queue.put(msg)
Expand All @@ -477,21 +475,19 @@ def talk_master():
infinite loop, and only exit when
the main thread ends.
"""
global stop, downloading, master_connection, slave_queue, master_online, msg_logger
global stop, master_connection, slave_queue, master_online, msg_logger

msg_logger.debug('Initiating talk_master thread...')
while not stop:
if master_online:
try:
if not downloading:
msg = slave_queue.get()
try:
msg_logger.debug('Saying to master :' + msg.decode('utf-8'))
master_connection.send(msg)
except Exception as e:
msg_logger.warning('*** talk_master thread - Unable to decode slave message: %s' % e)
master_connection.send(msg)

msg = slave_queue.get()
try:
msg_logger.debug('Saying to master :' + msg.decode('utf-8'))
master_connection.send(msg)
except Exception as e:
msg_logger.warning('*** talk_master thread - Unable to decode slave message: %s' % e)
master_connection.send(msg)
except queue.Empty:
pass
except Exception as e:
Expand All @@ -502,7 +498,7 @@ def talk_master():


def start_sequence():
global master_queue, slave_queue, n_channels, msg_logger
global master_queue, slave_queue, n_channels, msg_logger, starting

msg_logger.debug('Initiating start sequence...')
msg_logger.debug('____________________________')
Expand Down Expand Up @@ -621,8 +617,9 @@ def start_sequence():
msg_logger.debug('start_sequence : No proper reply received yet...')
except Exception as e:
msg_logger.debug('start_sequence : Unable to get date and time from to NTP: %s' % e)
msg_logger.debug('Start sequence finished...')
msg_logger.debug('__________________________')
msg_logger.debug('Start sequence completed...')
msg_logger.debug('___________________________')
starting = False


if __name__ == '__main__':
Expand All @@ -634,8 +631,9 @@ def start_sequence():
net_id = ARDAS_CONFIG['net_id']
integration_period = ARDAS_CONFIG['integration_period']
pause = True
influxdb_log_event(influxdb_client=client, tags='pause', text='reconfiguration started',
title='Pause logging', msg_logger=msg_logger)
influxdb_log_event(influxdb_client=client, title='Pause logging',
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'pause',
event_args='reconfiguration started', msg_logger=msg_logger)
slave_talker = Thread(target=talk_slave)
slave_talker.setDaemon(True)
slave_talker.start()
Expand All @@ -656,10 +654,10 @@ def start_sequence():
master_listener = Thread(target=listen_master)
master_listener.setDaemon(True)
master_listener.start()
influxdb_log_event(influxdb_client=client, title='Resume logging',
default_tags=ARDAS_CONFIG['net_id'] + ',' + 'resume',
event_args='reconfiguration complete', msg_logger=msg_logger)
pause = False
influxdb_log_event(influxdb_client=client, tags='resume', text='reconfiguration ended',
title='Resume logging', msg_logger=msg_logger)

msg_logger.info('*** Starting logging... ***')

while not stop:
Expand Down

0 comments on commit 7affcb2

Please sign in to comment.