Skip to content

Commit b5dc2fc

Browse files
dcjclaude
andcommitted
Normalize MQTT URI schemes from /notifiers (Postel's Law)
normalize_broker_uri now accepts bare hosts (broker.example.com), host:port forms, and strips unknown schemes — previously these either misparsed or were rejected. extract_mqtt_broker_uris() handles both the spec notifiersResponse shape ({"MQTT": {"URIS": [...]}}) and the VTN-RI list shape ([{"transport": "MQTT", "url": ...}]) with field-name tolerance. vtn_supports_mqtt() rewritten to recognize both shapes properly instead of substring-matching the str() of the response. Closes OA3C-7q7. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 71d43f8 commit b5dc2fc

6 files changed

Lines changed: 241 additions & 15 deletions

File tree

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,20 @@ pid = ven.resolve_program_id("residential-pricing")
208208
```python
209209
notifiers = ven.discover_notifiers()
210210
supports_mqtt = ven.vtn_supports_mqtt()
211+
212+
# Extract broker URIs from /notifiers — handles both spec
213+
# ({"MQTT": {"URIS": [...]}}) and VTN-RI ([{"transport": "MQTT", "url": ...}])
214+
# response shapes. Returns [] if MQTT isn't advertised.
215+
uris = ven.get_mqtt_broker_uris()
216+
if uris:
217+
mqtt = ven.add_mqtt(uris[0])
211218
```
212219

220+
URIs from `/notifiers` may use any of the `mqtt://`, `mqtts://`, `tcp://`, or
221+
`ssl://` schemes (or no scheme at all — `broker.example.com:1883`).
222+
`MqttChannel` and `MQTTConnection` accept all of these via
223+
`normalize_broker_uri`.
224+
213225
### VEN-scoped topic methods
214226

215227
Default to the registered `ven_id` when called without arguments:

src/openadr3_client/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
advertise_vtn,
77
discover_vtns,
88
)
9-
from openadr3_client.mqtt import MQTTConnection, MQTTMessage, normalize_broker_uri
9+
from openadr3_client.mqtt import (
10+
MQTTConnection,
11+
MQTTMessage,
12+
extract_mqtt_broker_uris,
13+
normalize_broker_uri,
14+
)
1015
from openadr3_client.notifications import (
1116
MqttChannel,
1217
NotificationChannel,
@@ -37,5 +42,6 @@
3742
# Helpers
3843
"extract_topics",
3944
"normalize_broker_uri",
45+
"extract_mqtt_broker_uris",
4046
"detect_lan_ip",
4147
]

src/openadr3_client/mqtt.py

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,83 @@
2424
log = logging.getLogger(__name__)
2525

2626

27+
_KNOWN_MQTT_SCHEMES = frozenset({"mqtt", "mqtts", "tcp", "ssl"})
28+
_TLS_MQTT_SCHEMES = frozenset({"mqtts", "ssl"})
29+
30+
2731
def normalize_broker_uri(uri: str) -> tuple[str, int, bool]:
2832
"""Translate an MQTT URI into (host, port, use_tls).
2933
30-
Supports mqtt://, mqtts://, tcp://, ssl:// schemes.
31-
Adds default ports (1883 for plain, 8883 for TLS) when omitted.
34+
Liberal in what is accepted (Postel's Law):
35+
36+
- Recognized schemes: ``mqtt://`` and ``tcp://`` (plain, default 1883),
37+
``mqtts://`` and ``ssl://`` (TLS, default 8883). Case-insensitive.
38+
- Bare or unknown-scheme inputs (``broker.example.com``,
39+
``broker.example.com:1883``) are interpreted as plain MQTT.
3240
"""
3341
parsed = urlparse(uri)
3442
scheme = parsed.scheme.lower()
43+
if scheme not in _KNOWN_MQTT_SCHEMES:
44+
# Unknown or missing scheme — strip any "<scheme>://" prefix and
45+
# re-parse as plain MQTT. Be liberal in what we accept.
46+
rest = uri.split("://", 1)[1] if "://" in uri else uri
47+
parsed = urlparse(f"mqtt://{rest}")
48+
scheme = "mqtt"
49+
3550
host = parsed.hostname or "127.0.0.1"
51+
use_tls = scheme in _TLS_MQTT_SCHEMES
52+
port = parsed.port or (8883 if use_tls else 1883)
53+
return host, port, use_tls
3654

