In [0]:
%run ./azurehelper/azurehelper

In [0]:
# dbutils.library.restartPython()

In [0]:
import json
import datetime
import uuid
import logging
import sys

from azure.iot.hub import IoTHubRegistryManager
from azure.eventhub import EventData
from azure.eventhub import EventHubConsumerClient

from azure.eventhub.aio._eventprocessor.partition_context import PartitionContext
from azure.identity import ChainedTokenCredential
from azure.storage.blob import BlobSasPermissions, BlobServiceClient, generate_blob_sas


class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat() + "Z"


class DeliveryFinished(Exception):  # noqa: N818
    # the class is used as an event to stop messaging even if it is failed or successful
    def __init__(self, message):
        super().__init__(message)


class AssetManager:

    def __init__(
        self,
        device_id: str,
        blob_name: str,
        azure_helper: AzureHelper,
        package_name: str,
        package_version: str,
    ):
        """
        Constructs all the necessary attributes for the azuredelivery object.
        Parameters:
                device_id : str
                    name of the registered IoT Hub device (= AI Asset Manager Azure ws)
                blob_name : str
                    Blob name under which the package to be delivered is found
                azure_credential: azure.identity.ChainedTokenCredential
                    Azure credential required to generate Shared Access Signature link
                    pointing to the package being delivered
        """
        self.device_id = device_id
        self.blob_name = blob_name
        self.azure_helper = azure_helper

        self.registry_manager = IoTHubRegistryManager.from_connection_string(
            self.azure_helper.iot_hub_con_str
        )

        # a dedicated consumer group can be created on IoTHub and then assigned here
        # that can be useful when the eventhub is already used on other purpose
        # self.consumer_group = ("$Default", "modelmanager", "mlops_target")  
        self.consumer_group = ("$Default", "assetmanager", "mlops_target")  
        self.event_hub_client = EventHubConsumerClient.from_connection_string(
            self.azure_helper.iot_event_hub_con_str, consumer_group=self.consumer_group[0]
        )

        self.state = "NewDelivery"  # new delivery starts by sending NewDelivery message from Cloud to AssetManager (C2D)
        self.time_out = 60  # maximum time in seconds waiting between two messages from Asset Manager

        # self.package_id = package_id or str(uuid.uuid4())  # an older version of pipeline package might not have package id
        self.package_id = str(uuid.uuid4())  # an older version of pipeline package might not have package id
        self.package_name = package_name
        self.package_version = package_version

        self.delivery_id = str(uuid.uuid4())  # unique id for the current delivery communication
        self.project_id = str(uuid.uuid4())
        self.event_id = str(uuid.uuid4())

    def on_event(self, partition_context: PartitionContext, event: EventData):
        """
        Callback function triggered by an EventHub message
        - Timeout cause a None event, and the messaging workflow can be stopped
        - Only events with the initiated delivery id will be processed, others will be ignored
        - In case the delivery state is changed to 'Success' or 'Error', the messaging is stopped
        """
        if event is None:
            print("Actual delivery has been timed out")
            self.event_hub_client.close()
            raise TimeoutError("Actual delivery has been timed out")

        print(
            f"event received on partition '{partition_context.partition_id}', consumer group '{partition_context.consumer_group}'"
        )

        message_body = self._process_event_hub_event(partition_context, event)
        print(f"Actual delivery state: {self.state}")

        if self.state in ["Success", "Error"]:
            self.event_hub_client.close()
            raise DeliveryFinished(
                f"Actual delivery has been finished with state '{self.state}' and message {message_body}"
            )

    def start_event_hub_loop(self):
        """
        Initiates and manages the communication with AI Asset Manager
        by acting as a IoT Hub and IoT Event Hub client.
        The callback function "on_event" will be triggered when message event arrives.
        Events will be processed with timestamp after `start_time`
        """
        try:
            start_time = datetime.datetime.now()
            self._initiate_delivery()
            print("initiative message has been sent to the device")

            self.event_hub_client.receive(
                on_event=self.on_event,
                starting_position=start_time,
                max_wait_time=self.time_out,
            )

        except DeliveryFinished as finished:
            print(f"Actual delivery has been finished with state {finished}")

        except Exception as err:
            self.event_hub_client.close()
            print(f"Unknown error occurred during package delivery: {err}")

        finally:
            print("IoT Hub event loop thread exited.")

    def _process_event_hub_event(
        self, partition_context: PartitionContext, event: EventData
    ):
        """
        Processes events arriving from Event Hub (or an Evnet Hub compatible endpoint)
        The events are filtered by deliveryId - only messages which match the class attribute deliveryId are considered.
        """
        body = event.body_as_json(encoding="UTF-8")

        application_properties = {
            k.decode("ascii"): v.decode("ascii") for k, v in event.properties.items()
        }

        # check if message has same 'deliveryId', otherwise skip message
        if "deliveryId" in application_properties:
            print(f"Event body: {body}")
            print(
                f"Event application properties: {application_properties} from group {partition_context.consumer_group}"
            )
            if self.delivery_id != application_properties["deliveryId"]:
                print(
                    f"Event does not match deliveryId {self.delivery_id}. Received deliveryId {application_properties['deliveryId']}. Skipped."
                )
                return body
        else:
            print(
                "Event message has no 'deliveryId' in application properties. Skipped."
            )
            return body

        # check message purpose e.g. 'kind' (action required) or 'state' update message
        if "type" in application_properties:
            if application_properties["type"] == "event":
                if application_properties["kind"] == "presignedUrlRequest":
                    self.state = "PresignedUrlRequested"

                    # presigned_link = self._create_presigned_link()
                    print('creating presigned link...')
                    presigned_link = self.azure_helper.generate_sas_token(blob_name)
                    self._send_presigned_link(presigned_link=presigned_link)

                    self.state = "PresignedUrlCreated"
            elif application_properties["type"] == "state":
                self.state = body["state"]
                print(f"Delivery state has been changed: {self.state}")

        return body

    def _initiate_delivery(self):
        """
        Sends the first c2d message to AI Asset Manager that a new edge config package is ready
        to be downloaded.
        """
        print(
            f"Sending c2d message to {self.device_id} in order to initiate new delivery {self.delivery_id} ..."
        )
        data = {
            "packageId": self.package_id,
            "packageVersion": self.package_version,
            "eventId": self.event_id,
            "timestamp": datetime.datetime.now(),
        }

        self._send_message(data=data, type_value="event", kind_value="newDelivery")

    def _send_presigned_link(self, presigned_link):
        """
        Sends presigned link as c2d message to AI Asset Manager.
        Parameter:
            presigned_link (str): presigned link as url with SAS token
        """
        print(f"Presigned link created for delivery {self.delivery_id}")
        
        data = {
            "timestamp": datetime.datetime.now(),
            "attributes": {"presignedUrl": presigned_link},
        }

        self._send_message(data=data, type_value="data", kind_value="presignedUrlResponse")

    def _send_message(self, data, type_value, kind_value):

        message_body = json.dumps(data, indent=4, cls=DateTimeEncoder)
        print(f"message_body: {message_body}")

        props = {
            'contentType':  'application/json',
            'type': type_value,
            'kind': kind_value,
            'deliveryId': self.delivery_id,
            'projectId': self.project_id
        }

        try:
            self.registry_manager.send_c2d_message(
                self.device_id, message_body, properties=props
            )

            print(f"message sent. device_id: {self.device_id}, props: {props}")
                
        except Exception as exp:
            print(f"An error occurred while sending message: {exp}")
            raise exp


    
model_name = "aisdk_model_state_identifier-edge"
model_version = "1"

azure_helper = AzureHelper()
blob_name = azure_helper.get_blob_name(model_name, model_version)

# azure_helper.generate_sas_token(blob_name)

asset_manager = AssetManager(
    "device3",
    blob_name,
    azure_helper,
    model_name,
    model_version
)

try:
    asset_manager.start_event_hub_loop()

    # Check the result of the delivery
    if asset_manager.state == "Success":
        print(f"Successful delivery '{asset_manager.delivery_id}' to '{asset_manager.device_id}'.")
    else:
        raise ValueError(
            f"Delivery '{asset_manager.delivery_id}' to '{asset_manager.device_id}' failed with "
            f"state '{asset_manager.state}'."
        )
except Exception as mme:
    raise ValueError(
        f"""Delivery '{asset_manager.delivery_id}' to '{asset_manager.device_id}' \
            failed with state '{asset_manager.state}'.
            and error: {mme}
            """
    )