Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hololinked/client/mqtt/consumed_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
def listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
# This method is called from a different thread but also finishes quickly, we wont redo this way
# for the time being.
topic = f"{self.resource.thing_id}/{self.resource.name}"
topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}"

def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage):
try:
Expand All @@ -57,7 +57,7 @@ def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage):
self.sync_client.message_callback_add(topic, on_topic_message)

async def async_listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
topic = f"{self.resource.thing_id}/{self.resource.name}"
topic = form.mqv_topic or f"{self.resource.thing_id}/{self.resource.name}"
try:
await self.async_client.__aenter__()
except aiomqtt.MqttReentrantError:
Expand Down
26 changes: 14 additions & 12 deletions hololinked/server/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import copy
import ssl


from ..utils import get_current_async_loop
from .utils import consume_broker_queue, consume_broker_pubsub_per_event
from ..config import global_config
from ..constants import Operations
from ..serializers import Serializers
from ..td.interaction_affordance import EventAffordance
from ..td.interaction_affordance import EventAffordance, PropertyAffordance
from ..param.parameters import Selector, String, Integer, ClassSelector, List


Expand Down Expand Up @@ -149,23 +151,23 @@ async def publish_thing_description(self, ZMQ_TD: dict[str, Any]) -> dict[str, A
TD.pop("actions", None)
# remove properties that are not observable
for name in ZMQ_TD.get("properties", {}).keys():
if not TD["properties"][name].get("observable", False):
affordance = PropertyAffordance.from_TD(name, ZMQ_TD)
if not affordance.observable:
TD["properties"].pop(name)
continue
forms = TD["properties"][name].pop("forms", [])
TD["properties"][name]["forms"] = []
for form in forms:
if form["op"] == "observeproperty":
form["href"] = f"mqtt://{self.hostname}:{self.port}"
TD["properties"][name]["forms"].append(form)
form = affordance.retrieve_form(Operations.observeproperty)
form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}"
form.mqv_topic = f"{TD['id']}/{name}"
TD["properties"][name]["forms"].append(form.json())
# repurpose event
for name in ZMQ_TD.get("events", {}).keys():
forms = TD["events"][name].pop("forms", [])
affordance = EventAffordance.from_TD(name, ZMQ_TD)
TD["events"][name]["forms"] = []
for form in forms:
if form["op"] == "subscribeevent":
form["href"] = f"mqtt://{self.hostname}:{self.port}"
TD["events"][name]["forms"].append(form)
form = affordance.retrieve_form(Operations.subscribeevent)
form.href = f"mqtt{'s' if self.ssl_context else ''}://{self.hostname}:{self.port}"
form.mqv_topic = f"{TD['id']}/{name}"
TD["events"][name]["forms"].append(form.json())
await self.client.publish(
topic=f"{TD['id']}/thing-description",
payload=Serializers.json.dumps(TD),
Expand Down
3 changes: 3 additions & 0 deletions hololinked/td/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Form(Schema):
href: str = None
op: str = None
htv_methodName: str = Field(default=None, alias="htv:methodName")
mqv_topic: str = Field(default=None, alias="mqv:topic")
contentType: typing.Optional[str] = "application/json"
additionalResponses: typing.Optional[typing.List[AdditionalExpectedResponse]] = None
contentEncoding: typing.Optional[str] = None
Expand All @@ -61,6 +62,8 @@ def from_TD(cls, form_json: typing.Dict[str, typing.Any]) -> "Form":
for field in cls.model_fields:
if field == "htv_methodName" and "htv:methodName" in form_json:
setattr(form, field, form_json["htv:methodName"])
elif field == "mqv_topic" and "mqv:topic" in form_json:
setattr(form, field, form_json["mqv:topic"])
elif field in form_json:
setattr(form, field, form_json[field])
return form
Expand Down
13 changes: 11 additions & 2 deletions tests/helper-scripts/client.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,7 @@
" 'observable': True,\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'observeproperty',\n",
" 'mqv:topic': 'example-test/observable_list_prop',\n",
" 'contentType': 'application/json'}]},\n",
" 'observable_readonly_prop': {'description': 'An observable readonly property to check observable events on read operations',\n",
" 'default': 0,\n",
Expand All @@ -2056,6 +2057,7 @@
" 'observable': True,\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'observeproperty',\n",
" 'mqv:topic': 'example-test/observable_readonly_prop',\n",
" 'contentType': 'application/json'}]},\n",
" 'sleeping_prop': {'description': 'A property that sleeps for 10 seconds on read operations',\n",
" 'default': 0,\n",
Expand All @@ -2064,6 +2066,7 @@
" 'observable': True,\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'observeproperty',\n",
" 'mqv:topic': 'example-test/sleeping_prop',\n",
" 'contentType': 'application/json'}]}},\n",
" 'events': {'data_point_event': {'description': 'Event raised when a new data point is available',\n",
" 'data': {'type': 'object',\n",
Expand All @@ -2072,26 +2075,32 @@
" 'required': ['timestamp', 'energy']},\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/data_point_event',\n",
" 'contentType': 'application/json'}]},\n",
" 'test_binary_payload_event': {'description': 'test event with binary payload',\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/test_binary_payload_event',\n",
" 'contentType': 'application/json'}]},\n",
" 'test_event': {'description': 'test event with arbitrary payload',\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/test_event',\n",
" 'contentType': 'application/json'}]},\n",
" 'test_event_with_json_schema': {'description': 'test event with schema validation',\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/test_event_with_json_schema',\n",
" 'contentType': 'application/json'}]},\n",
" 'test_event_with_pydantic_schema': {'description': 'test event with pydantic schema validation',\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/test_event_with_pydantic_schema',\n",
" 'contentType': 'application/json'}]},\n",
" 'test_mixed_content_payload_event': {'description': 'test event with mixed content payload',\n",
" 'forms': [{'href': 'mqtt://localhost:8883',\n",
" 'op': 'subscribeevent',\n",
" 'mqv:topic': 'example-test/test_mixed_content_payload_event',\n",
" 'contentType': 'application/json'}]}}}"
]
},
Expand Down Expand Up @@ -2119,7 +2128,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"id": "335296e0",
"metadata": {},
"outputs": [],
Expand All @@ -2141,7 +2150,7 @@
"\n",
"# object_proxy.subscribe_event(\"test_event\", cb)\n",
"# object_proxy.subscribe_event(\"test_event\", [cb1, cb2], concurrent=True)\n",
"# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n",
"object_proxy_mqtt.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True)\n",
"# object_proxy.subscribe_event(\"test_event\", [async_cb1, async_cb2], asynch=True, concurrent=True)"
]
},
Expand Down
22 changes: 11 additions & 11 deletions tests/helper-scripts/run_test_thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