37-
if scheme in ("mqtts", "ssl"):
38-
use_tls = True
39-
port = parsed.port or 8883
40-
else:
41-
use_tls = False
42-
port = parsed.port or 1883
4355

44-
return host, port, use_tls
56+
def extract_mqtt_broker_uris(notifiers: Any) -> list[str]:
57+
"""Extract MQTT broker URIs from a ``/notifiers`` response.
58+
59+
Accepts both response shapes seen in the wild:
60+
61+
- **Spec shape** (`notifiersResponse`): ``{"WEBHOOK": true, "MQTT": {"URIS": [...], ...}}``
62+
- **VTN-RI shape**: ``[{"transport": "MQTT", "url": "..."}, ...]``
63+
64+
Returns an empty list if no MQTT URI is advertised. Schemes are returned
65+
as-is (callers should pass them through :func:`normalize_broker_uri`).
66+
"""
67+
if not notifiers:
68+
return []
69+
70+
if isinstance(notifiers, dict):
71+
mqtt = notifiers.get("MQTT") or notifiers.get("mqtt")
72+
if not isinstance(mqtt, dict):
73+
return []
74+
uris = mqtt.get("URIS") or mqtt.get("uris")
75+
if isinstance(uris, list):
76+
return [u for u in uris if isinstance(u, str)]
77+
single = mqtt.get("URI") or mqtt.get("uri") or mqtt.get("url") or mqtt.get("broker")
78+
return [single] if isinstance(single, str) else []
79+
80+
if isinstance(notifiers, list):
81+
result: list[str] = []
82+
for item in notifiers:
83+
if not isinstance(item, dict):
84+
continue
85+
transport = (item.get("transport") or item.get("Transport") or "").upper()
86+
if transport and transport != "MQTT":
87+
continue
88+
uri = (
89+
item.get("url")
90+
or item.get("uri")
91+
or item.get("URI")
92+
or item.get("URL")
93+
or item.get("broker")
94+
or item.get("endpoint")
95+
)
96+
if isinstance(uri, str):
97+
result.append(uri)
98+
uris = item.get("URIS") or item.get("uris")
99+
if isinstance(uris, list):
100+
result.extend(u for u in uris if isinstance(u, str))
101+
return result
102+
103+
return []
45104

46105

47106
def _parse_payload(raw: bytes, topic: str) -> Any:

src/openadr3_client/ven.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from openadr3.entities.models import Event, Program
1212

