In [None]:
import ipywidgets as widgets
import ipydatetime
from IPython.display import display
import plotly.graph_objects as go
import asyncio
import pandas as pd
import numpy as np
import datetime
from dateutil import tz
import pytz

import stream.metricspandas as metricpandas
import stream.stream as stream

SERVER="nats://dbs001.edgefarm.dev:4222"
EXPORT_SUBJECT = "agilis-mc-1.EXPORT.metrics"
STREAM_NAME = "bogie-network_export-stream-aggregate"
NATS_CREDS = "/home/klaus/work/ops.edgefarm-deployments/overlays/edgefarm.dev/dbs001/source/nsc/nkeys/creds/edgefarm_operator/acc3/user.creds"

In [None]:

async def ns_from_time(t):
    #print("ns_from_time %s" % t)
    ns = await stream.NatsStream.from_start_time(
        SERVER, NATS_CREDS, STREAM_NAME, EXPORT_SUBJECT, t
    )
    return ns

async def nats_stream_fetch(ns, end_time):
    end_time = end_time.replace(tzinfo=tz.tzlocal())
    #print("nats_stream_fetch until %s\n" % end_time)
    timeout = 2.0
    df = pd.DataFrame()
    while True:
        msg = await ns.next_msg(timeout=timeout)
        if msg is None:
            #print("msg is None")
            break
        await ns.ack(msg)
        df2 = metricpandas.metrics_nats_to_pandas(msg)

        ts = df2["nats_rx_time"].iloc[0].replace(tzinfo=tz.tzlocal())
        #ts = df2["nats_rx_time"].iloc[0]
        if ts > end_time:
            break
        #print("loaded time %s seq %d" % (df2.iloc[0]["nats_rx_time"], df2.iloc[0]["seq"]  ))
        df = pd.concat([df, df2], axis=0)
        timeout = 0.5
    return df



In [None]:
class DemoUI:
    def __init__(self, queue):
        self.queue = queue
        
        self.start_date_picker = datetime_picker = ipydatetime.DatetimePicker()
        self.time_range_picker = widgets.Dropdown(
            options=[('1 min', 60), ('5 min', 300), ('15 min', 900), ('1 hour', 3600), ('1 day', 86400)],
            value=300,
            description='Time range',
            disabled=False,
        )
        self.busy_indicator = widgets.Text(
            disabled=True
        )
        self.control_box = widgets.Box([self.start_date_picker, self.time_range_picker, self.busy_indicator])

        self.accel_wf_fig = go.FigureWidget()
        for i in range(3):
            self.accel_wf_fig.add_trace(go.Scatter(x=[], y=[], name="sensor%d" % i))


        self.start_date_picker.observe(self.handle_start_date_change, names='value')
        self.time_range_picker.observe(self.handle_time_range_change, names='value')

        vbox = widgets.VBox([self.control_box, self.accel_wf_fig])

        self.df = pd.DataFrame()

        self.draw()
        display(vbox)


    def draw_accel(self):
        df = self.df
        fig = self.accel_wf_fig
        
        #fig.update_layout({"title": f"Dataset from {df.iloc[0]['ts']} seq {df.iloc[0]['seq']}"})
        fig.update_layout({"title": "Accelerometer"})

        for i in range(3):
            fig.data[i].x = df["ts"]
        
        fig.data[0].y = df["accel_x_rms"]
        fig.data[1].y = df["accel_y_rms"]
        fig.data[2].y = df["accel_z_rms"]


    def draw(self):
        """
        draw dataframe row at idx
        """
        df = self.df
        if df.shape[0] == 0:
            self.busy_indicator.value = "No DATA!"
            return
        self.busy_indicator.value = f"Dataset starting {df.iloc[0]['ts']}"
        print("draw")
        self.draw_accel()

    async def load_from_time(self, time, time_range):
        self.busy_indicator.value = "Loading..."
        ns = await ns_from_time(time)
        self.df = await nats_stream_fetch(ns, time + time_range)
        self.draw()


    def handle_start_date_change(self, change):
        self.queue.put_nowait({"type": "start_date", "value": change.new})

    def handle_time_range_change(self, change):
        self.queue.put_nowait({"type": "time_range", "value": change.new})

    def time_range_seconds(self):
        return datetime.timedelta(seconds=self.time_range_picker.value)


async def loader(queue, ui):
    #ui.start_date_picker.value = datetime.datetime.now() - datetime.timedelta(hours=1)
    ui.time_range_picker.value = 300

    time_range = ui.time_range_seconds()
    start_time = datetime.datetime.now() - datetime.timedelta(hours=1)
    start_time = start_time.replace(tzinfo=tz.tzlocal())
    await ui.load_from_time(start_time, time_range)

    while True:
        command = await queue.get()
        queue.task_done()
        if command["type"] == "start_date":
            start_time = command["value"]
            await ui.load_from_time(start_time, time_range)
        elif command["type"] == "time_range":
            time_range = ui.time_range_seconds()
            await ui.load_from_time(start_time, time_range)
        else:
            print("unknown command")

async def main(queue, ui):

    # fire up the both producers and consumers
    asyncio.create_task(loader(queue, ui))

queue = asyncio.Queue()
ui = DemoUI(queue)


#asyncio.run()
await main(queue, ui)


In [1]:
from ipyleaflet import Map

lat, lon = 18, 95

m = Map(center=(lat,lon), zoom=3)

m

Map(center=[18, 95], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title', 'zoom_out_tex…