Skip to content

Commit

Permalink
Add ability to use functions with param dependencies in DynamicMap (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Aug 3, 2019
1 parent ed78f49 commit 4534d45
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 55 deletions.
71 changes: 48 additions & 23 deletions examples/user_guide/14-Data_Pipelines.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Controlling operations via Streams"
"### Dynamically evaluating operations and parameters with ``.apply``"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the previous section we briefly mentioned that in addition to regular widgets ``DynamicMap`` also supports streams, which allow us to define custom events our ``DynamicMap`` should subscribe to. To learn more about streams see the [Responding to Events](./12-Responding_to_Events.ipynb). Here we will declare a stream that controls the rolling window:"
"The ``.apply`` method allows us to automatically build a pipeline given an object and some operation or function along with parameter instances passed in as keyword arguments. Internally it will then build a Stream to ensure that whenever the parameter changes the plot is updated. To learn more about streams see the [Responding to Events](./12-Responding_to_Events.ipynb).\n",
"\n",
"This mechanism allows us to build powerful pipelines by linking parameters on a user defined class or even an external widget, e.g. here we import an ``IntSlider`` widget from [``panel``](https://pyviz.panel.org):"
]
},
{
Expand All @@ -121,15 +123,16 @@
"metadata": {},
"outputs": [],
"source": [
"rolling_stream = Stream.define('rolling', rolling_window=5)\n",
"stream = rolling_stream()"
"import panel as pn\n",
"\n",
"slider = pn.widgets.IntSlider(name='rolling_window', start=1, end=100, value=50)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can define a function that both loads the symbol and applies the ``rolling`` operation passing our ``rolling_window`` parameter to the operation:"
"Using the ``.apply`` method we can now apply the ``rolling`` operation to the DynamicMap and link the ``value`` parameter of the slider to the operation's ``rolling_window`` parameter (which also works for simple functions as will be shown below):"
]
},
{
Expand All @@ -138,12 +141,7 @@
"metadata": {},
"outputs": [],
"source": [
"def rolled_data(symbol, rolling_window, **kwargs):\n",
" curve = load_symbol(symbol)\n",
" return rolling(curve, rolling_window=rolling_window)\n",
" \n",
"rolled_dmap = hv.DynamicMap(rolled_data, kdims='Symbol',\n",
" streams=[stream]).redim.values(Symbol=stock_symbols)\n",
"rolled_dmap = dmap.apply(rolling, rolling_window=slider.param.value)\n",
"\n",
"rolled_dmap"
]
Expand All @@ -152,7 +150,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we have a handle on the ``Stream`` we can now send events to it and watch the plot above update, let's start by setting the ``rolling_window=50``. "
"The ``rolled_dmap`` is another DynamicMap that defines a simple two-step pipeline, which calls the original callback when the ``symbol`` changes and reapplies the ``rolling`` operation when the slider value changes. Since the widget's value is now linked to the plot via a ``Stream`` we can display the widget and watch the plot update:"
]
},
{
Expand All @@ -161,14 +159,14 @@
"metadata": {},
"outputs": [],
"source": [
"stream.event(rolling_window=50)"
"slider"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Instead of manually defining a function we can also do something much simpler, namely we can just apply the rolling operation to the original ``DynamicMap`` we defined and pass our ``rolling_stream`` to the operation. To make things a bit more interesting we will also apply the ``rolling_outlier_std`` function which computes outliers within the ``rolling_window``. We supply our stream to both:"
"The power of building pipelines is that different visual components can share the same inputs but compute very different things from that data. The part of the pipeline that is shared is only evaluated once making it easy to build efficient data processing code. To illustrate this we will also apply the ``rolling_outlier_std`` operation which computes outliers within the ``rolling_window`` and again we will supply the widget ``value``:"
]
},
{
Expand All @@ -177,19 +175,18 @@
"metadata": {},
"outputs": [],
"source": [
"stream = rolling_stream()\n",
"\n",
"smoothed = rolling(dmap, streams=[stream])\n",
"outliers = rolling_outlier_std(dmap, streams=[stream])\n",
"outliers = dmap.apply(rolling_outlier_std, rolling_window=slider.param.value)\n",
"\n",
"smoothed * outliers.opts(color='red', marker='triangle')"
"rolled_dmap * outliers.opts(color='red', marker='triangle')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since the ``rolling_stream`` instance we created is bound to both operations, triggering an event on the stream will trigger both the ``Curve`` and the ``Scatter`` of outliers to be updated:"
"We can chain operations like this indefinitely and attach parameters or explicit streams to each stage. By chaining we can watch our visualization update whenever we change a stream value anywhere in the pipeline and HoloViews will be smart about which parts of the pipeline are recomputed, which allows us to build complex visualizations very quickly.\n",
"\n",
"The ``.apply`` method is also not limited to operations. We can just as easily apply a simple Python function to each object in the ``DynamicMap``. Here we define a function to compute the residual between the original ``dmap`` and the ``rolled_dmap``."
]
},
{
Expand All @@ -198,16 +195,44 @@
"metadata": {},
"outputs": [],
"source": [
"stream.event(rolling_window=50)"
"def residual_fn(overlay):\n",
" # Get first and second Element in overlay\n",
" el1, el2 = overlay.get(0), overlay.get(1)\n",
"\n",
" # Get x-values and y-values of curves\n",
" xvals = el1.dimension_values(0)\n",
" yvals = el1.dimension_values(1)\n",
" yvals2 = el2.dimension_values(1)\n",
"\n",
" # Return new Element with subtracted y-values\n",
" # and new label\n",
" return el1.clone((xvals, yvals-yvals2),\n",
" vdims='Residual')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can chain operations like this indefinitely and attach streams to each stage. By chaining we can watch our visualization update whenever we change a stream value anywhere in the pipeline and HoloViews will be smart about which parts of the pipeline are recomputed, which allows us to build complex visualizations very quickly.\n",
"If we overlay the two DynamicMaps we can then dynamically broadcast this function to each of the overlays, producing a new DynamicMap which responds to both the symbol selector widget and the slider:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"residual = (dmap * rolled_dmap).apply(residual_fn)\n",
"\n",
"In later guides we will discover how to tie custom streams to custom widgets letting us easily control the stream values and making it trivial to define complex dashboards. ``paramNB`` is only one widget framework we could use: we could also choose ``paramBokeh`` to make use of bokeh widgets and deploy the dashboard on bokeh server, or we could manually link ``ipywidgets`` to our streams. For more information on how to deploy bokeh apps from HoloViews and build dashboards see the [Deploying Bokeh Apps](./Deploying_Bokeh_Apps.ipynb) and [Dashboards](./17-Dashboards.ipynb) guides."
"residual"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In later guides we will see how we can combine HoloViews plots and Panel widgets into custom layouts allowing us to define complex dashboards. For more information on how to deploy bokeh apps from HoloViews and build dashboards see the [Deploying Bokeh Apps](./Deploying_Bokeh_Apps.ipynb) and [Dashboards](./17-Dashboards.ipynb) guides. To get a quick idea of what this might look like let's compose all the components we have no built:"
]
}
],
Expand Down
57 changes: 43 additions & 14 deletions examples/user_guide/17-Dashboards.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@
"source": [
"import param\n",
"import panel as pn\n",
"from holoviews.streams import Params\n",
"\n",
"variables = ['open', 'high', 'low', 'close', 'volume', 'adj_close']\n",
"\n",
"class StockExplorer(param.Parameterized):\n",
"\n",
" rolling_window = param.Integer(default=10, bounds=(1, 365))\n",
" \n",
" symbol = param.ObjectSelector(default='AAPL', objects=stock_symbols)\n",
" \n",
" variable = param.ObjectSelector(default='adj_close', objects=[\n",
" 'date', 'open', 'high', 'low', 'close', 'volume', 'adj_close'])\n",
" variable = param.ObjectSelector(default='adj_close', objects=variables)\n",
"\n",
" @param.depends('symbol', 'variable')\n",
" def load_symbol(self):\n",
Expand Down Expand Up @@ -107,7 +107,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"The ``rolling_window`` parameter is not yet connected to anything however, so just like in the [Data Processing Pipelines section](./14-Data_Pipelines.ipynb) we will see how we can get the widget to control the parameters of an operation. Both the ``rolling`` and ``rolling_outlier_std`` operations accept a ``rolling_window`` parameter, so we create a ``Params`` stream to listen to that parameter and then pass it to the operations. Finally we compose everything into a panel ``Row``:"
"The ``rolling_window`` parameter is not yet connected to anything however, so just like in the [Data Processing Pipelines section](./14-Data_Pipelines.ipynb) we will see how we can get the widget to control the parameters of an operation. Both the ``rolling`` and ``rolling_outlier_std`` operations accept a ``rolling_window`` parameter, so we simply pass that parameter into the operation. Finally we compose everything into a panel ``Row``:"
]
},
{
Expand All @@ -117,15 +117,45 @@
"outputs": [],
"source": [
"# Apply rolling mean\n",
"window = Params(explorer, ['rolling_window'])\n",
"smoothed = rolling(stock_dmap, streams=[window])\n",
"smoothed = rolling(stock_dmap, rolling_window=explorer.param.rolling_window)\n",
"\n",
"# Find outliers\n",
"outliers = rolling_outlier_std(stock_dmap, streams=[window]).opts(\n",
" hv.opts.Scatter(color='red', marker='triangle')\n",
")\n",
"outliers = rolling_outlier_std(stock_dmap, rolling_window=explorer.param.rolling_window).opts(\n",
" color='red', marker='triangle')\n",
"\n",
"pn.Row(explorer.param, (smoothed * outliers).opts(width=600, padding=0.1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A function based approach\n",
"\n",
"Instead of defining a whole Parameterized class we can also use the ``depends`` decorator to directly link the widgets to a DynamicMap callback function. This approach makes the link between the widgets and the computation very explicit at the cost of tying the widget and display code very closely together.\n",
"\n",
"Instead of declaring the dependencies as strings we map the parameter instance to a particular keyword argument in the ``depends`` call. In this way we can link the symbol to the ``RadioButtonGroup`` value and the ``variable`` to the ``Select`` widget value:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"symbol = pn.widgets.RadioButtonGroup(options=stock_symbols)\n",
"variable = pn.widgets.Select(options=variables)\n",
"rolling_window = pn.widgets.IntSlider(name='Rolling Window', value=10, start=1, end=365)\n",
"\n",
"@pn.depends(symbol=symbol.param.value, variable=variable.param.value)\n",
"def load_symbol_cb(symbol, variable):\n",
" return load_symbol(symbol, variable)\n",
"\n",
"dmap = hv.DynamicMap(load_symbol_cb)\n",
"\n",
"smoothed = rolling(stock_dmap, rolling_window=rolling_window.param.value)\n",
"\n",
"pn.Row(explorer.param, (smoothed * outliers).opts(width=600))"
"pn.Row(pn.WidgetBox('## Stock Explorer', symbol, variable, window), smoothed.opts(width=500))"
]
},
{
Expand Down Expand Up @@ -154,13 +184,12 @@
" stocks = hv.DynamicMap(self.load_symbol)\n",
"\n",
" # Apply rolling mean\n",
" window = Params(self, ['rolling_window'])\n",
" smoothed = rolling(stocks, streams=[window])\n",
" smoothed = rolling(stocks, rolling_window=self.param.rolling_window)\n",
" if self.datashade:\n",
" smoothed = dynspread(datashade(smoothed)).opts(framewise=True)\n",
" smoothed = dynspread(datashade(smoothed, aggregator='any')).opts(framewise=True)\n",
"\n",
" # Find outliers\n",
" outliers = rolling_outlier_std(stocks, streams=[window]).opts(\n",
" outliers = rolling_outlier_std(stocks, rolling_window=self.param.rolling_window).opts(\n",
" width=600, color='red', marker='triangle', framewise=True)\n",
" return (smoothed * outliers)"
]
Expand Down
4 changes: 3 additions & 1 deletion holoviews/core/spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import partial
from collections import defaultdict
from contextlib import contextmanager
from types import FunctionType

