diff --git a/hololinked/client/abstractions.py b/hololinked/client/abstractions.py index 818facc0..b723b453 100644 --- a/hololinked/client/abstractions.py +++ b/hololinked/client/abstractions.py @@ -27,9 +27,9 @@ import builtins import logging import threading -import typing from dataclasses import dataclass +from typing import Any, Callable from ..constants import Operations from ..td import ActionAffordance, EventAffordance, PropertyAffordance @@ -44,16 +44,15 @@ class ConsumedThingAction: def __init__( self, resource: ActionAffordance, - owner_inst: typing.Any, + owner_inst: Any, logger: logging.Logger, - # schema_validator: typing.Type[BaseSchemaValidator] | None = None ) -> None: """ Parameters ---------- resource: ActionAffordance dataclass TD fragment representing the action (must have forms). - owner_inst: typing.Optional[typing.Any] + owner_inst: Any instance of the owning consumed Thing or `ObjectProxy` logger: logging.Logger logger instance @@ -63,7 +62,7 @@ def __init__( self.logger = logger self.schema_validator = None # schema_validator - def get_last_return_value(self, raise_exception: bool = False) -> typing.Any: + def get_last_return_value(self, raise_exception: bool = False) -> Any: """retrieve return value of the last call to the action""" raise NotImplementedError("implement get_last_return_value per protocol") @@ -73,38 +72,38 @@ def get_last_return_value(self, raise_exception: bool = False) -> typing.Any: ) """cached return value of the last call to the method""" - def __call__(self, *args, **kwargs) -> typing.Any: + def __call__(self, *args, **kwargs) -> Any: """ Invoke action/method on server Parameters ---------- - *args: typing.Any + *args: Any arguments to the action - **kwargs: typing.Any + **kwargs: Any keyword arguments to the action Returns ------- - typing.Any + Any reply of the action call """ raise NotImplementedError("implement action __call__ per protocol") - async def async_call(self, *args, **kwargs) -> typing.Any: + async def async_call(self, *args, **kwargs) -> Any: """ async invoke action on server - asynchronous at the network level, may not necessarily be at the server level. Parameters ---------- - *args: typing.Any + *args: Any arguments to the action - **kwargs: typing.Any + **kwargs: Any keyword arguments to the action Returns ------- - typing.Any + Any reply of the action call """ raise NotImplementedError("implement action async_call per protocol") @@ -116,9 +115,9 @@ def oneway(self, *args, **kwargs) -> None: Parameters ---------- - *args: typing.Any + *args: Any arguments to the action - **kwargs: typing.Any + **kwargs: Any keyword arguments to the action """ raise NotImplementedError("implement action oneway call per protocol") @@ -130,9 +129,9 @@ def noblock(self, *args, **kwargs) -> str: Parameters ---------- - *args: typing.Any + *args: Any arguments to the action - **kwargs: typing.Any + **kwargs: Any keyword arguments to the action Returns @@ -142,7 +141,7 @@ def noblock(self, *args, **kwargs) -> str: """ raise NotImplementedError("implement action noblock call per protocol") - def read_reply(self, message_id: str, timeout: float | int | None = None) -> typing.Any: + def read_reply(self, message_id: str, timeout: float | int | None = None) -> Any: """ Read the reply of the action call which was scheduled with `noblock`. @@ -155,7 +154,7 @@ def read_reply(self, message_id: str, timeout: float | int | None = None) -> typ Returns ------- - typing.Any + Any reply of the action call """ raise NotImplementedError("implement action read_reply per protocol") @@ -173,15 +172,13 @@ class ConsumedThingProperty: # property get set abstraction # Dont add doc otherwise __doc__ in slots will conflict with class variable - def __init__( - self, resource: PropertyAffordance, owner_inst: typing.Optional[typing.Any], logger: logging.Logger - ) -> None: + def __init__(self, resource: PropertyAffordance, owner_inst: Any, logger: logging.Logger) -> None: """ Parameters ---------- resource: PropertyAffordance dataclass object TD fragment representing the property (must have forms). - owner_inst: typing.Optional[typing.Any] + owner_inst: Any instance of the owning consumed Thing or `ObjectProxy` logger: logging.Logger logger instance @@ -191,50 +188,50 @@ def __init__( self.logger = logger @property # i.e. cannot have setter - def last_read_value(self) -> typing.Any: + def last_read_value(self) -> Any: """cache of last read value""" raise NotImplementedError("implement last_read_value per protocol") - def set(self, value: typing.Any) -> None: + def set(self, value: Any) -> None: """ Set or write property value. Parameters ---------- - value: typing.Any + value: Any value to set """ raise NotImplementedError("implement property set per protocol") - def get(self) -> typing.Any: + def get(self) -> Any: """ Get or read property value. Returns ------- - typing.Any + Any property value """ raise NotImplementedError("implement property get per protocol") - async def async_set(self, value: typing.Any) -> None: + async def async_set(self, value: Any) -> None: """ Async set or write property value - asynchronous at the network level, may not necessarily be at the server level. Parameters ---------- - value: typing.Any + value: Any value to set """ raise NotImplementedError("implement async property set per protocol") - async def async_get(self) -> typing.Any: + async def async_get(self) -> Any: """ Async get or read property value - asynchronous at the network level, may not necessarily be at the server level. Returns ------- - typing.Any + Any property value """ raise NotImplementedError("implement async property get per protocol") @@ -251,14 +248,14 @@ def noblock_get(self) -> str: """ raise NotImplementedError("implement property noblock get per protocol") - def noblock_set(self, value: typing.Any) -> str: + def noblock_set(self, value: Any) -> str: """ Set or write property value without blocking, i.e. make a request and collect it later and the method returns immediately. Server must return a message ID to identify the request. Parameters ---------- - value: typing.Any + value: Any value to set Returns @@ -268,25 +265,25 @@ def noblock_set(self, value: typing.Any) -> str: """ raise NotImplementedError("implement property noblock set per protocol") - def oneway_set(self, value: typing.Any) -> None: + def oneway_set(self, value: Any) -> None: """ Set property value without waiting for acknowledgement. The server also does not send any reply. There is no guarantee that the property value was set. Parameters ---------- - value: typing.Any + value: Any value to set """ raise NotImplementedError("implement property oneway set per protocol") - def observe(self, *callbacks: typing.Callable) -> None: + def observe(self, *callbacks: Callable) -> None: """ Observe property value changes Parameters ---------- - *callbacks: typing.Callable + *callbacks: Callable callback to call when property value changes """ # looks like this will be unused. observe property is done via ConsumedThingEvent @@ -297,7 +294,7 @@ def unobserve(self) -> None: # looks like this will be unused, observe property is done via ConsumedThingEvent raise NotImplementedError("implement property unobserve per protocol") - def read_reply(self, message_id: str, timeout: float | int | None = None) -> typing.Any: + def read_reply(self, message_id: str, timeout: float | int | None = None) -> Any: """ Read the reply of the property get or set which was scheduled with `noblock`. @@ -310,7 +307,7 @@ def read_reply(self, message_id: str, timeout: float | int | None = None) -> typ Returns ------- - typing.Any + Any reply of the property get or set """ raise NotImplementedError("implement property read_reply per protocol") @@ -324,7 +321,7 @@ def __init__( self, resource: EventAffordance, logger: logging.Logger, - owner_inst: typing.Any, + owner_inst: Any, ) -> None: """ Parameters @@ -333,7 +330,7 @@ def __init__( dataclass object representing the event logger: logging.Logger logger instance - owner_inst: typing.Any + owner_inst: Any the parent object that owns this event """ self.resource = resource @@ -345,7 +342,7 @@ def __init__( def subscribe( self, - callbacks: list[typing.Callable] | typing.Callable, + callbacks: list[Callable] | Callable, asynch: bool = False, concurrent: bool = False, deserialize: bool = True, @@ -356,7 +353,7 @@ def subscribe( Parameters ---------- - callbacks: typing.List[typing.Callable] | typing.Callable + callbacks: list[Callable] | Callable callback or list of callbacks to add asynch: bool whether to start an async(-io task) event listener instead of a threaded listener @@ -387,27 +384,27 @@ def unsubscribe(self): # self._sync_callbacks.clear() # self._async_callbacks.clear() - def listen(self, form: Form, callbacks: list[typing.Callable], concurrent: bool = True, deserialize: bool = True): + def listen(self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True): """ listen to events and call the callbacks """ raise NotImplementedError("implement listen per protocol") async def async_listen( - self, form: Form, callbacks: list[typing.Callable], concurrent: bool = True, deserialize: bool = True + self, form: Form, callbacks: list[Callable], concurrent: bool = True, deserialize: bool = True ): """ listen to events and call the callbacks """ raise NotImplementedError("implement async_listen per protocol") - def schedule_callbacks(self, callbacks, event_data: typing.Any, concurrent: bool = False) -> None: + def schedule_callbacks(self, callbacks, event_data: Any, concurrent: bool = False) -> None: """ schedule the callbacks to be called with the event data Parameters ---------- - event_data: typing.Any + event_data: Any event data to pass to the callbacks concurrent: bool whether to run each callback in a separate thread @@ -422,13 +419,13 @@ def schedule_callbacks(self, callbacks, event_data: typing.Any, concurrent: bool self.logger.error(f"Error occurred in callback {cb}: {ex}") self.logger.exception(ex) - async def async_schedule_callbacks(self, callbacks, event_data: typing.Any, concurrent: bool = False) -> None: + async def async_schedule_callbacks(self, callbacks, event_data: Any, concurrent: bool = False) -> None: """ schedule the callbacks to be called with the event data Parameters ---------- - event_data: typing.Any + event_data: Any event data to pass to the callbacks concurrent: bool whether to run each callback in a separate thread @@ -449,15 +446,13 @@ async def async_schedule_callbacks(self, callbacks, event_data: typing.Any, conc self.logger.error(f"Error occurred in callback {cb}: {ex}") self.logger.exception(ex) - def add_callbacks( - self, callbacks: typing.Union[typing.List[typing.Callable], typing.Callable], asynch: bool = False - ) -> None: + def add_callbacks(self, callbacks: list[Callable] | Callable, asynch: bool = False) -> None: """ add callbacks to the event Parameters ---------- - *callbacks: typing.List[typing.Callable] | typing.Callable + *callbacks: list[Callable] | Callable callback or list of callbacks to add """ raise NotImplementedError( @@ -466,7 +461,7 @@ def add_callbacks( # for logic, see tag v0.3.2 -def raise_local_exception(error_message: typing.Dict[str, typing.Any]) -> None: +def raise_local_exception(error_message: dict[str, Any]) -> None: """ raises an exception on client side using an exception from server, using a mapping based on exception type (currently only python built-in exceptions supported). If the exception type is not found, a generic `Exception` is raised. @@ -525,11 +520,11 @@ def __init__(self): def clear(self): """reset to default/empty values""" self.event = "message" # type: str - self.data = "" # type: typing.Any - self.id = None # type: typing.Optional[str] - self.retry = None # type: typing.Optional[int] + self.data = "" # type: Any + self.id = None # type: str | None + self.retry = None # type: int | None - def flush(self) -> typing.Optional[dict]: + def flush(self) -> dict[str, Any] | None: """obtain the event payload as dictionary and reset to default values""" if not self.data and self.id is None: return None diff --git a/hololinked/client/factory.py b/hololinked/client/factory.py index 48e78c34..b2b371a1 100644 --- a/hololinked/client/factory.py +++ b/hololinked/client/factory.py @@ -122,7 +122,7 @@ def zmq( ignore_errors=ignore_TD_errors, protocol=access_point.split("://")[0].upper() if access_point else "IPC", skip_names=skip_interaction_affordances, - ) # typing.Dict[str, typing.Any] + ) # dict[str, Any] # create ObjectProxy object_proxy = ObjectProxy( diff --git a/hololinked/client/http/consumed_interactions.py b/hololinked/client/http/consumed_interactions.py index 6dc1a206..479dc7f5 100644 --- a/hololinked/client/http/consumed_interactions.py +++ b/hololinked/client/http/consumed_interactions.py @@ -6,7 +6,6 @@ import contextlib import logging import threading -import typing from copy import deepcopy from typing import Any, AsyncIterator, Callable, Iterator @@ -122,7 +121,7 @@ def __init__( async_client: httpx.AsyncClient = None, invokation_timeout: int = 5, execution_timeout: int = 5, - owner_inst: typing.Any = None, + owner_inst: Any = None, logger: logging.Logger = None, ) -> None: ConsumedThingAction.__init__(self=self, resource=resource, owner_inst=owner_inst, logger=logger) @@ -207,7 +206,7 @@ def __init__( async_client: httpx.AsyncClient = None, invokation_timeout: int = 5, execution_timeout: int = 5, - owner_inst: typing.Any = None, + owner_inst: Any = None, logger: logging.Logger = None, ) -> None: ConsumedThingProperty.__init__(self=self, resource=resource, owner_inst=owner_inst, logger=logger) @@ -330,7 +329,7 @@ def __init__( async_client: httpx.AsyncClient = None, invokation_timeout: int = 5, execution_timeout: int = 5, - owner_inst: typing.Any = None, + owner_inst: Any = None, logger: logging.Logger = None, ) -> None: ConsumedThingEvent.__init__(self, resource=resource, owner_inst=owner_inst, logger=logger) diff --git a/hololinked/client/proxy.py b/hololinked/client/proxy.py index d5db35b6..1c487c9b 100644 --- a/hololinked/client/proxy.py +++ b/hololinked/client/proxy.py @@ -1,5 +1,6 @@ import base64 -import typing + +from typing import Any, Callable import structlog @@ -56,10 +57,10 @@ def __init__(self, id: str, **kwargs) -> None: """ self.id = id self._allow_foreign_attributes = kwargs.get("allow_foreign_attributes", False) - self._noblock_messages = dict() # type: typing.Dict[str, ConsumedThingAction | ConsumedThingProperty] + self._noblock_messages = dict() # type: dict[str, ConsumedThingAction | ConsumedThingProperty] self._schema_validator = kwargs.get("schema_validator", None) self.logger = kwargs.pop("logger", structlog.get_logger()) - self.td = kwargs.get("td", dict()) # type: typing.Dict[str, typing.Any] + self.td = kwargs.get("td", dict()) # type: dict[str, Any] self._auth_header = None username = kwargs.get("username") @@ -68,13 +69,13 @@ def __init__(self, id: str, **kwargs) -> None: token = f"{username}:{password}".encode("utf-8") self._auth_header = {"Authorization": f"Basic {base64.b64encode(token).decode('utf-8')}"} - def __getattribute__(self, __name: str) -> typing.Any: + def __getattribute__(self, __name: str) -> Any: obj = super().__getattribute__(__name) if isinstance(obj, ConsumedThingProperty): return obj.get() return obj - def __setattr__(self, __name: str, __value: typing.Any) -> None: + def __setattr__(self, __name: str, __value: Any) -> None: if ( __name in ObjectProxy._own_attrs or (__name not in self.__dict__ and isinstance(__value, ObjectProxy.__allowed_attribute_types__)) @@ -120,7 +121,7 @@ def __hash__(self) -> int: # with the given name is supported in this Protocol Binding client.""" # raise NotImplementedError() - def invoke_action(self, name: str, *args, **kwargs) -> typing.Any: + def invoke_action(self, name: str, *args, **kwargs) -> Any: """ invoke an action specified by name on the server with positional/keyword arguments @@ -161,7 +162,7 @@ def invoke_action(self, name: str, *args, **kwargs) -> typing.Any: else: return action(*args, **kwargs) - async def async_invoke_action(self, name: str, *args, **kwargs) -> typing.Any: + async def async_invoke_action(self, name: str, *args, **kwargs) -> Any: """ async(io) call an action specified by name on the server with positional/keyword arguments. `noblock` and `oneway` are not supported for async calls. @@ -192,7 +193,7 @@ async def async_invoke_action(self, name: str, *args, **kwargs) -> typing.Any: raise AttributeError(f"No remote action named {name}") return await action.async_call(*args, **kwargs) - def read_property(self, name: str, noblock: bool = False) -> typing.Any: + def read_property(self, name: str, noblock: bool = False) -> Any: """ read property specified by name on server. @@ -218,7 +219,7 @@ def read_property(self, name: str, noblock: bool = False) -> typing.Any: else: return prop.get() - def write_property(self, name: str, value: typing.Any, oneway: bool = False, noblock: bool = False) -> None: + def write_property(self, name: str, value: Any, oneway: bool = False, noblock: bool = False) -> None: """ write property specified by name on server with given value. @@ -272,7 +273,7 @@ async def async_read_property(self, name: str) -> None: raise AttributeError(f"No property named {name}") return await prop.async_get() - async def async_write_property(self, name: str, value: typing.Any) -> None: + async def async_write_property(self, name: str, value: Any) -> None: """ async(io) write property specified by name on server with specified value. `noblock` and `oneway` are not supported for async calls. @@ -296,7 +297,7 @@ async def async_write_property(self, name: str, value: typing.Any) -> None: raise AttributeError(f"No property named {name}") await prop.async_set(value) - def read_multiple_properties(self, names: typing.List[str], noblock: bool = False) -> typing.Any: + def read_multiple_properties(self, names: list[str], noblock: bool = False) -> Any: """ read properties specified by list of names. @@ -331,7 +332,7 @@ def write_multiple_properties( self, oneway: bool = False, noblock: bool = False, - **properties: typing.Dict[str, typing.Any], + **properties: dict[str, Any], ) -> None: """ write properties whose name is specified by keys of a dictionary @@ -365,7 +366,7 @@ def write_multiple_properties( else: return method(**properties) - async def async_read_multiple_properties(self, names: typing.List[str]) -> None: + async def async_read_multiple_properties(self, names: list[str]) -> None: """ async(io) read properties specified by list of names. `noblock` reads are not supported for asyncio. @@ -385,7 +386,7 @@ async def async_read_multiple_properties(self, names: typing.List[str]) -> None: raise RuntimeError("Client did not load server resources correctly. Report issue at github.") return await method.async_call(names=names) - async def async_write_multiple_properties(self, **properties: dict[str, typing.Any]) -> None: + async def async_write_multiple_properties(self, **properties: dict[str, Any]) -> None: """ async(io) write properties whose name is specified by keys of a dictionary @@ -411,7 +412,7 @@ async def async_write_multiple_properties(self, **properties: dict[str, typing.A def observe_property( self, name: str, - callbacks: typing.Union[typing.List[typing.Callable], typing.Callable], + callbacks: list[Callable] | Callable, asynch: bool = False, concurrent: bool = False, deserialize: bool = True, @@ -471,7 +472,7 @@ def unobserve_property(self, name: str) -> None: def subscribe_event( self, name: str, - callbacks: typing.Union[typing.List[typing.Callable], typing.Callable], + callbacks: list[Callable] | Callable, asynch: bool = False, concurrent: bool = False, deserialize: bool = True, @@ -534,7 +535,7 @@ def unsubscribe_event(self, name: str) -> None: raise AttributeError(f"No event named {name}") event.unsubscribe() - def read_reply(self, message_id: str, timeout: typing.Optional[float] = 5.0) -> typing.Any: + def read_reply(self, message_id: str, timeout: float | None = 5.0) -> Any: """ read reply of no block calls of an action or a property read/write. @@ -551,17 +552,17 @@ def read_reply(self, message_id: str, timeout: typing.Optional[float] = 5.0) -> return obj.read_reply(message_id=message_id, timeout=timeout) @property - def properties(self) -> typing.List[ConsumedThingProperty]: + def properties(self) -> list[ConsumedThingProperty]: """list of properties that were consumed from the Thing Description""" return [prop for prop in self.__dict__.values() if isinstance(prop, ConsumedThingProperty)] @property - def actions(self) -> typing.List[ConsumedThingAction]: + def actions(self) -> list[ConsumedThingAction]: """list of actions that were consumed from the Thing Description""" return [action for action in self.__dict__.values() if isinstance(action, ConsumedThingAction)] @property - def events(self) -> typing.List[ConsumedThingEvent]: + def events(self) -> list[ConsumedThingEvent]: """list of events that were consumed from the Thing Description""" return [event for event in self.__dict__.values() if isinstance(event, ConsumedThingEvent)] @@ -571,7 +572,7 @@ def thing_id(self) -> str: return self.td.get("id", None) @property - def TD(self) -> typing.Dict[str, typing.Any]: + def TD(self) -> dict[str, Any]: """Thing Description of the consuimed thing""" return self.td diff --git a/hololinked/client/zmq/consumed_interactions.py b/hololinked/client/zmq/consumed_interactions.py index 4c69f413..9fa3dfb8 100644 --- a/hololinked/client/zmq/consumed_interactions.py +++ b/hololinked/client/zmq/consumed_interactions.py @@ -2,10 +2,11 @@ import logging import threading import traceback -import typing import uuid import warnings +from typing import Any, Callable + from ...client.abstractions import ( SSE, ConsumedThingAction, @@ -63,7 +64,6 @@ def __init__( sync_client: SyncZMQClient, async_client: AsyncZMQClient | None = None, **kwargs, - # schema_validator: typing.Type[BaseSchemaValidator] | None = None ) -> None: """ Parameters @@ -85,9 +85,9 @@ def __init__( self._invokation_timeout = kwargs.get("invokation_timeout", 5.0) self._execution_timeout = kwargs.get("execution_timeout", 5.0) self._thing_execution_context = dict(fetch_execution_logs=False) - self._last_zmq_response = None # type: typing.Optional[ResponseMessage] + self._last_zmq_response = None # type: ResponseMessage | None - def get_last_return_value(self, response: ResponseMessage, raise_exception: bool = False) -> typing.Any: + def get_last_return_value(self, response: ResponseMessage, raise_exception: bool = False) -> Any: """ cached return value of the last operation performed. @@ -115,7 +115,7 @@ def last_zmq_response(self) -> ResponseMessage: """cache of last message received for this property""" return self._last_zmq_response - def read_reply(self, message_id: str, timeout: int = None) -> typing.Any: + def read_reply(self, message_id: str, timeout: int = None) -> Any: if self.owner_inst._noblock_messages.get(message_id) != self: raise RuntimeError(f"Message ID {message_id} does not belong to this property.") response = self._sync_zmq_client.recv_response(message_id=message_id) @@ -134,10 +134,9 @@ def __init__( resource: ActionAffordance, sync_client: SyncZMQClient, async_client: AsyncZMQClient, - owner_inst: typing.Any, + owner_inst: Any, logger: logging.Logger, **kwargs, - # schema_validator: typing.Type[BaseSchemaValidator] | None = None ) -> None: """ Parameters @@ -148,7 +147,7 @@ def __init__( synchronous ZMQ client async_zmq_client: AsyncZMQClient asynchronous ZMQ client for async calls - owner_inst: typing.Any + owner_inst: Any the parent object that owns this action logger: logging.Logger logger instance @@ -162,7 +161,7 @@ def __init__( doc="cached return value of the last call to the method", ) - def __call__(self, *args, **kwargs) -> typing.Any: + def __call__(self, *args, **kwargs) -> Any: if len(args) > 0: kwargs["__args__"] = args elif self.schema_validator: @@ -183,7 +182,7 @@ def __call__(self, *args, **kwargs) -> typing.Any: self._last_zmq_response = response return ZMQConsumedAffordanceMixin.get_last_return_value(self, response, True) - async def async_call(self, *args, **kwargs) -> typing.Any: + async def async_call(self, *args, **kwargs) -> Any: if not self._async_zmq_client: raise RuntimeError("async calls not possible as async_mixin was not set True at __init__()") if len(args) > 0: @@ -263,7 +262,7 @@ def __init__( resource: PropertyAffordance, sync_client: SyncZMQClient, async_client: AsyncZMQClient, - owner_inst: typing.Any, + owner_inst: Any, logger: logging.Logger, **kwargs, ) -> None: @@ -276,7 +275,7 @@ def __init__( synchronous ZMQ client async_client: AsyncZMQClient asynchronous ZMQ client for async calls - owner_inst: typing.Any + owner_inst: Any the parent object that owns this property logger: logging.Logger logger instance for logging @@ -290,7 +289,7 @@ def __init__( doc="cached return value of the last call to the method", ) - def set(self, value: typing.Any) -> None: + def set(self, value: Any) -> None: response = self._sync_zmq_client.execute( thing_id=self.resource.thing_id, objekt=self.resource.name, @@ -309,7 +308,7 @@ def set(self, value: typing.Any) -> None: self._last_zmq_response = response ZMQConsumedAffordanceMixin.get_last_return_value(self, response, True) - def get(self) -> typing.Any: + def get(self) -> Any: response = self._sync_zmq_client.execute( thing_id=self.resource.thing_id, objekt=self.resource.name, @@ -323,7 +322,7 @@ def get(self) -> typing.Any: self._last_zmq_response = response return ZMQConsumedAffordanceMixin.get_last_return_value(self, response, True) - async def async_set(self, value: typing.Any) -> None: + async def async_set(self, value: Any) -> None: if not self._async_zmq_client: raise RuntimeError("async calls not possible as async_mixin was not set at __init__()") response = await self._async_zmq_client.async_execute( @@ -344,7 +343,7 @@ async def async_set(self, value: typing.Any) -> None: self._last_zmq_response = response ZMQConsumedAffordanceMixin.get_last_return_value(self, response, True) - async def async_get(self) -> typing.Any: + async def async_get(self) -> Any: if not self._async_zmq_client: raise RuntimeError("async calls not possible as async_mixin was not set at __init__()") response = await self._async_zmq_client.async_execute( @@ -360,7 +359,7 @@ async def async_get(self) -> typing.Any: self._last_zmq_response = response return ZMQConsumedAffordanceMixin.get_last_return_value(self, response, True) - def oneway_set(self, value: typing.Any) -> None: + def oneway_set(self, value: Any) -> None: self._sync_zmq_client.send_request( thing_id=self.resource.thing_id, objekt=self.resource.name, @@ -391,7 +390,7 @@ def noblock_get(self) -> str: self.owner_inst._noblock_messages[msg_id] = self return msg_id - def noblock_set(self, value: typing.Any) -> None: + def noblock_set(self, value: Any) -> None: msg_id = self._sync_zmq_client.send_request( thing_id=self.resource.thing_id, objekt=self.resource.name, @@ -422,7 +421,7 @@ def __init__( self, resource: EventAffordance, logger: logging.Logger, - owner_inst: typing.Any, + owner_inst: Any, **kwargs, ) -> None: ConsumedThingEvent.__init__( @@ -433,7 +432,7 @@ def __init__( ) ZMQConsumedAffordanceMixin.__init__(self, sync_client=None, async_client=None, **kwargs) - def listen(self, form: Form, callbacks: list[typing.Callable], concurrent: bool, deserialize: bool) -> None: + def listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None: sync_event_client = EventConsumer( id=f"{self.resource.thing_id}|{self.resource.name}|sync|{uuid.uuid4().hex[:8]}", event_unique_identifier=f"{self.resource.thing_id}/{self.resource.name}", @@ -466,9 +465,7 @@ def listen(self, form: Form, callbacks: list[typing.Callable], concurrent: bool, category=RuntimeWarning, ) - async def async_listen( - self, form: Form, callbacks: list[typing.Callable], concurrent: bool, deserialize: bool - ) -> None: + async def async_listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None: async_event_client = AsyncEventConsumer( id=f"{self.resource.thing_id}|{self.resource.name}|async|{uuid.uuid4().hex[:8]}", event_unique_identifier=f"{self.resource.thing_id}/{self.resource.name}", @@ -520,7 +517,7 @@ def __init__( self, sync_client: SyncZMQClient, async_client: AsyncZMQClient | None = None, - owner_inst: typing.Optional[typing.Any] = None, + owner_inst: Any = None, **kwargs, ) -> None: action = Thing._set_properties # type: Action @@ -544,7 +541,7 @@ def __init__( self, sync_client: SyncZMQClient, async_client: AsyncZMQClient | None = None, - owner_inst: typing.Optional[typing.Any] = None, + owner_inst: Any = None, **kwargs, ) -> None: action = Thing._get_properties # type: Action diff --git a/hololinked/config.py b/hololinked/config.py index 3b70498f..70e34716 100644 --- a/hololinked/config.py +++ b/hololinked/config.py @@ -28,9 +28,10 @@ import logging import os import tempfile -import typing import warnings +from typing import Any # noqa: F401 + import zmq.asyncio @@ -146,7 +147,7 @@ def load_variables(self, use_environment: bool = False): warnings.warn("no environment file found although asked to load from one", UserWarning) return with open(file, "r") as file: - config = json.load(file) # type: typing.Dict + config = json.load(file) # type: dict[str, Any] for item, value in config.items(): setattr(self, item, value) @@ -181,8 +182,8 @@ def zmq_context(self) -> zmq.asyncio.Context: def set_default_server_execution_context( self, - invokation_timeout: typing.Optional[int] = None, - execution_timeout: typing.Optional[int] = None, + invokation_timeout: int | None = None, + execution_timeout: int | None = None, oneway: bool = False, ) -> None: """Sets the default server execution context for the application""" diff --git a/hololinked/core/actions.py b/hololinked/core/actions.py index 9c2a91af..31df9fcb 100644 --- a/hololinked/core/actions.py +++ b/hololinked/core/actions.py @@ -1,9 +1,9 @@ -import typing import warnings from enum import Enum from inspect import getfullargspec, iscoroutinefunction from types import FunctionType, MethodType +from typing import Any import jsonschema @@ -141,7 +141,7 @@ def __post_init__(self): # stored only for reference, hardly used. self._execution_info: ActionInfoValidator - def validate_call(self, args, kwargs: typing.Dict[str, typing.Any]) -> None: + def validate_call(self, args, kwargs: dict[str, Any]) -> None: """ Validate the call to the action, like payload, state machine state etc. Errors are raised as exceptions. diff --git a/hololinked/core/dataklasses.py b/hololinked/core/dataklasses.py index 62e113a4..a5ecb768 100644 --- a/hololinked/core/dataklasses.py +++ b/hololinked/core/dataklasses.py @@ -3,10 +3,9 @@ resources on the network. These classese are generally not for consumption by the package-end-user. """ -import typing - from enum import Enum from types import FunctionType, MethodType +from typing import Any from pydantic import BaseModel, RootModel @@ -45,7 +44,7 @@ class RemoteResourceInfoValidator: accept_list=True, accept_item=True, doc="State machine state at which a callable will be executed or attribute/property can be written.", - ) # type: typing.Tuple[typing.Union[Enum, str]] + ) # type: tuple[Enum | str] obj = ClassSelector( default=None, @@ -156,7 +155,7 @@ def _set_return_value_schema(self, value): value = self.action_payload_schema_validator(value) setattr(self, "_return_value_schema", value) - def action_payload_schema_validator(self, value: typing.Any) -> typing.Any: + def action_payload_schema_validator(self, value: Any) -> Any: if value is None or isinstance(value, dict) or issubklass(value, (BaseModel, RootModel)): return value raise TypeError("Schema must be None, a dict, or a subclass of BaseModel or RootModel") diff --git a/hololinked/core/events.py b/hololinked/core/events.py index 55bc9964..907295ed 100644 --- a/hololinked/core/events.py +++ b/hololinked/core/events.py @@ -1,4 +1,4 @@ -import typing +from typing import Any, overload import jsonschema @@ -21,9 +21,9 @@ class Event: def __init__( self, - doc: typing.Optional[str] = None, - schema: typing.Optional[JSON] = None, - label: typing.Optional[str] = None, + doc: str | None = None, + schema: JSON | None = None, + label: str | None = None, ) -> None: """ Parameters @@ -49,7 +49,7 @@ def __set_name__(self, owner: ParameterizedMetaclass, name: str) -> None: self.name = name self.owner = owner - @typing.overload + @overload def __get__(self, obj, objtype) -> "EventDispatcher": ... def __get__(self, obj: Parameterized, objtype: ParameterizedMetaclass = None): @@ -98,7 +98,11 @@ class EventDispatcher: __slots__ = ["_unique_identifier", "_publisher", "_owner_inst", "_descriptor"] def __init__( - self, unique_identifier: str, publisher: "EventPublisher", owner_inst: ParameterizedMetaclass, descriptor: Event + self, + unique_identifier: str, + publisher: "EventPublisher", + owner_inst: ParameterizedMetaclass, + descriptor: Event, ) -> None: self._unique_identifier = unique_identifier self._owner_inst = owner_inst @@ -119,7 +123,7 @@ def publisher(self, value: "EventPublisher") -> None: if self._publisher is not None: self._publisher.register(self) - def push(self, data: typing.Any) -> None: + def push(self, data: Any) -> None: """ publish the event. @@ -130,7 +134,7 @@ def push(self, data: typing.Any) -> None: """ self.publisher.publish(self, data=data) - def receive_acknowledgement(self, timeout: typing.Union[float, int, None]) -> bool: + def receive_acknowledgement(self, timeout: float | int | None) -> bool: """ Unimplemented. diff --git a/hololinked/core/logger.py b/hololinked/core/logger.py index 175a020d..361ef08d 100644 --- a/hololinked/core/logger.py +++ b/hololinked/core/logger.py @@ -3,9 +3,9 @@ import logging import threading import time -import typing from collections import deque +from typing import Any # noqa: F401 import structlog @@ -209,24 +209,24 @@ async def _async_push_diff_logs(self) -> None: self.diff_logs.clear() # self.owner.logger.info("ending log events.") - debug_logs = List(default=[], readonly=True, fget=lambda self: self._debug_logs, doc="logs at logging.DEBUG level") # type: list[typing.Dict[str, typing.Any]] + debug_logs = List(default=[], readonly=True, fget=lambda self: self._debug_logs, doc="logs at logging.DEBUG level") # type: list[dict[str, Any]] - warn_logs = List(default=[], readonly=True, fget=lambda self: self._warn_logs, doc="logs at logging.WARN level") # type: list[typing.Dict[str, typing.Any]] + warn_logs = List(default=[], readonly=True, fget=lambda self: self._warn_logs, doc="logs at logging.WARN level") # type: list[dict[str, Any]] - info_logs = List(default=[], readonly=True, fget=lambda self: self._info_logs, doc="logs at logging.INFO level") # type: list[typing.Dict[str, typing.Any]] + info_logs = List(default=[], readonly=True, fget=lambda self: self._info_logs, doc="logs at logging.INFO level") # type: list[dict[str, Any]] - error_logs = List(default=[], readonly=True, fget=lambda self: self._error_logs, doc="logs at logging.ERROR level") # type: list[typing.Dict[str, typing.Any]] + error_logs = List(default=[], readonly=True, fget=lambda self: self._error_logs, doc="logs at logging.ERROR level") # type: list[dict[str, Any]] critical_logs = List( default=[], readonly=True, fget=lambda self: self._critical_logs, doc="logs at logging.CRITICAL level" - ) # type: list[typing.Dict[str, typing.Any]] + ) # type: list[dict[str, Any]] execution_logs = List( default=[], readonly=True, fget=lambda self: self._execution_logs, doc="logs at all levels accumulated in order of collection/execution", - ) # type: list[typing.Dict[str, typing.Any]] + ) # type: list[dict[str, Any]] def prepare_object_logger(instance: RemoteObject, log_level: int, log_file: str, remote_access: bool = False) -> None: @@ -283,7 +283,7 @@ class LogHistoryHandler(logging.Handler): execution logs that were collected during a specific operation. """ - def __init__(self, log_list: typing.Optional[typing.List] = None): + def __init__(self, log_list: list | None = None): """ Parameters ---------- @@ -292,7 +292,7 @@ def __init__(self, log_list: typing.Optional[typing.List] = None): Initial set of log entries to start with. Optional, defaults to empty list. """ super().__init__() - self.log_list: typing.List[typing.Dict] = [] if not log_list else log_list + self.log_list: list[dict] = [] if not log_list else log_list def emit(self, record: logging.LogRecord): # log_entry = self.format(record) diff --git a/hololinked/core/meta.py b/hololinked/core/meta.py index 8b4aebb7..3e793c94 100644 --- a/hololinked/core/meta.py +++ b/hololinked/core/meta.py @@ -1,8 +1,8 @@ import copy import inspect -import typing from types import FunctionType +from typing import Any, KeysView, Type from ..constants import JSON, JSONSerializable from ..param.parameterized import EventDispatcher as ParamEventDispatcher @@ -129,22 +129,22 @@ def _qualified_prefix(self) -> str: return prefix @property - def descriptor_object(self) -> type[Property | Action | Event]: + def descriptor_object(self) -> Type[Property | Action | Event]: """The type of descriptor object that this registry holds, i.e. `Property`, `Action` or `Event`""" raise NotImplementedError("Implement descriptor_object in subclass") @property - def descriptors(self) -> typing.Dict[str, type[Property | Action | Event]]: + def descriptors(self) -> dict[str, Type[Property | Action | Event]]: """A dictionary with all the descriptors as values and their names as keys.""" raise NotImplementedError("Implement descriptors in subclass") @property - def names(self) -> typing.KeysView[str]: + def names(self) -> KeysView[str]: """The names of the descriptors objects as a dictionary key view""" return self.descriptors.keys() @property - def values(self) -> typing.Dict[str, typing.Any]: + def values(self) -> dict[str, Any]: """ The values contained within the descriptors after reading when accessed at instance level, otherwise, the descriptor objects as dictionary when accessed at class level. @@ -174,7 +174,7 @@ def __contains__(self, obj: Property | Action | Event) -> bool: """Returns `True` if the descriptor object is in the descriptors dictionary.""" raise NotImplementedError("contains not implemented yet") - def __dir__(self) -> typing.List[str]: + def __dir__(self) -> list[str]: """Adds descriptor object to the dir""" return super().__dir__() + self.descriptors.keys() # type: ignore @@ -194,7 +194,7 @@ def __str__(self) -> int: return f"" return f"" - def get_descriptors(self, recreate: bool = False) -> typing.Dict[str, Property | Action | Event]: + def get_descriptors(self, recreate: bool = False) -> dict[str, Property | Action | Event]: """ a dictionary with all the descriptors as values and their names as keys. @@ -224,7 +224,7 @@ def get_descriptors(self, recreate: bool = False) -> typing.Dict[str, Property | # and parameters are rarely added (and cannot be deleted) return descriptors - def get_values(self) -> typing.Dict[str, typing.Any]: + def get_values(self) -> dict[str, Any]: """ the values contained within the descriptors after reading when accessed at instance level, otherwise, the descriptor objects as dictionary when accessed at class level. @@ -295,16 +295,16 @@ def __init__(self, owner_cls: ThingMeta, owner_class_members: dict, owner_inst=N self.event_dispatcher.prepare_instance_dependencies() @property - def descriptor_object(self) -> type[Parameter]: + def descriptor_object(self) -> Type[Parameter]: return Parameter @property - def descriptors(self) -> typing.Dict[str, Parameter]: + def descriptors(self) -> dict[str, Parameter]: if self.owner_inst is None: return super().get_descriptors() return dict(super().get_descriptors(), **self._instance_params) - values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: typing.Dict[str, Parameter | Property | typing.Any] + values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: dict[str, Parameter | Property | Any] def __getitem__(self, key: str) -> Property | Parameter: return self.descriptors[key] @@ -313,7 +313,7 @@ def __contains__(self, value: str | Property | Parameter) -> bool: return value in self.descriptors.values() or value in self.descriptors @property - def defaults(self) -> typing.Dict[str, typing.Any]: + def defaults(self) -> dict[str, Any]: """default values of all properties as a dictionary with property names as keys""" defaults = {} for key, val in self.descriptors.items(): @@ -321,7 +321,7 @@ def defaults(self) -> typing.Dict[str, typing.Any]: return defaults @property - def remote_objects(self) -> typing.Dict[str, Property]: + def remote_objects(self) -> dict[str, Property]: """ dictionary of properties that are remotely accessible (`remote=True`), which is also a default setting for all properties @@ -347,7 +347,7 @@ def remote_objects(self) -> typing.Dict[str, Property]: return remote_props @property - def db_objects(self) -> typing.Dict[str, Property]: + def db_objects(self) -> dict[str, Property]: """ dictionary of properties that are stored or loaded from the database (`db_init`, `db_persist` or `db_commit` set to `True`) @@ -370,7 +370,7 @@ def db_objects(self) -> typing.Dict[str, Property]: return db_props @property - def db_init_objects(self) -> typing.Dict[str, Property]: + def db_init_objects(self) -> dict[str, Property]: """dictionary of properties that are initialized from the database (`db_init` or `db_persist` set to `True`)""" try: return getattr( @@ -391,7 +391,7 @@ def db_init_objects(self) -> typing.Dict[str, Property]: return db_init_props @property - def db_commit_objects(self) -> typing.Dict[str, Property]: + def db_commit_objects(self) -> dict[str, Property]: """dictionary of properties that are committed to the database (`db_commit` or `db_persist` set to `True`)""" try: return getattr( @@ -412,7 +412,7 @@ def db_commit_objects(self) -> typing.Dict[str, Property]: return db_commit_props @property - def db_persisting_objects(self) -> typing.Dict[str, Property]: + def db_persisting_objects(self) -> dict[str, Property]: """dictionary of properties that are persisted through the database (`db_persist` set to `True`)""" try: return getattr( @@ -432,13 +432,13 @@ def db_persisting_objects(self) -> typing.Dict[str, Property]: ) return db_persisting_props - def get(self, **kwargs) -> typing.Dict[str, typing.Any]: + def get(self, **kwargs) -> dict[str, Any]: """ read properties from the object, implements WoT operations `readAllProperties` and `readMultipleProperties` Parameters ---------- - **kwargs: typing.Dict[str, typing.Any] + **kwargs: dict[str, Any] - names: `List[str]` list of property names to be fetched @@ -449,7 +449,7 @@ def get(self, **kwargs) -> typing.Dict[str, typing.Any]: Returns ------- - typing.Dict[str, typing.Any] + dict[str, Any] dictionary of property names and their values Raises @@ -492,14 +492,14 @@ def get(self, **kwargs) -> typing.Dict[str, typing.Any]: data[rename] = prop.__get__(self.owner_inst, self.owner_cls) return data - def set(self, **values: typing.Dict[str, typing.Any]) -> None: + def set(self, **values: dict[str, Any]) -> None: """ set properties whose name is specified by keys of a dictionary; implements WoT operations `writeMultipleProperties` or `writeAllProperties`. Parameters ---------- - values: typing.Dict[str, typing.Any] + values: dict[str, Any] dictionary of property names and its new values Raises @@ -560,18 +560,18 @@ def clear(self): pass @supports_only_instance_access("database operations are only supported at instance level") - def get_from_DB(self) -> typing.Dict[str, typing.Any]: + def get_from_DB(self) -> dict[str, Any]: """ get all properties (i.e. their values) currently stored in the database Returns ------- - Dict[str, typing.Any] + dict[str, Any] dictionary of property names and their values """ if not hasattr(self.owner_inst, "db_engine"): raise AttributeError("database engine not set, this object is not connected to a database") - props = self.owner_inst.db_engine.get_all_properties() # type: typing.Dict + props = self.owner_inst.db_engine.get_all_properties() # type: dict final_list = {} for name, prop in props.items(): try: @@ -607,7 +607,7 @@ def load_from_DB(self): self.owner_inst.logger.error(f"could not set attribute {db_prop} due to error {str(ex)}") @classmethod - def get_type_from_name(cls, name: str) -> typing.Type[Property]: + def get_type_from_name(cls, name: str) -> Type[Property]: return Property @supports_only_instance_access("additional property setup is required only for instances") @@ -675,12 +675,12 @@ class ActionsRegistry(DescriptorRegistry): """ @property - def descriptor_object(self) -> type[Action]: + def descriptor_object(self) -> Type[Action]: return Action - descriptors = property(DescriptorRegistry.get_descriptors) # type: typing.Dict[str, Action] + descriptors = property(DescriptorRegistry.get_descriptors) # type: dict[str, Action] - values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: typing.Dict[str, Action] + values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: dict[str, Action] def __getitem__(self, key: str) -> Action | BoundAction: if self.owner_inst is not None: @@ -702,9 +702,9 @@ class EventsRegistry(DescriptorRegistry): def descriptor_object(self): return Event - descriptors = property(DescriptorRegistry.get_descriptors) # type: typing.Dict[str, Event] + descriptors = property(DescriptorRegistry.get_descriptors) # type: dict[str, Event] - values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: typing.Dict[str, EventDispatcher] + values = property(DescriptorRegistry.get_values, doc=DescriptorRegistry.get_values.__doc__) # type: dict[str, EventDispatcher] def __getitem__(self, key: str) -> Event | EventDispatcher: if self.owner_inst is not None: @@ -726,7 +726,7 @@ def clear(self): pass @property - def plain(self) -> typing.Dict[str, Event]: + def plain(self) -> dict[str, Event]: """dictionary of events that are not change events (i.e., not observable)""" try: return getattr( @@ -746,7 +746,7 @@ def plain(self) -> typing.Dict[str, Event]: return non_change_events @property - def change_events(self) -> typing.Dict[str, Event]: + def change_events(self) -> dict[str, Event]: """dictionary of change events belonging to observable properties""" try: return getattr( @@ -767,7 +767,7 @@ def change_events(self) -> typing.Dict[str, Event]: return change_events @property - def observables(self) -> typing.Dict[str, Property]: + def observables(self) -> dict[str, Property]: """dictionary of all properties that are observable, i.e. that which push change events""" try: return getattr( @@ -818,12 +818,12 @@ def properties(self) -> PropertiesRegistry: # Affordance object associated with it i.e _get_properties.to_affordance() function needs to work. # TODO - fix this anomaly @action() - def _get_properties(self, **kwargs) -> typing.Dict[str, typing.Any]: + def _get_properties(self, **kwargs) -> dict[str, Any]: """ """ return self.properties.get(**kwargs) @action() - def _set_properties(self, **values: typing.Dict[str, typing.Any]) -> None: + def _set_properties(self, **values: dict[str, Any]) -> None: """ set properties whose name is specified by keys of a dictionary @@ -835,7 +835,7 @@ def _set_properties(self, **values: typing.Dict[str, typing.Any]) -> None: return self.properties.set(**values) # returns None @action() - def _get_properties_in_db(self) -> typing.Dict[str, JSONSerializable]: + def _get_properties_in_db(self) -> dict[str, JSONSerializable]: """ get all properties in the database diff --git a/hololinked/core/property.py b/hololinked/core/property.py index 536fba09..a70d8d84 100644 --- a/hololinked/core/property.py +++ b/hololinked/core/property.py @@ -1,6 +1,7 @@ -import typing +from __future__ import annotations from enum import Enum +from typing import Any, Callable, Type from pydantic import BaseModel, ConfigDict, RootModel, create_model @@ -35,29 +36,29 @@ class Property(Parameter): def __init__( self, - default: typing.Any = None, + default: Any = None, *, - doc: typing.Optional[str] = None, + doc: str | None = None, constant: bool = False, readonly: bool = False, allow_None: bool = False, - label: typing.Optional[str] = None, - state: typing.Optional[typing.Union[typing.List, typing.Tuple, str, Enum]] = None, + label: str | None = None, + state: list | tuple | str | Enum | None = None, db_persist: bool = False, db_init: bool = False, db_commit: bool = False, observable: bool = False, - model: typing.Optional["BaseModel"] = None, + model: "BaseModel" | None = None, class_member: bool = False, - fget: typing.Optional[typing.Callable] = None, - fset: typing.Optional[typing.Callable] = None, - fdel: typing.Optional[typing.Callable] = None, - fcomparator: typing.Optional[typing.Callable] = None, + fget: Callable | None = None, + fset: Callable | None = None, + fdel: Callable | None = None, + fcomparator: Callable | None = None, deepcopy_default: bool = False, per_instance_descriptor: bool = False, remote: bool = True, - precedence: typing.Optional[float] = None, - metadata: typing.Optional[typing.Dict] = None, + precedence: float | None = None, + metadata: dict | None = None, ) -> None: """ Parameters @@ -169,7 +170,7 @@ def __init__( if observable: self._observable_event_descriptor = Event() self._execution_info_validator = None - self.execution_info = None # typing.Optional[RemoteResource] + self.execution_info = None # RemoteResource | None if remote: # TODO, this execution info validator can be refactored & removed later, adds an additional layer of info self._execution_info_validator = RemoteResourceInfoValidator(state=state, isproperty=True, obj=self) @@ -184,7 +185,7 @@ def __init__( self.model = wrap_plain_types_in_rootmodel(model) # type: BaseModel self.validator = self.model.model_validate - def __set_name__(self, owner: typing.Any, attrib_name: str) -> None: + def __set_name__(self, owner: Any, attrib_name: str) -> None: super().__set_name__(owner, attrib_name) if self._execution_info_validator: self._execution_info_validator.obj_name = attrib_name @@ -197,12 +198,12 @@ def __set_name__(self, owner: typing.Any, attrib_name: str) -> None: # This is a descriptor object, so we need to set it on the owner class setattr(owner, _observable_event_name, self._observable_event_descriptor) - def __get__(self, obj: Parameterized, objtype: ParameterizedMetaclass) -> typing.Any: + def __get__(self, obj: Parameterized, objtype: ParameterizedMetaclass) -> Any: read_value = super().__get__(obj, objtype) self.push_change_event(obj, read_value) return read_value - def push_change_event(self, obj, value: typing.Any) -> None: + def push_change_event(self, obj, value: Any) -> None: """ Pushes change event both on read and write if an event publisher object is available on the owning `Thing`. @@ -230,7 +231,7 @@ def push_change_event(self, obj, value: typing.Any) -> None: return event_dispatcher.push(value) - def validate_and_adapt(self, value) -> typing.Any: + def validate_and_adapt(self, value) -> Any: """ Validate the given value and adapt it if a proper logical reasoning can be given, for example, cropping a number to its bounds. Returns modified value. @@ -249,7 +250,7 @@ def validate_and_adapt(self, value) -> typing.Any: value = self.model(**value) return super().validate_and_adapt(value) - def external_set(self, obj: Parameterized, value: typing.Any) -> None: + def external_set(self, obj: Parameterized, value: Any) -> None: """ method called when the value of the property is set from an external source, e.g. a remote client. Usually introduces a state machine check before allowing the set operation. @@ -265,13 +266,13 @@ def external_set(self, obj: Parameterized, value: typing.Any) -> None: ) ) - def _post_value_set(self, obj, value: typing.Any) -> None: + def _post_value_set(self, obj, value: Any) -> None: if (self.db_persist or self.db_commit) and hasattr(obj, "db_engine"): obj.db_engine.set_property(self, value) self.push_change_event(obj, value) return super()._post_value_set(obj, value) - def comparator(self, func: typing.Callable) -> typing.Callable: + def comparator(self, func: Callable) -> Callable: """ Register a comparator method using this decorator to decide when to push a change event. @@ -327,7 +328,7 @@ def to_affordance(self, owner_inst=None): return PropertyAffordance.generate(self, owner_inst or self.owner) -def wrap_plain_types_in_rootmodel(model: type) -> type[BaseModel] | type[RootModel]: +def wrap_plain_types_in_rootmodel(model: type) -> Type[BaseModel] | Type[RootModel]: """ Ensure a type is a subclass of BaseModel. diff --git a/hololinked/core/state_machine.py b/hololinked/core/state_machine.py index 7859b8ef..0ba7bba5 100644 --- a/hololinked/core/state_machine.py +++ b/hololinked/core/state_machine.py @@ -1,7 +1,6 @@ -import typing - from enum import Enum, EnumMeta, StrEnum from types import FunctionType, MethodType +from typing import Callable from ..param import edit_constant from .actions import Action @@ -23,11 +22,11 @@ class StateMachine: initial_state = ClassSelector( default=None, allow_None=True, constant=True, class_=(Enum, str), doc="initial state of the machine" - ) # type: typing.Union[Enum, str] + ) # type: Enum | str states = ClassSelector( default=None, allow_None=True, constant=True, class_=(EnumMeta, tuple, list), doc="list/enum of allowed states" - ) # type: typing.Union[EnumMeta, tuple, list] + ) # type: EnumMeta | tuple | list on_enter = TypedDict( default=None, @@ -35,7 +34,7 @@ class StateMachine: key_type=str, doc="""callbacks to execute when a certain state is entered; specfied as map with state as keys and callbacks as list""", - ) # type: typing.Dict[str, typing.List[typing.Callable]] + ) # type: dict[str, list[Callable]] on_exit = TypedDict( default=None, @@ -43,7 +42,7 @@ class StateMachine: key_type=str, doc="""callbacks to execute when certain state is exited; specfied as map with state as keys and callbacks as list""", - ) # type: typing.Dict[str, typing.List[typing.Callable]] + ) # type: dict[str, list[Callable]] machine = TypedDict( default=None, @@ -51,7 +50,7 @@ class StateMachine: item_type=(list, tuple), key_type=str, # i.e. its like JSON doc="the machine specification with state as key and objects as list", - ) # type: typing.Dict[str, typing.List[typing.Callable, Property]] + ) # type: dict[str, list[Callable | Property]] push_state_change_event = Boolean( default=True, doc="if `True`, when the state changes, an event is pushed with the new state" @@ -66,13 +65,13 @@ class StateMachine: def __init__( self, - states: EnumMeta | typing.List[str] | typing.Tuple[str], + states: EnumMeta | list[str] | tuple[str], *, initial_state: StrEnum | str, push_state_change_event: bool = True, - on_enter: typing.Dict[str, typing.List[typing.Callable] | typing.Callable] = None, - on_exit: typing.Dict[str, typing.List[typing.Callable] | typing.Callable] = None, - **machine: typing.Dict[str, typing.Callable | Property], + on_enter: dict[str, list[Callable] | Callable] = None, + on_exit: dict[str, list[Callable] | Callable] = None, + **machine: dict[str, Callable | Property], ) -> None: """ Parameters @@ -199,14 +198,14 @@ def __set__(self, instance, value) -> None: "Cannot set state machine directly. It is a class level attribute and can be defined only once." ) - def __contains__(self, state: typing.Union[str, StrEnum]): + def __contains__(self, state: str | StrEnum): if isinstance(self.states, EnumMeta) and state in self.states.__members__: return True elif isinstance(self.states, tuple) and state in self.states: return True return False - def _get_machine_compliant_state(self, state) -> typing.Union[StrEnum, str]: + def _get_machine_compliant_state(self, state) -> StrEnum | str: """ In case of not using StrEnum or iterable of str, this maps the enum of state to the state name. @@ -219,7 +218,7 @@ def _get_machine_compliant_state(self, state) -> typing.Union[StrEnum, str]: f"cannot comply state to a string: {state} which is of type {type(state)}. owner - {self.owner}." ) - def contains_object(self, object: typing.Union[Property, typing.Callable]) -> bool: + def contains_object(self, object: Property | Callable) -> bool: """ Check if specified object is found in any of the state machine states. Supply unbound method for checking methods, as state machine is specified at class level @@ -253,7 +252,7 @@ def __init__(self, owner: Thing, state_machine: StateMachine) -> None: self.owner = owner self.logger = state_machine.logger - def get_state(self) -> typing.Union[str, StrEnum, None]: + def get_state(self) -> str | StrEnum | None: """ return the current state, one can also access it using the property `current state`. @@ -267,9 +266,7 @@ def get_state(self) -> typing.Union[str, StrEnum, None]: except AttributeError: return self.initial_state - def set_state( - self, value: typing.Union[str, StrEnum, Enum], push_event: bool = True, skip_callbacks: bool = False - ) -> None: + def set_state(self, value: str | StrEnum | Enum, push_event: bool = True, skip_callbacks: bool = False) -> None: """ set state of state machine. Also triggers state change callbacks if `skip_callbacks=False` and pushes a state change event when `push_event=True`. One can also set state using the '=' operator of the `current_state` property, @@ -305,7 +302,7 @@ def set_state( current_state = property(get_state, set_state, None, doc="""read and write current state of the state machine""") - def contains_object(self, object: typing.Union[Property, typing.Callable]) -> bool: + def contains_object(self, object: Property | Callable) -> bool: """ Check if specified object is found in any of the state machine states. Supply unbound method for checking methods, as state machine is specified at class level @@ -334,7 +331,7 @@ def __eq__(self, other) -> bool: and self.owner.id == other.owner.id ) - def __contains__(self, state: typing.Union[str, StrEnum]) -> bool: + def __contains__(self, state: str | StrEnum) -> bool: return state in self.descriptor @property diff --git a/hololinked/core/thing.py b/hololinked/core/thing.py index 18bf53c3..d61b5f27 100644 --- a/hololinked/core/thing.py +++ b/hololinked/core/thing.py @@ -1,7 +1,8 @@ import inspect import logging import ssl -import typing + +from typing import Any import structlog @@ -46,7 +47,7 @@ class Thing(Propertized, RemoteInvokable, EventSource, metaclass=ThingMeta): and network accessible handler is created if none supplied.""", ) # type: logging.Logger - state_machine = None # type: typing.Optional["StateMachine"] + state_machine = None # type: "StateMachine" | None # remote properties state = String( @@ -57,7 +58,7 @@ class Thing(Propertized, RemoteInvokable, EventSource, metaclass=ThingMeta): fget=lambda self: self.state_machine.current_state if self.state_machine else None, doc="""current state machine's state if state machine present, `None` indicates absence of state machine. State machine returned state is always a string even if specified as an Enum in the state machine.""", - ) # type: typing.Optional[str] + ) # type: str | None # object_info = Property(doc="contains information about this object like the class name, script location etc.") # type: ThingInformation @@ -72,9 +73,9 @@ def __init__( self, *, id: str, - logger: typing.Optional[logging.Logger] = None, - serializer: typing.Optional[BaseSerializer | JSONSerializer] = None, - **kwargs: typing.Dict[str, typing.Any], + logger: logging.Logger | None = None, + serializer: BaseSerializer | JSONSerializer | None = None, + **kwargs: dict[str, Any], ) -> None: """ Parameters @@ -90,7 +91,7 @@ def __init__( serializer: BaseSerializer | JSONSerializer, optional Default serializer to be used for serializing and deserializing data. If not supplied, a `msgspec` based JSON Serializer is used. - **kwargs: typing.Dict[str, Any] + **kwargs: dict[str, Any] - `remote_accessible_logger`: `bool`, Default `False`. if `True`, the log records can be streamed by a remote client. `remote_accessible_logger` can also be set as a class attribute. @@ -142,11 +143,11 @@ def __post_init__(self): from .zmq.rpc_server import RPCServer # noqa: F401 # Type definitions - self.rpc_server = None # type: typing.Optional[RPCServer] - self.db_engine: typing.Optional[ThingDB] - self._owners = None if not hasattr(self, "_owners") else self._owners # type: typing.Optional[typing.List[Thing]] - self._remote_access_loghandler: typing.Optional[RemoteAccessHandler] - self._internal_fixed_attributes: typing.List[str] + self.rpc_server = None # type: RPCServer | None + self.db_engine: ThingDB | None + self._owners = None if not hasattr(self, "_owners") else self._owners # type: list[Thing] | None + self._remote_access_loghandler: RemoteAccessHandler | None + self._internal_fixed_attributes: list[str] self._qualified_id: str self._state_machine_state: str # database operations @@ -154,7 +155,7 @@ def __post_init__(self): # object is ready self.logger.info(f"initialialised Thing class {self.__class__.__name__} with id {self.id}") - def __setattr__(self, __name: str, __value: typing.Any) -> None: + def __setattr__(self, __name: str, __value: Any) -> None: if __name == "_internal_fixed_attributes" or __name in self._internal_fixed_attributes: # order of 'or' operation for above 'if' matters if not hasattr(self, __name) or getattr(self, __name, None) is None: @@ -169,7 +170,7 @@ def __setattr__(self, __name: str, __value: typing.Any) -> None: super().__setattr__(__name, __value) @property - def sub_things(self) -> typing.Dict[str, "Thing"]: + def sub_things(self) -> dict[str, "Thing"]: """other `Thing`s' that are composed within this `Thing`.""" things = dict() for name, subthing in inspect._getmembers( @@ -220,7 +221,7 @@ def run_with_zmq_server( self, access_points: list[ZMQ_TRANSPORTS] | ZMQ_TRANSPORTS | str | list[str] = ZMQ_TRANSPORTS.IPC, forked: bool = False, # used by decorator - **kwargs: typing.Dict[str, typing.Any], + **kwargs: dict[str, Any], ) -> None: """ Quick-start to serve `Thing` over ZMQ. This method is fully blocking. @@ -267,12 +268,12 @@ def run_with_http_server( port: int = 8080, address: str = "0.0.0.0", # host: str = None, - allowed_clients: str | typing.Iterable[str] | None = None, + allowed_clients: str | list[str] | None = None, ssl_context: ssl.SSLContext | None = None, # protocol_version : int = 1, # network_interface : str = 'Ethernet', forked: bool = False, # used by forkable decorator - **kwargs: typing.Dict[str, typing.Any], + **kwargs: dict[str, Any], ) -> None: """ Quick-start to serve `Thing` over HTTP. This method is fully blocking. @@ -285,13 +286,13 @@ def run_with_http_server( A convenience option to set IP address apart from 0.0.0.0 (i.e. bind to all interfaces, which is default) ssl_context: ssl.SSLContext | None provide custom certificates with an SSL context for encrypted communication - allowed_clients: typing.Iterable[str] | str | None + allowed_clients: str | list[str] | None serves request and sets CORS only from these clients, other clients are rejected with 403. Uses remote IP header value to achieve this. Unlike CORS, the server resource is not even executed if the client is not an allowed client. Note that the remote IP in a HTTP request is believable only from a trusted HTTP client, not a modified one. forked: bool, Default `False` if `True`, the server is started in a separate thread and this method returns immediately - **kwargs: typing.Dict[str, typing.Any] + **kwargs: dict[str, Any] additional keyword arguments: - `property_handler`: `BaseHandler` | `PropertyHandler`, @@ -318,14 +319,14 @@ def run_with_http_server( @forkable # noqa: F405 def run( self, - **kwargs: typing.Dict[str, typing.Any], + **kwargs: dict[str, Any], ) -> None: """ Expose the object with the given servers. This method is blocking until `exit()` is called. Parameters ---------- - kwargs: typing.Dict[str, Any] + kwargs: dict[str, Any] keyword arguments - `access_points`: dict[str, dict | int | str | list[str]], optional @@ -339,7 +340,7 @@ def run( from ..server.server import BaseProtocolServer # noqa: F401 access_points = kwargs.get("access_points", None) # type: dict[str, dict | int | str | list[str]] - servers = kwargs.get("servers", []) # type: typing.Optional[typing.List[BaseProtocolServer]] + servers = kwargs.get("servers", []) # type: list[BaseProtocolServer] | None if access_points is None and len(servers) == 0: raise ValueError("At least one of access_points or servers must be provided.") diff --git a/hololinked/core/zmq/brokers.py b/hololinked/core/zmq/brokers.py index 61d1e68a..bc1e5f60 100644 --- a/hololinked/core/zmq/brokers.py +++ b/hololinked/core/zmq/brokers.py @@ -1,11 +1,13 @@ +from __future__ import annotations + import asyncio import os import threading import time -import typing import warnings from enum import Enum +from typing import Any, Iterator import structlog import zmq @@ -86,7 +88,7 @@ def get_socket( access_point: str = ZMQ_TRANSPORTS.IPC, socket_type: zmq.SocketType = zmq.ROUTER, **kwargs, - ) -> typing.Tuple[zmq.Socket, str]: + ) -> tuple[zmq.Socket, str]: """ Create a socket with certain specifications. Supported ZeroMQ transports are TCP, IPC & INPROC. For IPC sockets, a file is created under TEMP_DIR of global configuration. @@ -409,7 +411,7 @@ def __init__( self, *, id: str, - context: typing.Union[zmq.asyncio.Context, None] = None, + context: zmq.asyncio.Context | None = None, socket_type: zmq.SocketType = zmq.ROUTER, access_point: str = ZMQ_TRANSPORTS.IPC, poll_timeout: int = 25, @@ -487,14 +489,14 @@ async def async_recv_request(self) -> RequestMessage: ) return request_message - async def async_recv_requests(self) -> typing.List[RequestMessage]: + async def async_recv_requests(self) -> list[RequestMessage]: """ Receive all currently available messages in blocking form. There is no polling, therefore this method blocks until at least one message is received. Returns ------- - messages: typing.List[RequestMessage] + messages: list[RequestMessage] list of received messages """ messages = [await self.async_recv_request()] @@ -580,7 +582,7 @@ async def async_send_response_with_message_type( message_type=response_message.type, ) - async def poll_requests(self) -> typing.List[RequestMessage]: + async def poll_requests(self) -> list[RequestMessage]: """ poll for messages with specified timeout (`poll_timeout`) and return if any messages are available. This method can be stopped from another method in a different thread or asyncio task (not in the same thread though). @@ -708,7 +710,7 @@ def exit(self) -> None: class ZMQServerPool(BaseZMQServer): """Implements pool of async ZMQ servers (& their sockets)""" - def __init__(self, *, ids: typing.List[str] | None = None, **kwargs) -> None: + def __init__(self, *, ids: list[str] | None = None, **kwargs) -> None: """ ids: List[str], optional list of server IDs to create the server pool. If None, an empty pool is created and servers can be @@ -721,7 +723,7 @@ def __init__(self, *, ids: typing.List[str] | None = None, **kwargs) -> None: """ self.context = global_config.zmq_context() self.poller = zmq.asyncio.Poller() - self.pool = dict() # type: typing.Dict[str, AsyncZMQServer] + self.pool = dict() # type: dict[str, AsyncZMQServer] if ids: for id in ids: self.pool[id] = AsyncZMQServer(id=id, context=self.context, **kwargs) @@ -734,7 +736,7 @@ def create_socket( *, id: str, bind: bool, - context: typing.Union[zmq.asyncio.Context, zmq.Context], + context: zmq.asyncio.Context | zmq.Context, access_point: str, socket_type: zmq.SocketType = zmq.ROUTER, **kwargs, @@ -783,7 +785,7 @@ async def async_recv_request(self, id: str) -> RequestMessage: """ return await self.pool[id].async_recv_request() - async def async_recv_requests(self, id: str) -> typing.List[RequestMessage]: + async def async_recv_requests(self, id: str) -> list[RequestMessage]: """ receive all available messages for server specified by id @@ -794,7 +796,7 @@ async def async_recv_requests(self, id: str) -> typing.List[RequestMessage]: Returns ------- - typing.List[RequestMessage] + list[RequestMessage] list of received messages """ return await self.pool[id].async_recv_requests() @@ -827,14 +829,14 @@ async def async_send_response( preserialized_payload=preserialized_payload, ) - async def poll(self) -> typing.List[RequestMessage]: + async def poll(self) -> list[RequestMessage]: """ Pool for messages in the entire server pool. Use the message to identify the server by using `receiver_id` of the message header. Returns ------- - typing.List[RequestMessage] + list[RequestMessage] list of received messages across all servers in the pool """ self.stop_poll = False @@ -866,7 +868,7 @@ def stop_polling(self) -> None: def __getitem__(self, key) -> AsyncZMQServer: return self.pool[key] - def __iter__(self) -> typing.Iterator[str]: + def __iter__(self) -> Iterator[str]: return self.pool.__iter__() def __contains__(self, name: str) -> bool: @@ -1204,7 +1206,7 @@ def execute( ) return self.recv_response(message_id=message_id) - def handshake(self, timeout: typing.Union[float, int] = 60000) -> None: + def handshake(self, timeout: float | int = 60000) -> None: """ handshake with server before sending first message @@ -1367,7 +1369,7 @@ async def async_send_request( payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, - thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context, + thing_execution_context: dict[str, Any] = default_thing_execution_context, ) -> str: """ send request message to server. @@ -1416,7 +1418,7 @@ async def async_send_request( ) return request_message.id - async def async_recv_response(self, message_id: str) -> typing.List[ResponseMessage]: + async def async_recv_response(self, message_id: str) -> list[ResponseMessage]: """ Receives response from server. Messages are identified by message id, and out of order messages are sent to a cache which may be popped later. This method blocks until the expected message is received or `stop_polling()` @@ -1526,8 +1528,8 @@ class MessageMappedZMQClientPool(BaseZMQClient): def __init__( self, id: str, - client_ids: typing.List[str], - server_ids: typing.List[str], + client_ids: list[str], + server_ids: list[str], handshake: bool = True, context: zmq.asyncio.Context = None, access_point: str = ZMQ_TRANSPORTS.IPC, @@ -1567,7 +1569,7 @@ def __init__( raise ValueError("client_ids and server_ids must have same length") # this class does not call create_socket method self.context = context or global_config.zmq_context() - self.pool = dict() # type: typing.Dict[str, AsyncZMQClient] + self.pool = dict() # type: dict[str, AsyncZMQClient] self.poller = zmq.asyncio.Poller() for client_id, server_id in zip(client_ids, server_ids): client = AsyncZMQClient( @@ -1582,13 +1584,13 @@ def __init__( # Both the client pool as well as the individual client get their serializers and client_types # This is required to implement pool level sending and receiving messages like polling of pool of sockets self.event_pool = AsyncioEventPool(len(server_ids)) - self.events_map = dict() # type: typing.Dict[bytes, asyncio.Event] + self.events_map = dict() # type: dict[bytes, asyncio.Event] self.message_map = dict() self.cancelled_messages = [] self.poll_timeout = poll_timeout self.stop_poll = False - self._thing_to_client_map = dict() # type: typing.Dict[str, AsyncZMQClient] - self._client_to_thing_map = dict() # type: typing.Dict[str, str] + self._thing_to_client_map = dict() # type: dict[str, AsyncZMQClient] + self._client_to_thing_map = dict() # type: dict[str, str] def create_new(self, id: str, server_id: str, access_point: str = ZMQ_TRANSPORTS.IPC) -> None: """ @@ -1775,7 +1777,7 @@ async def poll_responses(self) -> None: else: event_loop.create_task(self._resolve_response(message_id, response_message)) - async def _resolve_response(self, message_id: str, data: typing.Any) -> None: + async def _resolve_response(self, message_id: str, data: Any) -> None: """ This method is called when there is no asyncio Event available for a message ID. This can happen only when the server replied before the client created a asyncio.Event object. check `async_execute()` for details. @@ -1989,10 +1991,10 @@ async def async_execute_in_all( operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, - thing_ids: typing.Optional[typing.List[str]] = None, + thing_ids: list[str] | None = None, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context, - ) -> typing.Dict[str, ResponseMessage]: + ) -> dict[str, ResponseMessage]: if not thing_ids: thing_ids = self._client_to_thing_map.values() @@ -2023,7 +2025,7 @@ async def async_execute_in_all_things( preserialized_payload: PreserializedData = PreserializedEmptyByte, server_execution_context: ServerExecutionContext = default_server_execution_context, thing_execution_context: ThingExecutionContext = default_thing_execution_context, - ) -> typing.Dict[str, ResponseMessage]: + ) -> dict[str, ResponseMessage]: """execute the same operation in all `Thing`s""" return await self.async_execute_in_all( objekt=objekt, @@ -2043,7 +2045,7 @@ def __contains__(self, name: str) -> bool: def __getitem__(self, key) -> AsyncZMQClient: return self.pool[key] - def __iter__(self) -> typing.Iterator[AsyncZMQClient]: + def __iter__(self) -> Iterator[AsyncZMQClient]: return iter(self.pool.values()) def exit(self) -> None: @@ -2136,8 +2138,8 @@ def __init__( socket_type=zmq.PUB, **kwargs, ) - self.events = set() # type is typing.Set[EventDispatcher] - self.event_ids = set() # type: typing.Set[str] + self.events = set() # type is set[EventDispatcher] + self.event_ids = set() # type: set[str] self._send_lock = threading.Lock() def register(self, event: "EventDispatcher") -> None: @@ -2173,7 +2175,7 @@ def unregister(self, event: "EventDispatcher") -> None: UserWarning, ) - def publish(self, event, data: typing.Any) -> None: + def publish(self, event, data: Any) -> None: """ publish an event with given unique name. @@ -2351,9 +2353,7 @@ def exit(self): class EventConsumer(BaseEventConsumer, BaseSyncZMQ): """Sync Event Consumer to used outside of async loops""" - def receive( - self, timeout: typing.Optional[float] = 1000, raise_interrupt_as_exception: bool = False - ) -> EventMessage | None: + def receive(self, timeout: float | None = 1000, raise_interrupt_as_exception: bool = False) -> EventMessage | None: """ receive event with given timeout @@ -2369,7 +2369,7 @@ def receive( try: if not self._poller_lock.acquire(timeout=timeout / 1000 if timeout else -1): continue - sockets = self.poller.poll(timeout) # typing.List[typing.Tuple[zmq.Socket, int]] + sockets = self.poller.poll(timeout) # list[tuple[zmq.Socket, int]] if len(sockets) > 1: # if there is an interrupt message as well as an event, # give preference to interrupt message. @@ -2410,7 +2410,7 @@ class AsyncEventConsumer(BaseEventConsumer, BaseAsyncZMQ): async def receive( self, - timeout: typing.Optional[float] = 1000, + timeout: float | None = 1000, raise_interrupt_as_exception: bool = False, ) -> EventMessage | None: """ diff --git a/hololinked/core/zmq/message.py b/hololinked/core/zmq/message.py index 365021ad..51e42945 100644 --- a/hololinked/core/zmq/message.py +++ b/hololinked/core/zmq/message.py @@ -1,5 +1,4 @@ -import typing - +from typing import Any, Optional from uuid import uuid4 import msgspec @@ -84,13 +83,13 @@ class RequestHeader(msgspec.Struct): thingExecutionContext: ThingExecutionContext = msgspec.field( default_factory=lambda: default_thing_execution_context ) - thingID: typing.Optional[str] = "" - objekt: typing.Optional[str] = "" - operation: typing.Optional[str] = "" - payloadContentType: typing.Optional[str] = "application/json" - preencodedPayloadContentType: typing.Optional[str] = "text/plain" + thingID: Optional[str] = "" + objekt: Optional[str] = "" + operation: Optional[str] = "" + payloadContentType: Optional[str] = "application/json" + preencodedPayloadContentType: Optional[str] = "text/plain" - def __getitem__(self, key: str) -> typing.Any: + def __getitem__(self, key: str) -> Any: try: return getattr(self, key) except AttributeError: @@ -107,10 +106,10 @@ class ResponseHeader(msgspec.Struct): messageID: str receiverID: str senderID: str - payloadContentType: typing.Optional[str] = "application/json" - preencodedPayloadContentType: typing.Optional[str] = "" + payloadContentType: Optional[str] = "application/json" + preencodedPayloadContentType: Optional[str] = "" - def __getitem__(self, key: str) -> typing.Any: + def __getitem__(self, key: str) -> Any: try: return getattr(self, key) except AttributeError: @@ -127,10 +126,10 @@ class EventHeader(msgspec.Struct): messageID: str senderID: str eventID: str - payloadContentType: typing.Optional[str] = "application/json" - preencodedPayloadContentType: typing.Optional[str] = "" + payloadContentType: Optional[str] = "application/json" + preencodedPayloadContentType: Optional[str] = "" - def __getitem__(self, key: str) -> typing.Any: + def __getitem__(self, key: str) -> Any: try: return getattr(self, key) except AttributeError: @@ -155,14 +154,14 @@ class RequestMessage: length = Integer(default=5, readonly=True, class_member=True, doc="length of the message") # type: int - def __init__(self, msg: typing.List[bytes]) -> None: + def __init__(self, msg: list[bytes]) -> None: self._bytes = msg self._header = None # deserialized header - self._body = None # type: typing.Optional[typing.Tuple[SerializableData, PreserializedData]] + self._body = None # type: Optional[tuple[SerializableData, PreserializedData]] self._sender_id = None @property - def byte_array(self) -> typing.List[bytes]: + def byte_array(self) -> list[bytes]: """ message byte array, either after being composed or as received from the socket. @@ -182,7 +181,7 @@ def header(self) -> RequestHeader: return self._header @property - def body(self) -> typing.Tuple[SerializableData, PreserializedData]: + def body(self) -> tuple[SerializableData, PreserializedData]: """body of the message""" if self._body is None: self.parse_body() @@ -214,12 +213,12 @@ def type(self) -> str: return self.header["messageType"] @property - def server_execution_context(self) -> typing.Dict[str, typing.Any]: + def server_execution_context(self) -> dict[str, Any]: """server execution context""" return self.header["serverExecutionContext"] @property - def thing_execution_context(self) -> typing.Dict[str, typing.Any]: + def thing_execution_context(self) -> dict[str, Any]: """thing execution context""" return self.header["thingExecutionContext"] @@ -256,8 +255,8 @@ def craft_from_arguments( operation: str, payload: SerializableData = SerializableNone, preserialized_payload: PreserializedData = PreserializedEmptyByte, - server_execution_context: typing.Dict[str, typing.Any] = default_server_execution_context, - thing_execution_context: typing.Dict[str, typing.Any] = default_thing_execution_context, + server_execution_context: dict[str, Any] = default_server_execution_context, + thing_execution_context: dict[str, Any] = default_thing_execution_context, ) -> "RequestMessage": """ create a request message from the given arguments @@ -375,14 +374,14 @@ class ResponseMessage: length = Integer(default=5, readonly=True, class_member=True, doc="length of the message") # type: int - def __init__(self, msg: typing.List[bytes]): + def __init__(self, msg: list[bytes]): self._bytes = msg self._header = None self._body = None self._sender_id = None @property - def byte_array(self) -> typing.List[bytes]: + def byte_array(self) -> list[bytes]: """the message in bytes, either after being composed or as received from the socket. Message indices: @@ -421,7 +420,7 @@ def header(self) -> JSON: return self._header @property - def body(self) -> typing.Tuple[SerializableData, PreserializedData]: + def body(self) -> tuple[SerializableData, PreserializedData]: """body of the message""" if self._body is None: self.parse_body() diff --git a/hololinked/core/zmq/rpc_server.py b/hololinked/core/zmq/rpc_server.py index bb245b53..dcbf339e 100644 --- a/hololinked/core/zmq/rpc_server.py +++ b/hololinked/core/zmq/rpc_server.py @@ -1,12 +1,14 @@ +from __future__ import annotations + import asyncio import copy import logging import socket import threading import tracemalloc -import typing from collections import deque +from typing import Any import structlog import zmq @@ -84,18 +86,18 @@ class RPCServer(BaseZMQServer): default=None, doc="list of Things which are being executed", remote=False, - ) # type: typing.List[Thing] + ) # type: list[Thing] - schedulers: typing.Dict[str, "QueuedScheduler"] + schedulers: dict[str, "QueuedScheduler"] def __init__( self, *, id: str, - things: typing.Optional[typing.List[Thing]] = None, + things: list[Thing] | None = None, context: zmq.asyncio.Context | None = None, access_point: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.INPROC, - **kwargs: typing.Dict[str, typing.Any], + **kwargs: dict[str, Any], ) -> None: """ Parameters @@ -320,7 +322,7 @@ async def tunnel_message_to_things(self, scheduler: "Scheduler") -> None: scheduler.cleanup() self.logger.info("stopped schedulers") - async def run_thing_instance(self, instance: Thing, scheduler: typing.Optional["Scheduler"] = None) -> None: + async def run_thing_instance(self, instance: Thing, scheduler: "Scheduler" | None = None) -> None: """ run a single `Thing` instance in an infinite loop by allowing the scheduler to schedule operations on it. @@ -446,9 +448,9 @@ async def execute_operation( instance: Thing, objekt: str, operation: str, - payload: typing.Any, + payload: Any, preserialized_payload: bytes, - ) -> typing.Any: + ) -> Any: """ Execute a given operation on a thing instance. @@ -509,7 +511,7 @@ async def execute_operation( def format_return_value( self, - return_value: typing.Any, + return_value: Any, serializer: BaseSerializer, ) -> tuple[SerializableData, PreserializedData]: if ( @@ -567,7 +569,7 @@ def run_zmq_request_listener(self): ) eventloop.close() - def run_things(self, things: typing.List[Thing]): + def run_things(self, things: list[Thing]): """ Run loop that executes operations on `Thing` instances. This method is blocking and is called by `run()` method. @@ -594,7 +596,7 @@ def run(self): self.logger.info("starting RPC server") for thing in self.things: self.schedulers[thing.id] = QueuedScheduler(thing, self) - threads = dict() # type: typing.Dict[int, threading.Thread] + threads = dict() # type: dict[int, threading.Thread] for thing in self.things: thread = threading.Thread(target=self.run_things, args=([thing],)) thread.start() @@ -639,7 +641,7 @@ def get_thing_description( protocol: str, ignore_errors: bool = False, skip_names: list[str] = [], - ) -> dict[str, typing.Any]: + ) -> dict[str, Any]: """ Get the Thing Description (TD) for a specific Thing instance. @@ -659,7 +661,7 @@ def get_thing_description( JSON The Thing Description in JSON format. """ - TM = instance.get_thing_model(ignore_errors=ignore_errors, skip_names=skip_names).json() # type: dict[str, typing.Any] + TM = instance.get_thing_model(ignore_errors=ignore_errors, skip_names=skip_names).json() # type: dict[str, Any] TD = copy.deepcopy(TM) from ...td import ActionAffordance, EventAffordance, PropertyAffordance from ...td.forms import Form @@ -750,9 +752,9 @@ class Scheduler: [UML Diagram subclasses](http://docs.hololinked.dev/UML/PDF/Scheduler.pdf) """ - OperationRequest = typing.Tuple[str, str, str, SerializableData, PreserializedData, typing.Dict[str, typing.Any]] - OperationReply = typing.Tuple[SerializableData, PreserializedData, str] - JobInvokationType = typing.Tuple[AsyncZMQServer, RequestMessage, asyncio.Task, asyncio.Event] + OperationRequest = tuple[str, str, str, SerializableData, PreserializedData, dict[str, Any]] + OperationReply = tuple[SerializableData, PreserializedData, str] + JobInvokationType = tuple[AsyncZMQServer, RequestMessage, asyncio.Task, asyncio.Event] # [UML Diagram](http://docs.hololinked.dev/UML/PDF/RPCServer.pdf) _operation_execution_complete_event: asyncio.Event | threading.Event _operation_execution_ready_event: asyncio.Event | threading.Event @@ -842,7 +844,7 @@ def extract_operation_tuple_from_request(self, request_message: RequestMessage) ) @classmethod - def format_reply_tuple(self, return_value: typing.Any) -> OperationReply: + def format_reply_tuple(self, return_value: Any) -> OperationReply: pass diff --git a/hololinked/server/http/__init__.py b/hololinked/server/http/__init__.py index 84d2ecca..94f24e35 100644 --- a/hololinked/server/http/__init__.py +++ b/hololinked/server/http/__init__.py @@ -1,10 +1,10 @@ import logging import socket import ssl -import typing import warnings from copy import deepcopy +from typing import Any, Iterable, Type import structlog @@ -70,7 +70,7 @@ class HTTPServer(BaseProtocolServer): class_=ssl.SSLContext, default=None, allow_None=True, - ) # type: typing.Optional[ssl.SSLContext] + ) # type: ssl.SSLContext | None """SSL context to provide encrypted communication""" allowed_clients = TypedList(item_type=str) @@ -85,28 +85,28 @@ class HTTPServer(BaseProtocolServer): default=PropertyHandler, class_=(PropertyHandler, RPCHandler), isinstance=False, - ) # type: typing.Union[RPCHandler, PropertyHandler] + ) # type: RPCHandler | PropertyHandler """custom web request handler for property read-write""" action_handler = ClassSelector( default=ActionHandler, class_=(ActionHandler, RPCHandler), isinstance=False, - ) # type: typing.Union[RPCHandler, ActionHandler] + ) # type: RPCHandler | ActionHandler """custom web request handler for actions""" event_handler = ClassSelector( default=EventHandler, class_=(EventHandler, RPCHandler), isinstance=False, - ) # type: typing.Union[RPCHandler, EventHandler] + ) # type: RPCHandler | EventHandler """custom event handler for sending HTTP SSE""" security_schemes = TypedList( default=None, allow_None=True, item_type=Security, - ) # type: typing.Optional[typing.List[Security]] + ) # type: list[Security] | None """ List of security schemes to be used by the server, it is sufficient that one scheme passes for a request to be authorized. @@ -127,15 +127,15 @@ def __init__( *, port: int = 8080, address: str = "0.0.0.0", - things: typing.Optional[typing.List[Thing]] = None, - # host: typing.Optional[str] = None, - logger: typing.Optional[logging.Logger] = None, + things: list[Thing] | None = None, + # host: Optional[str] = None, + logger: logging.Logger | None = None, log_level: int = logging.INFO, - ssl_context: typing.Optional[ssl.SSLContext] = None, - security_schemes: typing.Optional[typing.List[Security]] = None, + ssl_context: ssl.SSLContext | None = None, + security_schemes: list[Security] | None = None, # protocol_version : int = 1, network_interface : str = 'Ethernet', - allowed_clients: typing.Optional[typing.Union[str, typing.Iterable[str]]] = None, - config: typing.Optional[dict[str, typing.Any]] = None, + allowed_clients: str | Iterable[str] | None = None, + config: dict[str, Any] | None = None, **kwargs, ) -> None: """ @@ -292,7 +292,7 @@ def add_property( self, URL_path: str, property: Property | PropertyAffordance, - http_methods: typing.Tuple[str, typing.Optional[str], typing.Optional[str]] | None = ("GET", "PUT", None), + http_methods: str | tuple[str | None, str | None, str | None] = ("GET", "PUT", None), handler: BaseHandler | PropertyHandler = PropertyHandler, **kwargs, ) -> None: @@ -431,15 +431,15 @@ def __init__(self, app: Application, server: HTTPServer) -> None: self.server = server self.logger = server.logger.bind(component="http-router") self._pending_rules = [] - self._rules = dict() # type: dict[str, typing.Any] + self._rules = dict() # type: dict[str, Any] # can add a single property, action or event rule def add_rule( self, affordance: PropertyAffordance | ActionAffordance | EventAffordance, URL_path: str, - handler: typing.Type[BaseHandler], - kwargs: dict, + handler: Type[BaseHandler], + kwargs: dict[str, Any], ) -> None: """ Add rules to the application router. Note that this method will replace existing rules and can duplicate @@ -511,9 +511,9 @@ def __init__( # can add multiple properties, actions and events at once def add_interaction_affordances( self, - properties: typing.Iterable[PropertyAffordance], - actions: typing.Iterable[ActionAffordance], - events: typing.Iterable[EventAffordance], + properties: Iterable[PropertyAffordance], + actions: Iterable[ActionAffordance], + events: Iterable[EventAffordance], thing_id: str = None, ) -> None: for property in properties: @@ -729,7 +729,7 @@ def adapt_route(self, interaction_affordance_name: str) -> str: return "/resources/wot-tm" return f"/{pep8_to_dashed_name(interaction_affordance_name)}" - def adapt_http_methods(self, http_methods: typing.Any): + def adapt_http_methods(self, http_methods: Any): """comply the supplied HTTP method to the router to a tuple and check if the method is supported""" if isinstance(http_methods, str): http_methods = (http_methods,) diff --git a/hololinked/server/http/handlers.py b/hololinked/server/http/handlers.py index 5132baf0..77a2a05a 100644 --- a/hololinked/server/http/handlers.py +++ b/hololinked/server/http/handlers.py @@ -1,7 +1,8 @@ import copy -import typing import uuid +from typing import Any, Optional + import msgspec from msgspec import DecodeError as MsgspecJSONDecodeError @@ -50,8 +51,8 @@ class LocalExecutionContext(msgspec.Struct): - noblock: typing.Optional[bool] = None - messageID: typing.Optional[str] = None + noblock: Optional[bool] = None + messageID: Optional[str] = None class BaseHandler(RequestHandler): @@ -63,7 +64,7 @@ def initialize( self, resource: InteractionAffordance | PropertyAffordance | ActionAffordance | EventAffordance, owner_inst=None, - metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: Optional[dict[str, Any]] = None, ) -> None: """ Parameters @@ -73,7 +74,7 @@ def initialize( ZMQ Request object owner_inst: HTTPServer owning `hololinked.server.HTTPServer` instance - metadata: typing.Optional[typing.Dict[str, typing.Any]] + metadata: dict[str, Any] | None, additional metadata about the resource, like allowed HTTP methods """ from . import HTTPServer # noqa: F401 @@ -169,7 +170,7 @@ def set_custom_default_headers(self) -> None: def get_execution_parameters( self, - ) -> typing.Tuple[ServerExecutionContext, ThingExecutionContext, LocalExecutionContext, SerializableData]: + ) -> tuple[ServerExecutionContext, ThingExecutionContext, LocalExecutionContext, SerializableData]: """ merges all arguments to a single JSON body and retrieves execution context (like oneway calls, fetching executing logs) and timeouts, payloads in URL query parameters etc. @@ -231,7 +232,7 @@ def message_id(self) -> str: self._message_id = message_id return message_id - def get_request_payload(self) -> typing.Tuple[SerializableData, PreserializedData]: + def get_request_payload(self) -> tuple[SerializableData, PreserializedData]: """retrieves the payload from the request body, does not necessarily deserialize it""" payload = SerializableData(value=None) preserialized_payload = PreserializedData(value=b"") @@ -528,7 +529,7 @@ def initialize( self, resource: InteractionAffordance | EventAffordance, owner_inst=None, - metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: dict[str, Any] | None = None, ) -> None: super().initialize(resource, owner_inst, metadata) self.data_header = b"data: %s\n\n" diff --git a/hololinked/server/zmq.py b/hololinked/server/zmq.py index 13171139..a170be99 100644 --- a/hololinked/server/zmq.py +++ b/hololinked/server/zmq.py @@ -1,5 +1,3 @@ -import typing - import structlog import zmq import zmq.asyncio @@ -22,7 +20,7 @@ def __init__( *, id: str, access_points: ZMQ_TRANSPORTS = ZMQ_TRANSPORTS.IPC, - things: typing.List["Thing"] = None, + things: list["Thing"] = None, context: zmq.asyncio.Context | None = None, **kwargs, ) -> None: diff --git a/hololinked/storage/json_storage.py b/hololinked/storage/json_storage.py index d0966514..814a5542 100644 --- a/hololinked/storage/json_storage.py +++ b/hololinked/storage/json_storage.py @@ -1,7 +1,7 @@ import os import threading -from typing import Any, Dict, List, Optional, Union +from typing import Any from ..core.property import Property from ..param import Parameterized @@ -24,7 +24,7 @@ class ThingJSONStorage: Serializer used for encoding and decoding JSON data. Defaults to an instance of ``JSONSerializer``. """ - def __init__(self, filename: str, instance: Parameterized, serializer: Optional[Any] = None): + def __init__(self, filename: str, instance: Parameterized, serializer: Any = None): self.filename = filename self.thing_instance = instance self.id = instance.id @@ -32,7 +32,7 @@ def __init__(self, filename: str, instance: Parameterized, serializer: Optional[ self._lock = threading.RLock() self._data = self._load() - def _load(self) -> Dict[str, Any]: + def _load(self) -> dict[str, Any]: """ Load and decode data from the JSON file. @@ -60,7 +60,7 @@ def _save(self): with open(self.filename, "wb") as f: f.write(raw_bytes) - def get_property(self, property: Union[str, Property]) -> Any: + def get_property(self, property: str | Property) -> Any: """ Fetch a single property. @@ -80,7 +80,7 @@ def get_property(self, property: Union[str, Property]) -> Any: with self._lock: return self._data[name] - def set_property(self, property: Union[str, Property], value: Any) -> None: + def set_property(self, property: str | Property, value: Any) -> None: """ change the value of an already existing property. @@ -96,7 +96,7 @@ def set_property(self, property: Union[str, Property], value: Any) -> None: self._data[name] = value self._save() - def get_properties(self, properties: Dict[Union[str, Property], Any]) -> Dict[str, Any]: + def get_properties(self, properties: dict[str | Property, Any]) -> dict[str, Any]: """ get multiple properties at once. @@ -114,7 +114,7 @@ def get_properties(self, properties: Dict[Union[str, Property], Any]) -> Dict[st with self._lock: return {name: self._data.get(name) for name in names} - def set_properties(self, properties: Dict[Union[str, Property], Any]) -> None: + def set_properties(self, properties: dict[str | Property, Any]) -> None: """ change the values of already existing few properties at once @@ -129,7 +129,7 @@ def set_properties(self, properties: Dict[Union[str, Property], Any]) -> None: self._data[name] = value self._save() - def get_all_properties(self) -> Dict[str, Any]: + def get_all_properties(self) -> dict[str, Any]: """ read all properties of the ``Thing`` instance. """ @@ -137,8 +137,10 @@ def get_all_properties(self) -> Dict[str, Any]: return dict(self._data) def create_missing_properties( - self, properties: Dict[str, Property], get_missing_property_names: bool = False - ) -> Optional[List[str]]: + self, + properties: dict[str, Property], + get_missing_property_names: bool = False, + ) -> list[str] | None: """ create any and all missing properties of ``Thing`` instance diff --git a/hololinked/td/base.py b/hololinked/td/base.py index 1165c1cb..d5e5ea50 100644 --- a/hololinked/td/base.py +++ b/hololinked/td/base.py @@ -1,7 +1,6 @@ import inspect -import typing -from typing import ClassVar +from typing import Any, ClassVar from pydantic import BaseModel @@ -14,7 +13,7 @@ class Schema(BaseModel): skip_keys: ClassVar = [] # override this to skip some dataclass attributes in the schema - def model_dump(self, **kwargs) -> dict[str, typing.Any]: + def model_dump(self, **kwargs) -> dict[str, Any]: """Return the JSON representation of the schema""" # we need to override this to work with our JSON serializer kwargs["mode"] = "json" @@ -31,7 +30,7 @@ def model_dump(self, **kwargs) -> dict[str, typing.Any]: ] return super().model_dump(**kwargs) - def json(self) -> dict[str, typing.Any]: + def json(self) -> dict[str, Any]: """same as model_dump""" return self.model_dump() diff --git a/hololinked/td/forms.py b/hololinked/td/forms.py index 7769bbe3..55bd8dce 100644 --- a/hololinked/td/forms.py +++ b/hololinked/td/forms.py @@ -1,4 +1,4 @@ -import typing +from typing import Any, Optional from pydantic import Field @@ -26,7 +26,7 @@ class AdditionalExpectedResponse(Schema): success: bool = Field(default=False) contentType: str = Field(default="application/json") - response_schema: typing.Optional[JSON] = Field(default="exception", alias="schema") + response_schema: Optional[JSON] = Field(default="exception", alias="schema") def __init__(self): super().__init__() @@ -42,19 +42,19 @@ class Form(Schema): op: str = None htv_methodName: str = Field(default=None, alias="htv:methodName") mqv_topic: str = Field(default=None, alias="mqv:topic") - contentType: typing.Optional[str] = "application/json" - additionalResponses: typing.Optional[typing.List[AdditionalExpectedResponse]] = None - contentEncoding: typing.Optional[str] = None - security: typing.Optional[str] = None - scopes: typing.Optional[str] = None - response: typing.Optional[ExpectedResponse] = None - subprotocol: typing.Optional[str] = None + contentType: Optional[str] = "application/json" + additionalResponses: Optional[list[AdditionalExpectedResponse]] = None + contentEncoding: Optional[str] = None + security: Optional[str] = None + scopes: Optional[str] = None + response: Optional[ExpectedResponse] = None + subprotocol: Optional[str] = None def __init__(self): super().__init__() @classmethod - def from_TD(cls, form_json: typing.Dict[str, typing.Any]) -> "Form": + def from_TD(cls, form_json: dict[str, Any]) -> "Form": """ Create a Form instance from a Thing Description JSON object. :param form_json: The JSON representation of the form. diff --git a/hololinked/td/interaction_affordance.py b/hololinked/td/interaction_affordance.py index fcea105c..c39e7b91 100644 --- a/hololinked/td/interaction_affordance.py +++ b/hololinked/td/interaction_affordance.py @@ -1,8 +1,7 @@ import copy -import typing from enum import Enum -from typing import ClassVar, Optional +from typing import Any, ClassVar, Optional from pydantic import BaseModel, ConfigDict, RootModel @@ -28,10 +27,10 @@ class InteractionAffordance(Schema): """ title: Optional[str] = None - titles: Optional[typing.Dict[str, str]] = None + titles: Optional[dict[str, str]] = None description: Optional[str] = None - descriptions: Optional[typing.Dict[str, str]] = None - forms: Optional[typing.List[Form]] = None + descriptions: Optional[dict[str, str]] = None + forms: Optional[list[Form]] = None # uri variables _custom_schema_generators: ClassVar = dict() @@ -117,7 +116,7 @@ def build(self) -> None: """populate the fields of the schema for the specific interaction affordance""" raise NotImplementedError("build must be implemented in subclass of InteractionAffordance") - def retrieve_form(self, op: str, default: typing.Any = None) -> Form: + def retrieve_form(self, op: str, default: Any = None) -> Form: """ retrieve form for a certain operation, return default if not found @@ -125,13 +124,13 @@ def retrieve_form(self, op: str, default: typing.Any = None) -> Form: ---------- op: str operation for which the form is to be retrieved - default: typing.Any, optional + default: Any, optional default value to return if form is not found, by default None. One can make use of a sensible default value for one's logic. Returns ------- - Dict[str, typing.Any] + dict[str, Any] JSON representation of the form """ if self.forms is None: @@ -141,7 +140,7 @@ def retrieve_form(self, op: str, default: typing.Any = None) -> Form: return form return default - def pop_form(self, op: str, default: typing.Any = None) -> Form: + def pop_form(self, op: str, default: Any = None) -> Form: """ retrieve and remove form for a certain operation, return default if not found @@ -149,13 +148,13 @@ def pop_form(self, op: str, default: typing.Any = None) -> Form: ---------- op: str operation for which the form is to be retrieved - default: typing.Any, optional + default: Any, optional default value to return if form is not found, by default None. One can make use of a sensible default value for one's logic. Returns ------- - Dict[str, typing.Any] + dict[str, Any] JSON representation of the form """ if self.forms is None: @@ -168,7 +167,7 @@ def pop_form(self, op: str, default: typing.Any = None) -> Form: @classmethod def generate( cls, interaction: Property | Action | Event, owner: Thing - ) -> typing.Union["PropertyAffordance", "ActionAffordance", "EventAffordance"]: + ) -> "PropertyAffordance | ActionAffordance | EventAffordance": """ build the schema for the specific interaction affordance within the container object. Use the `json()` method to get the JSON representation of the schema. @@ -185,7 +184,7 @@ def generate( Returns ------- - typing.Union[PropertyAffordance, ActionAffordance, EventAffordance] + "PropertyAffordance | ActionAffordance | EventAffordance" """ raise NotImplementedError("generate_schema must be implemented in subclass of InteractionAffordance") @@ -203,7 +202,7 @@ def from_TD(cls, name: str, TD: JSON) -> "PropertyAffordance | ActionAffordance Returns ------- - typing.Union[PropertyAffordance, ActionAffordance, EventAffordance] + "PropertyAffordance | ActionAffordance | EventAffordance" """ if cls == PropertyAffordance: affordance_name = "properties" @@ -213,7 +212,7 @@ def from_TD(cls, name: str, TD: JSON) -> "PropertyAffordance | ActionAffordance affordance_name = "events" else: raise ValueError(f"unknown affordance type - {cls}, cannot create object from TD") - affordance_json = TD[affordance_name][name] # type: typing.Dict[str, JSON] + affordance_json = TD[affordance_name][name] # type: dict[str, JSON] affordance = cls() for field in cls.model_fields: if field in affordance_json: diff --git a/hololinked/td/metadata.py b/hololinked/td/metadata.py index 3ef6e048..460da355 100644 --- a/hololinked/td/metadata.py +++ b/hololinked/td/metadata.py @@ -1,4 +1,4 @@ -import typing +from typing import Optional from pydantic import Field @@ -7,9 +7,9 @@ class Link(Schema): href: str - anchor: typing.Optional[str] - rel: typing.Optional[str] - type: typing.Optional[str] = Field(default="application/json") + anchor: Optional[str] + rel: Optional[str] + type: Optional[str] = Field(default="application/json") class VersionInfo(Schema): diff --git a/hololinked/td/security_definitions.py b/hololinked/td/security_definitions.py index 52744db0..b369ee37 100644 --- a/hololinked/td/security_definitions.py +++ b/hololinked/td/security_definitions.py @@ -1,4 +1,4 @@ -import typing +from typing import Optional from .base import Schema @@ -11,8 +11,8 @@ class SecurityScheme(Schema): scheme: str = None description: str = None - descriptions: typing.Optional[typing.Dict[str, str]] = None - proxy: typing.Optional[str] = None + descriptions: Optional[dict[str, str]] = None + proxy: Optional[str] = None def __init__(self): super().__init__() diff --git a/hololinked/td/tm.py b/hololinked/td/tm.py index 96cc862c..3fa246fd 100644 --- a/hololinked/td/tm.py +++ b/hololinked/td/tm.py @@ -1,4 +1,4 @@ -import typing +from typing import Any, Optional from pydantic import ConfigDict, Field @@ -22,28 +22,28 @@ class ThingModel(Schema): [UML Diagram](https://docs.hololinked.dev/UML/PDF/ThingModel.pdf)
""" - context: typing.List[str | typing.Dict[str, str]] = Field(["https://www.w3.org/2022/wot/td/v1.1"], alias="@context") - type: typing.Optional[typing.Union[str, typing.List[str]]] = None + context: list[str | dict[str, str]] = Field(["https://www.w3.org/2022/wot/td/v1.1"], alias="@context") + type: Optional[str | list[str]] = None id: str = None title: str = None - description: typing.Optional[str] = None - version: typing.Optional[VersionInfo] = None - created: typing.Optional[str] = None - modified: typing.Optional[str] = None - support: typing.Optional[str] = None - base: typing.Optional[str] = None - properties: typing.Dict[str, DataSchema] = Field(default_factory=dict) - actions: typing.Dict[str, ActionAffordance] = Field(default_factory=dict) - events: typing.Dict[str, EventAffordance] = Field(default_factory=dict) + description: Optional[str] = None + version: Optional[VersionInfo] = None + created: Optional[str] = None + modified: Optional[str] = None + support: Optional[str] = None + base: Optional[str] = None + properties: dict[str, DataSchema] = Field(default_factory=dict) + actions: dict[str, ActionAffordance] = Field(default_factory=dict) + events: dict[str, EventAffordance] = Field(default_factory=dict) model_config = ConfigDict(extra="allow") def __init__( self, instance: "Thing", - allow_loose_schema: typing.Optional[bool] = False, + allow_loose_schema: Optional[bool] = False, ignore_errors: bool = False, - skip_names: typing.Optional[list[str]] = [], + skip_names: Optional[list[str]] = [], ) -> None: super().__init__() self.instance = instance @@ -69,8 +69,8 @@ def produce(self) -> Thing: raise NotImplementedError("This will be implemented in a future release for an API first approach") # not the best code and logic, but works for now - skip_properties: typing.List[str] = ["expose", "thing_description", "GUI", "object_info"] - skip_actions: typing.List[str] = [ + skip_properties: list[str] = ["expose", "thing_description", "GUI", "object_info"] + skip_actions: list[str] = [ Thing._add_property.name, Thing._get_properties.name, Thing._get_properties_in_db.name, @@ -78,9 +78,9 @@ def produce(self) -> Thing: "get_postman_collection", "get_our_thing_model", ] - skip_events: typing.List[str] = [] + skip_events: list[str] = [] - def add_interaction_affordances(self): + def add_interaction_affordances(self) -> None: """add interaction affordances to thing model""" for affordance, items, affordance_cls, skip_list in [ ["properties", self.instance.properties.remote_objects.items(), PropertyAffordance, self.skip_properties], @@ -107,7 +107,7 @@ def add_interaction_affordances(self): raise ex from None self.instance.logger.error(f"Error while generating schema for {name} - {ex}") - def model_dump(self, **kwargs) -> dict[str, typing.Any]: + def model_dump(self, **kwargs) -> dict[str, Any]: """Return the JSON representation of the schema""" def dump_value(value): diff --git a/hololinked/utils.py b/hololinked/utils.py index c6d483d9..63f7c5fe 100644 --- a/hololinked/utils.py +++ b/hololinked/utils.py @@ -12,6 +12,7 @@ from dataclasses import asdict from functools import wraps from inspect import Parameter, signature +from typing import Any, Callable, Coroutine, Sequence, Type from pydantic import BaseModel, ConfigDict, Field, RootModel, create_model @@ -59,7 +60,7 @@ def uuid_hex() -> str: return uuid4().hex[:8] -def format_exception_as_json(exc: Exception) -> typing.Dict[str, typing.Any]: +def format_exception_as_json(exc: Exception) -> dict[str, Any]: """ return exception as a JSON serializable dictionary """ @@ -123,10 +124,8 @@ def get_default_logger( return logger -def get_current_async_loop(): - """ - get or automatically create an asnyc loop for the current thread. - """ +def get_current_async_loop() -> asyncio.AbstractEventLoop: + """get or automatically create an asnyc loop for the current thread""" try: loop = asyncio.get_event_loop() except RuntimeError: @@ -136,15 +135,9 @@ def get_current_async_loop(): return loop -def run_coro_sync(coro: typing.Coroutine): - """ - run coroutine synchronously - """ - try: - eventloop = asyncio.get_event_loop() - except RuntimeError: - eventloop = asyncio.new_event_loop() - asyncio.set_event_loop(eventloop) +def run_coro_sync(coro: Coroutine) -> Any: + """try to run coroutine synchronously, raises runtime error if event loop is already running""" + eventloop = get_current_async_loop() if eventloop.is_running(): raise RuntimeError( f"asyncio event loop is already running, cannot setup coroutine {coro.__name__} to run sync, please await it." @@ -154,7 +147,7 @@ def run_coro_sync(coro: typing.Coroutine): return eventloop.run_until_complete(coro) -def run_callable_somehow(method: typing.Union[typing.Callable, typing.Coroutine]) -> typing.Any: +def run_callable_somehow(method: Callable | Coroutine) -> Any: """ run method if synchronous, or when async, either schedule a coroutine or run it until its complete """ @@ -177,14 +170,14 @@ def run_callable_somehow(method: typing.Union[typing.Callable, typing.Coroutine] return eventloop.run_until_complete(coro) -def complete_pending_tasks_in_current_loop(): +def complete_pending_tasks_in_current_loop() -> None: """ Complete all pending tasks in the current asyncio event loop. """ get_current_async_loop().run_until_complete(asyncio.gather(*asyncio.all_tasks(get_current_async_loop()))) -async def complete_pending_tasks_in_current_loop_async(): +async def complete_pending_tasks_in_current_loop_async() -> None: """ Complete all pending tasks in the current asyncio event loop. """ @@ -211,7 +204,7 @@ def print_pending_tasks_in_current_loop(): print(f"Task: {task}, Status: {task._state}") -def set_global_event_loop_policy(): +def set_global_event_loop_policy() -> None: if sys.platform.lower().startswith("win"): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) @@ -227,7 +220,7 @@ def set_global_event_loop_policy(): asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -def get_signature(callable: typing.Callable) -> typing.Tuple[typing.List[str], typing.List[type]]: +def get_signature(callable: Callable) -> tuple[list[str], list[type]]: """ Retrieve the names and types of arguments based on annotations for the given callable. @@ -254,7 +247,7 @@ def get_signature(callable: typing.Callable) -> typing.Tuple[typing.List[str], t return arg_names, arg_types -def getattr_without_descriptor_read(instance, key): +def getattr_without_descriptor_read(instance: Any, key: str) -> Any: """ supply to inspect._get_members (not inspect.get_members) to avoid calling __get__ on hardware attributes @@ -275,7 +268,7 @@ def getattr_without_descriptor_read(instance, key): return getattr(instance, key, None) # we can deal with None where we use this getter, so dont raise AttributeError -def isclassmethod(method) -> bool: +def isclassmethod(method: Callable) -> bool: """ Returns `True` if the method is a classmethod, `False` otherwise. https://stackoverflow.com/questions/19227724/check-if-a-function-uses-classmethod @@ -295,7 +288,7 @@ def isclassmethod(method) -> bool: return False -def has_async_def(method) -> bool: +def has_async_def(method: Callable) -> bool: """ Checks if async def is found in method signature. Especially useful for class methods. https://github.com/python/cpython/issues/100224#issuecomment-2000895467 @@ -320,15 +313,15 @@ def has_async_def(method) -> bool: return False -def issubklass(obj, cls) -> bool: +def issubklass(obj: Any, cls: Any) -> bool: """ Safely check if `obj` is a subclass of `cls`. Parameters ---------- - obj: typing.Any + obj: Any The object to check if it's a subclass. - cls: typing.Any + cls: Any The class (or tuple of classes) to compare against. Returns @@ -349,7 +342,7 @@ def issubklass(obj, cls) -> bool: return False -def get_a_filename_from_instance(thing: type, extension: str = "json") -> str: +def get_a_filename_from_instance(thing: Any, extension: str = "json") -> str: class_name = thing.__class__.__name__ # Remove invalid characters from the instance name @@ -375,7 +368,7 @@ def json(self): def __getstate__(self): return self.json() - def __setstate__(self, values: typing.Dict): + def __setstate__(self, values: dict): for key, value in values.items(): setattr(self, key, value) @@ -397,7 +390,7 @@ class MappableSingleton(Singleton): def __setitem__(self, key, value) -> None: setattr(self, key, value) - def __getitem__(self, key) -> typing.Any: + def __getitem__(self, key) -> Any: return getattr(self, key) def __contains__(self, key) -> bool: @@ -405,11 +398,11 @@ def __contains__(self, key) -> bool: def get_input_model_from_signature( - func: typing.Callable, + func: Callable, remove_first_positional_arg: bool = False, - ignore: typing.Sequence[str] | None = None, + ignore: Sequence[str] | None = None, model_for_empty_annotations: bool = False, -) -> type[BaseModel] | None: +) -> Type[BaseModel] | None: """ Create a pydantic model for a function's signature. @@ -447,7 +440,7 @@ def get_input_model_from_signature( # fields is a dictionary of tuples of (type, default) that defines the input model type_hints = typing.get_type_hints(func, include_extras=True) - fields = {} # type: typing.Dict[str, typing.Tuple[type, typing.Any]] + fields = {} # type: dict[str, tuple[type, Any]] for name, p in parameters.items(): if ignore and name in ignore: continue @@ -483,7 +476,7 @@ def get_input_model_from_signature( return model -def get_return_type_from_signature(func: typing.Callable) -> RootModel | None: +def get_return_type_from_signature(func: Callable) -> RootModel | None: """Determine the return type of a function.""" sig = inspect.signature(func) if sig.return_annotation == inspect.Signature.empty: @@ -506,9 +499,9 @@ def get_return_type_from_signature(func: typing.Callable) -> RootModel | None: def pydantic_validate_args_kwargs( - model: typing.Type[BaseModel], - args: typing.Tuple = tuple(), - kwargs: typing.Dict = dict(), + model: Type[BaseModel], + args: tuple = tuple(), + kwargs: dict = dict(), ) -> None: """ Validate and separate *args and **kwargs according to the fields of the given pydantic model. @@ -573,9 +566,7 @@ def pydantic_validate_args_kwargs( model.model_validate(data) -def json_schema_merge_args_to_kwargs( - schema: dict, args: typing.Tuple = tuple(), kwargs: typing.Dict = dict() -) -> typing.Dict[str, typing.Any]: +def json_schema_merge_args_to_kwargs(schema: dict, args: tuple = tuple(), kwargs: dict = dict()) -> dict[str, Any]: """ Merge positional arguments into keyword arguments according to the schema. @@ -622,7 +613,7 @@ def json_schema_merge_args_to_kwargs( return data -def get_all_sub_things_recusively(thing) -> typing.List: +def get_all_sub_things_recusively(thing) -> list: sub_things = [thing] for sub_thing in thing.sub_things.values(): sub_things.extend(get_all_sub_things_recusively(sub_thing)) diff --git a/tests/test_00_utils.py b/tests/test_00_utils.py index 2471a931..372914bd 100644 --- a/tests/test_00_utils.py +++ b/tests/test_00_utils.py @@ -309,7 +309,7 @@ def test_06_no_model_func_with_args(): def test_07_model_func_with_annotated_args(): model = get_input_model_from_signature(func_with_annotated_args) assert issubklass(model, BaseModel) - # assert model.model_fields["args"].annotation == typing.List[int] + # assert model.model_fields["args"].annotation == list[int] assert len(model.model_fields) == 1 assert model.model_config["extra"] == "forbid" diff --git a/tests/things/spectrometer.py b/tests/things/spectrometer.py index a962b073..1e91c762 100644 --- a/tests/things/spectrometer.py +++ b/tests/things/spectrometer.py @@ -1,7 +1,6 @@ import datetime import threading import time -import typing from dataclasses import dataclass from enum import StrEnum @@ -88,7 +87,7 @@ class OceanOpticsSpectrometer(Thing): default=None, allow_None=True, class_=Intensity, doc="reference intensity to overlap in background" ) # type: Intensity - def __init__(self, id: str, serial_number: typing.Optional[str] = None, **kwargs) -> None: + def __init__(self, id: str, serial_number: str | None = None, **kwargs) -> None: super().__init__(id=id, serial_number=serial_number, **kwargs) self.set_status("disconnected") if serial_number is not None: @@ -140,7 +139,7 @@ def connect(self, serial_number: str = None, trigger_mode: int = None, integrati # this is only for testing, be careful doc="wavelength bins of measurement", fget=lambda self: self._wavelengths if self.state_machine.current_state != self.states.DISCONNECTED else None, - ) # type: typing.List[typing.Union[float, int]] + ) # type: list[float | int] pixel_count = Integer( default=None, @@ -204,9 +203,9 @@ def get_integration_time(self) -> float: default=None, allow_None=True, doc="set True for Seabreeze internal black level correction", - ) # type: typing.Optional[str] + ) # type: str | None - custom_background_intensity = TypedList(item_type=(float, int)) # type: typing.List[typing.Union[float, int]] + custom_background_intensity = TypedList(item_type=(float, int)) # type: list[float | int] nonlinearity_correction = Boolean(default=False, doc="automatic correction of non linearity in detector CCD") # type: bool diff --git a/tests/things/test_thing.py b/tests/things/test_thing.py index f642949b..4e4c4946 100644 --- a/tests/things/test_thing.py +++ b/tests/things/test_thing.py @@ -1,7 +1,8 @@ import asyncio import threading import time -import typing + +from typing import Annotated, Literal import numpy as np @@ -116,9 +117,7 @@ async def not_an_async_action(self, value): def json_schema_validated_action(self, val1: int, val2: str, val3: dict, val4: list): return {"val1": val1, "val3": val3} - def pydantic_validated_action( - self, val1: int, val2: str, val3: dict, val4: list - ) -> typing.Dict[str, typing.Union[int, dict]]: + def pydantic_validated_action(self, val1: int, val2: str, val3: dict, val4: list) -> dict[str, int | dict]: return {"val2": val2, "val4": val4} @action() @@ -351,7 +350,7 @@ def get_numpy_array_prop(self): JSONSchema.register_type_replacement(np.ndarray, "array") - NDArray = typing.Annotated[ + NDArray = Annotated[ np.ndarray, WithJsonSchema( { @@ -460,7 +459,7 @@ def _push_worker(self, event_name: str = "test_event", total_number_of_events: i input_schema=analog_offset_input_schema, output_schema=analog_offset_output_schema, ) - def get_analogue_offset(self, voltage_range: str, coupling: str) -> typing.Tuple[float, float]: + def get_analogue_offset(self, voltage_range: str, coupling: str) -> tuple[float, float]: """analogue offset for a voltage range and coupling""" print(f"get_analogue_offset called with voltage_range={voltage_range}, coupling={coupling}") return 0.0, 0.0 @@ -516,9 +515,9 @@ def set_channel( @action() def set_channel_pydantic( self, - channel: typing.Literal["A", "B", "C", "D"], + channel: Literal["A", "B", "C", "D"], enabled: bool = True, - v_range: typing.Literal[ + v_range: Literal[ "10mV", "20mV", "50mV", @@ -534,8 +533,8 @@ def set_channel_pydantic( "MAX_RANGES", ] = "2V", offset: float = 0, - coupling: typing.Literal["AC", "DC"] = "DC_1M", - bw_limiter: typing.Literal["full", "20MHz"] = "full", + coupling: Literal["AC", "DC"] = "DC_1M", + bw_limiter: Literal["full", "20MHz"] = "full", ) -> None: """ Set the parameter for a channel. @@ -557,7 +556,7 @@ def set_sensor_model(self, value: str): print(f"set_sensor_model called with value={value}") @action() - def set_sensor_model_pydantic(self, value: typing.Literal["QE25LP-S-MB", "QE12LP-S-MB-QED-D0"]): + def set_sensor_model_pydantic(self, value: Literal["QE25LP-S-MB", "QE12LP-S-MB-QED-D0"]): """ Set the attached sensor to the meter under control. Sensor should be defined as a class and added to the AllowedSensors dict. @@ -565,7 +564,7 @@ def set_sensor_model_pydantic(self, value: typing.Literal["QE25LP-S-MB", "QE12LP print(f"set_sensor_model_pydantic called with value={value}") @action() - def start_acquisition(self, max_count: typing.Annotated[int, Field(gt=0)]): + def start_acquisition(self, max_count: Annotated[int, Field(gt=0)]): """ Start acquisition of energy measurements. @@ -590,7 +589,7 @@ def start_acquisition(self, max_count: typing.Annotated[int, Field(gt=0)]): # ----- Serial Utility @action() - def execute_instruction(self, command: str, return_data_size: typing.Annotated[int, Field(ge=0)] = 0) -> str: + def execute_instruction(self, command: str, return_data_size: Annotated[int, Field(ge=0)] = 0) -> str: """ executes instruction given by the ASCII string parameter 'command'. If return data size is greater than 0, it reads the response and returns the response. @@ -600,7 +599,7 @@ def execute_instruction(self, command: str, return_data_size: typing.Annotated[i return b"" -def replace_methods_with_actions(thing_cls: typing.Type[TestThing]) -> None: +def replace_methods_with_actions(thing_cls: type[TestThing]) -> None: exposed_actions = [] if not isinstance(thing_cls.action_echo, (Action, BoundAction)): thing_cls.action_echo = action()(thing_cls.action_echo)