# Obtaining IoT data 

## Summary 

This notebook explains how to obtain telemetry data coming from IoT devices that arrives trough a gateway enabled edgeHub.

## Description

The purpose of this notebook is to explain and guide the reader onto how to obtain telemetry data generated from IoT devices within the DSVM IoT extension. In order to achieve this, we propose the following architecture.

![Architecture](https://raw.githubusercontent.com/Azure/DataScienceVM/master/Extensions/IoT/Img/Architecture.PNG)


An IoT device sends telemetry data into a gateway enabled EdgeHub, then the route is splitted, the data goes upstream to IoT Hub and it also goes into a 'Extractor Module' which it's only job is to read and dump incoming telemetry data into the host.


## Requirements

* An Iot Hub
* A gateway enabled Iot Edge runtime, please refer to the 'Setting up your Edge device' notebook.


## Documentation

* Iot Edge modules: https://docs.microsoft.com/en-us/azure/iot-edge/module-composition![image.png](attachment:image.png)







## Step 1: Building the 'Extractor Module'

In order to read incoming telemetry data we need to route that data into a module that can unpack the data and store it locally on the host, the following code shows what's running inside the container:


In [None]:
import json
import random
import time
import sys
import iothub_client
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
import os

# messageTimeout - the maximum time in milliseconds until a message times out.
# The timeout period starts at IoTHubModuleClient.send_event_async.
# By default, messages do not expire.
MESSAGE_TIMEOUT = 10000
# global counters
RECEIVE_CALLBACKS = 0
SEND_CALLBACKS = 0
# Choose HTTP, AMQP or MQTT as transport protocol.  Currently only MQTT is supported.
PROTOCOL = IoTHubTransportProvider.MQTT

# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
    global SEND_CALLBACKS
    print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
    map_properties = message.properties()
    key_value_pair = map_properties.get_internals()
    print ( "    Properties: %s" % key_value_pair )
    SEND_CALLBACKS += 1
    print ( "    Total calls confirmed: %d" % SEND_CALLBACKS )


# receive_message_callback is invoked when an incoming message arrives on the specified 
# input queue (in the case of this sample, "input1").  Because this is a filter module, 
# we forward this message to the "output1" queue.
def receive_message_callback(message, hubManager):
    global RECEIVE_CALLBACKS
    message_buffer = message.get_bytearray()
    size = len(message_buffer)
    message_text = message_buffer[:size].decode('utf-8')
    print ( "    Data: <<<%s>>> & Size=%d" % (message_text, size) )
    map_properties = message.properties()
    key_value_pair = map_properties.get_internals()
    print ( "    Properties: %s" % key_value_pair )
    RECEIVE_CALLBACKS += 1
    print ( "    Total calls received: %d" % RECEIVE_CALLBACKS )
    data = json.loads(message_text)
    with open("/myvol/data.json", "a") as myfile:
        myfile.write(json.dumps(data))
        myfile.write('\n')
    return IoTHubMessageDispositionResult.ACCEPTED


class HubManager(object):

    def __init__(
            self,
            protocol=IoTHubTransportProvider.MQTT):
        self.client_protocol = protocol
        self.client = IoTHubModuleClient()
        self.client.create_from_environment(protocol)

        # set the time until a message times out
        self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
        
        # sets the callback when a message arrives on "input1" queue.  Messages sent to 
        # other inputs or to the default will be silently discarded.
        self.client.set_message_callback("input1", receive_message_callback, self)

    # Forwards the message received onto the next stage in the process.
    def forward_event_to_output(self, outputQueueName, event, send_context):
        self.client.send_event_async(
            outputQueueName, event, send_confirmation_callback, send_context)

def main(protocol):
    try:
        print ( "\nPython %s\n" % sys.version )
        print ( "EdgeHub Extractor for Python" )
        hub_manager = HubManager(protocol)
        print ( "Starting the Extractor using protocol %s..." % hub_manager.client_protocol )
        while True:
            time.sleep(1)

    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "Extractor  stopped" )

if __name__ == '__main__':
    main(PROTOCOL)

Next is the docker file used to build the container, notice that the data that arrives to the module it's going to be stored in a docker volume:

In [None]:
FROM ubuntu:xenial

WORKDIR /app

RUN apt-get update && \
    apt-get install -y --no-install-recommends libcurl4-openssl-dev python-pip libboost-python-dev && \
    rm -rf /var/lib/apt/lists/* 

COPY requirements.txt ./
RUN pip install -r requirements.txt

COPY . .
RUN mkdir /myvol
RUN touch /myvol/data.json
RUN chmod 777 /myvol/data.json
VOLUME /myvol
RUN useradd -ms /bin/bash moduleuser
USER moduleuser

CMD [ "python", "-u", "./main.py" ]

## Step 2: Using the module

There are currently two options for using the 'Extractor Module':

### Building it by yourself:

In case that you want to build the container yourself and deploy it to a repository of your own we have included the required files for the module under 'home/$USER/IoT/IotEdge/gateway/extractor_module', the directory includes the before mentioned files:

* main.py
* Dockerfile
* Requirements.txt

you can run the command:


In [None]:
%%bash
docker build -t 'name of container' /home/$USER/IoT/IotEdge/gateway/extractor_module

in order to create it and publish it in the repo of your choice.


### Using the one in docker-hub:

We have already a module created and hosted in docker-hub that's ready to use.

https://hub.docker.com/r/tdavega/extractor_module/


## Step 3: Deployment

Once you have created your module and you have published it to your repo (or used the one we have provided), what remains is to deploy the 'architecture' of modules into the Iot Edge runtime of the vm.


### 3.1: Go into your Iot Hub

Go to the azure portal and then to the Iot Hub that you have paired, and in there you should see the name of your VM device, click on it.

![EdgeDevice](https://raw.githubusercontent.com/Azure/DataScienceVM/master/Extensions/IoT/Img/EdgeDevice.PNG)

### 3.2: Set modules

In the next screen that appears click on set modules.

![SetModules](https://raw.githubusercontent.com/Azure/DataScienceVM/master/Extensions/IoT/Img/SetModules.PNG)

### 3.3: Add custom module

In the next screen click on add module and select Iot Edge module, fill fields after tgat you should see your custom module in the modules list.

![AddModule](https://raw.githubusercontent.com/Azure/DataScienceVM/master/Extensions/IoT/Img/AddModule.PNG)

NOTE: If you're using azure container registry you need to fill the container registry settings fields just above the button for adding modules.

Once you're done, click on next.

### 3.4: Routing

In order to route the incoming telemetry messages from Edge Hub to our module and also upstream (to Iot Hub) we are going to add the following routes:


In [None]:
{
  "routes": {
    "sensorToExtractormodule": "FROM /messages/* INTO BrokeredEndpoint(\"/modules/extractormodule/inputs/input1\")",
    "IncomingDataToIoTHub": "FROM /messages/* INTO $upstream"
  }
}


the first route will send incoming sensor data into our custom module and the second one will also send a copy to IoT Hub.

Note: You need to replace 'extractormodule' by the name you have given to your custom module.

You should see something like this:

![Routes](https://raw.githubusercontent.com/Azure/DataScienceVM/master/Extensions/IoT/Img/Routes.PNG)


Click on next and in the next screen click submit.


if everything went alright you should see your modules running by using the following commands:

In [None]:
%%bash

iotedge list 

Now that everything is set up you can start sending telemetry data into your vm.

Note: Remember that you need to install your certificates in your downstream device.