-
-
Notifications
You must be signed in to change notification settings - Fork 477
/
vega.py
298 lines (255 loc) · 10.3 KB
/
vega.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
from __future__ import annotations
import re
import sys
from typing import (
TYPE_CHECKING, Any, ClassVar, Mapping, Optional,
)
import numpy as np
import param
from bokeh.models import ColumnDataSource
from pyviz_comms import JupyterComm
from ..util import lazy_load
from .base import ModelPane
if TYPE_CHECKING:
from bokeh.document import Document
from bokeh.model import Model
from pyviz_comms import Comm
def ds_as_cds(dataset):
"""
Converts Vega dataset into Bokeh ColumnDataSource data
"""
if len(dataset) == 0:
return {}
# create a list of unique keys from all items as some items may not include optional fields
keys = sorted(set(k for d in dataset for k in d.keys()))
data = {k: [] for k in keys}
for item in dataset:
for k in keys:
data[k].append(item.get(k))
data = {k: np.asarray(v) for k, v in data.items()}
return data
_containers = ['hconcat', 'vconcat', 'layer']
SCHEMA_REGEX = re.compile(r'^v(\d+)\.\d+\.\d+.json')
def _isin(obj, attr):
if isinstance(obj, dict):
return attr in obj
else:
return hasattr(obj, attr)
def _get_type(spec, version):
if version >= 5:
if isinstance(spec, dict):
return spec.get('select', {}).get('type', 'interval')
elif isinstance(spec.select, dict):
return spec.select.get('type', 'interval')
else:
return getattr(spec.select, 'type', 'interval')
else:
if isinstance(spec, dict):
return spec.get('type', 'interval')
else:
return getattr(spec, 'type', 'interval')
def _get_dimensions(spec, props):
dimensions = {}
responsive_height = spec.get('height') == 'container' and props.get('height') is None
responsive_width = spec.get('width') == 'container' and props.get('width') is None
if responsive_height and responsive_width:
dimensions['sizing_mode'] = 'stretch_both'
elif responsive_width:
dimensions['sizing_mode'] = 'stretch_width'
elif responsive_height:
dimensions['sizing_mode'] = 'stretch_height'
return dimensions
def _get_schema_version(obj, default_version: int = 5) -> int:
if Vega.is_altair(obj):
schema = obj.to_dict().get('$schema', '')
else:
schema = obj.get('$schema', '')
version = schema.split('/')[-1]
match = SCHEMA_REGEX.fullmatch(version)
if match is None or not match.groups():
return default_version
return int(match.groups()[0])
def _get_selections(obj, version=None):
if obj is None:
return {}
elif version is None:
version = _get_schema_version(obj)
key = 'params' if version >= 5 else 'selection'
selections = {}
if _isin(obj, key):
params = obj[key]
if version >= 5 and isinstance(params, list):
params = {
p.name if hasattr(p, 'name') else p['name']: p for p in params
if getattr(p, 'param_type', None) == 'selection' or _isin(p, 'select')
}
try:
selections.update({
name: _get_type(spec, version) for name, spec in params.items()
})
except (AttributeError, TypeError):
pass
for c in _containers:
if _isin(obj, c):
for subobj in obj[c]:
selections.update(_get_selections(subobj, version=version))
return selections
def _to_json(obj):
if isinstance(obj, dict):
json = dict(obj)
if 'data' in json:
data = json['data']
if isinstance(data, dict):
json['data'] = dict(data)
elif isinstance(data, list):
json['data'] = [dict(d) for d in data]
return json
return obj.to_dict()
class Vega(ModelPane):
"""
The Vega pane renders Vega-lite based plots (including those from Altair)
inside a panel.
Note
- to use the `Vega` pane, the Panel `extension` has to be
loaded with 'vega' as an argument to ensure that vega.js is initialized.
- it supports selection events
- it optimizes the plot rendering by using binary serialization for any
array data found on the Vega/Altair object, providing huge speedups over
the standard JSON serialization employed by Vega natively.
Reference: https://panel.holoviz.org/reference/panes/Vega.html
:Example:
>>> pn.extension('vega')
>>> Vega(some_vegalite_dict_or_altair_object, height=240)
"""
debounce = param.ClassSelector(default=20, class_=(int, dict), doc="""
Declares the debounce time in milliseconds either for all
events or if a dictionary is provided for individual events.""")
selection = param.ClassSelector(class_=param.Parameterized, doc="""
The Selection object reflects any selections available on the
supplied vega plot into Python.""")
show_actions = param.Boolean(default=False, doc="""
Whether to show Vega actions.""")
theme = param.ObjectSelector(default=None, allow_None=True, objects=[
'excel', 'ggplot2', 'quartz', 'vox', 'fivethirtyeight', 'dark',
'latimes', 'urbaninstitute', 'googlecharts'])
priority: ClassVar[float | bool | None] = 0.8
_rename: ClassVar[Mapping[str, str | None]] = {
'selection': None, 'debounce': None, 'object': 'data'}
_updates: ClassVar[bool] = True
def __init__(self, object=None, **params):
super().__init__(object, **params)
self.param.watch(self._update_selections, ['object'])
self._update_selections()
@property
def _selections(self):
return _get_selections(self.object)
@property
def _throttle(self):
default = self.param.debounce.default
if isinstance(self.debounce, dict):
throttle = {
sel: self.debounce.get(sel, default)
for sel in self._selections
}
else:
throttle = {sel: self.debounce or default for sel in self._selections}
return throttle
def _update_selections(self, *args):
params = {
e: param.Dict(allow_refs=False) if stype == 'interval' else param.List(allow_refs=False)
for e, stype in self._selections.items()
}
if self.selection and (set(self.selection.param) - {'name'}) == set(params):
self.selection.param.update({p: None for p in params})
return
self.selection = type('Selection', (param.Parameterized,), params)()
@classmethod
def is_altair(cls, obj):
if 'altair' in sys.modules:
import altair as alt
return isinstance(obj, alt.api.TopLevelMixin)
return False
@classmethod
def applies(cls, obj: Any) -> float | bool | None:
if isinstance(obj, dict) and 'vega' in obj.get('$schema', '').lower():
return True
return cls.is_altair(obj)
def _get_sources(self, json, sources=None):
sources = {} if sources is None else dict(sources)
datasets = json.get('datasets', {})
for name in list(datasets):
if name in sources or isinstance(datasets[name], dict):
continue
data = datasets.pop(name)
if isinstance(data, list) and any(isinstance(d, dict) and 'geometry' in d for d in data):
# Handle geometry records types
datasets[name] = data
continue
columns = set(data[0]) if data else []
if self.is_altair(self.object):
import altair as alt
if (not isinstance(self.object.data, (alt.Data, alt.UrlData, type(alt.Undefined))) and
columns == set(self.object.data)):
data = ColumnDataSource.from_df(self.object.data)
else:
data = ds_as_cds(data)
sources[name] = ColumnDataSource(data=data)
else:
sources[name] = ColumnDataSource(data=ds_as_cds(data))
data = json.get('data', {})
if isinstance(data, dict):
data = data.pop('values', {})
if data:
sources['data'] = ColumnDataSource(data=ds_as_cds(data))
elif isinstance(data, list):
for d in data:
if 'values' in d:
sources[d['name']] = ColumnDataSource(data=ds_as_cds(d.pop('values')))
return sources
def _process_event(self, event):
name = event.data['type']
stype = self._selections.get(name)
value = event.data['value']
if stype != 'interval':
value = list(value)
self.selection.param.update(**{name: value})
def _process_param_change(self, params):
props = super()._process_param_change(params)
if 'data' in props and props['data'] is not None:
props['data'] = _to_json(props['data'])
return props
def _get_properties(self, doc, sources={}):
props = super()._get_properties(doc)
data = props['data']
if data is not None:
sources = self._get_sources(data, sources)
if self.sizing_mode:
if 'both' in self.sizing_mode:
if 'width' in data:
data['width'] = 'container'
if 'height' in data:
data['height'] = 'container'
elif 'width' in self.sizing_mode and 'width' in data:
data['width'] = 'container'
elif 'height' in self.sizing_mode and 'height' in data:
data['height'] = 'container'
dimensions = _get_dimensions(data, props) if data else {}
props['data'] = data
props['data_sources'] = sources
props['events'] = list(self._selections)
props['throttle'] = self._throttle
props.update(dimensions)
return props
def _get_model(
self, doc: Document, root: Optional[Model] = None,
parent: Optional[Model] = None, comm: Optional[Comm] = None
) -> Model:
self._bokeh_model = lazy_load(
'panel.models.vega', 'VegaPlot', isinstance(comm, JupyterComm), root
)
model = super()._get_model(doc, root, parent, comm)
self._register_events('vega_event', model=model, doc=doc, comm=comm)
return model
def _update(self, ref: str, model: Model) -> None:
props = self._get_properties(model.document, sources=dict(model.data_sources))
model.update(**props)