import numpy as np
import param
Expand Down Expand Up @@ -915,7 +916,8 @@ def __init__(self, callback, initial_items=None, streams=None, **params):
streams = (streams or [])

# If callback is a parameterized method and watch is disabled add as stream
if util.is_param_method(callback, has_deps=True) and params.get('watch', True):
if (util.is_param_method(callback, has_deps=True) and params.get('watch', True)
or isinstance(callback, FunctionType) and hasattr(callback, '_dinfo')):
streams.append(callback)

if isinstance(callback, types.GeneratorType):
Expand Down
42 changes: 30 additions & 12 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from contextlib import contextmanager
from itertools import groupby
from types import FunctionType

import param
import numpy as np
Expand Down Expand Up @@ -186,6 +187,11 @@ def _process_streams(cls, streams):
if not hasattr(s, "_dinfo"):
continue
s = ParamMethod(s)
elif isinstance(s, FunctionType) and hasattr(s, "_dinfo"):
deps = s._dinfo
dep_params = list(deps['dependencies']) + list(deps.get('kw', {}).values())
rename = {(p.owner, p.name): k for k, p in deps.get('kw', {}).items()}
s = Params(parameters=dep_params, rename=rename)
else:
invalid.append(s)
continue
Expand Down Expand Up @@ -623,13 +629,13 @@ class Params(Stream):