1313
from openadr3_client.base import BaseClient
14+
from openadr3_client.mqtt import extract_mqtt_broker_uris
1415
from openadr3_client.notifications import (
1516
MqttChannel,
1617
NotificationChannel,
@@ -130,15 +131,32 @@ def discover_notifiers(self) -> dict[str, Any] | None:
130131
return None
131132

132133
def vtn_supports_mqtt(self) -> bool:
133-
"""Check if the VTN advertises MQTT notification support."""
134+
"""Check if the VTN advertises MQTT notification support.
135+
136+
Handles both the spec ``notifiersResponse`` dict (presence of an
137+
``MQTT`` key with a non-null binding object) and the VTN-RI list
138+
shape (``[{"transport": "MQTT", ...}, ...]``).
139+
"""
134140
notifiers = self.discover_notifiers()
135141
if not notifiers:
136142
return False
137-
# VTN-RI returns a list of notifier dicts with "transport" field
143+
if isinstance(notifiers, dict):
144+
mqtt = notifiers.get("MQTT") or notifiers.get("mqtt")
145+
return bool(mqtt)
138146
if isinstance(notifiers, list):
139-
return any(n.get("transport", "").upper() == "MQTT" for n in notifiers)
140-
# Or it might be a dict with transport info
141-
return "mqtt" in str(notifiers).lower()
147+
return any(
148+
isinstance(n, dict) and (n.get("transport") or "").upper() == "MQTT"
149+
for n in notifiers
150+
)
151+
return False
152+
153+
def get_mqtt_broker_uris(self) -> list[str]:
154+
"""Return MQTT broker URIs the VTN advertises via ``/notifiers``.
155+
156+
Empty if MQTT is not advertised. Use :func:`normalize_broker_uri`
157+
before passing to a connection if you need (host, port, use_tls).
158+
"""
159+
return extract_mqtt_broker_uris(self.discover_notifiers())
142160

143161
# -- Channel management --
144162

tests/test_mqtt.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from openadr3_client.mqtt import (
99
MQTTConnection,
1010
_parse_payload,
11+
extract_mqtt_broker_uris,
1112
normalize_broker_uri,
1213
)
1314

@@ -31,6 +32,83 @@ def test_tcp_scheme(self):
3132
def test_ssl_scheme(self):
3233
assert normalize_broker_uri("ssl://broker.local") == ("broker.local", 8883, True)
3334

35+
def test_uppercase_scheme(self):
36+
assert normalize_broker_uri("MQTTS://broker.local") == ("broker.local", 8883, True)
37+
38+
def test_bare_host(self):
39+
assert normalize_broker_uri("broker.example.com") == ("broker.example.com", 1883, False)
40+
41+
def test_bare_host_with_port(self):
42+
assert normalize_broker_uri("broker.example.com:9883") == (
43+
"broker.example.com",
44+
9883,
45+
False,
46+
)
47+
48+
def test_bare_ip_with_port(self):
49+
assert normalize_broker_uri("127.0.0.1:1883") == ("127.0.0.1", 1883, False)
50+
51+
def test_unknown_scheme_treated_as_plain(self):
52+
# An unrecognized scheme falls back to mqtt:// — be liberal in what we accept.
53+
assert normalize_broker_uri("foo://broker.local:1234") == ("broker.local", 1234, False)
54+
55+
56+
class TestExtractMqttBrokerUris:
57+
def test_spec_dict_shape(self):
58+
notifiers = {
59+
"WEBHOOK": True,
60+
"MQTT": {
61+
"URIS": ["mqtts://broker.vtn.example.com", "mqtt://broker.vtn.example.com:1883"],
62+
"serialization": "JSON",
63+
"authentication": {},
64+
},
65+
}
66+
assert extract_mqtt_broker_uris(notifiers) == [
67+
"mqtts://broker.vtn.example.com",
68+
"mqtt://broker.vtn.example.com:1883",
69+
]
70+
71+
def test_spec_dict_shape_lowercase_keys(self):
72+
notifiers = {"mqtt": {"uris": ["mqtt://broker:1883"]}}
73+
assert extract_mqtt_broker_uris(notifiers) == ["mqtt://broker:1883"]
74+
75+
def test_spec_dict_shape_no_mqtt(self):
76+
assert extract_mqtt_broker_uris({"WEBHOOK": True}) == []
77+
78+
def test_spec_dict_shape_single_uri_field(self):
79+
notifiers = {"MQTT": {"uri": "mqtt://broker:1883"}}
80+
assert extract_mqtt_broker_uris(notifiers) == ["mqtt://broker:1883"]
81+
82+
def test_vtn_ri_list_shape(self):
83+
notifiers = [
84+
{"transport": "MQTT", "url": "mqtt://broker:1883"},
85+
{"transport": "WEBHOOK", "url": "https://example.com"},
86+
]
87+
assert extract_mqtt_broker_uris(notifiers) == ["mqtt://broker:1883"]
88+
89+
def test_vtn_ri_list_uri_field_variants(self):
90+
# Different VTNs may use different field names for the broker URI.
91+
for field in ("url", "uri", "URI", "URL", "broker", "endpoint"):
92+
notifiers = [{"transport": "MQTT", field: "mqtt://broker:1883"}]
93+
assert extract_mqtt_broker_uris(notifiers) == ["mqtt://broker:1883"], field
94+
95+
def test_vtn_ri_list_with_uris_array_in_item(self):
96+
notifiers = [
97+
{"transport": "MQTT", "URIS": ["mqtts://b1:8883", "tcp://b2:1883"]},
98+
]
99+
assert extract_mqtt_broker_uris(notifiers) == ["mqtts://b1:8883", "tcp://b2:1883"]
100+
101+
def test_empty_inputs(self):
102+
assert extract_mqtt_broker_uris(None) == []
103+
assert extract_mqtt_broker_uris({}) == []
104+
assert extract_mqtt_broker_uris([]) == []
105+
106+
def test_normalizable_tcp_scheme_round_trip(self):
107+
# The whole point: VTN advertises tcp://, we accept it through to normalization.
108+
uris = extract_mqtt_broker_uris([{"transport": "MQTT", "url": "tcp://broker:1883"}])
109+
assert uris == ["tcp://broker:1883"]
110+
assert normalize_broker_uri(uris[0]) == ("broker", 1883, False)
111+
34112

35113
class TestParsePayload:
36114
def test_json_object(self):

tests/test_ven.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,59 @@ def test_vtn_supports_mqtt_error(self, mock_create):
196196
with VenClient(url="http://test", token="tok") as ven:
197197
assert ven.vtn_supports_mqtt() is False
198198

199+
@patch("openadr3_client.base.create_ven_client")
200+
def test_vtn_supports_mqtt_spec_dict_shape(self, mock_create):
201+
# Spec notifiersResponse: dict with MQTT binding object
202+
mock_api = MagicMock()
203+
mock_api.get_notifiers.return_value = _make_response(
204+
200,
205+
{
206+
"WEBHOOK": True,
207+
"MQTT": {
208+
"URIS": ["mqtts://broker.vtn.example.com"],
209+
"serialization": "JSON",
210+
"authentication": {},
211+
},
212+
},
213+
)
214+
mock_create.return_value = mock_api
215+
216+
with VenClient(url="http://test", token="tok") as ven:
217+
assert ven.vtn_supports_mqtt() is True
218+
219+
@patch("openadr3_client.base.create_ven_client")
220+
def test_vtn_supports_mqtt_spec_dict_no_mqtt(self, mock_create):
221+
mock_api = MagicMock()
222+
mock_api.get_notifiers.return_value = _make_response(200, {"WEBHOOK": True})
223+
mock_create.return_value = mock_api
224+
225+
with VenClient(url="http://test", token="tok") as ven:
226+
assert ven.vtn_supports_mqtt() is False
227+
228+
@patch("openadr3_client.base.create_ven_client")
229+
def test_get_mqtt_broker_uris_vtn_ri_shape(self, mock_create):
230+
mock_api = MagicMock()
231+
mock_api.get_notifiers.return_value = _make_response(
232+
200,
233+
[{"transport": "MQTT", "url": "tcp://broker:1883"}],
234+
)
235+
mock_create.return_value = mock_api
236+
237+
with VenClient(url="http://test", token="tok") as ven:
238+
assert ven.get_mqtt_broker_uris() == ["tcp://broker:1883"]
239+
240+
@patch("openadr3_client.base.create_ven_client")
241+
def test_get_mqtt_broker_uris_spec_shape(self, mock_create):
242+
mock_api = MagicMock()
243+
mock_api.get_notifiers.return_value = _make_response(
244+
200,
245+
{"MQTT": {"URIS": ["mqtts://b:8883"]}},
246+
)
247+
mock_create.return_value = mock_api
248+
249+
with VenClient(url="http://test", token="tok") as ven:
250+
assert ven.get_mqtt_broker_uris() == ["mqtts://b:8883"]
251+
199252

200253
class TestVenClientMqttTopics:
201254
@patch("openadr3_client.base.create_ven_client")

0 commit comments

Comments
 (0)