/
bokeh_plot.py
286 lines (227 loc) · 8.69 KB
/
bokeh_plot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
from dataclasses import asdict, dataclass, field
from itertools import cycle
from pprint import pformat
from threading import Event
from typing import Callable, Dict, List
from bokeh.io import curdoc
from bokeh.layouts import column, gridplot
from bokeh.models import ColumnDataSource, HoverTool, Legend
from bokeh.models.widgets import CheckboxGroup, Div, Slider
from bokeh.palettes import Dark2_5 as palette
from bokeh.plotting import figure
from sensor import RollingStack, SensorConsumer, SensorDetails
from tornado import gen
@dataclass
class GenericDataclass:
def __str__(self) -> str:
return pformat(self.dict(), indent=4)
def dict(self):
return {k: str(v) for k, v in asdict(self).items()}
@dataclass
class PlotDefaults(GenericDataclass):
"""Some Generic defaults for most plots
Args:
GenericDataclass (Class): adds pretty printouts for debugging
"""
sensor_details: SensorDetails = None
plot_tools: str = "box_zoom,pan,wheel_zoom,reset"
tooltips: List = field(
default_factory=lambda: [
("index", "$index"),
(
"(x,y)",
"(@x, $y)",
),
]
)
# plot data
plot_title: str = "Sensor Data"
xaxis_label: str = "TS"
yaxis_label: str = "Value"
plot_width: int = 1000
plot_height: int = 500
ys_legend_text: Dict = field(default_factory=lambda: {"y": "Fn(x)"})
def __post_init__(self):
if self.sensor_details:
self.ys_legend_text = self.sensor_details.legend
self.plot_title = self.sensor_details.title
@dataclass
class LayoutDefaults(GenericDataclass):
"""Some Generic defaults for parent canvas that contains the plots
Args:
GenericDataclass (Class): adds pretty printouts for debugging
"""
delay_queue: RollingStack
page_title: str = "Real Time Sensor Data"
page_title_colour: str = "white"
page_title_width: int = 1000
page_title_height: int = 75
# how much data to scroll
window_slider_start: int = 1
window_slider_end: int = 1000
window_slider_value: int = 125
window_slider_step: int = 1
# how fast to simulate sensor new datapoints
sensor_speed_slider_start: int = 0.005
sensor_speed_slider_end: int = 0.5
sensor_speed_slider_value: int = 0.01
sensor_speed_slider_step: int = 0.01
n_columns: int = 3
class BokehPage:
def __init__(self, defaults: LayoutDefaults, sensor_is_reading: Event) -> None:
"""Initialse page/canvas
Args:
defaults (LayoutDefaults): default setup values
"""
self.doc = curdoc()
curdoc().theme = "dark_minimal"
self.defaults = defaults
self.window_width = self.defaults.window_slider_value
self.start_stop_checkbox = None
self.window_width_slider = None
self.sensor_speed_slider = None
self.all_plots = None
self.plots = None
self.sensor_is_reading = sensor_is_reading
self.header = Div(
text=f"<h1 style='color:{defaults.page_title_colour}'>{defaults.page_title}</h1>",
width=defaults.page_title_width,
height=defaults.page_title_height,
background="black",
)
def add_plots(self, plots: List["BokehPlot"]):
"""Add plots to window
Args:
plots (List[BokehPlot]): list of bokeh plots showing sensor data
"""
self.plots = plots
grid_plot = []
for p in plots:
grid_plot.append(p.plt)
n = self.defaults.n_columns
grid_plot = [grid_plot[i : i + n] for i in range(0, len(grid_plot), n)]
self.all_plots = gridplot(
grid_plot,
)
self.all_plots.spacing = 10
self.layout()
def layout(self):
"""Add plots and sliders to layout"""
self.doc.title = self.defaults.page_title
self.start_stop_checkbox = CheckboxGroup(labels=["Enable Plotting"], active=[0])
self.start_stop_checkbox.on_change("active", self.start_stop_handler)
self.window_width_slider = Slider(
start=self.defaults.window_slider_start,
end=self.defaults.window_slider_end,
value=self.defaults.window_slider_value,
step=self.defaults.window_slider_step,
title="window_width",
)
self.window_width_slider.on_change("value", self.window_width_handler)
# adjust delay from sensor data updates. Can be removed for real data
self.sensor_speed = Slider(
start=self.defaults.sensor_speed_slider_start,
end=self.defaults.sensor_speed_slider_end,
value=self.defaults.sensor_speed_slider_value,
step=self.defaults.sensor_speed_slider_step,
title="Sensor Update delay",
)
self.sensor_speed.on_change("value", self.sensor_speed_handler)
self.hertz_div = Div(
text=f"<b>Each plot is updating at {1/self.defaults.sensor_speed_slider_value:.1f}Hz</b>"
)
a = 1
itms = [
self.header,
self.start_stop_checkbox,
self.window_width_slider,
self.sensor_speed,
self.hertz_div,
self.all_plots,
]
for itm in itms:
itm.sizing_mode = "stretch_width"
layout = column(*itms)
layout.sizing_mode = "stretch_width"
self.doc.add_root(layout)
def start_stop_handler(self, attr: str, old: int, new: int):
"""Pause plot updates so you can
Args:
attr (str): only used as a placeholder
old (int): only used as a placeholder
new (int): current checkbox value: 0 off, 1 on
"""
if new:
self.sensor_is_reading.set()
else:
self.sensor_is_reading.clear()
def window_width_handler(self, attr, old, new):
"""Pause plot updates so you can
Args:
attr (str): only used as a placeholder
old (int): only used as a placeholder
new (int): sets with of rolling window
"""
self.window_width = new
def sensor_speed_handler(self, attr, old, new):
"""Pause plot updates so you can
Args:
attr (str): only used as a placeholder
old (int): only used as a placeholder
new (int): sets delay between sensor updates
"""
self.hertz_div.text = f"<b>Each plot is updating at {1/new:.1f}Hz</b>"
self.defaults.delay_queue.append(new)
class BokehPlot:
def __init__(self, parent: BokehPage, sensor_details: SensorDetails) -> None:
"""Initialise a plot
Args:
parent (BokehPage): parent that will contain the plot
signal (SensorConsumer): sensor signal producer
"""
self.parent = parent
self.doc = parent.doc
self.colours = cycle(palette)
self.defaults = PlotDefaults(sensor_details)
self.plot_options = dict(
width=self.defaults.plot_width,
height=self.defaults.plot_height,
tools=[
HoverTool(tooltips=self.defaults.tooltips),
self.defaults.plot_tools,
],
)
self.source, self.plt = self.definePlot()
def definePlot(self):
"""Automaticaaly define the plot based on the legend data supplied in Main
Returns:
(source, plt): (source data for sensor, plot data based on sensor data)
"""
plt = figure(**self.plot_options, title=self.defaults.plot_title)
plt.sizing_mode = "scale_width"
plt.xaxis.axis_label = self.defaults.xaxis_label
plt.yaxis.axis_label = self.defaults.yaxis_label
# if multiple y values (eg y, y1,y2...yn) in plot create a multiline plot
data = {_y: [0] for _y in self.defaults.ys_legend_text.keys()}
data["x"] = [0]
source = ColumnDataSource(data=data)
items = []
for y, legend_text in self.defaults.ys_legend_text.items():
colour = next(self.colours)
r1 = plt.line(x="x", y=y, source=source, line_width=2, color=colour)
r1a = plt.circle(
x="x", y=y, source=source, fill_color="white", size=5, color=colour
)
items.append((legend_text, [r1, r1a]))
legend = Legend(items=items)
plt.add_layout(legend, "right")
plt.legend.click_policy = "hide"
return source, plt
@gen.coroutine
def update(self, new_data: dict):
"""update source data from sensor data
Args:
new_data (dict): newest data
"""
if self.parent.sensor_is_reading.is_set():
self.source.stream(new_data, rollover=self.parent.window_width)