Skip to content

Commit

Permalink
[DA] Add per-device addressing and topic decoding strategies
Browse files Browse the repository at this point in the history
This aims towards bringing in multi-tenant channel bundle endpoints,
for example suitable to accepting TTS/TTN payloads, in order to run
a network of multiple devices on a single TTN application conveniently.

The feature is called "device addressing" (DA).

Examples:

- mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
- mqttkit-1/dt/network-gateway-node

It works with both MQTT and HTTP protocols.
  • Loading branch information
amotl committed Jun 7, 2023
1 parent 2a0b51c commit b7ea48c
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ in progress
- [TTN] Add TTS (The Things Stack) / TTN (The Things Network) decoder.
Thanks, @thiasB and @u-l-m-i.
- [TTN] Decode metadata from full TTN payload. Thanks, @thiasB.
- [DA] Add per-device addressing and topic decoding strategies. Thanks, @thiasB.
``mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000``
``mqttkit-1/dt/network-gateway-node``


.. _kotori-0.27.0:

Expand Down
51 changes: 42 additions & 9 deletions kotori/daq/strategy/wan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl, <andreas@getkotori.org>
# (c) 2015-2023 Andreas Motl, <andreas@getkotori.org>
import re

from kotori.daq.strategy import StrategyBase
Expand All @@ -9,12 +9,13 @@
class WanBusStrategy(StrategyBase):

# Regular expression pattern for decoding MQTT topic address segments.
pattern = r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$'
matcher = re.compile(pattern)
channel_matcher = re.compile(r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$')
device_matcher_dashed_topo = re.compile(r'^(?P<realm>.+?)/dt/(?P<device>.+?)(?:/(?P<slot>.+?))?$')
device_matcher_generic = re.compile(r'^(?P<realm>.+?)/d/(?P<device>.+?)(?:/(?P<slot>.+?))?$')

def topic_to_topology(self, topic):
"""
Decode MQTT topic segments implementing the »quadruple hierarchy strategy«.
Decode MQTT topic segments implementing the »quadruple hierarchy/topology strategy«.
The topology hierarchy is directly specified by the MQTT topic and is
made up of a minimum of four identifiers describing the core structure::
Expand All @@ -40,16 +41,48 @@ def topic_to_topology(self, topic):
- "node" is the node identifier. Choose anything you like. This usually
gets transmitted from an embedded device node.
Other than decoding the classic WAN path topic style, like
mqttkit-1/network/gateway/node
the decoder now also knows how to handle per-device addressing schemes, like
mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
Also, it has another special topic decoding scheme, by decomposing a dashed
identifier, and transforming it into the quadruple hierarchy.
mqttkit-1/dt/network-gateway-node
"""

# Decode the topic.
m = self.matcher.match(topic)
address = None
m = self.device_matcher_generic.match(topic)
if m:
address = SmartMunch(m.groupdict())
else:
address = {}

return address
if "device" in address:
address.network = "devices"
address.gateway = "default"
address.node = address.device
del address.device

if address is None:
m = self.device_matcher_dashed_topo.match(topic)
if m:
address = SmartMunch(m.groupdict())
if "device" in address:
segments = address.device.split("-")
address.network, address.gateway, address.node = segments
del address.device

if address is None:
m = self.channel_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())

return address or {}

@classmethod
def topology_to_storage(self, topology, message_type=None):
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ markers =
ttn: Tests for TTS/TTN adapter.
hiveeyes: Tests for vendor hiveeyes.
legacy: Tests for legacy endpoints and such.
device: Device-based addressing.
5 changes: 4 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kotori import KotoriBootloader
from test.util import boot_kotori, sleep
from test.settings.mqttkit import influx_sensors, influx_events, grafana
from test.settings.mqttkit import influx_sensors, influx_events, grafana, device_influx_sensors

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,4 +53,7 @@ def machinery():
reset_grafana = grafana.make_reset()
reset_influxdb_events = influx_events.make_reset_measurement()

device_create_influxdb = device_influx_sensors.make_create_db()
device_reset_influxdb = device_influx_sensors.make_reset_measurement()

machinery_basic = create_machinery('./etc/test/basic.ini')
8 changes: 8 additions & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ class TestSettings:
channel_path_event = '/mqttkit-1/itest/foo/bar/event'
channel_path_airrohr = '/mqttkit-1/itest/foo/bar/custom/airrohr'

# Per-device entrypoints.
device_influx_database = 'mqttkit_1_devices'
device_influx_measurement_sensors = 'default_123e4567_e89b_12d3_a456_426614174000_sensors'
device_mqtt_topic_generic = 'mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000/data.json'
device_mqtt_topic_dashed_topo = 'mqttkit-1/dt/itest-foo-bar/data.json'


