From 40cdeabd71199f7f7ba898a8a45db2ecf1d7b890 Mon Sep 17 00:00:00 2001 From: lucadruda Date: Thu, 24 Jun 2021 16:08:22 +0200 Subject: [PATCH 1/4] fix twin --- samples/async_device_key.py | 23 +++++----- src/iotc/__init__.py | 44 +++++++------------ src/iotc/aio/__init__.py | 62 ++++++++++----------------- src/iotc/test/async/test_listeners.py | 16 +++---- 4 files changed, 57 insertions(+), 88 deletions(-) diff --git a/samples/async_device_key.py b/samples/async_device_key.py index 374eff1..f94742e 100644 --- a/samples/async_device_key.py +++ b/samples/async_device_key.py @@ -24,16 +24,17 @@ device_id = config["DEVICE_M3"]["DeviceId"] scope_id = config["DEVICE_M3"]["ScopeId"] key = config["DEVICE_M3"]["DeviceKey"] -hub_name = config["DEVICE_M3"]["HubName"] +# hub_name = config["SMARTPHONE"]["HubName"] class MemStorage(Storage): def retrieve(self): - return CredentialsCache( - hub_name, - device_id, - key, - ) + # return CredentialsCache( + # hub_name, + # device_id, + # key, + # ) + return None def persist(self, credentials): # a further option would be updating config file with latest hub name @@ -42,7 +43,7 @@ def persist(self, credentials): # optional model Id for auto-provisioning try: - model_id = config["DEVICE_M3"]["ModelId"] + model_id = config["SMARTPHONE"]["ModelId"] except: model_id = None @@ -93,11 +94,9 @@ async def main(): print("client connected {}".format(client._device_client.connected)) await client.send_telemetry( { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), - } + "temperature": randint(20, 45) + },{ + "$.sub": "firstcomponent" } ) await asyncio.sleep(3) diff --git a/src/iotc/__init__.py b/src/iotc/__init__.py index f324f09..4e4b592 100644 --- a/src/iotc/__init__.py +++ b/src/iotc/__init__.py @@ -180,7 +180,7 @@ def on(self, eventname, callback): """ self._events[eventname] = callback return 0 - + def _sync_twin(self): try: desired = self._twin['desired'] @@ -193,12 +193,14 @@ def _sync_twin(self): return patch = {} for desired_prop in desired: + print("Syncing property '{}'".format(desired_prop)) if desired_prop == '$version': continue - if '__t' in desired[desired_prop]: # is a component + # is a component + if str(type(desired[desired_prop])) == "" and '__t' in desired[desired_prop]: desired_prop_component = desired_prop for desired_prop_name in desired[desired_prop_component]: - if desired_prop_name == '__t': + if desired_prop_name == "__t": continue has_reported = False try: @@ -214,6 +216,8 @@ def _sync_twin(self): has_reported = reported[desired_prop] except KeyError: pass + if not has_reported: # no reported yet. send desired + patch[desired_prop] = desired[desired_prop] # desired is more recent if has_reported and 'av' in has_reported and has_reported['av'] < desired_version: patch[desired_prop] = desired[desired_prop] @@ -268,7 +272,7 @@ def _handle_property_ack( "{}".format(component_name): { "{}".format(property_name): { "ac": 200, - "ad": "Property received", + "ad": "Completed", "av": property_version, "value": property_value, } @@ -281,7 +285,7 @@ def _handle_property_ack( { "{}".format(property_name): { "ac": 200, - "ad": "Property received", + "ad": "Completed", "av": property_version, "value": property_value, } @@ -300,7 +304,7 @@ def _update_properties(self, patch, prop_cb): # check if component try: - is_component = patch[prop]["__t"] + is_component = str(type(patch[prop])) == "" except KeyError: pass if is_component: @@ -340,20 +344,6 @@ def _on_properties(self): self._logger.debug("Stopping properties listener...") - def _cmd_ack(self, name, value, request_id, component_name=None): - if component_name is not None: - self.send_property( - { - "{}".format(component_name): { - "{}".format(name): {"value": value, "requestId": request_id} - } - } - ) - else: - self.send_property( - {"{}".format(name): {"value": value, "requestId": request_id}} - ) - def _on_commands(self): self._logger.debug("Setup commands listener") while not self._terminate: @@ -385,12 +375,6 @@ def reply_fn(): {"result": True, "data": "Command received"}, ) ) - self._cmd_ack( - command.name, - command.value, - method_request.request_id, - command.component_name, - ) command.reply = reply_fn self._logger.debug( @@ -555,9 +539,11 @@ def connect(self, force_dps=False): ) else: if 'cert_phrase' in _credentials.certificate: - x509 = X509(_credentials.certificate['cert_file'], _credentials.certificate['key_file'], _credentials.certificate['cert_phrase']) + x509 = X509( + _credentials.certificate['cert_file'], _credentials.certificate['key_file'], _credentials.certificate['cert_phrase']) else: - x509 = X509(_credentials.certificate['cert_file'], _credentials.certificate['key_file']) + x509 = X509( + _credentials.certificate['cert_file'], _credentials.certificate['key_file']) self._device_client = IoTHubDeviceClient.create_from_x509_certificate( x509=x509, hostname=_credentials.hub_name, @@ -568,6 +554,7 @@ def connect(self, force_dps=False): self._twin = self._device_client.get_twin() self._logger.debug("Current twin: {}".format(self._twin)) prop_patch = self._sync_twin() + self._logger.debug("Properties to patch: {}".format(prop_patch)) if prop_patch is not None: self._update_properties(prop_patch, None) except: @@ -604,7 +591,6 @@ def disconnect(self, *args): self._logger.info("Client disconnected.") self._logger.info("See you!") - def _compute_derived_symmetric_key(self, secret, reg_id): # pylint: disable=no-member try: diff --git a/src/iotc/aio/__init__.py b/src/iotc/aio/__init__.py index bb16e54..38c89b9 100644 --- a/src/iotc/aio/__init__.py +++ b/src/iotc/aio/__init__.py @@ -125,12 +125,12 @@ async def _handle_property_ack( await self.send_property( { "{}".format(component_name): { - "{}".format(property_name): { - "ac": 200, - "ad": "Property received", - "av": property_version, - "value": property_value, - } + "value": { + "{}".format(property_name): property_value + }, + "ac": 200, + "ad": "Completed", + "av": property_version, } } ) @@ -140,7 +140,7 @@ async def _handle_property_ack( { "{}".format(property_name): { "ac": 200, - "ad": "Property received", + "ad": "Completed", "av": property_version, "value": property_value, } @@ -156,10 +156,9 @@ async def _update_properties(self, patch, prop_cb): is_component = False if prop == "$version": continue - # check if component try: - is_component = patch[prop]["__t"] + is_component = str(type(patch[prop])) == "" and patch[prop]["__t"] except KeyError: pass if is_component: @@ -174,13 +173,13 @@ async def _update_properties(self, patch, prop_cb): await self._handle_property_ack( prop_cb, component_prop, - patch[prop][component_prop]["value"], + patch[prop][component_prop], patch["$version"], prop, ) else: await self._handle_property_ack( - prop_cb, prop, patch[prop]["value"], patch["$version"] + prop_cb, prop, patch[prop], patch["$version"] ) async def _on_properties(self): @@ -205,20 +204,6 @@ async def _on_properties(self): await self._logger.debug("Stopping properties listener...") - async def _cmd_ack(self, name, value, request_id, component_name): - if component_name is not None: - await self.send_property( - { - "{}".format(component_name): { - "{}".format(name): {"value": value, "requestId": request_id} - } - } - ) - else: - await self.send_property( - {"{}".format(name): {"value": value, "requestId": request_id}} - ) - async def _on_commands(self): await self._logger.debug("Setup commands listener") while not self._terminate: @@ -256,12 +241,6 @@ async def reply_fn(): {"result": True, "data": "Command received"}, ) ) - await self._cmd_ack( - command.name, - command.value, - method_request.request_id, - command.component_name, - ) command.reply = reply_fn await self._logger.debug("Received command {}".format(method_request.name)) @@ -364,7 +343,8 @@ async def connect(self, force_dps=False): self._cert_file = self._key_or_cert["cert_file"] try: self._cert_phrase = self._key_or_cert["cert_phrase"] - x509 = X509(self._cert_file, self._key_file, self._cert_phrase) + x509 = X509(self._cert_file, self._key_file, + self._cert_phrase) except: await self._logger.debug( "No passphrase available for certificate. Trying without it" @@ -403,9 +383,10 @@ async def connect(self, force_dps=False): else None, ) - except: + except Exception as e: await self._logger.info( - "ERROR: Failed to get device provisioning information" + "ERROR: Failed to get device provisioning information. {}".format( + e) ) sys.exit(1) # Connect to iothub @@ -420,9 +401,11 @@ async def connect(self, force_dps=False): ) else: if 'cert_phrase' in _credentials.certificate: - x509 = X509(_credentials.certificate['cert_file'], _credentials.certificate['key_file'], _credentials.certificate['cert_phrase']) + x509 = X509( + _credentials.certificate['cert_file'], _credentials.certificate['key_file'], _credentials.certificate['cert_phrase']) else: - x509 = X509(_credentials.certificate['cert_file'], _credentials.certificate['key_file']) + x509 = X509( + _credentials.certificate['cert_file'], _credentials.certificate['key_file']) self._device_client = IoTHubDeviceClient.create_from_x509_certificate( x509=x509, hostname=_credentials.hub_name, @@ -435,8 +418,8 @@ async def connect(self, force_dps=False): twin_patch = self._sync_twin() if twin_patch is not None: await self._update_properties(twin_patch, None) - except: - await self._logger.info("ERROR: Failed to connect to Hub") + except Exception as e: + await self._logger.info("ERROR: Failed to connect to Hub. {}".format(e)) if force_dps is True: sys.exit(1) await self.connect(True) @@ -444,7 +427,8 @@ async def connect(self, force_dps=False): # setup listeners self._prop_thread = asyncio.create_task(self._on_properties()) self._cmd_thread = asyncio.create_task(self._on_commands()) - self._enqueued_cmd_thread = asyncio.create_task(self._on_enqueued_commands()) + self._enqueued_cmd_thread = asyncio.create_task( + self._on_enqueued_commands()) signal.signal(signal.SIGINT, self.raise_graceful_exit) signal.signal(signal.SIGTERM, self.raise_graceful_exit) diff --git a/src/iotc/test/async/test_listeners.py b/src/iotc/test/async/test_listeners.py index ff12822..0e2b2ff 100644 --- a/src/iotc/test/async/test_listeners.py +++ b/src/iotc/test/async/test_listeners.py @@ -16,19 +16,19 @@ from iotc.test import dummy_storage -DEFAULT_COMPONENT_PROP = {"prop1": {"value": "value1"}, "$version": 1} +DEFAULT_COMPONENT_PROP = {"prop1": "value1", "$version": 1} COMPONENT_PROP = { - "component1": {"__t": "c", "prop1": {"value": "value1"}}, + "component1": {"__t": "c", "prop1": "value1"}, "$version": 1, } COMPLEX_COMPONENT_PROP = { - "component1": {"__t": "c", "prop1": {"value": "value1"}}, + "component1": {"__t": "c", "prop1": {"item1": "value1"}}, "component2": { "__t": "c", - "prop1": {"value": "value1"}, - "prop2": {"value": "value2"}, + "prop1": "value1", + "prop2": 2, }, - "prop2": {"value": "value2"}, + "prop2": {"item2": "value2"}, "$version": 1, } @@ -113,9 +113,9 @@ async def test_on_properties_triggered_with_complex_component(mocker, iotc_clien await asyncio.sleep(0.1) prop_stub.assert_has_calls( [ - mocker.call("prop1", "value1", "component1"), + mocker.call("prop1", {"item1":"value1"}, "component1"), mocker.call("prop1", "value1", "component2"), - mocker.call("prop2", "value2", "component2"), + mocker.call("prop2", 2, "component2"), mocker.call("prop2", "value2", None), ] ) From 99e675dbc0bce00dcb683e8125197474d248e9fd Mon Sep 17 00:00:00 2001 From: lucadruda Date: Fri, 25 Jun 2021 17:56:43 +0200 Subject: [PATCH 2/4] fix value wrapping and update listeners --- samples/async_device_key.py | 18 +- samples/async_eventhub_logger.py | 52 +++--- samples/async_file_logger.py | 21 ++- samples/async_x509.py | 18 +- samples/sync_device_key.py | 18 +- samples/sync_x509.py | 19 +-- src/iotc/__init__.py | 193 +++++++++++----------- src/iotc/aio/__init__.py | 226 +++++++++++++------------- src/iotc/test/async/test_listeners.py | 46 ++---- src/iotc/test/sync/test_listeners.py | 69 +++----- 10 files changed, 321 insertions(+), 359 deletions(-) diff --git a/samples/async_device_key.py b/samples/async_device_key.py index f94742e..898d171 100644 --- a/samples/async_device_key.py +++ b/samples/async_device_key.py @@ -90,15 +90,15 @@ async def main(): await client.connect() await client.send_property({"writeableProp": 50}) - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - await client.send_telemetry( - { - "temperature": randint(20, 45) - },{ - "$.sub": "firstcomponent" - } - ) + while not client.terminated(): + if client.is_connected(): + await client.send_telemetry( + { + "temperature": randint(20, 45) + },{ + "$.sub": "firstcomponent" + } + ) await asyncio.sleep(3) asyncio.run(main()) diff --git a/samples/async_eventhub_logger.py b/samples/async_eventhub_logger.py index 18f7d3e..2d7e0e7 100644 --- a/samples/async_eventhub_logger.py +++ b/samples/async_eventhub_logger.py @@ -1,3 +1,13 @@ +from azure.eventhub import EventHubProducerClient, EventData +from iotc.aio import IoTCClient +from iotc import ( + IOTCConnectType, + IOTCLogLevel, + IOTCEvents, + Command, + CredentialsCache, + Storage, +) import os import asyncio import configparser @@ -11,19 +21,11 @@ if config["DEFAULT"].getboolean("Local"): sys.path.insert(0, "src") -from iotc import ( - IOTCConnectType, - IOTCLogLevel, - IOTCEvents, - Command, - CredentialsCache, - Storage, -) -from iotc.aio import IoTCClient class EventHubLogger: def __init__(self, conn_str, eventhub_name): - self._producer = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name) + self._producer = EventHubProducerClient.from_connection_string( + conn_str, eventhub_name=eventhub_name) async def _create_batch(self): self._event_data_batch = await self._producer.create_batch() @@ -45,7 +47,6 @@ def set_log_level(self, log_level): self._log_level = log_level - device_id = config["DEVICE_M3"]["DeviceId"] scope_id = config["DEVICE_M3"]["ScopeId"] key = config["DEVICE_M3"]["DeviceKey"] @@ -55,7 +56,6 @@ def set_log_level(self, log_level): event_hub_name = config['EventHub']['EventHubName'] - class MemStorage(Storage): def retrieve(self): return CredentialsCache( @@ -86,8 +86,9 @@ async def on_commands(command: Command): await command.reply() -async def on_enqueued_commands(command:Command): - print("Received offline command {} with value {}".format(command.name, command.value)) +async def on_enqueued_commands(command: Command): + print("Received offline command {} with value {}".format( + command.name, command.value)) # change connect type to reflect the used key (device or group) @@ -96,7 +97,7 @@ async def on_enqueued_commands(command:Command): scope_id, IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, key, - logger=EventHubLogger(event_hub_conn_str,event_hub_name) + logger=EventHubLogger(event_hub_conn_str, event_hub_name), storage=MemStorage(), ) if model_id != None: @@ -107,21 +108,20 @@ async def on_enqueued_commands(command:Command): client.on(IOTCEvents.IOTC_COMMAND, on_commands) client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqueued_commands) + async def main(): await client.connect() await client.send_property({"writeableProp": 50}) - - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - await client.send_telemetry( - { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), + + while not client.terminated(): + if client.is_connected(): + await client.send_telemetry( + { + "temperature": randint(20, 45) + }, { + "$.sub": "firstcomponent" } - } - ) + ) await asyncio.sleep(3) asyncio.run(main()) diff --git a/samples/async_file_logger.py b/samples/async_file_logger.py index 743b272..dd15d38 100644 --- a/samples/async_file_logger.py +++ b/samples/async_file_logger.py @@ -2,6 +2,7 @@ import asyncio import configparser import sys +import logging from random import randint @@ -94,7 +95,7 @@ async def on_enqueued_commands(command:Command): scope_id, IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, key, - logger=FileLogger(log_path) + logger=FileLogger(log_path), storage=MemStorage(), ) if model_id != None: @@ -109,17 +110,15 @@ async def main(): await client.connect() await client.send_property({"writeableProp": 50}) - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - await client.send_telemetry( - { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), + while not client.terminated(): + if client.is_connected(): + await client.send_telemetry( + { + "temperature": randint(20, 45) + },{ + "$.sub": "firstcomponent" } - } - ) + ) await asyncio.sleep(3) asyncio.run(main()) diff --git a/samples/async_x509.py b/samples/async_x509.py index f31a233..dcee274 100644 --- a/samples/async_x509.py +++ b/samples/async_x509.py @@ -74,17 +74,15 @@ async def main(): await client.connect() await client.send_property({"writeableProp": 50}) - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - await client.send_telemetry( - { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), + while not client.terminated(): + if client.is_connected(): + await client.send_telemetry( + { + "temperature": randint(20, 45) + },{ + "$.sub": "firstcomponent" } - } - ) + ) await asyncio.sleep(3) asyncio.run(main()) diff --git a/samples/sync_device_key.py b/samples/sync_device_key.py index 85328e0..ca979a1 100644 --- a/samples/sync_device_key.py +++ b/samples/sync_device_key.py @@ -82,17 +82,15 @@ def main(): client.connect() client.send_property({"writeableProp": 50}) - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - client.send_telemetry( - { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), + while not client.terminated(): + if client.is_connected(): + client.send_telemetry( + { + "temperature": randint(20, 45) + }, { + "$.sub": "firstcomponent" } - } - ) + ) time.sleep(3) main() diff --git a/samples/sync_x509.py b/samples/sync_x509.py index 85c9afb..f2e92f4 100644 --- a/samples/sync_x509.py +++ b/samples/sync_x509.py @@ -2,6 +2,7 @@ import configparser import sys from random import randint +import time config = configparser.ConfigParser() config.read(os.path.join(os.path.dirname(__file__),'samples.ini')) @@ -73,17 +74,15 @@ def main(): client.connect() client.send_property({"writeableProp": 50}) - while client.is_connected(): - print("client connected {}".format(client._device_client.connected)) - client.send_telemetry( - { - "acceleration": { - "x": str(randint(20, 45)), - "y": str(randint(20, 45)), - "z": str(randint(20, 45)), + while not client.terminated(): + if client.is_connected(): + client.send_telemetry( + { + "temperature": randint(20, 45) + }, { + "$.sub": "firstcomponent" } - } - ) + ) time.sleep(3) main() diff --git a/src/iotc/__init__.py b/src/iotc/__init__.py index 4e4b592..87a64e1 100644 --- a/src/iotc/__init__.py +++ b/src/iotc/__init__.py @@ -108,11 +108,8 @@ def set_log_level(self, log_level): self._log_level = log_level -terminate = False - - class AbstractClient: - def __init__(self, device_id, scope_id, cred_type, key_or_cert, storage=None): + def __init__(self, device_id, scope_id, cred_type, key_or_cert, storage=None, max_connection_attempts=5): self._device_id = device_id self._scope_id = scope_id self._cred_type = cred_type @@ -127,6 +124,12 @@ def __init__(self, device_id, scope_id, cred_type, key_or_cert, storage=None): self._global_endpoint = "global.azure-devices-provisioning.net" self._storage = storage self._terminate = False + self._connecting = False + self._max_connection_attempts = max_connection_attempts + self._connection_attempts_count = 0 + + def terminated(self): + return self._terminate def is_connected(self): """ @@ -207,6 +210,8 @@ def _sync_twin(self): has_reported = reported[desired_prop_component][desired_prop_name] except KeyError: pass + if not has_reported: # no reported yet. send desired + patch[desired_prop_component] = desired[desired_prop_component] # desired is more recent if has_reported and 'av' in has_reported and has_reported['av'] < desired_version: patch[desired_prop_component] = desired[desired_prop_component] @@ -216,7 +221,7 @@ def _sync_twin(self): has_reported = reported[desired_prop] except KeyError: pass - if not has_reported: # no reported yet. send desired + if not has_reported: # no reported yet. send desired patch[desired_prop] = desired[desired_prop] # desired is more recent if has_reported and 'av' in has_reported and has_reported['av'] < desired_version: @@ -231,10 +236,10 @@ def _sync_twin(self): class IoTCClient(AbstractClient): def __init__( - self, device_id, scope_id, cred_type, key_or_cert, logger=None, storage=None + self, device_id, scope_id, cred_type, key_or_cert, logger=None, storage=None, max_connection_attempts=5 ): AbstractClient.__init__( - self, device_id, scope_id, cred_type, key_or_cert, storage + self, device_id, scope_id, cred_type, key_or_cert, storage, max_connection_attempts ) if logger is None: self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY) @@ -270,11 +275,12 @@ def _handle_property_ack( self.send_property( { "{}".format(component_name): { + "__t": "c", "{}".format(property_name): { + "value": property_value, "ac": 200, "ad": "Completed", - "av": property_version, - "value": property_value, + "av": property_version } } } @@ -304,7 +310,8 @@ def _update_properties(self, patch, prop_cb): # check if component try: - is_component = str(type(patch[prop])) == "" + is_component = str( + type(patch[prop])) == "" and patch[prop]["__t"] except KeyError: pass if is_component: @@ -319,43 +326,34 @@ def _update_properties(self, patch, prop_cb): self._handle_property_ack( prop_cb, component_prop, - patch[prop][component_prop]["value"], + patch[prop][component_prop], patch["$version"], prop, ) else: self._handle_property_ack( - prop_cb, prop, patch[prop]["value"], patch["$version"] + prop_cb, prop, patch[prop], patch["$version"] ) - def _on_properties(self): - self._logger.debug("Setup properties listener.") - while not self._terminate: - try: - prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] - except KeyError: - time.sleep(0.1) - continue - patch = self._device_client.receive_twin_desired_properties_patch() - self._logger.debug("Received desired properties. {}".format(patch)) - - self._update_properties(patch, prop_cb) - time.sleep(0.1) + def _on_properties(self, patch): + self._logger.debug("Setup properties listener") + try: + prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] + except KeyError: + self._logger.debug("Properties callback not found") + return - self._logger.debug("Stopping properties listener...") + self._update_properties(patch, prop_cb) - def _on_commands(self): + def _on_commands(self, method_request): self._logger.debug("Setup commands listener") - while not self._terminate: - try: - cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] - except KeyError: - time.sleep(0.1) - continue - # Wait for unknown method calls - method_request = self._device_client.receive_method_request() - - command = Command(method_request.name, method_request.payload) + try: + cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] + except KeyError: + self._logger.debug("Command callback not found") + return + command = Command(method_request.name, method_request.payload) + try: command_name_with_components = method_request.name.split("*") if len(command_name_with_components) > 1: @@ -366,56 +364,49 @@ def _on_commands(self): method_request.payload, command_name_with_components[0], ) - - def reply_fn(): - self._device_client.send_method_response( - MethodResponse.create_from_method_request( - method_request, - 200, - {"result": True, "data": "Command received"}, - ) + except: + pass + + def reply_fn(): + self._device_client.send_method_response( + MethodResponse.create_from_method_request( + method_request, + 200, + {"result": True, "data": "Command received"}, ) + ) - command.reply = reply_fn - self._logger.debug( - "Received command {}".format(method_request.name)) + command.reply = reply_fn + self._logger.debug("Received command {}".format(method_request.name)) + cmd_cb(command) - cmd_cb(command) - time.sleep(0.1) - self._logger.debug("Stopping commands listener...") + def _on_enqueued_commands(self, c2d): + self._logger.debug("Setup offline commands listener") + try: + c2d_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] + except KeyError: + self._logger.debug("Command callback not found") + return - def _on_enqueued_commands(self): - self._logger.debug("Setup enqueued commands listener") - while not self._terminate: - try: - enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] - except KeyError: - time.sleep(0.1) - continue - # Wait for unknown method calls - c2d = self._device_client.receive_message() - c2d_name = c2d.custom_properties["method-name"] - command = Command(c2d_name, c2d.data) - try: - command_name_with_components = c2d_name.split("*") - - if len(command_name_with_components) > 1: - # In a component - self._logger.debug("Command in a component") - command = Command( - command_name_with_components[1], - c2d.data, - command_name_with_components[0], - ) - except: - pass + # Wait for unknown method calls + c2d_name = c2d.custom_properties["method-name"] + command = Command(c2d_name, c2d.data) + try: + command_name_with_components = c2d_name.split("*") - self._logger.debug( - "Received offline command {}".format(command.name)) + if len(command_name_with_components) > 1: + # In a component + self._logger.debug("Command in a component") + command = Command( + command_name_with_components[1], + c2d.data, + command_name_with_components[0], + ) + except: + pass - enqueued_cmd_cb(command) - time.sleep(0.1) - self._logger.debug("Stopping enqueued commands listener...") + self._logger.debug("Received offline command {}".format(command.name)) + c2d_cb(command) def _send_message(self, payload, properties): msg = self._prepare_message(payload, properties) @@ -443,13 +434,21 @@ def connect(self, force_dps=False): Connects the device. :raises exception: If connection fails """ + if self._connection_attempts_count > self._max_connection_attempts: # max number of retries. exit + self._terminate = True + self._connecting = False + return self._terminate = False + self._connecting = True + _credentials = None + # search for existing credentials in store if self._storage is not None and force_dps is False: _credentials = self._storage.retrieve() self._logger.debug("Found cached credentials") + # no stored credentials. use dps if _credentials is None: if self._cred_type in ( IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, @@ -551,37 +550,43 @@ def connect(self, force_dps=False): ) self._device_client.connect() self._logger.debug("Device connected") + self._connecting = False self._twin = self._device_client.get_twin() self._logger.debug("Current twin: {}".format(self._twin)) prop_patch = self._sync_twin() self._logger.debug("Properties to patch: {}".format(prop_patch)) if prop_patch is not None: self._update_properties(prop_patch, None) - except: + except: # connection to hub failed. hub can be down or connection string expired. fallback to dps t, v, tb = sys.exc_info() self._logger.info("ERROR: Failed to connect to Hub") - if force_dps is True: + if force_dps is True: # don't fallback to dps as we already using it for connecting sys.exit(1) + self._connection_attempts_count += 1 self.connect(True) # setup listeners - self._prop_thread = threading.Thread(target=self._on_properties) - self._prop_thread.daemon = True - self._prop_thread.start() + self._device_client.on_twin_desired_properties_patch_received = self._on_properties + self._device_client.on_method_request_received = self._on_commands + self._device_client.on_message_received = self._on_enqueued_commands - self._cmd_thread = threading.Thread(target=self._on_commands) - self._cmd_thread.daemon = True - self._cmd_thread.start() - - self._enqueued_cmd_thread = threading.Thread( - target=self._on_enqueued_commands) - self._enqueued_cmd_thread.daemon = True - self._enqueued_cmd_thread.start() + self._conn_thread=threading.Thread(target=self._on_connection_state) + self._conn_thread.daemon=True + self._conn_thread.start() signal.signal(signal.SIGINT, self.disconnect) signal.signal(signal.SIGTERM, self.disconnect) + def _on_connection_state(self): + while not self._terminate: + if not self._connecting and not self.is_connected(): + self._device_client.shutdown() + self._device_client = None + self._connection_attempts_count = 0 + self.connect(True) + time.sleep(1.0) + def disconnect(self, *args): self._logger.info("Received shutdown signal") self._terminate = True diff --git a/src/iotc/aio/__init__.py b/src/iotc/aio/__init__.py index 38c89b9..a225192 100644 --- a/src/iotc/aio/__init__.py +++ b/src/iotc/aio/__init__.py @@ -75,10 +75,10 @@ def set_log_level(self, log_level): class IoTCClient(AbstractClient): def __init__( - self, device_id, scope_id, cred_type, key_or_cert, logger=None, storage=None + self, device_id, scope_id, cred_type, key_or_cert, logger=None, storage=None, max_connection_attempts=5 ): AbstractClient.__init__( - self, device_id, scope_id, cred_type, key_or_cert, storage + self, device_id, scope_id, cred_type, key_or_cert, storage, max_connection_attempts ) if logger is None: self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY) @@ -125,12 +125,13 @@ async def _handle_property_ack( await self.send_property( { "{}".format(component_name): { - "value": { - "{}".format(property_name): property_value - }, - "ac": 200, - "ad": "Completed", - "av": property_version, + "__t": "c", + "{}".format(property_name): { + "value": property_value, + "ac": 200, + "ad": "Completed", + "av": property_version + } } } ) @@ -158,7 +159,8 @@ async def _update_properties(self, patch, prop_cb): continue # check if component try: - is_component = str(type(patch[prop])) == "" and patch[prop]["__t"] + is_component = str( + type(patch[prop])) == "" and patch[prop]["__t"] except KeyError: pass if is_component: @@ -182,108 +184,78 @@ async def _update_properties(self, patch, prop_cb): prop_cb, prop, patch[prop], patch["$version"] ) - async def _on_properties(self): + async def _on_properties(self, patch): await self._logger.debug("Setup properties listener") - while not self._terminate: - try: - prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] - except KeyError: - # await self._logger.debug("Properties callback not found") - await asyncio.sleep(0.1) - continue - try: - patch = ( - await self._device_client.receive_twin_desired_properties_patch() - ) - except asyncio.CancelledError: - return - await self._logger.debug("Received desired properties. {}".format(patch)) - - await self._update_properties(patch, prop_cb) - await asyncio.sleep(0.1) + try: + prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] + except KeyError: + await self._logger.debug("Properties callback not found") + return - await self._logger.debug("Stopping properties listener...") + await self._update_properties(patch, prop_cb) - async def _on_commands(self): + async def _on_commands(self, method_request): await self._logger.debug("Setup commands listener") - while not self._terminate: - try: - cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] - except KeyError: - # await self._logger.debug("Commands callback not found") - await asyncio.sleep(0.1) - continue - # Wait for unknown method calls - try: - method_request = await self._device_client.receive_method_request() - except asyncio.CancelledError: - return - command = Command(method_request.name, method_request.payload) - try: - command_name_with_components = method_request.name.split("*") - - if len(command_name_with_components) > 1: - # In a component - await self._logger.debug("Command in a component") - command = Command( - command_name_with_components[1], - method_request.payload, - command_name_with_components[0], - ) - except: - pass - - async def reply_fn(): - await self._device_client.send_method_response( - MethodResponse.create_from_method_request( - method_request, - 200, - {"result": True, "data": "Command received"}, - ) + try: + cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] + except KeyError: + await self._logger.debug("Command callback not found") + return + command = Command(method_request.name, method_request.payload) + try: + command_name_with_components = method_request.name.split("*") + + if len(command_name_with_components) > 1: + # In a component + await self._logger.debug("Command in a component") + command = Command( + command_name_with_components[1], + method_request.payload, + command_name_with_components[0], ) + except: + pass - command.reply = reply_fn - await self._logger.debug("Received command {}".format(method_request.name)) - - await cmd_cb(command) - await asyncio.sleep(0.1) - await self._logger.debug("Stopping commands listener...") + async def reply_fn(): + await self._device_client.send_method_response( + MethodResponse.create_from_method_request( + method_request, + 200, + {"result": True, "data": "Command received"}, + ) + ) - async def _on_enqueued_commands(self): - await self._logger.debug("Setup enqueued commands listener") - while not self._terminate: - try: - enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] - except KeyError: - await self._logger.debug("Enqueued commands callback not found") - await asyncio.sleep(0.1) - continue - # Wait for unknown method calls - try: - c2d = await self._device_client.receive_message() - except asyncio.CancelledError: - return - c2d_name = c2d.custom_properties["method-name"] - command = Command(c2d_name, c2d.data) - try: - command_name_with_components = c2d_name.split("*") - - if len(command_name_with_components) > 1: - # In a component - await self._logger.debug("Command in a component") - command = Command( - command_name_with_components[1], - c2d.data, - command_name_with_components[0], - ) - except: - pass + command.reply = reply_fn + await self._logger.debug("Received command {}".format(method_request.name)) + await cmd_cb(command) - await self._logger.debug("Received offline command {}".format(command.name)) + async def _on_enqueued_commands(self, c2d): + await self._logger.debug("Setup offline commands listener") + try: + c2d_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] + except KeyError: + await self._logger.debug("Command callback not found") + return + + # Wait for unknown method calls + c2d_name = c2d.custom_properties["method-name"] + command = Command(c2d_name, c2d.data) + try: + command_name_with_components = c2d_name.split("*") + + if len(command_name_with_components) > 1: + # In a component + await self._logger.debug("Command in a component") + command = Command( + command_name_with_components[1], + c2d.data, + command_name_with_components[0], + ) + except: + pass - await enqueued_cmd_cb(command) - await asyncio.sleep(0.1) - await self._logger.debug("Stopping enqueued commands listener...") + await self._logger.debug("Received offline command {}".format(command.name)) + await c2d_cb(command) async def _send_message(self, payload, properties): msg = self._prepare_message(payload, properties) @@ -311,7 +283,14 @@ async def connect(self, force_dps=False): Connects the device. :raises exception: If connection fails """ + + if self._connection_attempts_count > self._max_connection_attempts: # max number of retries. exit + self._terminate = True + self._connecting = False + return + self._terminate = False + self._connecting = True _credentials = None if self._storage is not None and force_dps is False: @@ -391,7 +370,6 @@ async def connect(self, force_dps=False): sys.exit(1) # Connect to iothub try: - print(_credentials.connection_string) if self._cred_type in ( IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, IOTCConnectType.IOTC_CONNECT_SYMM_KEY, @@ -412,39 +390,53 @@ async def connect(self, force_dps=False): device_id=_credentials.device_id, ) await self._device_client.connect() - await self._logger.debug("Device connected") + await self._logger.debug("Device connected to '{}'".format(_credentials.hub_name)) + self._connecting = False self._twin = await self._device_client.get_twin() await self._logger.debug("Current twin: {}".format(self._twin)) twin_patch = self._sync_twin() if twin_patch is not None: await self._update_properties(twin_patch, None) - except Exception as e: + except Exception as e: # connection to hub failed. hub can be down or connection string expired. fallback to dps await self._logger.info("ERROR: Failed to connect to Hub. {}".format(e)) if force_dps is True: sys.exit(1) + self._connection_attempts_count += 1 await self.connect(True) # setup listeners - self._prop_thread = asyncio.create_task(self._on_properties()) - self._cmd_thread = asyncio.create_task(self._on_commands()) - self._enqueued_cmd_thread = asyncio.create_task( - self._on_enqueued_commands()) + self._device_client.on_twin_desired_properties_patch_received = self._on_properties + self._device_client.on_method_request_received = self._on_commands + self._device_client.on_message_received = self._on_enqueued_commands + + if hasattr(self,'_conn_thread') and self._conn_thread is not None: + try: + self._conn_thread.cancel() + await self._conn_thread + except asyncio.CancelledError: + print("Resetting conn_status thread") + self._conn_thread = asyncio.create_task(self._on_connection_state()) + signal.signal(signal.SIGINT, self.raise_graceful_exit) signal.signal(signal.SIGTERM, self.raise_graceful_exit) + async def _on_connection_state(self): + while not self._terminate: + if not self._connecting and not self.is_connected(): + await self._device_client.shutdown() + self._device_client = None + self._connection_attempts_count = 0 + await self.connect(True) + await asyncio.sleep(1.0) + async def disconnect(self): await self._logger.info("Received shutdown signal") - if ( - self._prop_thread is not None - and self._cmd_thread is not None - and self._enqueued_cmd_thread is not None - ): + self._terminate = True + if hasattr(self,'_conn_thread') and self._conn_thread is not None: tasks = asyncio.gather( - self._prop_thread, self._cmd_thread, self._enqueued_cmd_thread + self._conn_thread ) - self._terminate = True try: - tasks.cancel() await tasks except: pass diff --git a/src/iotc/test/async/test_listeners.py b/src/iotc/test/async/test_listeners.py index 0e2b2ff..6c43641 100644 --- a/src/iotc/test/async/test_listeners.py +++ b/src/iotc/test/async/test_listeners.py @@ -1,3 +1,7 @@ +from iotc.test import dummy_storage +from iotc.aio import IoTCClient +from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents, Command +from azure.iot.device import MethodRequest, Message import pytest import asyncio import configparser @@ -10,11 +14,6 @@ if config["TESTS"].getboolean("Local"): sys.path.insert(0, "src") -from azure.iot.device import MethodRequest, Message -from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents, Command -from iotc.aio import IoTCClient -from iotc.test import dummy_storage - DEFAULT_COMPONENT_PROP = {"prop1": "value1", "$version": 1} COMPONENT_PROP = { @@ -35,7 +34,8 @@ DEFAULT_COMMAND = MethodRequest(1, "cmd1", "sample") COMPONENT_COMMAND = MethodRequest(1, "commandComponent*cmd1", "sample") COMPONENT_ENQUEUED = Message("sample_data") -COMPONENT_ENQUEUED.custom_properties = {"method-name": "component*command_name"} +COMPONENT_ENQUEUED.custom_properties = { + "method-name": "component*command_name"} DEFAULT_COMPONENT_ENQUEUED = Message("sample_data") DEFAULT_COMPONENT_ENQUEUED.custom_properties = {"method-name": "command_name"} @@ -78,11 +78,8 @@ async def iotc_client(mocker): async def test_on_properties_triggered(mocker, iotc_client): prop_stub = mocker.AsyncMock() iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - DEFAULT_COMPONENT_PROP - ) await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_twin_desired_properties_patch_received(DEFAULT_COMPONENT_PROP) prop_stub.assert_called_with("prop1", "value1", None) @@ -92,11 +89,8 @@ async def test_on_properties_triggered_with_component(mocker, iotc_client): # set return value, otherwise a check for the function result will execute a mock again prop_stub.return_value = True iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - COMPONENT_PROP - ) await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_twin_desired_properties_patch_received(COMPONENT_PROP) prop_stub.assert_called_with("prop1", "value1", "component1") @@ -106,17 +100,14 @@ async def test_on_properties_triggered_with_complex_component(mocker, iotc_clien # set return value, otherwise a check for the function result will execute a mock again prop_stub.return_value = True iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - COMPLEX_COMPONENT_PROP - ) await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_twin_desired_properties_patch_received(COMPLEX_COMPONENT_PROP) prop_stub.assert_has_calls( [ - mocker.call("prop1", {"item1":"value1"}, "component1"), + mocker.call("prop1", {"item1": "value1"}, "component1"), mocker.call("prop1", "value1", "component2"), mocker.call("prop2", 2, "component2"), - mocker.call("prop2", "value2", None), + mocker.call("prop2", {"item2": "value2"}, None), ] ) @@ -125,9 +116,8 @@ async def test_on_properties_triggered_with_complex_component(mocker, iotc_clien async def test_on_command_triggered(mocker, iotc_client): cmd_stub = mocker.AsyncMock() iotc_client.on(IOTCEvents.IOTC_COMMAND, cmd_stub) - iotc_client._device_client.receive_method_request.return_value = DEFAULT_COMMAND await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_method_request_received(DEFAULT_COMMAND) cmd_stub.assert_called_with(Command("cmd1", "sample", None)) @@ -135,9 +125,8 @@ async def test_on_command_triggered(mocker, iotc_client): async def test_on_command_triggered_with_component(mocker, iotc_client): cmd_stub = mocker.AsyncMock() iotc_client.on(IOTCEvents.IOTC_COMMAND, cmd_stub) - iotc_client._device_client.receive_method_request.return_value = COMPONENT_COMMAND await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_method_request_received(COMPONENT_COMMAND) cmd_stub.assert_called_with(Command("cmd1", "sample", "commandComponent")) @@ -145,9 +134,8 @@ async def test_on_command_triggered_with_component(mocker, iotc_client): async def test_on_enqueued_command_triggered(mocker, iotc_client): cmd_stub = mocker.AsyncMock() iotc_client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, cmd_stub) - iotc_client._device_client.receive_message.return_value = DEFAULT_COMPONENT_ENQUEUED await iotc_client.connect() - await asyncio.sleep(0.1) + await iotc_client._device_client.on_message_received(DEFAULT_COMPONENT_ENQUEUED) cmd_stub.assert_called_with(Command("command_name", "sample_data", None)) @@ -155,7 +143,7 @@ async def test_on_enqueued_command_triggered(mocker, iotc_client): async def test_on_enqueued_command_triggered_with_component(mocker, iotc_client): cmd_stub = mocker.AsyncMock() iotc_client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, cmd_stub) - iotc_client._device_client.receive_message.return_value = COMPONENT_ENQUEUED await iotc_client.connect() - await asyncio.sleep(0.1) - cmd_stub.assert_called_with(Command("command_name", "sample_data", "component")) + await iotc_client._device_client.on_message_received(COMPONENT_ENQUEUED) + cmd_stub.assert_called_with( + Command("command_name", "sample_data", "component")) diff --git a/src/iotc/test/sync/test_listeners.py b/src/iotc/test/sync/test_listeners.py index dd96233..4f41258 100644 --- a/src/iotc/test/sync/test_listeners.py +++ b/src/iotc/test/sync/test_listeners.py @@ -1,3 +1,6 @@ +from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents, IoTCClient, Command +from iotc.test import dummy_storage +from azure.iot.device import MethodRequest, Message import pytest import configparser import os @@ -10,10 +13,6 @@ if config["TESTS"].getboolean("Local"): sys.path.insert(0, "src") -from azure.iot.device import MethodRequest, Message -from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents, IoTCClient,Command -from iotc.test import dummy_storage - DEFAULT_COMPONENT_PROP = {"prop1": {"value": "value1"}, "$version": 1} COMPONENT_PROP = { @@ -21,20 +20,21 @@ "$version": 1, } COMPLEX_COMPONENT_PROP = { - "component1": {"__t": "c", "prop1": {"value": "value1"}}, + "component1": {"__t": "c", "prop1": {"item1": "value1"}}, "component2": { "__t": "c", - "prop1": {"value": "value1"}, - "prop2": {"value": "value2"}, + "prop1": "value1", + "prop2": 2, }, - "prop2": {"value": "value2"}, + "prop2": {"item2": "value2"}, "$version": 1, } DEFAULT_COMMAND = MethodRequest(1, "cmd1", "sample") COMPONENT_COMMAND = MethodRequest(1, "commandComponent*cmd1", "sample") COMPONENT_ENQUEUED = Message("sample_data") -COMPONENT_ENQUEUED.custom_properties = {"method-name": "component*command_name"} +COMPONENT_ENQUEUED.custom_properties = { + "method-name": "component*command_name"} DEFAULT_COMPONENT_ENQUEUED = Message("sample_data") DEFAULT_COMPONENT_ENQUEUED.custom_properties = {"method-name": "command_name"} @@ -49,6 +49,7 @@ def command_equals(self, other): Command.__eq__ = command_equals + @pytest.fixture() def iotc_client(mocker): ProvisioningClient = mocker.patch("iotc.ProvisioningDeviceClient") @@ -67,20 +68,16 @@ def iotc_client(mocker): yield mocked_client try: mocked_client.disconnect() - except asyncio.CancelledError: + except: pass def test_on_properties_triggered(mocker, iotc_client): prop_stub = mocker.MagicMock() iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - DEFAULT_COMPONENT_PROP - ) iotc_client.connect() - time.sleep(0.1) - prop_stub.assert_called_with("prop1", "value1", None) - + iotc_client._device_client.on_twin_desired_properties_patch_received(DEFAULT_COMPONENT_PROP) + prop_stub.assert_called_with("prop1", {"value":"value1"}, None) def test_on_properties_triggered_with_component(mocker, iotc_client): @@ -88,13 +85,9 @@ def test_on_properties_triggered_with_component(mocker, iotc_client): # set return value, otherwise a check for the function result will execute a mock again prop_stub.return_value = True iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - COMPONENT_PROP - ) iotc_client.connect() - time.sleep(0.1) - prop_stub.assert_called_with("prop1", "value1", "component1") - + iotc_client._device_client.on_twin_desired_properties_patch_received(COMPONENT_PROP) + prop_stub.assert_called_with("prop1", {"value": "value1"}, "component1") def test_on_properties_triggered_with_complex_component(mocker, iotc_client): @@ -102,56 +95,46 @@ def test_on_properties_triggered_with_complex_component(mocker, iotc_client): # set return value, otherwise a check for the function result will execute a mock again prop_stub.return_value = True iotc_client.on(IOTCEvents.IOTC_PROPERTIES, prop_stub) - iotc_client._device_client.receive_twin_desired_properties_patch.return_value = ( - COMPLEX_COMPONENT_PROP - ) iotc_client.connect() - time.sleep(0.1) + iotc_client._device_client.on_twin_desired_properties_patch_received(COMPLEX_COMPONENT_PROP) prop_stub.assert_has_calls( [ - mocker.call("prop1", "value1", "component1"), + mocker.call("prop1", {"item1": "value1"}, "component1"), mocker.call("prop1", "value1", "component2"), - mocker.call("prop2", "value2", "component2"), - mocker.call("prop2", "value2", None), - ],any_order=True + mocker.call("prop2", 2, "component2"), + mocker.call("prop2", {"item2": "value2"}, None), + ], any_order=True ) - def test_on_command_triggered(mocker, iotc_client): cmd_stub = mocker.MagicMock() iotc_client.on(IOTCEvents.IOTC_COMMAND, cmd_stub) - iotc_client._device_client.receive_method_request.return_value = DEFAULT_COMMAND iotc_client.connect() - time.sleep(0.1) + iotc_client._device_client.on_method_request_received(DEFAULT_COMMAND) cmd_stub.assert_called_with(Command("cmd1", "sample", None)) - def test_on_command_triggered_with_component(mocker, iotc_client): cmd_stub = mocker.MagicMock() iotc_client.on(IOTCEvents.IOTC_COMMAND, cmd_stub) - iotc_client._device_client.receive_method_request.return_value = COMPONENT_COMMAND iotc_client.connect() - time.sleep(0.1) + iotc_client._device_client.on_method_request_received(COMPONENT_COMMAND) cmd_stub.assert_called_with(Command("cmd1", "sample", "commandComponent")) - def test_on_enqueued_command_triggered(mocker, iotc_client): cmd_stub = mocker.MagicMock() iotc_client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, cmd_stub) - iotc_client._device_client.receive_message.return_value = DEFAULT_COMPONENT_ENQUEUED iotc_client.connect() - time.sleep(0.1) + iotc_client._device_client.on_message_received(DEFAULT_COMPONENT_ENQUEUED) cmd_stub.assert_called_with(Command("command_name", "sample_data", None)) - def test_on_enqueued_command_triggered_with_component(mocker, iotc_client): cmd_stub = mocker.MagicMock() iotc_client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, cmd_stub) - iotc_client._device_client.receive_message.return_value = COMPONENT_ENQUEUED iotc_client.connect() - time.sleep(0.1) - cmd_stub.assert_called_with(Command("command_name", "sample_data", "component")) + iotc_client._device_client.on_message_received(COMPONENT_ENQUEUED) + cmd_stub.assert_called_with( + Command("command_name", "sample_data", "component")) From 6bda818f5a30f79b73fc41a7f0c86ab1d242d2d5 Mon Sep 17 00:00:00 2001 From: lucadruda Date: Fri, 25 Jun 2021 17:58:29 +0200 Subject: [PATCH 3/4] increment version --- setup.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index c098d63..4f86f20 100755 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as fh: long_description = fh.read() -version = "1.1.0" +version = "1.1.1" setuptools.setup( name='iotc', @@ -24,10 +24,8 @@ classifiers=[ 'License :: OSI Approved :: MIT License', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.7+' ], include_package_data=True, install_requires=["azure-iot-device"] From 8ad8d958070edc0232a87b47a48f79d82b9f4578 Mon Sep 17 00:00:00 2001 From: lucadruda Date: Fri, 25 Jun 2021 18:03:32 +0200 Subject: [PATCH 4/4] update readme --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 780355b..b321c9c 100755 --- a/README.md +++ b/README.md @@ -11,8 +11,10 @@ It hides some of the complexities of the official Azure IoT SDK and uses IoT Cen ### Disclaimer -> **This library is experimental and has the purpose of providing an easy to use solution for prototyping and small projects. Although stable and actively maintained, its use in production is discouraged. -> Please refer to official [Azure IoT Python SDK](https://github.com/Azure/azure-iot-sdk-python) when building production products.** +> **This library is experimental and has the purpose of providing an easy to use solution for prototyping and small projects. Its use in production is discouraged. +The library is going to be archived soon so we suggest new developments to start using official Azure IoT SDK.** + +> Please refer to [Azure IoT Python SDK](https://github.com/Azure/azure-iot-sdk-python) when building production products.** _If you're looking for the v0.x.x client library, it is now preserved [here](https://github.com/obastemur/iot_client/tree/master/python). Latest version on pypi is 0.3.9_