global_config.DEBUG = True

# thing = TestThing(id="example-test")
thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation")
thing = TestThing(id="example-test")
# thing = OceanOpticsSpectrometer(id="example-test", serial_number="simulation")


Serializers.register_for_object(TestThing.db_init_int_prop, Serializers.pickle)
Expand All @@ -35,20 +35,20 @@
http_server = HTTPServer(port=9000)
zmq_server = ZMQServer(id="example-test-server", things=[thing], access_points="IPC")

# mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
# if not os.path.exists("ca.crt"):
# raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection")
# mqtt_ssl.load_verify_locations(cafile="ca.crt")
# mqtt_ssl.check_hostname = True
# mqtt_ssl.verify_mode = ssl.CERT_REQUIRED
# mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2
mqtt_ssl = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if not os.path.exists("ca.crt"):
raise FileNotFoundError("CA certificate 'ca.crt' not found in current directory for MQTT TLS connection")
mqtt_ssl.load_verify_locations(cafile="ca.crt")
mqtt_ssl.check_hostname = True
mqtt_ssl.verify_mode = ssl.CERT_REQUIRED
mqtt_ssl.minimum_version = ssl.TLSVersion.TLSv1_2

mqtt_publisher = MQTTPublisher(
hostname="localhost",
port=8883,
username="sampleuser",
password="samplepass",
qos=1,
# ssl_context=mqtt_ssl,
ssl_context=mqtt_ssl,
)
thing.run(servers=[http_server, zmq_server])
thing.run(servers=[http_server, zmq_server, mqtt_publisher])
Loading