Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeseries operations #1172

Merged
merged 10 commits into from Mar 6, 2017
4 changes: 4 additions & 0 deletions holoviews/core/operation.py
Expand Up @@ -103,6 +103,10 @@ class ElementOperation(Operation):
first component is a Normalization.ranges list and the second
component is Normalization.keys. """)

streams = param.List(default=[], doc="""
List of streams that are applied if dynamic=True, allowing
for dynamic interaction with the plot.""")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine but we do need to document this before before 1.7.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll have to add documentation about dynamic operations to our DynamicMap and streams sections.

def _process(self, view, key=None):
"""
Process a single input element and outputs new single element
Expand Down
4 changes: 2 additions & 2 deletions holoviews/operation/datashader.py
Expand Up @@ -32,7 +32,7 @@ def discover(dataset):
Allows datashader to correctly discover the dtypes of the data
in a holoviews Element.
"""
return dsdiscover(PandasInterface.as_dframe(element))
return dsdiscover(PandasInterface.as_dframe(dataset))


@bypixel.pipeline.register(Element)
Expand Down Expand Up @@ -149,7 +149,7 @@ def get_agg_data(cls, obj, category=None):
vdims = element.vdims
elif isinstance(obj, Element):
glyph = 'line' if isinstance(obj, Curve) else 'points'
paths.append(obj.data if is_df(obj) else obj.dframe())
paths.append(PandasInterface.as_dframe(obj))
if len(paths) > 1:
if glyph == 'line':
path = paths[0][:1]
Expand Down
10 changes: 7 additions & 3 deletions holoviews/operation/element.py
Expand Up @@ -575,7 +575,7 @@ class decimate(ElementOperation):
The x_range as a tuple of min and max y-value. Auto-ranges
if set to None.""")

def _process(self, element, key=None):
def _process_layer(self, element, key=None):
if not isinstance(element, Dataset):
raise ValueError("Cannot downsample non-Dataset types.")
if element.interface not in column_interfaces:
Expand Down Expand Up @@ -604,7 +604,8 @@ def _process(self, element, key=None):
sliced = element.clone(data)
return sliced


def _process(self, element, key=None):
return element.map(self._process_layer, Element)


class interpolate_curve(ElementOperation):
Expand Down Expand Up @@ -646,7 +647,7 @@ def pts_to_poststep(cls, x, y):
steps[1:, 1::2] = steps[1:, 0:-2:2]
return steps

def _process(self, element, key=None):
def _process_layer(self, element, key=None):
INTERPOLATE_FUNCS = {'steps-pre': self.pts_to_prestep,
'steps-mid': self.pts_to_midstep,
'steps-post': self.pts_to_poststep}
Expand All @@ -657,6 +658,9 @@ def _process(self, element, key=None):
dvals = tuple(element.dimension_values(d) for d in element.dimensions()[2:])
return element.clone((array[0, :], array[1, :])+dvals)

def _process(self, element, key=None):
return element.map(self._process_layer, Element)


#==================#
# Other operations #
Expand Down
116 changes: 116 additions & 0 deletions holoviews/operation/timeseries.py
@@ -0,0 +1,116 @@
import param
import numpy as np
import pandas as pd

from ..core import ElementOperation, Element
from ..core.data import PandasInterface
from ..element import Scatter


class rolling(ElementOperation):
"""
Applies a function over a rolling window.
"""

center = param.Boolean(default=True, doc="""
Whether to set the x-coordinate at the center or right edge
of the window.""")

function = param.Callable(default=np.mean, doc="""
The function to apply over the rolling window.""")

min_periods = param.Integer(default=None, doc="""
Minimum number of observations in window required to have a
value (otherwise result is NA).""")

rolling_window = param.Integer(default=10, doc="""
The window size over which to apply the function.""")

window_type = param.ObjectSelector(default=None,
objects=['boxcar', 'triang', 'blackman', 'hamming', 'bartlett',
'parzen', 'bohman', 'blackmanharris', 'nuttall',
'barthann', 'kaiser', 'gaussian', 'general_gaussian',
'slepian'], doc="The type of the window to apply")

def _process_layer(self, element, key=None):
xdim = element.kdims[0].name
df = PandasInterface.as_dframe(element)
roll_kwargs = {'window': self.p.rolling_window,
'center': self.p.center,
'win_type': self.p.window_type,
'min_periods': self.p.min_periods}
df = df.set_index(xdim).rolling(**roll_kwargs)
if roll_kwargs['window'] is None:
rolled = df.apply(self.p.function)
else:
if self.p.function is np.mean:
rolled = df.mean()
elif self.p.function is np.sum:
rolled = df.sum()
else:
raise ValueError("Rolling window function only supports "
"mean and sum when custom window_type is supplied")
return element.clone(rolled.reset_index())

def _process(self, element, key=None):
return element.map(self._process_layer, Element)


class resample(ElementOperation):
"""
Resamples a timeseries of dates with a frequency and function
"""