settings = TestSettings

influx_sensors = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_sensors)
influx_events = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_events)
grafana = GrafanaWrapper(settings=settings)

device_influx_sensors = InfluxWrapper(database=settings.device_influx_database, measurement=settings.device_influx_measurement_sensors)
70 changes: 69 additions & 1 deletion test/test_daq_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from twisted.internet import threads

from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_HTTP
from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_HTTP, device_influx_sensors
from test.util import http_json_sensor, http_form_sensor, http_csv_sensor, sleep, http_raw

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,6 +45,74 @@ def test_http_json_valid(machinery, create_influxdb, reset_influxdb):
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_device_generic(machinery, device_create_influxdb, device_reset_influxdb):
"""
Run HTTP data acquisition with per-device addressing.
Addressing: Per-device WAN
Example: mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.device_http_path_generic, data)
yield deferred

# Check response.
response = deferred.result
assert response.status_code == 200
assert response.content == json.dumps([{"type": "info", "message": "Received #1 readings"}], indent=4).encode("utf-8")

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_HTTP)

# Proof that data arrived in InfluxDB.
record = device_influx_sensors.get_first_record()
del record['time']
assert record == {u'temperature': 25.26, u'humidity': 51.8}
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_device_dashed(machinery, create_influxdb, reset_influxdb):
"""
Run MQTT data acquisition with per-device dashed-topo addressing.
Addressing: Per-device WAN, with dashed topology decoding
Example: mqttkit-1/dt/network-gateway-node
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.device_http_path_dashed_topo, data)
yield deferred

# Check response.
response = deferred.result
assert response.status_code == 200
assert response.content == json.dumps([{"type": "info", "message": "Received #1 readings"}], indent=4).encode("utf-8")

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_HTTP)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_first_record()
del record['time']
assert record == {u'temperature': 25.26, u'humidity': 51.8}
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
def test_http_json_invalid(machinery):
Expand Down
67 changes: 66 additions & 1 deletion test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest_twisted
from twisted.internet import threads

from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT
from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT, device_influx_sensors
from test.util import mqtt_json_sensor, sleep, mqtt_sensor

logger = logging.getLogger(__name__)
Expand All @@ -18,6 +18,9 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb
"""
Publish single reading in JSON format to MQTT broker
and proof it is stored in the InfluxDB database.
Addressing: Classic WAN path
Example: mqttkit-1/network/gateway/node
"""

# Submit a single measurement, without timestamp.
Expand All @@ -44,6 +47,9 @@ def test_mqtt_to_influxdb_json_legacy_topic(machinery, create_influxdb, reset_in
"""
Publish single reading in JSON format to MQTT broker on legacy suffix
and proof it is stored in the InfluxDB database.
Addressing: Classic WAN path
Example: mqttkit-1/network/gateway/node
"""

# Submit a single measurement, without timestamp.
Expand All @@ -67,6 +73,9 @@ def test_mqtt_to_influxdb_discrete(machinery, create_influxdb, reset_influxdb):
"""
Publish discrete values to the MQTT broker
and proof they are stored in the InfluxDB database.
Addressing: Classic WAN path + discrete
Example: mqttkit-1/network/gateway/node/data/field
"""

# Submit discrete measurement values, without timestamp.
Expand All @@ -81,3 +90,59 @@ def test_mqtt_to_influxdb_discrete(machinery, create_influxdb, reset_influxdb):
# Proof that data arrived in InfluxDB.
record = influx_sensors.get_first_record()
assert 'temperature' in record or 'humidity' in record


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.device
def test_mqtt_to_influxdb_json_wan_device_generic(machinery, device_create_influxdb, device_reset_influxdb):
"""
Run MQTT data acquisition with per-device addressing.
Addressing: Per-device WAN
Example: mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 42.84,
'humidity': 83.1,
}
yield threads.deferToThread(mqtt_json_sensor, settings.device_mqtt_topic_generic, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = device_influx_sensors.get_first_record()
del record['time']
assert record == {u'humidity': 83.1, u'temperature': 42.84}
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.device
def test_mqtt_to_influxdb_json_wan_device_dashed(machinery, create_influxdb, reset_influxdb):
"""
Run MQTT data acquisition with per-device dashed-topo addressing.
Addressing: Per-device WAN, with dashed topology decoding
Example: mqttkit-1/dt/network-gateway-node
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 42.84,
'humidity': 83.1,
}
yield threads.deferToThread(mqtt_json_sensor, settings.device_mqtt_topic_dashed_topo, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_first_record()
del record['time']
assert record == {u'humidity': 83.1, u'temperature': 42.84}
yield record

0 comments on commit b7ea48c

Please sign in to comment.