Skip to content

Commit

Permalink
[DA] Improve usability by using clear path fragment identifier prefixes
Browse files Browse the repository at this point in the history
For the new SensorWAN "direct" addressing scheme, by getting rid of the
previous `/d` vs. `/dt` path fragments, and using `/device` vs.
`/channel` instead, it reduces confusion about the meaning of those
prefixes, and makes usage less error prone.
  • Loading branch information
amotl committed Jun 7, 2023
1 parent 55ed889 commit b11524b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 46 deletions.
8 changes: 5 additions & 3 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ in progress
- [TTN] Add TTS (The Things Stack) / TTN (The Things Network) decoder.
Thanks, @thiasB and @u-l-m-i.
- [TTN] Decode metadata from full TTN payload. Thanks, @thiasB.
- [DA] Add per-device addressing and topic decoding strategies. Thanks, @thiasB.
``mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000``
``mqttkit-1/dt/network-gateway-node``
- [DA] Add per-device addressing and topic decoding strategies. Thanks,
@thiasB and @ClemensGruber. Examples:

- ``mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000``
- ``mqttkit-1/channel/network-gateway-node``


.. _kotori-0.27.0:
Expand Down
24 changes: 12 additions & 12 deletions kotori/daq/strategy/wan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
class WanBusStrategy(StrategyBase):

# Regular expression pattern for decoding MQTT topic address segments.
channel_matcher = re.compile(r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$')
device_matcher_dashed_topo = re.compile(r'^(?P<realm>.+?)/dt/(?P<device>.+?)/(?:(?P<slot>.+?))$')
device_matcher_generic = re.compile(r'^(?P<realm>.+?)/d/(?P<device>.+?)/(?:(?P<slot>.+?))$')
wide_channel_matcher = re.compile(r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$')
direct_channel_matcher = re.compile(r'^(?P<realm>.+?)/channel/(?P<channel>.+?)/(?:(?P<slot>.+?))$')
direct_device_matcher = re.compile(r'^(?P<realm>.+?)/device/(?P<device>.+?)/(?:(?P<slot>.+?))$')

def topic_to_topology(self, topic):
"""
Expand Down Expand Up @@ -48,20 +48,20 @@ def topic_to_topology(self, topic):
the decoder now also knows how to handle per-device addressing schemes, like
mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000
Also, it has another special topic decoding scheme, by decomposing a dashed
identifier, and transforming it into the quadruple hierarchy.
mqttkit-1/dt/network-gateway-node
mqttkit-1/channel/network-gateway-node
"""

# Decode the topic.
address = None

# Try to match the per-device pattern.
m = self.device_matcher_generic.match(topic)
m = self.direct_device_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())
if "device" in address:
Expand All @@ -72,11 +72,11 @@ def topic_to_topology(self, topic):

# Try to match the per-device pattern with dashed topology encoding for topics.
if address is None:
m = self.device_matcher_dashed_topo.match(topic)
m = self.direct_channel_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())
if "device" in address:
segments = address.device.split("-")
if "channel" in address:
segments = address.channel.split("-")

# Compensate "too few segments": Fill up with "default" at the front.
missing_segments = 3 - len(segments)
Expand All @@ -94,13 +94,13 @@ def topic_to_topology(self, topic):
# Destructure three components / segments.
address.network, address.gateway, address.node = segments

# Do not propagate the `device` slot. It either has been
# Do not propagate the `channel` slot. It either has been
# dissolved, or it was propagated into the `node` slot.
del address.device
del address.channel

# Try to match the classic path-based WAN topic encoding scheme.
if address is None:
m = self.channel_matcher.match(topic)
m = self.wide_channel_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())

Expand Down
14 changes: 7 additions & 7 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class TestSettings:
channel_path_airrohr = '/mqttkit-1/itest/foo/bar/custom/airrohr'

# Per-device entrypoints.
device_influx_database = 'mqttkit_1_devices'
device_influx_measurement_sensors = 'default_123e4567_e89b_12d3_a456_426614174000_sensors'
device_mqtt_topic_generic = 'mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000/data.json'
device_mqtt_topic_dashed_topo = 'mqttkit-1/dt/itest-foo-bar/data.json'
device_http_path_generic = '/mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000/data'
device_http_path_dashed_topo = '/mqttkit-1/dt/itest-foo-bar/data'
direct_influx_database = 'mqttkit_1_devices'
direct_influx_measurement_sensors = 'default_123e4567_e89b_12d3_a456_426614174000_sensors'
direct_mqtt_topic_device = 'mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000/data.json'
direct_mqtt_topic_channel = 'mqttkit-1/channel/itest-foo-bar/data.json'
direct_http_path_device = '/mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000/data'
direct_http_path_channel = '/mqttkit-1/channel/itest-foo-bar/data'


settings = TestSettings
Expand All @@ -50,4 +50,4 @@ class TestSettings:
influx_events = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_events)
grafana = GrafanaWrapper(settings=settings)

