Skip to content

Commit

Permalink
Cleaned up streaming notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
jbednar committed Oct 25, 2017
1 parent ed92995 commit bb27388
Showing 1 changed file with 87 additions and 105 deletions.
192 changes: 87 additions & 105 deletions examples/user_guide/Working_with_Streaming_Data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Working with Streaming Data"
"# Working with Streaming Data\n",
"\n",
"\"Streaming data\" is data that is continuously generated, often by some external source like a remote website, a measuring device, or a simulator. This kind of data is common for financial time series, web server logs, scientific applications, and many other situations. \n",
"\n",
"The HoloViews ``Stream`` system provides a way to push arbitrary content to a ``DynamicMap`` callback, constructing a plot that updates over time. This system was already discussed in the user guide sections [Responding to Events](11-Responding_to_Events.ipynb) and [Custom Interactivity](12-Custom_Interactivity.ipynb), but those examples showed streaming changes to plot metadata like zoom ranges, not to the underlying data. Here, we will show how the HoloViews ``DataStream`` and ``DataFrameStream`` streams can be used to work with streaming *data sources* as well. We will focus on streaming data coordinated by the separate [``streamz``](http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1) library from Matt Rocklin, which makes working with streams much simpler.\n",
"\n",
"*NOTE: To follow along with how it works, this example should be run one cell at a time in a Jupyter notebook. Viewing a static copy from a website or the result of \"Run All\" will not show the dynamic updates provided by streaming.*"
]
},
{
Expand All @@ -16,10 +22,11 @@
"import numpy as np\n",
"import pandas as pd\n",
"import holoviews as hv\n",
"\n",
"from holoviews.streams import DataStream, DataFrameStream\n",
"\n",
"from streamz import Stream\n",
"from streamz.dataframe import StreamingDataFrame, Random\n",
"import streamz\n",
"import streamz.dataframe\n",
"\n",
"hv.extension('bokeh')"
]
Expand All @@ -28,27 +35,15 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Streaming Data is data that is continuously generated by different sources. This kind of data is common for financial time series, web server logs, scientific instruments, IoT telemetry, and more. There are many ways of processing streaming data and in this user guide we will discover how to work with streaming data using ``HoloViews`` and the ``streamz`` library. In particular we will cover working with the ``DataStream`` and ``DataFrameStream`` HoloViews streams, first on their own and then using the ``streamz`` library.\n",
"\n",
"HoloViews also has the concept of a ``Stream`` which provides a way to push arbitrary data to a ``DynamicMap`` callback, which then drives your plot. By connecting a HoloViews ``Stream`` to a ``streamz.Stream`` we can use the powerful flow control features provided by the streamz library with the powerful capabilities of HoloViews. For an introduction to ``streamz`` see [this blog post](http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1) by Matthew Rocklin the author of the library."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DataStream"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the [Responding to Events](./11-Responding_to_Events.ipynb) user guide we covered how to work with HoloViews streams to push updates to a DynamicMap callback letting us drive a visualization dynamically. The ``DataStream`` Stream works just in the same way but instead of pushing some small amount of metadata it is meant to be used as a pipe to push actual data to a callback.\n",
"## DataStream\n",
"\n",
"Let's start with a fairly simple example we will declare a ``streamz.Stream`` and a ``hv.streams.DataStream`` object and then define a pipe we can push data into. In this example we will declare a ``sliding_window`` of 10, which will wait for 10 sets of stream updates to accumulate and then apply pd.concat to combine those updates. Finally we will use the ``sink`` method on the ``Stream`` to ``send`` the data to ``DataStream``.\n",
"A ``DataStream`` allows data to be pushed into a DynamicMap callback to change a visualization, just like the streams in the [Responding to Events](./11-Responding_to_Events.ipynb) user guide were used to push changes to metadata that controlled the visualization.\n",
"\n",
"Now we can declare a ``DynamicMap`` that takes the sliding window of concatenated DataFrames and displays it using a ``Scatter`` Element."
"Let's start with a fairly simple example:\n",
"1. Declare a ``streamz.Stream`` and a ``hv.streams.DataStream`` object and connect them into a pipeline into which we can push data. \n",
"2. Use a ``sliding_window`` of 10, which will first wait for 10 sets of stream updates to accumulate. At that point and for every subsequent update, it will apply ``pd.concat`` to combine the most recent 10 updates into a new dataframe. \n",
"3. Use the ``sink`` method on the ``Stream`` to ``send`` the resulting collection of 10 updates to ``DataStream``.\n",
"4. Declare a ``DynamicMap`` that takes the sliding window of concatenated DataFrames and displays it using a ``Scatter`` Element."
]
},
{
Expand All @@ -57,7 +52,7 @@
"metadata": {},
"outputs": [],
"source": [
"stream = Stream()\n",
"stream = streamz.Stream()\n",
"stream_data = hv.streams.DataStream(data=[])\n",
"stream.sliding_window(10).map(pd.concat).sink(stream_data.send)\n",
"hv.DynamicMap(hv.Scatter, streams=[stream_data]).redim.range(x=(-3, 3), y=(-3, 3))"
Expand All @@ -67,13 +62,15 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we have set up our pipeline we can start pushing data into it and see our plot update. For that purpose we will use ``stream.emit`` and send small chunks of random pandas DataFrames to our plot:"
"There is now a pipeline, but initially this plot will be empty, because no data has been sent to it. To see the plot update, let's use ``stream.emit`` and send small chunks of random pandas DataFrames to our plot:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"for i in range(100):\n",
Expand All @@ -84,14 +81,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Asynchronous updates"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Pushing updates to the Stream manually is usually not desirable instead we want our object to push data asynchronously. Since both Jupyter and Bokeh server run on Tornado we can use the tornado ``IOLoop`` in both cases defining a non-blocking co-routine to emit a new data for our stream. In this case we will use the ``rate_limit`` method to limit how quickly events are emitted and emit simply NumPy arrays, which we will again accumulate in a ``sliding_window`` and concatenate."
"If the above cell is run, you should see the previous plot update 100 times, and then stop on the last update. Hopefully this example makes clear how data can get pushed into an existing plot.\n",
"\n",
"#### Asynchronous updates\n",
"\n",
"In most cases, instead of pushing Stream updates manually from the same Python process, you'll want the object to update asynchronously as new data arrives. Since both Jupyter and Bokeh server run on Tornado, we can use the tornado ``IOLoop`` in both cases to define a non-blocking co-routine that can push data to our stream whenever it is ready. In this case we will use the ``rate_limit`` method to limit how quickly events are emitted. We'll also emit NumPy arrays rather than dataframes, and then accumulate in a ``sliding_window`` and concatenate as before."
]
},
{
Expand All @@ -104,7 +98,7 @@
"from tornado.ioloop import IOLoop\n",
"from tornado import gen\n",
"\n",
"source = Stream(asynchronous=True) # tell the stream we're working asynchronously\n",
"source = streamz.Stream(asynchronous=True) # tell the stream we're working asynchronously\n",
"stream_data = DataStream(data=[])\n",
"source.rate_limit(0.1).sink(stream_data.send)\n",
"\n",
Expand All @@ -121,28 +115,16 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## StreamingDataFrame"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"While ``DataStream`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, the ``streamz.dataframe.StreamingDataFrame`` and the corresponding ``hv.stream.DataFrameStream`` provide a very powerful means of working with streaming pandas dataframes."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### A simple example"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The ``streamz.dataframe`` module provides a ``Random`` utility which generates a StreamingDataFrame which emits data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:"
"Here the plot should update 100 times as before, but now via the Tornado IOLoop that can accept events from other sources.\n",
"\n",
"\n",
"## StreamingDataFrame\n",
"\n",
"While ``DataStream`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, the ``streamz.dataframe.StreamingDataFrame`` and the corresponding ``hv.stream.DataFrameStream`` provide a very powerful means of working with streaming Pandas dataframes.\n",
"\n",
"#### A simple example\n",
"\n",
"The ``streamz.dataframe`` module provides a ``Random`` utility that generates a StreamingDataFrame emitting data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:"
]
},
{
Expand All @@ -151,7 +133,7 @@
"metadata": {},
"outputs": [],
"source": [
"source = Random(freq='10ms', interval='100ms')\n",
"source = streamz.dataframe.Random(freq='10ms', interval='100ms')\n",
"print(source.index)\n",
"source.example.dtypes"
]
Expand All @@ -160,7 +142,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since the ``StreamingDataFrame`` provides a pandas-like API we can specify operations on the data directly. In this example we subtract a fixed offset and then compute the cumulative sum giving us a randomly drifting timeseries. We can then pass the x-values of this dataframe to the HoloViews ``DataFrameStream`` and supply ``hv.Curve`` as the ``DynamicMap`` callback, i.e. we will stream the data into the HoloViews ``Curve``:"
"Since the ``StreamingDataFrame`` provides a pandas-like API, we can specify operations on the data directly. In this example we subtract a fixed offset and then compute the cumulative sum, giving us a randomly drifting timeseries. We can then pass the x-values of this dataframe to the HoloViews ``DataFrameStream`` and supply ``hv.Curve`` as the ``DynamicMap`` callback to stream the data into a HoloViews ``Curve``:"
]
},
{
Expand All @@ -184,7 +166,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"source.stop()"
Expand All @@ -194,14 +178,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Making use of the StreamingDataFrame API"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So far we have only computed the cumulative sum, but the ``StreamingDataFrame`` actually has a broad API letting us easily apply streaming computations on our data. In this case we will apply a rolling mean to our x-values with a window of 500ms and overlay it on top of the 'raw' data:"
"#### Making use of the StreamingDataFrame API\n",
"\n",
"So far we have only computed the cumulative sum, but the ``StreamingDataFrame`` actually has an extensive API that lets us run a broad range of streaming computations on our data. For example, let's apply a rolling mean to our x-values with a window of 500ms and overlay it on top of the 'raw' data:"
]
},
{
Expand All @@ -211,7 +190,7 @@
"outputs": [],
"source": [
"%%opts Curve [width=500 show_grid=True]\n",
"source = Random(freq='5ms', interval='100ms')\n",
"source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n",
"sdf = (source-0.5).cumsum()\n",
"hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x)]).relabel('raw') *\\\n",
"hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x.rolling('500ms').mean())]).relabel('smooth')"
Expand All @@ -220,7 +199,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"source.stop()"
Expand All @@ -230,14 +211,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Controlling the backlog"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By default the ``DataFrameStream`` accumulates a ``backlog`` of 1000 samples. In many cases this is overkill and we just need a few samples, by specifying a shorter (or longer) backlog value we can control how much history we accumulate:"
"#### Controlling the backlog\n",
"\n",
"By default the ``DataFrameStream`` accumulates a ``backlog`` of 1000 samples. In many cases this is overkill, but we can specify a shorter (or longer) backlog value to control how much history we accumulate:"
]
},
{
Expand All @@ -246,7 +222,7 @@
"metadata": {},
"outputs": [],
"source": [
"source = Random(freq='5ms', interval='100ms')\n",
"source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n",
"sdf = (source-0.5).cumsum()\n",
"hv.DynamicMap(hv.Table, streams=[DataFrameStream(sdf.x, backlog=10)]) +\\\n",
"hv.DynamicMap(lambda data: hv.BoxWhisker(data, [], 'x'), streams=[DataFrameStream(sdf.x, backlog=100)])"
Expand All @@ -256,14 +232,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Updating multiple cells"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since a ``StreamingDataFrame`` will emit data until it is stopped we can subscribe multiple plots across different cells to the same stream:"
"Here the given stream ``sdf`` is being consumed by a table showing a short backlog (where only the items visible in the table need to be kept), along with a plot computing averages and variances over a longer backlog (100 items).\n",
"\n",
"#### Updating multiple cells\n",
"\n",
"Since a ``StreamingDataFrame`` will emit data until it is stopped, we can subscribe multiple plots across different cells to the same stream. Here, let's add a ``Scatter`` plot of the same data stream as in the preceding cell:"
]
},
{
Expand All @@ -276,26 +249,28 @@
]
},
{
"cell_type": "code",
"execution_count": null,
"cell_type": "markdown",
"metadata": {},
"outputs": [],
"source": [
"source.stop()"
"Stopping the stream will now stop updates to all three of these DynamicMaps:"
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#### Applying operations"
"source.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we discovered above the ``DataFrameStream`` lets us define a backlog window defining how many samples we want to accumulate. We can use this to our advantage and apply an operation over this backlog window. In this example we simply declare a ``Dataset`` and then apply the ``histogram`` operation to compute a ``Histogram`` over the specified ``backlog`` window:"
"#### Applying operations\n",
"\n",
"As we discovered above, the ``DataFrameStream`` lets us define a backlog window defining how many samples we want to accumulate. We can use this to our advantage and apply an operation over this backlog window. In this example we declare a ``Dataset`` and then apply the ``histogram`` operation to compute a ``Histogram`` over the specified ``backlog`` window:"
]
},
{
Expand All @@ -304,7 +279,7 @@
"metadata": {},
"outputs": [],
"source": [
"source = Random(freq='5ms', interval='100ms')\n",
"source = streamz.dataframe.Random(freq='5ms', interval='100ms')\n",
"sdf = (source-0.5).cumsum()\n",
"dmap = hv.DynamicMap(hv.Dataset, streams=[DataFrameStream(sdf.x, backlog=500)])\n",
"hv.operation.histogram(dmap, dimension='x')"
Expand All @@ -313,7 +288,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"source.stop()"
Expand All @@ -323,13 +300,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Datashading"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Datashading\n",
"\n",
"The same approach will also work for the datashader operation letting us datashade the entire ``backlog`` window even if we make it very large:"
]
},
Expand All @@ -342,21 +314,31 @@
"%%opts RGB [width=600]\n",
"import datashader as ds\n",
"from holoviews.operation.datashader import datashade\n",
"from bokeh.palettes import Blues8\n",
"\n",
"source = Random(freq='1ms', interval='500ms')\n",
"source = streamz.dataframe.Random(freq='1ms', interval='500ms')\n",
"sdf = (source-0.5).cumsum()\n",
"dmap = hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x, backlog=50000)])\n",
"datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear')"
"datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"source.stop()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see, streaming data works like streams in HoloViews in general, flexibly handling changes over time under either explicit control or governed by some external data source."
]
}
],
"metadata": {
Expand Down

0 comments on commit bb27388

Please sign in to comment.