<a href='http://www.holoviews.org'><img src="assets/hv+bk.png" alt="HV+BK logos" width="40%;" align="left"/></a>
<div style="float:right;"><h2>08. Operations and Pipelines</h2></div>

When interactively exploring a dataset you often end up interleaving visualization and analysis code. Since in HoloViews your visualization and your data are one and the same, analysis and data transformations can be applied directly to the visualizable data. For that purpose HoloViews provides operations, which can be used to implement any analysis or data transformation you might want to do.

Since Operations know about HoloViews you can apply them to large collections of data collected in HoloMap and DynamicMap containers. Since operations work on both of these containers that means they can also be applied lazily. This feature allows us to chain multiple operations in a data analysis, processing and visualization pipeline and drive the operation of a dashboard.

Internally pipelines built using DynamicMap and HoloViews operations will lazily (re)compute just the part of the pipeline that has changed providing an easy way of caching intermediate results and computing just what is required at the time.

In [None]:
import time
import param
import numpy as np
import pandas as pd
import holoviews as hv
import datashader as ds

from bokeh.sampledata import stocks
from holoviews.operation import decimate
from holoviews.operation.timeseries import rolling, rolling_outlier_std
from holoviews.operation.datashader import datashade, dynspread, aggregate

hv.extension('bokeh')

# Declare some data

In this example we'll work with a timeseries mocking some stock data, so we'll define a small function to generate a random, noisy timeseries and define a ``DynamicMap`` which will load (or rather generate) a timeseries for each stock symbol:

In [None]:
def time_series(T = 1, N = 100, mu = 0.1, sigma = 0.1, S0 = 20):  
    """Parameterized noisy time series"""
    dt = float(T)/N
    t = np.linspace(0, T, N)
    W = np.random.standard_normal(size = N) 
    W = np.cumsum(W)*np.sqrt(dt) # standard brownian motion
    X = (mu-0.5*sigma**2)*t + sigma*W 
    S = S0*np.exp(X) # geometric brownian motion
    return S

def load_symbol(symbol, **kwargs):
    return hv.Curve(time_series(N=10000), kdims=[('time', 'Time')],
                    vdims=[('adj_close', 'Adjusted Close')])

dmap = hv.DynamicMap(load_symbol, kdims=['Symbol']).redim.values(Symbol=stocks.stocks)

We will start by visualizing this data very simply:

In [None]:
%opts Curve [width=600] {+framewise}
dmap

## Applying an operation

Now we will start applying some operations to this data. HoloViews ships with two ready-to-use timeseries operations: the ``rolling`` operation, which applies a function over a rolling window, and a ``rolling_outlier_std`` operation that computes outlier points in a timeseries by excluding points less than sigma standard deviation removed from the rolling mean. This is just an example and you can trivially write operations that do whatever you like.

In [None]:
%opts Scatter (color='indianred')
smoothed = rolling(dmap, rolling_window=30)
outliers = rolling_outlier_std(dmap, rolling_window=30)
smoothed * outliers

As you can see the operations transform the ``Curve`` element into a smoothed version and a set of ``Scatter`` points containing the outliers both with a ``rolling_window`` of 30. Since we applied the operation to a ``DynamicMap``, the operation is lazy and only computes the result when it is requested. 

In [None]:
# Exercise: Apply the rolling and rolling_outlier_std operations changing the rolling_window and sigma parameters

## Linking operations to streams

Instead of supplying the parameter values explicitly as a scalar value we can also define a ``Stream`` which will let us update our visualization dynamically. By supplying a ``Stream`` with a ``rolling_window`` parameter to both operations we can now generate events on the stream and watch our visualization update.

In [None]:
rolling_stream = hv.streams.Stream.define('rolling', rolling_window=5)
stream = rolling_stream()

rolled_dmap = rolling(dmap, streams=[stream])
outlier_dmap = rolling_outlier_std(dmap, streams=[stream])
rolled_dmap * outlier_dmap

In [None]:
for i in range(0, 200, 20):
    time.sleep(0.1)
    stream.event(rolling_window=i)

In [None]:
# Exercise: Create a stream to control the sigma value and add it to the outlier operation,
#           then vary the sigma value and observe the effect

## Chaining operations

Finally, since operation simply transform and Element in some way, operations can easily be chained. As a simple example we will take the ``rolled_dmap`` and apply the ``datashading`` and ``dynspread`` operation to it. As you'll be able to see, this defines a complex analysis pipeline.

In [None]:
%%opts RGB [width=600 height=400] {+framewise}
overlay = dynspread(datashade(rolled_dmap)) * outlier_dmap
overlay

## Visualizating the pipeline

To understand what is going on we will write a small utility that traverses the output we just displayed above and visualizes each processing step leading up to it.

In [None]:
%%opts RGB Curve [width=250 height=200]

def traverse(obj, key, items=None):
    items = [] if items is None else items
    for inp in obj.callback.inputs[:1]:
        label = inp.callback.operation.name if isinstance(inp.callback, hv.core.OperationCallable) else ''
        if inp.last: items.append(inp[key].relabel(label))
        if isinstance(inp, hv.DynamicMap): traverse(inp, key, items)
    return list(hv.core.util.unique_iterator(items))[:-1]

hv.Layout(traverse(overlay, 'AAPL')).cols(4)