device_influx_sensors = InfluxWrapper(database=settings.device_influx_database, measurement=settings.device_influx_measurement_sensors)
device_influx_sensors = InfluxWrapper(database=settings.direct_influx_database, measurement=settings.direct_influx_measurement_sensors)
16 changes: 8 additions & 8 deletions test/test_daq_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ def test_http_json_valid(machinery, create_influxdb, reset_influxdb):
@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_device_generic(machinery, device_create_influxdb, device_reset_influxdb):
def test_http_json_wan_device(machinery, device_create_influxdb, device_reset_influxdb):
"""
Run HTTP data acquisition with per-device addressing.
Addressing: Per-device WAN
Example: mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
Addressing: SensorWAN direct-device
Example: mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.device_http_path_generic, data)
deferred = threads.deferToThread(http_json_sensor, settings.direct_http_path_device, data)
yield deferred

# Check response.
Expand All @@ -82,20 +82,20 @@ def test_http_json_wan_device_generic(machinery, device_create_influxdb, device_
@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_device_dashed(machinery, create_influxdb, reset_influxdb):
def test_http_json_wan_channel(machinery, create_influxdb, reset_influxdb):
"""
Run MQTT data acquisition with per-device dashed-topo addressing.
Addressing: Per-device WAN, with dashed topology decoding
Example: mqttkit-1/dt/network-gateway-node
Addressing: SensorWAN direct-channel, with dashed topology decoding
Example: mqttkit-1/channel/network-gateway-node
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.device_http_path_dashed_topo, data)
deferred = threads.deferToThread(http_json_sensor, settings.direct_http_path_channel, data)
yield deferred

# Check response.
Expand Down
14 changes: 7 additions & 7 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,20 @@ def test_mqtt_to_influxdb_discrete(machinery, create_influxdb, reset_influxdb):
@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.device
def test_mqtt_to_influxdb_json_wan_device_generic(machinery, device_create_influxdb, device_reset_influxdb):
def test_mqtt_to_influxdb_json_wan_device(machinery, device_create_influxdb, device_reset_influxdb):
"""
Run MQTT data acquisition with per-device addressing.
Addressing: Per-device WAN
Example: mqttkit-1/d/123e4567-e89b-12d3-a456-426614174000
Addressing: SensorWAN direct-device
Example: mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 42.84,
'humidity': 83.1,
}
yield threads.deferToThread(mqtt_json_sensor, settings.device_mqtt_topic_generic, data)
yield threads.deferToThread(mqtt_json_sensor, settings.direct_mqtt_topic_device, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
Expand All @@ -123,20 +123,20 @@ def test_mqtt_to_influxdb_json_wan_device_generic(machinery, device_create_influ
@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.device
def test_mqtt_to_influxdb_json_wan_device_dashed(machinery, create_influxdb, reset_influxdb):
def test_mqtt_to_influxdb_json_wan_channel(machinery, create_influxdb, reset_influxdb):
"""
Run MQTT data acquisition with per-device dashed-topo addressing.
Addressing: Per-device WAN, with dashed topology decoding
Example: mqttkit-1/dt/network-gateway-node
Example: mqttkit-1/channel/network-gateway-node
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 42.84,
'humidity': 83.1,
}
yield threads.deferToThread(mqtt_json_sensor, settings.device_mqtt_topic_dashed_topo, data)
yield threads.deferToThread(mqtt_json_sensor, settings.direct_mqtt_topic_channel, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
Expand Down
18 changes: 9 additions & 9 deletions test/test_wan_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


@pytest.mark.strategy
def test_wan_strategy_channel():
def test_wan_strategy_wide_channel():
"""
Verify the classic WAN topology decoding, using a channel-based addressing scheme.
"""
Expand All @@ -30,7 +30,7 @@ def test_wan_strategy_device_generic_success():
Verify the per-device WAN topology decoding, using a generic device identifier.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/d/123e4567-e89b-12d3-a456-426614174000/data.json")
topology = strategy.topic_to_topology("myrealm/device/123e4567-e89b-12d3-a456-426614174000/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -48,7 +48,7 @@ def test_wan_strategy_device_dashed_topo_basic():
Verify the per-device WAN topology decoding, using a dashed device identifier, which translates to the topology.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/acme-area42-eui70b3d57ed005dac6/data.json")
topology = strategy.topic_to_topology("myrealm/channel/acme-area42-eui70b3d57ed005dac6/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -69,7 +69,7 @@ def test_wan_strategy_device_dashed_topo_too_few_components():
This specific test uses a vanilla TTN device identifier.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/eui-70b3d57ed005dac6/data.json")
topology = strategy.topic_to_topology("myrealm/channel/eui-70b3d57ed005dac6/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -88,7 +88,7 @@ def test_wan_strategy_device_dashed_topo_three_components():
This topic will be decoded as-is.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/acme-area42-eui70b3d57ed005dac6/data.json")
topology = strategy.topic_to_topology("myrealm/channel/acme-area42-eui70b3d57ed005dac6/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -107,7 +107,7 @@ def test_wan_strategy_device_dashed_topo_too_many_components_merge_suffixes():
The solution is to merge all trailing segments into the `node` slot, re-joining them with `-`.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/acme-area42-eui70b3d57ed005dac6-suffix/data.json")
topology = strategy.topic_to_topology("myrealm/channel/acme-area42-eui70b3d57ed005dac6-suffix/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -132,7 +132,7 @@ def test_wan_strategy_device_dashed_topo_too_many_components_redundant_realm():
Cheers, @thiasB.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/myrealm-acme-area42-eui70b3d57ed005dac6/data.json")
topology = strategy.topic_to_topology("myrealm/channel/myrealm-acme-area42-eui70b3d57ed005dac6/data.json")
assert topology == SmartMunch(
{
"realm": "myrealm",
Expand All @@ -151,7 +151,7 @@ def test_wan_strategy_device_generic_empty():
Topic-to-topology decoding should return `None`.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/d/data.json")
topology = strategy.topic_to_topology("myrealm/device/data.json")
assert topology is None


Expand All @@ -162,5 +162,5 @@ def test_wan_strategy_device_dashed_topo_empty():
Topic-to-topology decoding should return `None`.
"""
strategy = WanBusStrategy()
topology = strategy.topic_to_topology("myrealm/dt/data.json")
topology = strategy.topic_to_topology("myrealm/channel/data.json")
assert topology is None

0 comments on commit b11524b

Please sign in to comment.