Skip to content

Commit

Permalink
Add proof of concept for timeout-based reassembly machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 23, 2018
1 parent ee19220 commit 8270ec4
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CHANGES
in progress
===========
- Upgrade software test framework to nose2
- Add proof of concept for timeout-based reassembly machinery


2018-07-11 0.12.3
Expand Down
172 changes: 167 additions & 5 deletions beradio/message.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
# -*- coding: utf-8 -*-
# (c) 2015 Richard Pobering <einsiedlerkrebs@netfrag.org>
# (c) 2015 Andreas Motl, Elmyra UG <andreas.motl@elmyra.de>
from pprint import pformat
# (c) 2015 Richard Pobering <richard@hiveeyes.org>
# (c) 2015-2018 Andreas Motl <andreas@hiveeyes.org>
import json
import time
import logging
import threading
from collections import OrderedDict
from pprint import pformat, pprint

from beradio.protocol import BERadioProtocol2

logger = logging.getLogger(__name__)


class BERadioMessage(object):

protocol_factory = BERadioProtocol2
Expand Down Expand Up @@ -62,12 +71,165 @@ def __repr__(self):
data = {
'#': self.nodeid,
'_': self.profile,
}
}
data.update(self.payload)
return pformat(data)

@classmethod
def decode(cls, payload):
message = cls.protocol_factory().decode(payload)
message['data'] = dict(message['data'])
message['data'] = OrderedDict(message['data'])
return message

@classmethod
def json(cls, payload):
message = cls.protocol_factory().decode(payload)
return json.dumps(message, sort_keys=True, indent=4)


class ResettableTimer:

def __init__(self, interval, function):
self.interval = interval
self.function = function
self.make_timer()

def make_timer(self):
self.timer = threading.Timer(self.interval, self.function)
self.timer.start()

def reset(self):

if self.timer is None:
self.make_timer()
else:
if not self.timer.finished.is_set():
self.timer.cancel()

self.make_timer()


class BERadioMessageDecoder(object):

REASSEMBLY_TIMEOUT = 2.5

# Collector mode: Wait until hitting REASSEMBLY_TIMEOUT, then return complete reassembled messages
MODE_COLLECT = 1

# Emitter mode: Emit reassembled fragments to all subscribers after hitting REASSEMBLY_TIMEOUT
MODE_EMIT = 2

def __init__(self, reassembly_timeout=None):
self.reassembly_timeout = reassembly_timeout or self.REASSEMBLY_TIMEOUT
self.mode = self.MODE_COLLECT
self.timer = ResettableTimer(self.reassembly_timeout, self.release)
self.ready = threading.Event()
self.subscribers = []
self.messages = []

def subscribe(self, subscriber):
self.mode = self.MODE_EMIT
self.subscribers.append(subscriber)

def release(self):
logger.info('BERadioMessageDecoder.release')

# Signal readiness
self.ready.set()
time.sleep(0.1)

if self.mode == self.MODE_EMIT:
self.emit()

def emit(self):
logger.info('BERadioMessageDecoder.emit')

# Inform all subscribers
payload = self.assemble()
for subscriber in self.subscribers:
subscriber(payload)

# Clear buffer for next iteration
self.messages = []

def read(self, payload):
logger.info('BERadioMessageDecoder.read')

# Clear readiness indicator
self.ready.clear()

# Reset the timer
self.timer.reset()

# Collect the payload fragment
self.messages.append(payload)

def wait(self):

if self.mode == self.MODE_EMIT:
raise NotImplementedError(
'The message decoder does not support waiting for complete packets when running in emitter mode')

logger.info('BERadioMessageDecoder.wait')

# Wait for reassembly timeout
self.ready.wait()

return self.assemble()

def to_json(self, strip_time=False):
data = self.wait()
payload = json.dumps(data, sort_keys=True, indent=4)
return payload

def assemble(self):

if not self.messages:
return

data = OrderedDict()
for payload in self.messages:

message = BERadioMessage.decode(payload)
#message['payload'] = payload

node_id = message['meta']['node']

if node_id not in data:
data[node_id] = message
data[node_id]['messages'] = [payload]
else:
data[node_id]['meta'].update(message['meta'])
data[node_id]['data'].update(message['data'])
data[node_id]['messages'].append(payload)

return data


if __name__ == '__main__':

logging.basicConfig(level=logging.INFO)

parts = []
parts.append('d1:#i2e1:_2:h11:tli2168ei1393ei3356ei1468ei1700ee1:hlee')
parts.append('d1:#i2e1:_2:h12:h0li8370ee1:wli53503600ei2590ee1:lli15100eee')
parts.append('d1:#i3e1:_2:h11:tli2168ee2:h0li930ee1:wli4242eee')
parts.append('d1:#i2e1:_2:h11:rli-6600eee')

def emitter(data):
logger.info('Emit:\n{}'.format(json.dumps(data, indent=4)))

