In [None]:
## Common utils
import numpy as np
import json
import time
import pandas as pd
import datetime

# bokeh plotting utils
from bokeh.models.sources import ColumnDataSource
from bokeh.models import CDSView, BooleanFilter
from bokeh.plotting import figure
from bokeh.io import output_notebook, show, push_notebook

# kafka utils
from confluent_kafka import Consumer, KafkaError

# show bokeh output in notebook
output_notebook()

In [4]:
## connect to kafka topic
c = Consumer({'bootstrap.servers': '192.168.0.44:9092', 'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'latest'}
             })
c.subscribe(['predict'])

In [None]:
def now():
    return pd.to_datetime(datetime.datetime.now())

# number of points to show
n_show = 100

## define figure aesthetics
my_figure = figure(plot_width=900,
                   plot_height=400,
                   x_axis_type='datetime',
                   tools="xpan,xwheel_zoom,xbox_zoom,reset")

# set streaming data source for plot
data = ColumnDataSource(data=dict(ts=[],
                                  value=[],
                                  lower_limit=[],
                                  upper_limit=[],
                                  is_anomaly=[]))

# Define lines to be plotted
line_value = my_figure.line("ts", "value", source=data, color='navy', alpha=0.5)
line_lower = my_figure.line("ts", "lower_limit", source=data, color='gray', alpha=0.5)
line_upper = my_figure.line("ts", "upper_limit", source=data, color='gray', alpha=0.5)
point_anomaly = my_figure.circle("ts", "is_anomaly", source=data, color='firebrick')

handle = show(my_figure, notebook_handle=True)

# Declare dictionary to be updated with the same structure than data
new_data=dict(ts=[],
              value=[],
              lower_limit=[],
              upper_limit=[],
              is_anomaly=[])

# Initialize list to containt new data as it arrives
ts = []
value = []
lower_limit = []
upper_limit = []
is_anomaly = []

# loop for kafka data collection and injection to plot
running = True
while running:
    msg = c.poll()
    if not msg.error():
        response = json.loads(msg.value().decode('utf-8'))
        ## update values ts, value
        
        ts.append(now())
        value.append(response.get('value'))
        lower_limit.append(response.get('results_anomaly').get('value_lower_limit'))
        upper_limit.append(response.get('results_anomaly').get('value_upper_limit'))
        tmp = response.get('results_anomaly').get('is_anomaly') * response.get('value') #hack to adapt value scale
        is_anomaly.append(tmp)
        
        # prevent filling ram
        new_data['ts'] = ts = ts[-n_show:]
        new_data['value'] = value = value[-n_show:]
        new_data['lower_limit'] = lower_limit = lower_limit[-n_show:]
        new_data['upper_limit'] = upper_limit = upper_limit[-n_show:]
        new_data['is_anomaly'] = is_anomaly = is_anomaly[-n_show:]
        
        ### plot
        data.stream(new_data, n_show)
        push_notebook(handle=handle)
    
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
c.close()