closed = param.ObjectSelector(default=None, objects=['left', 'right'],
doc="Which side of bin interval is closed")

function = param.Callable(default=np.mean, doc="""
The function to apply over the rolling window.""")

label = param.ObjectSelector(default='right', doc="""
The bin edge to label the bin with.""")

rule = param.String(default='D', doc="""
A string representing the time interval over which to apply the resampling""")

def _process_layer(self, element, key=None):
df = PandasInterface.as_dframe(element)
xdim = element.kdims[0].name
resample_kwargs = {'rule': self.p.rule, 'label': self.p.label,
'closed': self.p.closed}
df = df.set_index(xdim).resample(**resample_kwargs)
return element.clone(df.apply(self.p.function).reset_index())

def _process(self, element, key=None):
return element.map(self._process_layer, Element)


class rolling_outlier_std(ElementOperation):
"""
Detect outliers using the standard deviation within a rolling window.

Outliers are the array elements outside `sigma` standard deviations from
the smoothed trend line, as calculated from the trend line residuals.
"""

rolling_window = param.Integer(default=10, doc="""
The window size of which within the rolling std is computed.""")

sigma = param.Number(default=2.0, doc="""
Minimum sigma before a value is considered an outlier.""")

def _process_layer(self, element, key=None):
sigma, window = self.p.sigma, self.p.rolling_window
ys = element.dimension_values(1)

# Calculate the variation in the distribution of the residual
avg = pd.Series(ys).rolling(window, center=True).mean()
residual = ys - avg
std = pd.Series(residual).rolling(window, center=True).std()

# Get indices of outliers
outliers = (np.abs(residual) > std * sigma).values
return element[outliers].clone(new_type=Scatter)

def _process(self, element, key=None):
return element.map(self._process_layer, Element)
68 changes: 68 additions & 0 deletions tests/testtimeseriesoperations.py
@@ -0,0 +1,68 @@
import pandas as pd
import numpy as np

from holoviews import Curve, Scatter
from holoviews.element.comparison import ComparisonTestCase
from holoviews.operation.timeseries import (rolling, resample, rolling_outlier_std)


class TimeseriesOperationTests(ComparisonTestCase):
"""
Tests for the various timeseries operations including rolling,
resample and rolling_outliers_std.
"""

def setUp(self):
self.dates = pd.date_range("2016-01-01", "2016-01-07", freq='D')
self.values = [1, 2, 3, 4, 5, 6, 7]
self.outliers = [1, 2, 1, 2, 10., 2, 1]
self.date_curve = Curve((self.dates, self.values))
self.int_curve = Curve(self.values)
self.date_outliers = Curve((self.dates, self.outliers))
self.int_outliers = Curve(self.outliers)

def test_roll_dates(self):
rolled = rolling(self.date_curve, rolling_window=2)
rolled_vals = [np.NaN, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
self.assertEqual(rolled, Curve((self.dates, rolled_vals)))

def test_roll_ints(self):
rolled = rolling(self.int_curve, rolling_window=2)
rolled_vals = [np.NaN, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
self.assertEqual(rolled, Curve(rolled_vals))

def test_roll_date_with_window_type(self):
rolled = rolling(self.date_curve, rolling_window=3, window_type='triang')
rolled_vals = [np.NaN, 2, 3, 4, 5, 6, np.NaN]
self.assertEqual(rolled, Curve((self.dates, rolled_vals)))

def test_roll_ints_with_window_type(self):
rolled = rolling(self.int_curve, rolling_window=3, window_type='triang')
rolled_vals = [np.NaN, 2, 3, 4, 5, 6, np.NaN]
self.assertEqual(rolled, Curve(rolled_vals))

def test_resample_weekly(self):
resampled = resample(self.date_curve, rule='W')
dates = list(map(pd.Timestamp, ["2016-01-03", "2016-01-10"]))
vals = [2, 5.5]
self.assertEqual(resampled, Curve((dates, vals)))

def test_resample_weekly_closed_left(self):
resampled = resample(self.date_curve, rule='W', closed='left')
dates = list(map(pd.Timestamp, ["2016-01-03", "2016-01-10"]))
vals = [1.5, 5]
self.assertEqual(resampled, Curve((dates, vals)))

def test_resample_weekly_label_left(self):
resampled = resample(self.date_curve, rule='W', label='left')
dates = list(map(pd.Timestamp, ["2015-12-27", "2016-01-03"]))
vals = [2, 5.5]
self.assertEqual(resampled, Curve((dates, vals)))

def test_rolling_outliers_std_ints(self):
outliers = rolling_outlier_std(self.int_outliers, rolling_window=2, sigma=1)
self.assertEqual(outliers, Scatter([(4, 10)]))

def test_rolling_outliers_std_dates(self):
outliers = rolling_outlier_std(self.date_outliers, rolling_window=2, sigma=1)
self.assertEqual(outliers, Scatter([(pd.Timestamp("2016-01-05"), 10)]))