Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.ipynb export-ignore
*.ipynb linguist-vendored
9 changes: 0 additions & 9 deletions .gitlab-ci.yml

This file was deleted.

9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

✓ means ready to try

## [v0.3.4] - 2025-10-02

- fixes a bug in content type in the forms of TD for HTTP protocol binding, when multiple serializers are used
- one can specify numpy array or arbitrary python objects in pydantic models for properties, actions and events

## [v0.3.3] - 2025-09-25

- updates API reference largely to latest version

## [v0.3.2] - 2025-09-21

- adds TD security definition for BCryptBasicSecurity and ArgsBasicSecurity
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ if __name__ == '__main__':
id='spectrometer',
serial_number='S14155',
).run(
access_points=['HTTP', 'ZMQ-IPC']
access_points=[
("ZMQ", "IPC"),
("HTTP", 8080),
]
)
# HTTP & ZMQ Interprocess Communication
```
Expand Down
2 changes: 1 addition & 1 deletion doc
Submodule doc updated from 633d01 to afa0e6
3 changes: 2 additions & 1 deletion hololinked/client/http/consumed_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def get_body_from_response(
body = response.content
if not body:
return
serializer = Serializers.content_types.get(form.contentType or "application/json")
givenContentType = response.headers.get("Content-Type", None)
serializer = Serializers.content_types.get(givenContentType or form.contentType or "application/json")
if serializer is None:
raise ValueError(f"Unsupported content type: {form.contentType}")
body = serializer.loads(body)
Expand Down
9 changes: 7 additions & 2 deletions hololinked/core/property.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def to_affordance(self, owner_inst=None):


try:
from pydantic import BaseModel, RootModel, create_model
from pydantic import BaseModel, RootModel, create_model, ConfigDict

def wrap_plain_types_in_rootmodel(model: type) -> type[BaseModel] | type[RootModel]:
"""
Expand All @@ -344,7 +344,12 @@ def wrap_plain_types_in_rootmodel(model: type) -> type[BaseModel] | type[RootMod
return
if issubklass(model, BaseModel):
return model
return create_model(f"{model!r}", root=(model, ...), __base__=RootModel)
return create_model(
f"{model!r}",
root=(model, ...),
__base__=RootModel,
__config__=ConfigDict(arbitrary_types_allowed=True),
) # type: ignore[call-overload]
except ImportError:

def wrap_plain_types_in_rootmodel(model: type) -> type:
Expand Down
4 changes: 2 additions & 2 deletions hololinked/core/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ def run(
access_points = kwargs.get("access_points", None) # type: dict[str, dict | int | str | list[str]]
servers = kwargs.get("servers", []) # type: typing.Optional[typing.List[BaseProtocolServer]]

if access_points is None and servers is None:
if access_points is None and len(servers) == 0:
raise ValueError("At least one of access_points or servers must be provided.")
if access_points is not None and servers is not None:
if access_points is not None and len(servers) > 0:
raise ValueError("Only one of access_points or servers can be provided.")

if access_points is not None:
Expand Down
2 changes: 1 addition & 1 deletion hololinked/core/zmq/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2228,7 +2228,7 @@ def interrupt_message(self) -> EventMessage:
return EventMessage.craft_from_arguments(
event_id=f"{self.id}/interrupting-server",
sender_id=self.id,
payload=SerializableData("INTERRUPT"),
payload=SerializableData("INTERRUPT", content_type="application/json"),
)

def exit(self):
Expand Down
140 changes: 71 additions & 69 deletions hololinked/core/zmq/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
set_global_event_loop_policy,
)
from ...config import global_config
from ...serializers import Serializers
from ...serializers import Serializers, BaseSerializer
from .message import (
EMPTY_BYTE,
ERROR,
Expand All @@ -33,7 +33,7 @@
from ..thing import Thing
from ..property import Property
from ..properties import TypedDict
from ..actions import BoundAction, action as remote_method
from ..actions import BoundAction
from ..logger import LogHistoryHandler


Expand Down Expand Up @@ -377,88 +377,64 @@ async def run_thing_instance(self, instance: Thing, scheduler: typing.Optional["
return_value = await self.execute_operation(instance, objekt, operation, payload, preserialized_payload)

# handle return value
if (
isinstance(return_value, tuple)
and len(return_value) == 2
and (isinstance(return_value[1], bytes) or isinstance(return_value[1], PreserializedData))
):
if fetch_execution_logs:
return_value[0] = {
"return_value": return_value[0],
"execution_logs": list_handler.log_list,
}
payload = SerializableData(
return_value[0],
Serializers.for_object(thing_id, instance.__class__.__name__, objekt),
)
if isinstance(return_value[1], bytes):
preserialized_payload = PreserializedData(return_value[1])
# elif isinstance(return_value, PreserializedData):
# if fetch_execution_logs:
# return_value = {
# "return_value" : return_value.value,
# "execution_logs" : list_handler.log_list
# }
# payload = SerializableData(return_value.value, content_type='application/json')
# preserialized_payload = return_value

elif isinstance(return_value, bytes):
payload = SerializableData(None, content_type="application/json")
preserialized_payload = PreserializedData(return_value)
else:
# complete thing execution context
if fetch_execution_logs:
return_value = {
"return_value": return_value,
"execution_logs": list_handler.log_list,
}
payload = SerializableData(
return_value,
Serializers.for_object(thing_id, instance.__class__.__name__, objekt),
)
preserialized_payload = PreserializedData(EMPTY_BYTE, content_type="text/plain")
serializer = Serializers.for_object(thing_id, instance.__class__.__name__, objekt)
rpayload, rpreserialized_payload = self.format_return_value(return_value, serializer=serializer)

# complete thing execution context
if fetch_execution_logs:
rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

# raise any payload errors now
rpayload.require_serialized()

# set reply
scheduler.last_operation_reply = (payload, preserialized_payload, REPLY)
scheduler.last_operation_reply = (rpayload, rpreserialized_payload, REPLY)

except BreakInnerLoop:
# exit the loop and stop the thing
instance.logger.info(
"Thing {} with instance name {} exiting event loop.".format(
instance.__class__.__name__, instance.id
)
"Thing {} with id {} exiting event loop.".format(instance.__class__.__name__, instance.id)
)
return_value = None

# send a reply with None return value
rpayload, rpreserialized_payload = self.format_return_value(None, Serializers.json)

# complete thing execution context
if fetch_execution_logs:
return_value = {
"return_value": None,
"execution_logs": list_handler.log_list,
}
scheduler.last_operation_reply = (
SerializableData(return_value, content_type="application/json"),
PreserializedData(EMPTY_BYTE, content_type="text/plain"),
None,
)
return
rpayload.value = dict(return_value=rpayload.value, execution_logs=list_handler.log_list)

# set reply, let the message broker decide
scheduler.last_operation_reply = (rpayload, rpreserialized_payload, None)

# quit the loop
break

except Exception as ex:
# error occurred while executing the operation
instance.logger.error(
"Thing {} with ID {} produced error : {} - {}.".format(
instance.__class__.__name__, instance.id, type(ex), ex
)
)
return_value = dict(exception=format_exception_as_json(ex))
if fetch_execution_logs:
return_value["execution_logs"] = list_handler.log_list
scheduler.last_operation_reply = (
SerializableData(return_value, content_type="application/json"),
PreserializedData(EMPTY_BYTE, content_type="text/plain"),
ERROR,

# send a reply with error
rpayload, rpreserialized_payload = self.format_return_value(
dict(exception=format_exception_as_json(ex)), Serializers.json
)

# complete thing execution context
if fetch_execution_logs:
rpayload.value["execution_logs"] = list_handler.log_list

# set error reply
scheduler.last_operation_reply = (rpayload, rpreserialized_payload, ERROR)

finally:
# cleanup
if fetch_execution_logs:
instance.logger.removeHandler(list_handler)
instance.logger.debug(
"thing {} with instance name {} completed execution of operation {} on {}".format(
"thing {} with id {} completed execution of operation {} on {}".format(
instance.__class__.__name__, instance.id, operation, objekt
)
)
Expand Down Expand Up @@ -501,17 +477,19 @@ async def execute_operation(
elif operation == Operations.deleteproperty:
prop = instance.properties[objekt] # type: Property
del prop # raises NotImplementedError when deletion is not implemented which is mostly the case
elif operation == Operations.invokeaction and objekt == "get_thing_description":
# special case
if payload is None:
payload = dict()
args = payload.pop("__args__", tuple())
return self.get_thing_description(instance, *args, **payload)
elif operation == Operations.invokeaction:
if payload is None:
payload = dict()
args = payload.pop("__args__", tuple())
# payload then become kwargs
if preserialized_payload != EMPTY_BYTE:
args = (preserialized_payload,) + args
# special case
if objekt == "get_thing_description":
return self.get_thing_description(instance, *args, **payload)
# normal Thing action
action = instance.actions[objekt] # type: BoundAction
if action.execution_info.iscoroutine:
# the actual scheduling as a purely async task is done by the scheduler, not here,
Expand All @@ -528,6 +506,30 @@ async def execute_operation(
"Unimplemented execution path for Thing {} for operation {}".format(instance.id, operation)
)

def format_return_value(
self,
return_value: typing.Any,
serializer: BaseSerializer,
) -> tuple[SerializableData, PreserializedData]:
if (
isinstance(return_value, tuple)
and len(return_value) == 2
and (isinstance(return_value[1], bytes) or isinstance(return_value[1], PreserializedData))
):
payload = SerializableData(return_value[0], serializer=serializer, content_type=serializer.content_type)
if isinstance(return_value[1], bytes):
preserialized_payload = PreserializedData(return_value[1])
elif isinstance(return_value, bytes):
payload = SerializableData(None, content_type="application/json")
preserialized_payload = PreserializedData(return_value)
elif isinstance(return_value, PreserializedData):
payload = SerializableData(None, content_type="application/json")
preserialized_payload = return_value
else:
payload = SerializableData(return_value, serializer=serializer, content_type=serializer.content_type)
preserialized_payload = PreserializedData(EMPTY_BYTE, content_type="text/plain")
return payload, preserialized_payload

async def _process_timeouts(
self,
request_message: RequestMessage,
Expand Down
4 changes: 2 additions & 2 deletions hololinked/schema_validators/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def register_type_replacement(
self, type: typing.Any, json_schema_base_type: str, schema: typing.Optional[JSON] = None
) -> None:
"""
Specify a python type to map to a specific JSON type. Schema only supported for array and objects.
Specify a python type to map to a specific JSON type.

For example:
- `JSONSchema.register_type_replacement(MyCustomObject, 'object', schema=MyCustomObject.schema())`
Expand Down Expand Up @@ -87,7 +87,7 @@ def register_type_replacement(
JSONSchema._schemas[type] = schema
else:
raise TypeError(
f"json schema replacement type must be one of allowed type - 'string', 'object', 'array', 'string', "
"json schema replacement type must be one of allowed type - 'string', 'object', 'array', 'string', "
+ f"'number', 'integer', 'boolean', 'null'. Given value {json_schema_base_type}"
)

Expand Down
9 changes: 8 additions & 1 deletion hololinked/serializers/payloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ class SerializableData:
value: typing.Any
serializer: BaseSerializer | None = None
content_type: str = "application/json"
_serialized: bytes | None = None

def serialize(self):
"""serialize the value"""
if self._serialized is not None:
return self._serialized
if isinstance(self.value, byte_types):
return self.value
if self.serializer is not None:
Expand All @@ -38,6 +41,10 @@ def deserialize(self):
return serializer.loads(self.value)
raise ValueError(f"content type {self.content_type} not supported for deserialization")

def require_serialized(self) -> None:
"""ensure the value is serialized"""
self._serialized = self.serialize()


@dataclass
class PreserializedData:
Expand All @@ -47,4 +54,4 @@ class PreserializedData:
"""

value: bytes
content_type: str = "unknown"
content_type: str = "application/octet-stream"
Loading
Loading