forked from holoviz/panel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
callbacks.py
82 lines (67 loc) · 2.48 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
Defines callbacks to be executed on a thread or by scheduling it
on a running bokeh server.
"""
from __future__ import absolute_import, division, unicode_literals
import time
import param
from bokeh.io import curdoc as _curdoc
class PeriodicCallback(param.Parameterized):
"""
Periodic encapsulates a periodic callback which will run both
in tornado based notebook environments and on bokeh server. By
default the callback will run until the stop method is called,
but count and timeout values can be set to limit the number of
executions or the maximum length of time for which the callback
will run.
"""
callback = param.Callable(doc="""
The callback to execute periodically.""")
count = param.Integer(default=None, doc="""
Number of times the callback will be executed, by default
this is unlimited.""")
period = param.Integer(default=500, doc="""
Period in milliseconds at which the callback is executed.""")
timeout = param.Integer(default=None, doc="""
Timeout in seconds from the start time at which the callback
expires""")
def __init__(self, **params):
super(PeriodicCallback, self).__init__(**params)
self._counter = 0
self._start_time = None
self._timeout = None
self._cb = None
self._doc = None
def start(self):
if self._cb is not None:
raise RuntimeError('Periodic callback has already started.')
self._start_time = time.time()
if _curdoc().session_context:
self._doc = _curdoc()
self._cb = self._doc.add_periodic_callback(self._periodic_callback, self.period)
else:
from tornado.ioloop import PeriodicCallback
self._cb = PeriodicCallback(self._periodic_callback, self.period)
self._cb.start()
@param.depends('period', watch=True)
def _update_period(self):
if self._cb:
self.stop()
self.start()
def _periodic_callback(self):
self.callback()
self._counter += 1
if self._timeout is not None:
dt = (time.time() - self._start_time)
if dt > self._timeout:
self.stop()
if self._counter == self.count:
self.stop()
def stop(self):
self._counter = 0
self._timeout = None
if self._doc:
self._doc.remove_periodic_callback(self._cb)
else:
self._cb.stop()
self._cb = None