In [None]:
import os
import numpy as np
import sys
import panel as pn
import time
import json
import logging
import threading
import asyncio

from azure.eventhub import EventHubConsumerClient
from bokeh.io import curdoc
from bokeh.models import ColumnDataSource, DatetimeTickFormatter, Select
from bokeh.layouts import layout
from bokeh.plotting import figure
from datetime import datetime
from math import radians
from tornado import gen
from functools import partial

pn.extension()
rng = np.random.default_rng(12345)

RECEIVED_MESSAGES = 0
connection_str = os.environ.get("IOTHUB_CONNECTION_STRING")
consumer_group = os.environ.get("IOTHUB_CONSUMER_GROUP")
eventhub_name= os.environ.get("IOTHUB_COMPATIBLE_NAME")

In [None]:
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.WARNING)

In [None]:
# This will be the container that will hold all the data
source1 = ColumnDataSource(dict(
    timestamp=[], temperature=[], humidity=[]
))

In [None]:
doc = curdoc()

In [None]:
date_pattern = "%Y-%m-%d\n%H:%M:%S"

# Create a figure object with proper time format
def get_line(col, source):
    p = figure(width=300, height=350, x_axis_type="datetime")
    p.line(x='timestamp', y=col, alpha=0.2, line_width=3, color='navy', source=source)
    
    p.xaxis.formatter = DatetimeTickFormatter(
        seconds=date_pattern,
        minsec=date_pattern,
        minutes=date_pattern,
        hourmin=date_pattern,
        hours=date_pattern,
        days=date_pattern,
        months=date_pattern,
        years=date_pattern
    )
    p.xaxis.major_label_orientation=radians(80)
    
    p.xaxis.axis_label = "Date"
    p.yaxis.axis_label = "Value"
    
    return p

# Create a graph for each sensor value (x, y, z)
p = {'temperature': get_line('temperature', source1),
      'humidity': get_line('humidity', source1),
    }

In [None]:
@gen.coroutine
def update_stream(stream_point):
    source1.stream(stream_point, rollover=10)


# We produce new data here, and use the selector to discriminate
def message_handler(partition_context, event):
    global RECEIVED_MESSAGES
    global source1
    global p 
    global client
    
    RECEIVED_MESSAGES += 1

    if not threading.main_thread().is_alive():
        client.close()
        sys.exit(0)

    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

    message = event.body_as_json()

    print("\nMessage received:")
    print( "    Data: <<{}>>".format(message) )

    #data = json.loads(message)
    
    stream_point = {
        'timestamp': [datetime.strptime(message['timestamp'], date_pattern)],
        'temperature': [message['temperature']],
        'humidity': [message['humidity']]
    }
    
    doc.add_next_tick_callback(partial(update_stream, stream_point=stream_point))
    
    time.sleep(1)

def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))
        
        
# Callback function for when selector is changed. Restarts the streaming
def selector_update(attrname, old, new):
    source1.data = dict(timestamp=[], 
                        temperature=[], 
                        humidity=[], 
                        )

    p['temperature'].title.text = f"Streaming {select.value} data"
    p['humidity'].title.text = f"Streaming {select.value} data"
    
# Selection widget
options = [("D1", "Device 1"), ("D2", "Device 2"), ("D3", "Device 3")]
select = Select(title="Devices", value="D1", options=options)
select.on_change("value", selector_update)

In [None]:
bootstrap = pn.template.BootstrapTemplate(title="Streaming predictive maintenance")
bootstrap.sidebar.append(select)

bootstrap.main.append(
    pn.Row(
        pn.Card(p['temperature']),
        pn.Card(p['humidity'])
    )
)

bootstrap.servable()

In [None]:
client = EventHubConsumerClient.from_connection_string(connection_str, 
                                                    consumer_group, 
                                                  eventhub_name=eventhub_name)
def hub_task():
    print("Initializing message reception")
    with client:
        client.receive(
            on_event=message_handler,
            on_error=on_error,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )

In [None]:
# Watcher that will shutdown the client before closing the application
def client_watcher():
    while threading.main_thread().is_alive():
        time.sleep(0.1)
    else:
        print("Shutting down Event Hub client")
        client.close()
        
watcher_thread = threading.Thread(target=client_watcher)
data_thread = threading.Thread(target=hub_task)
data_thread.start()
watcher_thread.start()