In [None]:
!pip install streamsx

In [None]:
import sys
sys.path.append("../src/")

from streamsx import rest
from streamsx.topology import schema
from streamsx.topology.topology import Topology
from streamsx.topology.context import ConfigParams, submit
import json
import time
from obsdemo.windows import TumblingWindow
from obsdemo.utils import *

# Import Observation
from streamsx_health.ingest.ObservationTypeResolver import *

topic = 'ingest-vines'
window_length=10
signal_label='MDC_ECG_ELEC_POTL_AVF'

# Create Topology and read from data source
topo = Topology("observation_visualization")
patientData = topo.subscribe(topic, schema.CommonSchema.Json)

# Create new data stream containing only the specificed signal
ecg = patientData.filter(ObservationTypeFilter(signal_label))

# Aggregate the last `window_length` tuples into a list
aggStream = ecg.transform(TumblingWindow(length=window_length))

# transform the list into a format that is easy to visualize
vizStream = aggStream.transform(to_viz_obj)

# Output the data to a view
data_view = vizStream.view(name="ecg_view")

# Submit as stand-alone
sc = rest.StreamsConnection(username='streamsadmin', password='passw0rd')
sc.session.verify=False
submit("DISTRIBUTED", topo, config={ConfigParams.STREAMS_CONNECTION: sc})

In [None]:
# hides warnings
import warnings
warnings.filterwarnings('ignore')

from obsdemo.medgraphs import *

## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport obsdemo.utils
%aimport obsdemo.medgraphs
%autoreload 1

graphs = []
ecg_graph = ECGGraph(signal_label=signal_label, title='ECG (' + signal_label + ')', plot_width=600, min_range=-0.5, max_range=2.0)
graphs.append(ecg_graph)

def data_collector(view, graphs):
    for d in iter(view.get, None):
        for g in graphs:
            g.add(d)

from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, data_view.start_data_fetch(), graphs)

In [None]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox

## display graphs
show(ecg_graph.get_figure(), notebook_handle=True)

cnt = 0
while True:
    ## update graphs
    for g in graphs:
        g.update()

    ## update notebook 
    cnt += 1
    if cnt % 5 == 0:
        push_notebook() ## refresh the graphs
        cnt = 0
    time.sleep(0.010)