Skip to content

Commit

Permalink
Merge c30e917 into c815e99
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Apr 10, 2020
2 parents c815e99 + c30e917 commit b66f997
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
82 changes: 64 additions & 18 deletions holoviews/plotting/bokeh/callbacks.py
@@ -1,18 +1,20 @@
from __future__ import absolute_import, division, unicode_literals

import datetime as dt

from collections import defaultdict
from functools import partial

import param
import numpy as np
import panel as pn
import param

from bokeh.models import (
CustomJS, FactorRange, DatetimeAxis, ToolbarBox, Range1d,
DataRange1d, PolyDrawTool, BoxEditTool, PolyEditTool,
FreehandDrawTool, PointDrawTool
)
from panel.io.state import state
from panel.callbacks import PeriodicCallback
from pyviz_comms import JS_CALLBACK
from tornado import gen

Expand Down Expand Up @@ -103,7 +105,6 @@ def cleanup(self):
Callback._callbacks = {k: cb for k, cb in Callback._callbacks.items()
if cb is not self}


def reset(self):
if self.handle_ids:
handles = self._init_plot_handles()
Expand Down Expand Up @@ -308,18 +309,31 @@ class ServerCallback(MessageCallback):
resolves the requested attributes on the Python end and then hands
the msg off to the general on_msg handler, which will update the
Stream(s) attached to the callback.
The ServerCallback supports three different throttling modes:
- adaptive (default): The callback adapts the throttling timeout
depending on the rolling mean of the time taken to process each
message. The rolling window is controlled by the `adaptive_window`
value.
- throttle: Uses the fixed `throttle_timeout` as the minimum amount
of time between events.
- debounce: Processes the message only when no new event has been
received within the `throttle_timeout` duration.
"""

# Timeout before the first event is processed
debounce = 50
adaptive_window = 3

throttle_timeout = 50

_batched = []
throttling_scheme = 'adaptive'

def __init__(self, plot, streams, source, **params):
super(ServerCallback, self).__init__(plot, streams, source, **params)
self._active = False
self._prev_msg = None

self._last_event = dt.datetime.now().timestamp()
self._history = []

@classmethod
def resolve_attr_spec(cls, spec, cb_obj, model=None):
Expand All @@ -344,15 +358,21 @@ def resolve_attr_spec(cls, spec, cb_obj, model=None):
resolved = getattr(resolved, p, None)
return {'id': model.ref['id'], 'value': resolved}

def _schedule_callback(self, cb):
PeriodicCallback(callback=cb, period=self.debounce, count=1).start()
def _schedule_callback(self, cb, timeout=None):
if timeout is not None:
pass
elif self._history and self.throttling_scheme == 'adaptive':
timeout = np.array(self._history).mean()*1000
else:
timeout = self.throttle_timeout
pn.state.curdoc.add_timeout_callback(cb, timeout)

def on_change(self, attr, old, new):
"""
Process change events adding timeout to process multiple concerted
value change at once rather than firing off multiple plot updates.
"""
self._queue.append((attr, old, new))
self._queue.append((attr, old, new, dt.datetime.now().timestamp()))
if not self._active and self.plot.document:
self._active = True
self._schedule_callback(self.process_on_change)
Expand All @@ -362,11 +382,28 @@ def on_event(self, event):
Process bokeh UIEvents adding timeout to process multiple concerted
value change at once rather than firing off multiple plot updates.
"""
self._queue.append(event)
self._queue.append((event, dt.datetime.now().timestamp()))
if not self._active and self.plot.document:
self._active = True
self._schedule_callback(self.process_on_event)

def throttled(self):
now = dt.datetime.now().timestamp()
timeout = self.throttle_timeout/1000.
if self.throttling_scheme in ('throttle', 'adaptive'):
diff = (now-self._last_event)
if self._history and self.throttling_scheme == 'adaptive':
timeout = np.array(self._history).mean()
if diff < timeout:
return int((timeout-diff)*1000)
else:
prev_event = self._queue[-1][-1]
diff = (now-prev_event)
if diff < timeout:
return self.throttle_timeout
self._last_event = dt.datetime.now().timestamp()
return False

@gen.coroutine
def process_on_event(self):
"""
Expand All @@ -375,9 +412,13 @@ def process_on_event(self):
if not self._queue:
self._active = False
return
throttled = self.throttled()
if throttled:
self._schedule_callback(self._process_on_event, throttled)
return
# Get unique event types in the queue
events = list(OrderedDict([(event.event_name, event)
for event in self._queue]).values())
for event, dt in self._queue]).values())
self._queue = []

# Process event types
Expand All @@ -387,16 +428,20 @@ def process_on_event(self):
model_obj = self.plot_handles.get(self.models[0])
msg[attr] = self.resolve_attr_spec(path, event, model_obj)
self.on_msg(msg)
w = self.adaptive_window-1
diff = dt.datetime.now().timestamp()-self._last_event
self._history = self._history[-w:] + [diff]
self._schedule_callback(self.process_on_event)

@gen.coroutine
def process_on_change(self):
if not self._queue:
self._active = False
return
elif self._batched and not all(b in [q[0] for q in self._queue] for b in self._batched):
self._schedule_callback(self.process_on_change)
return # Skip until all batched events have arrived
throttled = self.throttled()
if throttled:
self._schedule_callback(self.process_on_change, throttled)
return
self._queue = []

msg = {}
Expand All @@ -414,13 +459,16 @@ def process_on_change(self):
equal = msg == self._prev_msg
except Exception:
equal = False

if not equal or any(s.transient for s in self.streams):
self.on_msg(msg)
w = self.adaptive_window-1
diff = dt.datetime.now().timestamp()-self._last_event
self._history = self._history[-w:] + [diff]
self._prev_msg = msg

self._schedule_callback(self.process_on_change)


def set_server_callback(self, handle):
"""
Set up on_change events for bokeh server interactions.
Expand Down Expand Up @@ -805,8 +853,6 @@ class RangeXYCallback(Callback):
models = ['x_range', 'y_range']
on_changes = ['start', 'end']

_batched = on_changes

def _process_msg(self, msg):
data = {}
if 'x0' in msg and 'x1' in msg:
Expand Down
5 changes: 4 additions & 1 deletion holoviews/plotting/plot.py
Expand Up @@ -19,6 +19,7 @@
from panel.config import config
from panel.io.notebook import push
from panel.io.state import state
from panel.io.server import unlocked
from pyviz_comms import JupyterComm

from ..selection import NoOpSelectionDisplay
Expand Down Expand Up @@ -232,6 +233,7 @@ def refresh(self, **kwargs):
for d, k in zip(self.dimensions, key))
stream_key = util.wrap_tuple_streams(key, self.dimensions, self.streams)


self._trigger_refresh(stream_key)
if self.top_level:
self.push()
Expand All @@ -248,7 +250,8 @@ def _trigger_refresh(self, key):
"Triggers update to a plot on a refresh event"
# Update if not top-level, batched or an ElementPlot
if not self.top_level or isinstance(self, GenericElementPlot):
self.update(key)
with unlocked():
self.update(key)


def push(self):
Expand Down

0 comments on commit b66f997

Please sign in to comment.