# Working with Streaming Data

"Streaming data" is data that is continuously generated, often by some external source like a remote website, a measuring device, or a simulator. This kind of data is common for financial time series, web server logs,  scientific applications, and many other situations. 

The HoloViews ``Stream`` system provides a way to push arbitrary content to a ``DynamicMap`` callback, constructing a plot that updates over time.  This system was already discussed in the user guide sections [Responding to Events](11-Responding_to_Events.ipynb) and [Custom Interactivity](12-Custom_Interactivity.ipynb), but those examples showed streaming changes to plot metadata like zoom ranges, not to the underlying data.  Here, we will show how the HoloViews ``DataStream`` and ``DataFrameStream`` streams can be used to work with streaming *data sources* as well. Apart from streaming directly in HoloViews we will also explore working with streaming data coordinated by the separate [``streamz``](http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1) library from Matt Rocklin, which makes working with complex streaming pipelines much simpler.

*NOTE: To follow along with how it works, this example should be run one cell at a time in a Jupyter notebook.  Viewing a static copy from a website or the result of "Run All" will not show the dynamic updates provided by streaming.*

In [None]:
import numpy as np
import pandas as pd
import holoviews as hv

from holoviews.streams import DataStream, DataFrameStream

import streamz
import streamz.dataframe

# Hide the gifs
from IPython.display import display, Javascript
display(Javascript("$('img.gif').hide()"))

hv.extension('bokeh')

## DataStream

A ``DataStream`` allows data to be pushed into a DynamicMap callback to change a visualization, just like the streams in the [Responding to Events](./11-Responding_to_Events.ipynb) user guide were used to push changes to metadata that controlled the visualization.

Let's start with a fairly simple example:
1. Declare a ``streamz.Stream`` and a ``hv.streams.DataStream`` object and connect them into a pipeline into which we can push data. 
2. Use a ``sliding_window`` of 10, which will first wait for 10 sets of stream updates to accumulate. At that point and for every subsequent update, it will apply ``pd.concat`` to combine the most recent 10 updates into a new dataframe. 
3. Use the ``sink`` method on the ``Stream`` to ``send`` the resulting collection of 10 updates to ``DataStream``.
4. Declare a ``DynamicMap`` that takes the sliding window of concatenated DataFrames and displays it using a ``Scatter`` Element.
5. Color the ``Scatter`` points by their 'count' and set a range.

In [None]:
point_source = streamz.Stream()
stream_data = hv.streams.DataStream(data=[])
point_source.sliding_window(20).map(pd.concat).sink(stream_data.send)
opts = dict(color_index='count', bgcolor='black')
hv.DynamicMap(hv.Scatter, streams=[stream_data]).opts(plot=opts).redim.range(y=(-4, 4))

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz1.gif"></img>

There is now a pipeline, but initially this plot will be empty, because no data has been sent to it. To see the plot update, let's use ``stream.emit`` and send small chunks of random pandas DataFrames to our plot:

In [None]:
for i in range(200):
    point_source.emit(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},
                                   columns=['x', 'y', 'count']))

If the above cell is run, you should see the previous plot update 100 times, and then stop on the last update.  Hopefully this example makes clear how data can get pushed into an existing plot.

#### Asynchronous updates

In most cases, instead of pushing Stream updates manually from the same Python process, you'll want the object to update asynchronously as new data arrives. Since both Jupyter and Bokeh server run on Tornado, we can use the tornado ``IOLoop`` in both cases to define a non-blocking co-routine that can push data to our stream whenever it is ready. In this case we will use the ``rate_limit`` method to limit how quickly events are emitted.  We'll also emit NumPy arrays rather than dataframes, and then accumulate in a ``sliding_window`` and concatenate as before.

In [None]:
%%opts Curve [width=600] {+framewise}
from tornado.ioloop import IOLoop
from tornado import gen

curve_source = streamz.Stream(asynchronous=True)  # tell the stream we're working asynchronously
stream_data = DataStream(data=[])
curve_source.rate_limit(0.1).sink(stream_data.send)

@gen.coroutine
def f():
    for x in range(100):
        yield curve_source.emit(np.random.rand(100))
        
