From 9c17d9cfd319a70f2b525ddb57fd1ef6eb6260fc Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Fri, 10 Apr 2020 01:12:38 +0200 Subject: [PATCH 1/5] Switch to debouncing scheme on bokeh ServerCallback --- holoviews/plotting/bokeh/callbacks.py | 45 ++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/holoviews/plotting/bokeh/callbacks.py b/holoviews/plotting/bokeh/callbacks.py index e49f4bc92c..ce2649e7a4 100644 --- a/holoviews/plotting/bokeh/callbacks.py +++ b/holoviews/plotting/bokeh/callbacks.py @@ -1,5 +1,7 @@ from __future__ import absolute_import, division, unicode_literals +import datetime as dt + from collections import defaultdict from functools import partial @@ -103,7 +105,6 @@ def cleanup(self): Callback._callbacks = {k: cb for k, cb in Callback._callbacks.items() if cb is not self} - def reset(self): if self.handle_ids: handles = self._init_plot_handles() @@ -311,7 +312,9 @@ class ServerCallback(MessageCallback): """ # Timeout before the first event is processed - debounce = 50 + throttle_timeout = 50 + + throttle_scheme = 'debounce' _batched = [] @@ -319,7 +322,7 @@ def __init__(self, plot, streams, source, **params): super(ServerCallback, self).__init__(plot, streams, source, **params) self._active = False self._prev_msg = None - + self._last_event = dt.datetime.now() @classmethod def resolve_attr_spec(cls, spec, cb_obj, model=None): @@ -344,29 +347,45 @@ def resolve_attr_spec(cls, spec, cb_obj, model=None): resolved = getattr(resolved, p, None) return {'id': model.ref['id'], 'value': resolved} - def _schedule_callback(self, cb): - PeriodicCallback(callback=cb, period=self.debounce, count=1).start() + def _schedule_callback(self, cb, timeout=None): + timeout = self.throttle_timeout if timeout is None else timeout + PeriodicCallback(callback=cb, period=timeout, count=1).start() + @gen.coroutine def on_change(self, attr, old, new): """ Process change events adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ - self._queue.append((attr, old, new)) + self._queue.append((attr, old, new, dt.datetime.now())) if not self._active and self.plot.document: self._active = True self._schedule_callback(self.process_on_change) + @gen.coroutine def on_event(self, event): """ Process bokeh UIEvents adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ - self._queue.append(event) + self._queue.append((event, dt.datetime.now())) if not self._active and self.plot.document: self._active = True self._schedule_callback(self.process_on_event) + def throttled(self): + prev_event = self._queue[-1][-1] + now = dt.datetime.now() + if self.throttle_scheme == 'throttle': + diff = (now-self._last_event).total_seconds() + if diff < (self.throttle_timeout/100.): + return self.throttle_timeout-(diff*100) + else: + diff = (now-prev_event).total_seconds() + if diff < (self.throttle_timeout/100.): + return self.throttle_timeout + return False + @gen.coroutine def process_on_event(self): """ @@ -375,9 +394,13 @@ def process_on_event(self): if not self._queue: self._active = False return + throttled = self.throttled() + if throttled: + self._schedule_callback(self._process_on_event, throttled) + self._last_event = dt.datetime.now() # Get unique event types in the queue events = list(OrderedDict([(event.event_name, event) - for event in self._queue]).values()) + for event, dt in self._queue]).values()) self._queue = [] # Process event types @@ -397,6 +420,11 @@ def process_on_change(self): elif self._batched and not all(b in [q[0] for q in self._queue] for b in self._batched): self._schedule_callback(self.process_on_change) return # Skip until all batched events have arrived + throttled = self.throttled() + if throttled: + self._schedule_callback(self.process_on_change, throttled) + return + self.last_event = dt.datetime.now() self._queue = [] msg = {} @@ -414,6 +442,7 @@ def process_on_change(self): equal = msg == self._prev_msg except Exception: equal = False + if not equal or any(s.transient for s in self.streams): self.on_msg(msg) self._prev_msg = msg From 7cd933fe13f095e5807861310839c4fb2a147155 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Fri, 10 Apr 2020 04:48:31 +0200 Subject: [PATCH 2/5] Add adaptive mode --- holoviews/plotting/bokeh/callbacks.py | 62 +++++++++++++++++---------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/holoviews/plotting/bokeh/callbacks.py b/holoviews/plotting/bokeh/callbacks.py index ce2649e7a4..29b93a95ae 100644 --- a/holoviews/plotting/bokeh/callbacks.py +++ b/holoviews/plotting/bokeh/callbacks.py @@ -5,8 +5,9 @@ from collections import defaultdict from functools import partial -import param import numpy as np +import panel as pn +import param from bokeh.models import ( CustomJS, FactorRange, DatetimeAxis, ToolbarBox, Range1d, @@ -309,12 +310,22 @@ class ServerCallback(MessageCallback): resolves the requested attributes on the Python end and then hands the msg off to the general on_msg handler, which will update the Stream(s) attached to the callback. + + The ServerCallback supports three different throttling modes: + + - adaptive (default): The callback adapts the throttling timeout + depending on the time taken to process each message. + - throttle: Uses the fixed throttle_timeout as the minimum amount + of time between events. + - debounce: Processes the message only when no new event has been + received within the throttle_timeout duration. """ # Timeout before the first event is processed throttle_timeout = 50 - throttle_scheme = 'debounce' + # The + throttling_scheme = 'adaptive' _batched = [] @@ -322,7 +333,8 @@ def __init__(self, plot, streams, source, **params): super(ServerCallback, self).__init__(plot, streams, source, **params) self._active = False self._prev_msg = None - self._last_event = dt.datetime.now() + self._last_event = dt.datetime.now().timestamp() + self._history = [] @classmethod def resolve_attr_spec(cls, spec, cb_obj, model=None): @@ -348,42 +360,49 @@ def resolve_attr_spec(cls, spec, cb_obj, model=None): return {'id': model.ref['id'], 'value': resolved} def _schedule_callback(self, cb, timeout=None): - timeout = self.throttle_timeout if timeout is None else timeout - PeriodicCallback(callback=cb, period=timeout, count=1).start() + if timeout is not None: + pass + elif self._history and self.throttling_scheme == 'adaptive': + timeout = np.array(self._history).mean()*1000 + else: + timeout = self.throttle_timeout + pn.state.curdoc.add_timeout_callback(cb, timeout) - @gen.coroutine def on_change(self, attr, old, new): """ Process change events adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ - self._queue.append((attr, old, new, dt.datetime.now())) + self._queue.append((attr, old, new, dt.datetime.now().timestamp())) if not self._active and self.plot.document: self._active = True self._schedule_callback(self.process_on_change) - @gen.coroutine def on_event(self, event): """ Process bokeh UIEvents adding timeout to process multiple concerted value change at once rather than firing off multiple plot updates. """ - self._queue.append((event, dt.datetime.now())) + self._queue.append((event, dt.datetime.now().timestamp())) if not self._active and self.plot.document: self._active = True self._schedule_callback(self.process_on_event) def throttled(self): - prev_event = self._queue[-1][-1] - now = dt.datetime.now() - if self.throttle_scheme == 'throttle': - diff = (now-self._last_event).total_seconds() - if diff < (self.throttle_timeout/100.): - return self.throttle_timeout-(diff*100) + now = dt.datetime.now().timestamp() + timeout = self.throttle_timeout/1000. + if self.throttling_scheme in ('throttle', 'adaptive'): + diff = (now-self._last_event) + if self._history and self.throttling_scheme == 'adaptive': + timeout = np.array(self._history).mean() + if diff < timeout: + return int((timeout-diff)*1000) else: - diff = (now-prev_event).total_seconds() - if diff < (self.throttle_timeout/100.): + prev_event = self._queue[-1][-1] + diff = (now-prev_event) + if diff < timeout: return self.throttle_timeout + self._last_event = dt.datetime.now().timestamp() return False @gen.coroutine @@ -397,7 +416,7 @@ def process_on_event(self): throttled = self.throttled() if throttled: self._schedule_callback(self._process_on_event, throttled) - self._last_event = dt.datetime.now() + return # Get unique event types in the queue events = list(OrderedDict([(event.event_name, event) for event, dt in self._queue]).values()) @@ -410,6 +429,7 @@ def process_on_event(self): model_obj = self.plot_handles.get(self.models[0]) msg[attr] = self.resolve_attr_spec(path, event, model_obj) self.on_msg(msg) + self._history = self._history[-9:] + [dt.datetime.now().timestamp()-self._last_event] self._schedule_callback(self.process_on_event) @gen.coroutine @@ -417,14 +437,10 @@ def process_on_change(self): if not self._queue: self._active = False return - elif self._batched and not all(b in [q[0] for q in self._queue] for b in self._batched): - self._schedule_callback(self.process_on_change) - return # Skip until all batched events have arrived throttled = self.throttled() if throttled: self._schedule_callback(self.process_on_change, throttled) return - self.last_event = dt.datetime.now() self._queue = [] msg = {} @@ -445,11 +461,11 @@ def process_on_change(self): if not equal or any(s.transient for s in self.streams): self.on_msg(msg) + self._history = self._history[-9:] + [dt.datetime.now().timestamp()-self._last_event] self._prev_msg = msg self._schedule_callback(self.process_on_change) - def set_server_callback(self, handle): """ Set up on_change events for bokeh server interactions. From 8fdc561befe9c21a1cd2d029bbf0e15783871ea3 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Fri, 10 Apr 2020 04:55:34 +0200 Subject: [PATCH 3/5] Fixed docs --- holoviews/plotting/bokeh/callbacks.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/holoviews/plotting/bokeh/callbacks.py b/holoviews/plotting/bokeh/callbacks.py index 29b93a95ae..0a8092cd3c 100644 --- a/holoviews/plotting/bokeh/callbacks.py +++ b/holoviews/plotting/bokeh/callbacks.py @@ -314,21 +314,21 @@ class ServerCallback(MessageCallback): The ServerCallback supports three different throttling modes: - adaptive (default): The callback adapts the throttling timeout - depending on the time taken to process each message. - - throttle: Uses the fixed throttle_timeout as the minimum amount + depending on the rolling mean of the time taken to process each + message. The rolling window is controlled by the `adaptive_window` + value. + - throttle: Uses the fixed `throttle_timeout` as the minimum amount of time between events. - debounce: Processes the message only when no new event has been - received within the throttle_timeout duration. + received within the `throttle_timeout` duration. """ - # Timeout before the first event is processed + adaptive_window = 3 + throttle_timeout = 50 - # The throttling_scheme = 'adaptive' - _batched = [] - def __init__(self, plot, streams, source, **params): super(ServerCallback, self).__init__(plot, streams, source, **params) self._active = False @@ -429,7 +429,9 @@ def process_on_event(self): model_obj = self.plot_handles.get(self.models[0]) msg[attr] = self.resolve_attr_spec(path, event, model_obj) self.on_msg(msg) - self._history = self._history[-9:] + [dt.datetime.now().timestamp()-self._last_event] + w = self.adaptive_window-1 + diff = dt.datetime.now().timestamp()-self._last_event + self._history = self._history[-w:] + [diff] self._schedule_callback(self.process_on_event) @gen.coroutine @@ -461,7 +463,9 @@ def process_on_change(self): if not equal or any(s.transient for s in self.streams): self.on_msg(msg) - self._history = self._history[-9:] + [dt.datetime.now().timestamp()-self._last_event] + w = self.adaptive_window-1 + diff = dt.datetime.now().timestamp()-self._last_event + self._history = self._history[-w:] + [diff] self._prev_msg = msg self._schedule_callback(self.process_on_change) @@ -850,8 +854,6 @@ class RangeXYCallback(Callback): models = ['x_range', 'y_range'] on_changes = ['start', 'end'] - _batched = on_changes - def _process_msg(self, msg): data = {} if 'x0' in msg and 'x1' in msg: From 559e6c8752ba05eb3540c80f7c9e58842f02f762 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Fri, 10 Apr 2020 05:03:19 +0200 Subject: [PATCH 4/5] Use unlocked context manager to handle server updates --- holoviews/plotting/plot.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/holoviews/plotting/plot.py b/holoviews/plotting/plot.py index b134f666bb..a77bd3386e 100644 --- a/holoviews/plotting/plot.py +++ b/holoviews/plotting/plot.py @@ -19,6 +19,7 @@ from panel.config import config from panel.io.notebook import push from panel.io.state import state +from panel.io.server import unlocked from pyviz_comms import JupyterComm from ..selection import NoOpSelectionDisplay @@ -232,6 +233,7 @@ def refresh(self, **kwargs): for d, k in zip(self.dimensions, key)) stream_key = util.wrap_tuple_streams(key, self.dimensions, self.streams) + self._trigger_refresh(stream_key) if self.top_level: self.push() @@ -248,7 +250,8 @@ def _trigger_refresh(self, key): "Triggers update to a plot on a refresh event" # Update if not top-level, batched or an ElementPlot if not self.top_level or isinstance(self, GenericElementPlot): - self.update(key) + with unlocked(): + self.update(key) def push(self): From c30e91723593866c6c40e698526dcb6e309fc428 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Fri, 10 Apr 2020 05:27:46 +0200 Subject: [PATCH 5/5] Fixed flake --- holoviews/plotting/bokeh/callbacks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/holoviews/plotting/bokeh/callbacks.py b/holoviews/plotting/bokeh/callbacks.py index 0a8092cd3c..e14988adea 100644 --- a/holoviews/plotting/bokeh/callbacks.py +++ b/holoviews/plotting/bokeh/callbacks.py @@ -15,7 +15,6 @@ FreehandDrawTool, PointDrawTool ) from panel.io.state import state -from panel.callbacks import PeriodicCallback from pyviz_comms import JS_CALLBACK from tornado import gen