decoder = BERadioMessageDecoder()
#decoder.subscribe(emitter)

decoder.read(parts[0])
time.sleep(1.5)

decoder.read(parts[1])
time.sleep(1.5)

decoder.read(parts[2])
decoder.read(parts[3])

data = decoder.wait()
logger.info('Data:\n{}'.format(json.dumps(data, indent=4)))
166 changes: 166 additions & 0 deletions beradio/test/reassembly.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#####################
BERadioMessageDecoder
#####################


************
Introduction
************
The `BERadio C++`_ encoder library provides automatic message fragmentation
yielding self-contained messages fitting into a defined maximum payload size.
Its default setting is ``MTU_SIZE_MAX 61``, making it suitable for typical ISM
applications.

The ``BERadioMessageDecoder`` is a decoder machinery for reassembling
fragmented BERadio messages into whole packets.

It works by waiting for a defined amount of time for new messages arriving.
Its default setting is ``REASSEMBLY_TIMEOUT = 2.5``, making it suitable to
reassemble messages spread across this time window into compound data packages.
After the defined time, ``BERadioMessageDecoder`` will release these
data packages for consumption by downstream software components.

.. _BERadio C++: https://hiveeyes.org/docs/arduino/BERadio/README.html



*************
Demonstration
*************
Let's show the functionality of the proof-of-concept.

.. highlight:: python


BERadio messages
================

Let's pretend there are four messages in flight:

- Three message fragments will arrive from node 2 containing values
from a temperature array (temp1-5), two weight values (wght1-2),
a single one for each humidity (hum1) and rssi (rssi1) and one
cpu cycle counter (loop1).
- There's another message from node 3 containing values for
temp1, hum1 and wght1.

Let's define these messages::

>>> messages = [
... 'd1:#i2e1:_2:h11:tli2168ei1393ei3356ei1468ei1700ee1:hlee',
... 'd1:#i2e1:_2:h12:h0li8370ee1:wli53503600ei2590ee1:lli15100eee',
... 'd1:#i3e1:_2:h11:tli2168ee2:h0li930ee1:wli4242eee',
... 'd1:#i2e1:_2:h11:rli-6600eee',
... ]

Decoding the first message is easy and will get you an
understanding about what's actually inside::

>>> from beradio.message import BERadioMessage
>>> print BERadioMessage.json(str(messages[0])) # doctest: +ELLIPSIS
{
"data": {
"temp1": 21.68,
"temp2": 13.93,
"temp3": 33.56,
"temp4": 14.68,
"temp5": 17.0
},
"meta": {
"gateway": "None",
"network": "None",
"node": "2",
"profile": "h1",
"protocol": "beradio2",
"time": ...
}
}


Setup machinery
===============

.. testsetup::

>>> import time

Import the decoder module

>>> from beradio.message import BERadioMessageDecoder

and make an instance of it

>>> decoder = BERadioMessageDecoder()


Getting started
===============
Let's stuff the first two messages into the decoder,
simulating a transmission delay after each one:

>>> decoder.read(messages[0])
>>> time.sleep(0.5)

>>> decoder.read(messages[1])
>>> time.sleep(0.5)

Let's just stuff the remaining two messages into the decoder quickly to reduce runtime:

>>> decoder.read(messages[2])
>>> decoder.read(messages[3])


Reassembly
==========
After reassembling, all the data received during the default
time window of 2.5 seconds will be available as a whole bunch,
keyed by node id. Enjoy:

>>> print decoder.to_json()
{
"2": {
"data": {
"hum1": 83.7,
"loops1": 151.0,
"rssi1": -66.0,
"temp1": 21.68,
"temp2": 13.93,
"temp3": 33.56,
"temp4": 14.68,
"temp5": 17.0,
"wght1": 535036.0,
"wght2": 25.9
},
"messages": [
"d1:#i2e1:_2:h11:tli2168ei1393ei3356ei1468ei1700ee1:hlee",
"d1:#i2e1:_2:h12:h0li8370ee1:wli53503600ei2590ee1:lli15100eee",
"d1:#i2e1:_2:h11:rli-6600eee"
],
"meta": {
"gateway": "None",
"network": "None",
"node": "2",
"profile": "h1",
"protocol": "beradio2",
"time": ...
}
},
"3": {
"data": {
"hum1": 9.3,
"temp1": 21.68,
"wght1": 42.42
},
"messages": [
"d1:#i3e1:_2:h11:tli2168ee2:h0li930ee1:wli4242eee"
],
"meta": {
"gateway": "None",
"network": "None",
"node": "3",
"profile": "h1",
"protocol": "beradio2",
"time": ...
}
}
}
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Feel welcome to join us!
:maxdepth: 1

test/beradio
test/reassembly


.. toctree::
Expand Down

0 comments on commit 8270ec4

Please sign in to comment.