-
-
Notifications
You must be signed in to change notification settings - Fork 473
/
callbacks.py
150 lines (132 loc) · 4.58 KB
/
callbacks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Defines callbacks to be executed on a thread or by scheduling it
on a running bokeh server.
"""
import asyncio
import inspect
import time
import param
from bokeh.io import curdoc as _curdoc
from ..util import edit_readonly
from .state import state
class PeriodicCallback(param.Parameterized):
"""
Periodic encapsulates a periodic callback which will run both
in tornado based notebook environments and on bokeh server. By
default the callback will run until the stop method is called,
but count and timeout values can be set to limit the number of
executions or the maximum length of time for which the callback
will run. The callback may also be started and stopped by setting
the running parameter to True or False respectively.
"""
callback = param.Callable(doc="""
The callback to execute periodically.""")
count = param.Integer(default=None, doc="""
Number of times the callback will be executed, by default
this is unlimited.""")
period = param.Integer(default=500, doc="""
Period in milliseconds at which the callback is executed.""")
timeout = param.Integer(default=None, doc="""
Timeout in milliseconds from the start time at which the callback
expires.""")
running = param.Boolean(default=False, doc="""
Toggles whether the periodic callback is currently running.""")
def __init__(self, **params):
super().__init__(**params)
self._counter = 0
self._start_time = None
self._cb = None
self._updating = False
self._doc = None
@param.depends('running', watch=True)
def _start(self):
if not self.running or self._updating:
return
self.start()
@param.depends('running', watch=True)
def _stop(self):
if self.running or self._updating:
return
self.stop()
@param.depends('period', watch=True)
def _update_period(self):
if self._cb:
self.stop()
self.start()
async def _periodic_callback(self):
with edit_readonly(state):
state.busy = True
try:
if inspect.isasyncgenfunction(self.callback) or inspect.iscoroutinefunction(self.callback):
await self.callback()
else:
self.callback()
finally:
with edit_readonly(state):
state.busy = False
self._counter += 1
if self.timeout is not None:
dt = (time.time() - self._start_time) * 1000
if dt > self.timeout:
self.stop()
if self._counter == self.count:
self.stop()
@property
def counter(self):
"""
Returns the execution count of the periodic callback.
"""
return self._counter
def _cleanup(self, session_context):
self.stop()
def start(self):
"""
Starts running the periodic callback.
"""
if self._cb is not None:
raise RuntimeError('Periodic callback has already started.')
if not self.running:
try:
self._updating = True
self.running = True
finally:
self._updating = False
self._start_time = time.time()
if state.curdoc:
self._doc = state.curdoc
self._cb = self._doc.add_periodic_callback(self._periodic_callback, self.period)
else:
from tornado.ioloop import PeriodicCallback
self._cb = PeriodicCallback(lambda: asyncio.create_task(self._periodic_callback()), self.period)
self._cb.start()
try:
state.on_session_destroyed(self._cleanup)
except Exception:
pass
def stop(self):
"""
Stops running the periodic callback.
"""
if self.running:
try:
self._updating = True
self.running = False
finally:
self._updating = False
self._counter = 0
self._timeout = None
if self._doc:
if self._doc._session_context:
self._doc.callbacks.remove_session_callback(self._cb)
else:
self._doc.callbacks._session_callbacks.remove(self._cb)
elif self._cb:
self._cb.stop()
self._cb = None
doc = self._doc or _curdoc()
if doc:
doc.callbacks.session_destroyed_callbacks = {
cb for cb in doc.callbacks.session_destroyed_callbacks
if cb is not self._cleanup
}
self._doc = None