# Real-time Patient Montioring with Streams and BioPy



This notebook shows how to use Python to analyze medical data in real time using Streams and existing Python modules like BioSPPY and SciPy.

It shows how the Streams health toolkit makes it easy for clinicians to get started developing applications to monitor patients in real time. 

<img src="https://raw.githubusercontent.com/IBMStreams/streamsx.health/develop/samples/HealthcareJupyterDemo/images/notebook-viz.gif" alt="screenshot of running visualization"/>
    <p style="text-align: center; font-size: 10px;"><em>Image showing the visualization using data from the Streams application.</em></p>

## Streams Health Toolkit Overview

The toolkit includes microservices to:
- Ingest health data from popular devices and database, like Physionet.
- Perform basic analysis: Early Warning Score (EWS) computation, ECG, etc.

 
So for example, this notebook is going to analyze ECG signals to compute Heart Rate Variability using scipy. 

So instead of spending time writing a connector to the Physionet database to get the ECG data,  we can just use the Physionet microservice  from the health toolkit that gives us the data we need to start developing our application.


The following diagram outlines the architecture of this demo. The **Ingest** part is handled by launching the Physionet ingest service, the notebook handles the **Analyze** portion.

<img height="700" width="900" src='https://github.com/IBMStreams/streamsx.health/blob/develop/samples/HealthcareJupyterDemo/images/architecture_diagram.jpg?raw=true' alt="Demo Architecture" title="Demo Architecture"></img>




# Prerequisites

This notebook can be used as-is from within an IBM Cloud Pak for Data project. 

