Skip to content

Commit

Permalink
OemGatewayBuffer: sending period
Browse files Browse the repository at this point in the history
Send data to server once every N seconds
"period" parameter is added, config file adaptation required

This commit sort of reverts 2d96eea
  • Loading branch information
Jérôme Lafréchoux committed Oct 25, 2013
1 parent c50c14e commit a6c75ef
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 37 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -204,5 +204,6 @@ None
* domain (e.g. emoncms.org)
* path (e.g. /emoncms)
* apikey
* period: period between two data sendings (in seconds)
* active: if False, neither record nor send data, but hold unsent data.

2 changes: 2 additions & 0 deletions oemgateway.conf.dist
Expand Up @@ -62,6 +62,7 @@ loglevel = DEBUG
apikey = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
protocol = http://
active = True
period = 0
path = /emoncms

[[emoncms_remote]]
Expand All @@ -72,5 +73,6 @@ loglevel = DEBUG
apikey = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
protocol = http://
active = True
period = 30
path = /emoncms

79 changes: 42 additions & 37 deletions oemgatewaybuffer.py
Expand Up @@ -22,13 +22,14 @@
class OemGatewayBuffer(object):

def __init__(self):
"""Create a server data buffer initialized with server settings."""
"""Create a server data buffer."""

# Initialize logger
self._log = logging.getLogger("OemGateway")

# Initialize variables
self._data_buffer = []
self._last_send = time.time()
self._settings = {}

def set(self, **kwargs):
Expand All @@ -39,6 +40,7 @@ def set(self, **kwargs):
domain (string): domain name (eg: 'domain.tld')
path (string): emoncms path with leading slash (eg: '/emoncms')
apikey (string): API key with write access
period (string): sending interval in seconds
active (string): whether the data buffer is active (True/False)
"""
Expand All @@ -53,27 +55,25 @@ def add(self, data):
"""

# Check buffer is active
if self._settings['active'] == 'False':
return

# Timestamp = time now
t = int(time.time())
# Timestamp = now
timestamp = time.time()

self._log.debug("Server " +
self._settings['domain'] + self._settings['path'] +
" -> buffer data: " + str(data) +
", timestamp: " + str(t))
", timestamp: " + str(timestamp))

# Append data set [timestamp, [node, val1, val2, val3,...]]
# to _data_buffer
self._data_buffer.append([t, data])
self._data_buffer.append([timestamp, data])

def _send_data(self, data, time):
def _send_data(self):
"""Send data to server.
data (list): node and values (eg: '[node,val1,val2,...]')
time (int): timestamp, time when sample was recorded
return True if data sent correctly
To be implemented in subclass.
Expand All @@ -82,22 +82,26 @@ def _send_data(self, data, time):
pass

def flush(self):
"""Send oldest data in buffer, if any."""
"""Send data in buffer, if any."""

# Check buffer is active
if self._settings['active'] == 'False':
return

# Buffer management
# If data buffer not empty, send a set of values
# Check sending period
if (time.time() - self._last_send < int(self._settings['period'])):
return

# If data buffer not empty, send data
if self._data_buffer != []:
time, data = self._data_buffer[0]
self._log.debug("Server " +
self._settings['domain'] + self._settings['path'] +
" -> send data: " + str(data) +
", timestamp: " + str(time))
if self._send_data(data, time):
# In case of success, delete sample set from buffer
del self._data_buffer[0]
self._log.debug("Server " + self._settings['domain'] +
self._settings['path'] + " -> flush buffer")

self._send_data()

# Update time of last data sending
self._last_send = time.time()

# If buffer size reaches maximum, trash oldest values
# TODO: optionnal write to file instead of losing data
size = len(self._data_buffer)
Expand All @@ -111,30 +115,29 @@ def flush(self):
"""
class OemGatewayEmoncmsBuffer(OemGatewayBuffer):

def _send_data(self, data, time):
def _send_data(self):
"""Send data to server."""

# Prepare data string with the values in data buffer
data_string = ''
# Timestamp
data_string += '&time=' + str(time)
# Node ID
data_string += '&node=' + str(data[0])
# Data
data_string += '&json={'
for i, val in enumerate(data[1:]):
data_string += str(i+1) + ':' + str(val)
data_string += ','
# Remove trailing comma and close braces
data_string = data_string[0:-1]+'}'
data_string = '['
for (timestamp, data) in self._data_buffer:
data_string += '['
data_string += str(int(round(timestamp-time.time())))
for sample in data:
data_string += ','
data_string += str(sample)
data_string += '],'
# Remove trailing comma and close bracket
data_string = data_string[0:-1]+']'

self._log.debug("Data string: " + data_string)

# Prepare URL string of the form
# 'http://domain.tld/emoncms/input/post.json?apikey=12345
# &node=10&json={1:1806, 2:1664}'
# 'http://domain.tld/emoncms/input/bulk.json?apikey=
# 12345&data=[[-10,10,1806],[-5,10,1806],[0,10,1806]]'
url_string = self._settings['protocol'] + self._settings['domain'] + \
self._settings['path'] + '/input/post.json?apikey=' + \
self._settings['apikey'] + data_string
self._settings['path'] + "/input/bulk.json?apikey=" + \
self._settings['apikey'] + "&data=" + data_string
self._log.debug("URL string: " + url_string)

# Send data to server
Expand All @@ -157,6 +160,8 @@ def _send_data(self, data, time):
else:
if (result.readline() == 'ok'):
self._log.debug("Send ok")
# Send ok -> empty buffer
self._data_buffer = []
return True
else:
self._log.warning("Send failure")
Expand Down
2 changes: 2 additions & 0 deletions oemgatewayinterface.py
Expand Up @@ -204,6 +204,7 @@ def check_settings(self):
'domain': self._local_domain,
'path': self._local_path,
'apikey': emoncms_s['apikey'],
'period': '0',
'active': 'True'}
# Remote
settings['buffers']['emoncms_remote'] = \
Expand All @@ -215,6 +216,7 @@ def check_settings(self):
'domain': emoncms_s['remotedomain'],
'path': emoncms_s['remotepath'],
'apikey': emoncms_s['remoteapikey'],
'period': '30',
'active': emoncms_s['remotesend']}

# Return True if settings modified
Expand Down

0 comments on commit a6c75ef

Please sign in to comment.