IOLoop.current().add_callback(f)
hv.DynamicMap(hv.Curve, streams=[stream_data]).redim.range(y=(0, 1))

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz2.gif" width=600px></img>

Here the plot should update 100 times as before, but now via the Tornado IOLoop which will not block other interactions and work in the notebook.

## StreamingDataFrame

While ``DataStream`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, the ``hv.stream.DataFrameStream`` provides a very powerful means of working with streaming Pandas dataframes with or without the ``streamz`` library.

#### A simple example

The ``DataFrameStream`` accepts regular pandas DataFrames or streamz DataFrames and Series. We will start with a simple example using just a pandas DataFrame. To initialize a ``DataFrameStream`` we have to provide an example DataFrame, which defines the columns and dtypes of the data we will be streaming. We can also specify whether we will also want to use the ``DataFrame`` ``index``. In this case we will simply define that we want to plot a ``DataFrame`` of x and y positions as a set of ``Points`` and a ``Curve``.

In [None]:
example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
dfstream = DataFrameStream(example, index=False, backlog=100)
hv.DynamicMap(hv.Curve, streams=[dfstream]).opts(style=dict(line_width=1, color='black')) *\
hv.DynamicMap(hv.Points, streams=[dfstream]).opts(plot=dict(color_index='count'),
                                                  style=dict(line_color='black', size=5))

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/brownian.gif"></img>

Now that we have set up the ``DataFrameStream`` and defined a ``DynamicMap`` to plot the data we can start pushing data to it. We will define a simple function which simulates brownian motion throughWe can ``send`` data through the ``hv.streams.DataFrameStream`` directly.

In [None]:
x, y = 0, 0
def gen_brownian(i):
    global x, y
    x += np.random.randn()
    y += np.random.randn()
    return pd.DataFrame([(x, y, i)], columns=['x', 'y', 'count'])

for i in range(200):
    dfstream.send(gen_brownian(i))

In [None]:
dfstream.clear()


#### A simple streamz example

The ``streamz.dataframe`` module provides a ``Random`` utility that generates a ``StreamingDataFrame`` emitting data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:

In [None]:
simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')
print(simple_sdf.index)
simple_sdf.example.dtypes

Since the ``StreamingDataFrame`` provides a pandas-like API, we can specify operations on the data directly. In this example we subtract a fixed offset and then compute the cumulative sum, giving us a randomly drifting timeseries. We can then pass the x-values of this dataframe to the HoloViews ``DataFrameStream`` and supply ``hv.Curve`` as the ``DynamicMap`` callback to stream the data into a HoloViews ``Curve``:

In [None]:
%%opts Curve [width=500 show_grid=True]
sdf = (simple_sdf-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x)])

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz3.gif"></img>

The ``Random`` StreamingDataFrame will asynchronously emit events until it is stopped, which we can do by calling the ``stop`` method.

In [None]:
simple_sdf.stop()

#### Making use of the StreamingDataFrame API

So far we have only computed the cumulative sum, but the ``StreamingDataFrame`` actually has an extensive API that lets us run a broad range of streaming computations on our data. For example, let's apply a rolling mean to our x-values with a window of 500ms and overlay it on top of the 'raw' data:

In [None]:
%%opts Curve [width=500 show_grid=True]
source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (source_df-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x)]).relabel('raw') *\
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x.rolling('500ms').mean())]).relabel('smooth')

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz4.gif"></img>

In [None]:
source_df.stop()

#### Controlling the backlog

By default the ``DataFrameStream`` accumulates a ``backlog`` of 1000 samples. In many cases this is overkill, but we can specify a shorter (or longer) backlog value to control how much history we accumulate:

In [None]:
multi_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (multi_source-0.5).cumsum()
hv.DynamicMap(hv.Table, streams=[DataFrameStream(sdf.x, backlog=10)]) +\
hv.DynamicMap(lambda data: hv.BoxWhisker(data, [], 'x'), streams=[DataFrameStream(sdf.x, backlog=100)])

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz5.gif"></img>

Here the given stream ``sdf`` is being consumed by a table showing a short backlog (where only the items visible in the table need to be kept), along with a plot computing averages and variances over a longer backlog (100 items).

#### Updating multiple cells

Since a ``StreamingDataFrame`` will emit data until it is stopped, we can subscribe multiple plots across different cells to the same stream.  Here, let's add a ``Scatter`` plot of the same data stream as in the preceding cell:

In [None]:
hv.DynamicMap(hv.Scatter, streams=[DataFrameStream(sdf.x)])

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz6.gif"></img>

Stopping the stream will now stop updates to all three of these DynamicMaps:

In [None]:
multi_source.stop()

#### Applying operations

As we discovered above, the ``DataFrameStream`` lets us define a backlog window defining how many samples we want to accumulate. We can use this to our advantage and apply an operation over this backlog window. In this example we declare a ``Dataset`` and then apply the ``histogram`` operation to compute a ``Histogram`` over the specified ``backlog`` window:

In [None]:
hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (hist_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Dataset, streams=[DataFrameStream(sdf.x, backlog=500)])
hv.operation.histogram(dmap, dimension='x')

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz7.gif"></img>

In [None]:
hist_source.stop()

#### Datashading

The same approach will also work for the datashader operation letting us datashade the entire ``backlog`` window even if we make it very large such as 1 million samples:

In [None]:
%%opts RGB [width=600]
from holoviews.operation.datashader import datashade
from bokeh.palettes import Blues8
large_source = streamz.dataframe.Random(freq='100us', interval='200ms')
sdf = (large_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x, backlog=1000000)])
datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8)

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz8.gif"></img>

In [None]:
large_source.stop()

## A real example

Finally we will round this guide off with a real example. We will set up a DataFrameStream emitting data about CPU and memory usage obtained using the ``psutil`` library. You can install ``psutil`` with ``pip install psutil`` or ``conda install psutil``.

In [None]:
import psutil
from tornado.ioloop import IOLoop
from tornado import gen

################################################
# Define functions to get memory and CPU usage #
################################################

def get_mem_data():
    vmem = psutil.virtual_memory()
    df = pd.DataFrame(dict(free=vmem.free/vmem.total,
                           used=vmem.used/vmem.total),
                      index=[pd.Timestamp.now()])
    return df*100

def get_cpu_data():
    cpu_percent = psutil.cpu_percent(percpu=True)
    df = pd.DataFrame(list(enumerate(cpu_percent)), columns=['CPU', 'Utilization'])
    df['time'] = pd.Timestamp.now()
    return df

##################################################
# Define DynamicMap callbacks returning Elements #
##################################################

def mem_stack(data):
    data = pd.melt(data, 'index', var_name='Type', value_name='Usage')
    areas = hv.Dataset(data).to(hv.Area, 'index', 'Usage')
    return hv.Area.stack(areas.overlay()).relabel('Memory')

def cpu_box(data):
    return hv.BoxWhisker(data, 'CPU', 'Utilization').relabel('CPU Usage')


####################################################
# Set up StreamingDataFrame and add async callback #
####################################################

cpu_stream = DataFrameStream(get_cpu_data(), 800)
mem_stream = DataFrameStream(get_mem_data())

@gen.coroutine
def f():
    for i in range(500):
        yield gen.sleep(0.01)
        yield cpu_stream.send(get_cpu_data())
        yield mem_stream.send(get_mem_data())

IOLoop.current().add_callback(f)

#######################################
# Define DynamicMaps and display plot #
#######################################

cpu_dmap = hv.DynamicMap(cpu_box, streams=[cpu_stream])
mem_dmap = hv.DynamicMap(mem_stack, streams=[mem_stream])

opts = {'plot': dict(width=500, height=400, color_index='CPU'),
        'style': dict(box_fill_color=hv.Cycle('Category20'))}

(cpu_dmap.redim.range(Utilization=(0, 100)).opts(**opts) +
 mem_dmap.redim.range(Usage=(0, 100)).opts(plot=dict(height=400, width=400)))

<img class="gif" src="http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz9.gif"></img>

As you can see, streaming data works like streams in HoloViews in general, flexibly handling changes over time under either explicit control or governed by some external data source.