In [1]:
from confluent_kafka import Consumer

KAFKA_CONF = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "my_group",
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(KAFKA_CONF)

In [2]:
TOPIC_NAME = "top-events"

In [3]:
consumer.subscribe(["top-events"])

In [4]:
import ast
from datetime import datetime
import pytz

utc_timezone = pytz.timezone('UTC')
jst_timezone = pytz.timezone('Asia/Tokyo')

def get_cpu():
    while True:
        msg = consumer.poll(0.01)
        if msg is None:
            continue

        utc_time = utc_timezone.localize(datetime.utcfromtimestamp(msg.timestamp()[1]/1000))
        jst_time = utc_time.astimezone(jst_timezone)

        yield jst_time, ast.literal_eval(msg.value().decode('utf-8'))

In [None]:
cpu_gen = get_cpu()
print(next(cpu_gen))

In [None]:
from jupyter_dash import JupyterDash
from dash import dcc
from dash import html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from collections import deque

app = JupyterDash(__name__)

app.layout = html.Div(
    [
        dcc.Graph(id='live-graph', animate=True),
        dcc.Interval(
            id='graph-update',
            interval=1 * 1000,
        ),
    ]
)

cpu_gen = get_cpu()
print(next(cpu_gen) )

X = deque(maxlen=15)
Ys = [deque(maxlen=15) for _ in range(16)]


@app.callback(Output('live-graph', 'figure'),
              [Input('graph-update', 'n_intervals')])
def update_graph_scatter(input_data):
    x, y = next(cpu_gen)
    X.append(x)

    data = []
    for i in range(16):
        Ys[i].append(y[i])
        data.append(go.Scatter(
            x=list(X),
            y=list(Ys[i]),
            name=f'CPU-{i}',
            mode='lines+markers'
        ))

    return {
        'data': data,
        'layout': go.Layout(
            xaxis=dict(range=[min(X), max(X)], title='Time'),
            yaxis=dict(range=[0, 100], title="CPU"),
        )
    }


app.run_server(mode='inline')