If you are not running this notebook from within IBM Cloud Pak for Data, [follow these steps to make sure you have installed all the prerequisites](https://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide-2/).

<a name="setup"></a>

# 1. Set up a connection to the Streams instance


To submit the application for execution, you have to connect to the Streams instance. The information required to connect to the instance depends on the target installation of Streams. 

Choose the option that matches your development environment.

- **Option 1**: [I'm running the notebook from an IBM Cloud for Data project](#cpd)
- **Option 2**: [I'm using IBM Watson Studio, Jupyter Notebooks, or any other development environment](#notcpd)


<a name="cpd"></a>
<a name="cpd"></a>
### Option 1: Connect to a Streams instance from an IBM Cloud Pak for Data  project

If you are not running the notebook from a Cloud Pak for Data project, [skip to the next section](#notcpd). 

In order to submit a Streams application you need to provide the name of the Streams instance.

1. From the navigation menu, click **Services > Instances**. This will take you to a list of instances.
2. Find your streams instance and update the value of `streams_instance_name` in the cell below according to your Streams instance name.
3. Run the cell and skip to section 1.2

The cell below defines a function called `submit_topology` that will be used later on to submit the `Topology` once it is defined.



In [None]:
from icpd_core import icpd_util
from streamsx.topology.context import JobConfig
from streamsx.topology import context

streams_instance_name =  ## Change this to Streams instance

try:
    cfg=icpd_util.get_service_instance_details(name=streams_instance_name, instance_type="streams")
except TypeError:
    cfg=icpd_util.get_service_instance_details(name=streams_instance_name)

    
def submit_topology(topo):
    global cfg

    # Disable SSL certificate verification if necessary
    cfg[context.ConfigParams.SSL_VERIFY] = False
    # Topology wil be deployed as a distributed app
    contextType = context.ContextTypes.DISTRIBUTED
    return context.submit (contextType, topo, config = cfg)    

if cfg:
    print("Successfully set up connection to Streams instance")

<a name="notcpd"></a>
### Option 2: Connect to a Streams instance from IBM Watson Studio and other environments

*Skip this section if you are running the notebook from a Cloud Pak for Data project.*

The code for each scenario is available in the development guide.  
Each snippet will define a function called `submit_topology` that will be used later on to submit the `Topology` once it is defined.

- Choose the tab that best matches your environment. 
- Copy the code under the heading **Copy this code snippet**.
- Paste it in the cell below.

    [Connection instructions from the development guide](https://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide-2/#connect)


In [None]:
# paste connection code here

### 1.2 Install required modules

This notebook requires version 1.14.13 or later of the `streamsx` package. Check the version below and upgrade if needed.

In [None]:

import sys
import streamsx.topology.context
print("INFO: streamsx package version: " + streamsx.topology.context.__version__)


#For more details uncomment line below.
#!pip show streamsx

# Uncomment this line to upgrade the streamsx package
#!pip install --user --upgrade streamsx

### Import the healthdemo utility package
This is a set of utilities from the streamsx.health package used in this application

In [None]:
!pip install  "https://github.com/IBMStreams/streamsx.health/raw/develop/samples/HealthcareJupyterDemo/whl/healthdemo-1.0-py3-none-any.whl"


<a id="createfeed"></a>
## Step 2: Start the Physionet ingest service

We will analyze simulated data generated by a pre-compiled Streams application called the`PhysionetIngestService`. This is a [microservice](https://community.ibm.com/community/user/cloudpakfordata/viewdocument/analytics-microservice-architecture?CommunityKey=c0c16ff2-10ef-4b50-ae4c-57d769937235&tab=librarydocuments&LibraryFolderKey=fb0a8bfd-6210-4de7-b2d0-1dc0c8c5aace&DefaultView=folder), or small application that retrieves patient waveform and vital data from a Physionet database (https://www.physionet.org/) and makes it available to other applications.  The Python application we will create later in this notebook will connect to the `PhysionetIngestService` service.

To start the `PhysionetIngestService`, 

1. Download and save the compiled application: https://github.com/IBMStreams/streamsx.health/releases/download/v0.1/com.ibm.streamsx.health.physionet.PhysionetIngestServiceMulti.sab. 
1. First open the Streams Console:
    * From IBM Cloud Pak for Data: 
        * From the navigation menu, click **Services > Instances**.
        * Click on your Streams instance. 
        * In the details page that opens, look for the list of **Streams external endpoints**.
        * Click the **Console** link to open the Streams Console.

    * If you are not using Cloud Pak for Data, see this [document for steps to open the Streams Console in your installation](https://community.ibm.com/community/user/cloudpakfordata/viewdocument/streams-console-overview?CommunityKey=c0c16ff2-10ef-4b50-ae4c-57d769937235&tab=librarydocuments#open).
2. From the Streams Console, click Submit job:
    ![submit job icon](http://ibmstreams.github.io/streamsx.documentation/images/atom/jpg/submit-play.jpg)

3. Select the `.sab` file you downloaded earlier, and click *Submit*. 

4. Click *Submit*.

5. Click *OK* in the *Submission-time parameters* dialog.



<a id="buildapp"></a>
## Step 3: Build a streaming app

Now you're ready to create and run the **HealthcareDemo** Python streaming application.

The following cell contains source code for the Python Topology application. This is a Python streaming application that ingests the patient data from the *ingest-physionet* topic, and performs analysis on the patient data to calculate vital data for all patients. It finally creates a view for displaying the result of the analysis.

In [None]:
from streamsx.topology import schema
from streamsx.topology.topology import Topology
from streamsx.topology.context import submit

## The healthdemo package provides tools to analyze patient data
## See https://github.com/IBMStreams/streamsx.health/tree/develop/samples/HealthcareJupyterDemo/package
from healthdemo.patientmonitoring_functions import streaming_rpeak
from healthdemo.healthcare_functions import GenTimestamp, aggregate
from healthdemo.windows import SlidingWindow
from healthdemo.utils import get_patient_id

topo = Topology('PatientMonitoringDemo')

## The ingest-physionet provides data at a rate of 125 tuples/sec
sample_rate = 125

## Subscribe to the topic
patients_data_source = topo.subscribe('ingest-physionet', schema.CommonSchema.Json)
            
## Add timestamp to the data, so you can perform windowing
patients_data_source = patients_data_source.map(GenTimestamp(sample_rate))

## Generate a window based on the provided sample_rate
patients_data_window = patients_data_source.last(size=sample_rate).trigger(sample_rate-1).partition(get_patient_id)

## Aggregate the data within the window and create a tuple
patients_data = patients_data_window.aggregate(aggregate)


## Process data from 'ECG Lead II' and calculate RPeak and RR delta
patients_data = streaming_rpeak(patients_data, sample_rate, data_label='ECG Lead II')

## Create view for viewing patient vital data
patients_vital = patients_data.view(name='patients_vitals')


## include the healthdemo package so it is accessible at runtime
topo.add_pip_package(requirement="https://github.com/IBMStreams/streamsx.health/raw/develop/samples/HealthcareJupyterDemo/whl/healthdemo-1.0-py3-none-any.whl",
                     name="healthdemo")


print ("Submitting topology for execution..")
result = submit_topology(topo)
if (result and result.job):
    print ("Submitted job successfully, job id: " + str(result.job.id))

<a id="visualization"></a>
## Part 4: Visualization

Complete the following steps to visualize the results of your app:

[4.1 Set up graphs for plotting patient vitals](#setupgraphs)<br>
[4.2 Provide data for the graphs](#providedata)<br>
[4.3 Display the graphs](#displaygraphs)<br>

<a id="setupgraphs"></a>
### 4.1 Set up graphs for plotting patient vitals

This cell initializes the nine graphs which will be used to display one patient's vital data.

Each property of the patient's vital data is identified by the signal label. Each graph is initialized by providing the signal label it plots and a title.

In [None]:
## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%autoreload 1


from healthdemo.medgraphs import ECGGraph, PoincareGraph, NumericText, ABPNumericText

## Select which patient's data to plot
patientId = 'patient-1'

graph = {
    'leadII_poincare': PoincareGraph(signal_label='Poincare - ECG Lead II', title='Poincare - ECG Lead II'),
    'ecg_leadII_graph': ECGGraph(signal_label='ECG Lead II', title='ECG Lead II', 
                                 plot_width=600, min_range=-0.5, max_range=2.0),
    'ecg_leadV_graph': ECGGraph(signal_label='ECG Lead V', title='ECG Lead V', plot_width=600),
    'resp_graph': ECGGraph(signal_label='Resp', title='Resp', min_range=-1, max_range=3, plot_width=600),
    'pleth_graph': ECGGraph(signal_label='Pleth', title='Pleth', min_range=0, max_range=5, plot_width=600),
    'hr_numeric': NumericText(signal_label='HR', title='HR', color='#7cc7ff'),
    'pulse_numeric': NumericText(signal_label='PULSE', title='PULSE', color='#e71d32'),
    'spo2_numeric': NumericText(signal_label='SpO2', title='SpO2', color='#8cd211'),
    'abp_numeric': ABPNumericText(abp_sys_label='ABP Systolic', abp_dia_label='ABP Diastolic', 
                                  title='ABP', color='#fdd600')            
}

print ("DONE")

<a id="providedata"></a>
### 4.2 Provide data for the graphs

This cell is responsible for propagating the graph objects with data in the view.

The view data contains vital data for all patients, and is continuously retrieved from the Streaming Analytics service in a background job. Each graph object receives data for a specified patient. The graph objects extract and store the data that is relevant for that particular graph.

In [None]:
from healthdemo.utils import get_patient_id

patients_vital = patients_vital
continue_data_collection = True

## retrieve data from Streams view in a background job
def data_collector(view, g):
    queue = view.start_data_fetch()
    while continue_data_collection:
        tup = queue.get()
        if patientId == get_patient_id(tup):
            for graphtype in g:
                g[graphtype].add(tup)
    view.stop_data_fetch()
            
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, patients_vital, graph)

<a id="displaygraphs"></a>
### 4.3 Display the graphs

This cell is responsible for laying out and displaying the graphs. 

Each time a call to ```update()``` is made on a graph object, the next data point is retrieved and displayed. Each graph object maintains an internal queue so that each time a call to ```update()``` is made, the next element in the queue is retrieved and removed.

There is a loop that continuously calls the ```update()``` method on each of the graphs for 60 seconds. After each graph has been updated, a call to ```push_notebook()``` is made, which causes the notebook to update the graphics.

The graphs will stop updating after 60 seconds. To extend the period for graph update, change the ````timeout```` variable.


To restart graph updates after the timeout period:

1. Rerun the cell in [4.2 Provide data for the graphs](#providedata) to restart the background thread to fetch data.
2. Rerun the cell in this section to restart graph updates.


In [None]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox
import bokeh

enableHandle = True

## display graphs for a patient
t = show(
    row(
        column(
            graph['ecg_leadII_graph'].get_figure(), 
            graph['ecg_leadV_graph'].get_figure(), 
            graph['resp_graph'].get_figure(),
            graph['pleth_graph'].get_figure()
        ), 
        column(
            graph['leadII_poincare'].get_figure(),
            widgetbox(graph['hr_numeric'].get_figure()),
            widgetbox(graph['pulse_numeric'].get_figure()),
            widgetbox(graph['spo2_numeric'].get_figure()),
            widgetbox(graph['abp_numeric'].get_figure())
        )
    ),
    
    notebook_handle=enableHandle
)

## Timeout(in seconds) before stopping the graph
timeout = 30
endtime = time.time() + timeout

cnt = 0
while time.time() < endtime:
    ## update graphs
    for graphtype in graph:
        graph[graphtype].update()

    ## update notebook 
    cnt += 1
    if cnt % 5 == 0:
        #output_notebook()
        #show(..., notebook_handle=True)
        push_notebook(handle=t) ## refresh the graphs
        cnt = 0
    time.sleep(0.008)
    
# Stop data collection running in background thread
continue_data_collection = False

To plot the graph for a different patient, change `patientId` in [4.1 Setup graphs for plotting patient vitals](#setupgraphs), and rerun the cells.

## Useful Links

- [Streams Health Toolkit on GitHub](https://github.com/IBMStreams/streamsx.health)
- [Python API Development Guide](https://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide/)
- [Find more samples in the samples repository on GitHub](https://ibmstreams.github.io/samples)
- [Connect with Streams developers in the Streams community](https://ibm.biz/streams-community)