parameterized = param.ClassSelector(class_=(param.Parameterized,
param.parameterized.ParameterizedMetaclass),
constant=True, doc="""
constant=True, allow_None=True, doc="""
Parameterized instance to watch for parameter changes.""")

parameters = param.List([], constant=True, doc="""
Parameters on the parameterized to watch.""")

def __init__(self, parameterized, parameters=None, watch=True, **params):
def __init__(self, parameterized=None, parameters=None, watch=True, **params):
if util.param_version < '1.8.0' and watch:
raise RuntimeError('Params stream requires param version >= 1.8.0, '
'to support watching parameters.')
Expand All @@ -638,6 +644,17 @@ def __init__(self, parameterized, parameters=None, watch=True, **params):
else:
parameters = [p if isinstance(p, param.Parameter) else parameterized.param[p]
for p in parameters]

if 'rename' in params:
rename = {}
owners = [p.owner for p in parameters]
for k, v in params['rename'].items():
if isinstance(k, tuple):
rename[k] = v
else:
rename.update({(o, k): v for o in owners})
params['rename'] = rename

super(Params, self).__init__(parameterized=parameterized, parameters=parameters, **params)
self._memoize_counter = 0
self._events = []
Expand Down Expand Up @@ -673,9 +690,10 @@ def from_params(cls, params):
def _validate_rename(self, mapping):
pnames = [p.name for p in self.parameters]
for k, v in mapping.items():
if k not in pnames:
raise KeyError('Cannot rename %r as it is not a stream parameter' % k)
if k != v and v in pnames:
n = k[1] if isinstance(k, tuple) else k
if n not in pnames:
raise KeyError('Cannot rename %r as it is not a stream parameter' % n)
if n != v and v in pnames:
raise KeyError('Cannot rename to %r as it clashes with a '
'stream parameter of the same name' % v)
return mapping
Expand All @@ -695,9 +713,9 @@ def _on_trigger(self):

@property
def hashkey(self):
hashkey = {p.name: getattr(p.owner, p.name) for p in self.parameters}
hashkey = {self._rename.get(k, k): v for (k, v) in hashkey.items()
if self._rename.get(k, True) is not None}
hashkey = {(p.owner, p.name): getattr(p.owner, p.name) for p in self.parameters}
hashkey = {self._rename.get((o, n), n): v for (o, n), v in hashkey.items()
if self._rename.get((o, n), True) is not None}
hashkey['_memoize_key'] = self._memoize_counter
return hashkey

Expand All @@ -710,9 +728,9 @@ def update(self, **kwargs):

@property
def contents(self):
filtered = {p.name: getattr(p.owner, p.name) for p in self.parameters}
return {self._rename.get(k, k): v for (k, v) in filtered.items()
if self._rename.get(k, True) is not None}
filtered = {(p.owner, p.name): getattr(p.owner, p.name) for p in self.parameters}
return {self._rename.get((o, n), n): v for (o, n), v in filtered.items()
if self._rename.get((o, n), True) is not None}



Expand All @@ -725,7 +743,7 @@ class ParamMethod(Params):

def __init__(self, parameterized, parameters=None, watch=True, **params):
if not util.is_param_method(parameterized):
raise ValueError('ParamMethodStream expects a method on a '
raise ValueError('ParamMethod stream expects a method on a '
'parameterized class, found %s.'
% type(parameterized).__name__)
method = parameterized
Expand Down
14 changes: 14 additions & 0 deletions holoviews/tests/core/testdynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ def test_deep_apply_element_function(self):
curve = fn(10)
self.assertEqual(mapped[10], curve.clone(curve.data*2))

def test_deep_apply_element_param_function(self):
fn = lambda i: Curve(np.arange(i))
class Test(param.Parameterized):
a = param.Integer(default=1)
test = Test()
@param.depends(test.param.a)
def op(obj, a):
return obj.clone(obj.data*2)
dmap = DynamicMap(fn, kdims=[Dimension('Test', range=(10, 20))])
mapped = dmap.apply(op)
test.a = 2
curve = fn(10)
self.assertEqual(mapped[10], curve.clone(curve.data*2))

def test_deep_apply_element_function_with_kwarg(self):
fn = lambda i: Curve(np.arange(i))
dmap = DynamicMap(fn, kdims=[Dimension('Test', range=(10, 20))])
Expand Down
Loading

0 comments on commit 4534d45

Please sign in to comment.