diff --git a/hololinked/client/mqtt/consumed_interactions.py b/hololinked/client/mqtt/consumed_interactions.py index b1b67d26..7ca9c7a7 100644 --- a/hololinked/client/mqtt/consumed_interactions.py +++ b/hololinked/client/mqtt/consumed_interactions.py @@ -30,7 +30,7 @@ def __init__( def listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None: # This method is called from a different thread but also finishes quickly, we wont redo this way # for the time being. - topic = f"{self.resource.thing_id}/{self.resource.name}" + topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}" def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage): try: @@ -57,7 +57,7 @@ def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage): self.sync_client.message_callback_add(topic, on_topic_message) async def async_listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None: - topic = f"{self.resource.thing_id}/{self.resource.name}" + topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}" try: await self.async_client.__aenter__() except aiomqtt.MqttReentrantError: diff --git a/hololinked/server/mqtt.py b/hololinked/server/mqtt.py index 488aa35e..1e3ecabc 100644 --- a/hololinked/server/mqtt.py +++ b/hololinked/server/mqtt.py @@ -5,11 +5,13 @@ import copy import ssl + from ..utils import get_current_async_loop from .utils import consume_broker_queue, consume_broker_pubsub_per_event from ..config import global_config +from ..constants import Operations from ..serializers import Serializers -from ..td.interaction_affordance import EventAffordance +from ..td.interaction_affordance import EventAffordance, PropertyAffordance from ..param.parameters import Selector, String, Integer, ClassSelector, List @@ -149,23 +151,23 @@ async def publish_thing_description(self, ZMQ_TD: dict[str, Any]) -> dict[str, A TD.pop("actions", None) # remove properties that are not observable for name in ZMQ_TD.get("properties", {}).keys(): - if not TD["properties"][name].get("observable", False): + affordance = PropertyAffordance.from_TD(name, ZMQ_TD) + if not affordance.observable: TD["properties"].pop(name) continue - forms = TD["properties"][name].pop("forms", []) TD["properties"][name]["forms"] = [] - for form in forms: - if form["op"] == "observeproperty": - form["href"] = f"mqtt://{self.hostname}:{self.port}" - TD["properties"][name]["forms"].append(form) + form = affordance.retrieve_form(Operations.observeproperty) + form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}" + form.mqv_topic = f"{TD['id']}/{name}" + TD["properties"][name]["forms"].append(form.json()) # repurpose event for name in ZMQ_TD.get("events", {}).keys(): - forms = TD["events"][name].pop("forms", []) + affordance = EventAffordance.from_TD(name, ZMQ_TD) TD["events"][name]["forms"] = [] - for form in forms: - if form["op"] == "subscribeevent": - form["href"] = f"mqtt://{self.hostname}:{self.port}" - TD["events"][name]["forms"].append(form) + form = affordance.retrieve_form(Operations.subscribeevent) + form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}" + form.mqv_topic = f"{TD['id']}/{name}" + TD["events"][name]["forms"].append(form.json()) await self.client.publish( topic=f"{TD['id']}/thing-description", payload=Serializers.json.dumps(TD), diff --git a/hololinked/td/forms.py b/hololinked/td/forms.py index 1c21a59c..0d2ec91c 100644 --- a/hololinked/td/forms.py +++ b/hololinked/td/forms.py @@ -39,6 +39,7 @@ class Form(Schema): href: str = None op: str = None htv_methodName: str = Field(default=None, alias="htv:methodName") + mqv_topic: str = Field(default=None, alias="mqv:topic") contentType: typing.Optional[str] = "application/json" additionalResponses: typing.Optional[typing.List[AdditionalExpectedResponse]] = None contentEncoding: typing.Optional[str] = None @@ -61,6 +62,8 @@ def from_TD(cls, form_json: typing.Dict[str, typing.Any]) -> "Form": for field in cls.model_fields: if field == "htv_methodName" and "htv:methodName" in form_json: setattr(form, field, form_json["htv:methodName"]) + elif field == "mqv_topic" and "mqv:topic" in form_json: + setattr(form, field, form_json["mqv:topic"]) elif field in form_json: setattr(form, field, form_json[field]) return form diff --git a/tests/helper-scripts/client.ipynb b/tests/helper-scripts/client.ipynb index b17048f0..bc5cc927 100644 --- a/tests/helper-scripts/client.ipynb +++ b/tests/helper-scripts/client.ipynb @@ -2048,6 +2048,7 @@ " 'observable': True,\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'observeproperty',\n", + " 'mqv:topic': 'example-test/observable_list_prop',\n", " 'contentType': 'application/json'}]},\n", " 'observable_readonly_prop': {'description': 'An observable readonly property to check observable events on read operations',\n", " 'default': 0,\n", @@ -2056,6 +2057,7 @@ " 'observable': True,\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'observeproperty',\n", + " 'mqv:topic': 'example-test/observable_readonly_prop',\n", " 'contentType': 'application/json'}]},\n", " 'sleeping_prop': {'description': 'A property that sleeps for 10 seconds on read operations',\n", " 'default': 0,\n", @@ -2064,6 +2066,7 @@ " 'observable': True,\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'observeproperty',\n", + " 'mqv:topic': 'example-test/sleeping_prop',\n", " 'contentType': 'application/json'}]}},\n", " 'events': {'data_point_event': {'description': 'Event raised when a new data point is available',\n", " 'data': {'type': 'object',\n", @@ -2072,26 +2075,32 @@ " 'required': ['timestamp', 'energy']},\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/data_point_event',\n", " 'contentType': 'application/json'}]},\n", " 'test_binary_payload_event': {'description': 'test event with binary payload',\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/test_binary_payload_event',\n", " 'contentType': 'application/json'}]},\n", " 'test_event': {'description': 'test event with arbitrary payload',\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/test_event',\n", " 'contentType': 'application/json'}]},\n", " 'test_event_with_json_schema': {'description': 'test event with schema validation',\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/test_event_with_json_schema',\n", " 'contentType': 'application/json'}]},\n", " 'test_event_with_pydantic_schema': {'description': 'test event with pydantic schema validation',\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/test_event_with_pydantic_schema',\n", " 'contentType': 'application/json'}]},\n", " 'test_mixed_content_payload_event': {'description': 'test event with mixed content payload',\n", " 'forms': [{'href': 'mqtt://localhost:8883',\n", " 'op': 'subscribeevent',\n", + " 'mqv:topic': 'example-test/test_mixed_content_payload_event',\n", " 'contentType': 'application/json'}]}}}" ] }, @@ -2119,7 +2128,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "335296e0", "metadata": {}, "outputs": [], @@ -2141,7 +2150,7 @@ "\n", "# object_proxy.subscribe_event(\"test_event\", cb)\n", "# object_proxy.subscribe_event(\"test_event\", [cb1, cb2], concurrent=True)\n", - "# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n", + "object_proxy_mqtt.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n", "# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True, concurrent=True)" ] }, diff --git a/tests/helper-scripts/run_test_thing.py b/tests/helper-scripts/run_test_thing.py index 78dcc898..9e674222 100644 --- a/tests/helper-scripts/run_test_thing.py +++ b/tests/helper-scripts/run_test_thing.py @@ -15,8 +15,8 @@ global_config.DEBUG = True -# thing = TestThing(id="example-test") -thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation") +thing = TestThing(id="example-test") +# thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation") Serializers.register_for_object(TestThing.db_init_int_prop, Serializers.pickle) @@ -35,13 +35,13 @@ http_server = HTTPServer(port=9000) zmq_server = ZMQServer(id="example-test-server", things=[thing], access_points="IPC") -# mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) -# if not os.path.exists("ca.crt"): -# raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection") -# mqtt_ssl.load_verify_locations(cafile="ca.crt") -# mqtt_ssl.check_hostname = True -# mqtt_ssl.verify_mode = ssl.CERT_REQUIRED -# mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2 +mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) +if not os.path.exists("ca.crt"): + raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection") +mqtt_ssl.load_verify_locations(cafile="ca.crt") +mqtt_ssl.check_hostname = True +mqtt_ssl.verify_mode = ssl.CERT_REQUIRED +mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2 mqtt_publisher = MQTTPublisher( hostname="localhost", @@ -49,6 +49,6 @@ username="sampleuser", password="samplepass", qos=1, - # ssl_context=mqtt_ssl, + ssl_context=mqtt_ssl, ) -thing.run(servers=[http_server, zmq_server]) +thing.run(servers=[http_server, zmq_server, mqtt_publisher])