-
-
Notifications
You must be signed in to change notification settings - Fork 504
/
__init__.py
487 lines (405 loc) · 14.4 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
"""
Various general utilities used in the panel codebase.
"""
from __future__ import annotations
import ast
import base64
import datetime as dt
import json
import logging
import numbers
import os
import pathlib
import re
import sys
import urllib.parse as urlparse
from collections import OrderedDict, defaultdict
from collections.abc import MutableMapping, MutableSequence
from datetime import datetime
from functools import partial
from html import escape # noqa
from importlib import import_module
from typing import Any, AnyStr
import bokeh
import numpy as np
import param
from bokeh.core.has_props import _default_resolver
from bokeh.model import Model
from packaging.version import Version
from .checks import ( # noqa
datetime_types, is_dataframe, is_holoviews, is_number, is_parameterized,
is_series, isdatetime, isfile, isIn, isurl,
)
from .parameters import ( # noqa
edit_readonly, extract_dependencies, get_method_owner, param_watchers,
recursive_parameterized,
)
log = logging.getLogger('panel.util')
bokeh_version = Version(bokeh.__version__)
# Bokeh serializes NaT as this value
# Discussion on why https://github.com/bokeh/bokeh/pull/10449/files#r479988469
BOKEH_JS_NAT = -9223372036854776.0
PARAM_NAME_PATTERN = re.compile(r'^.*\d{5}$')
class LazyHTMLSanitizer:
"""
Wraps bleach.sanitizer.Cleaner lazily importing it on the first
call to the clean method.
"""
def __init__(self, **kwargs):
self._cleaner = None
self._kwargs = kwargs
def clean(self, text):
if self._cleaner is None:
import bleach
self._cleaner = bleach.sanitizer.Cleaner(**self._kwargs)
return self._cleaner.clean(text)
HTML_SANITIZER = LazyHTMLSanitizer(strip=True)
def hashable(x):
if isinstance(x, MutableSequence):
return tuple(x)
elif isinstance(x, MutableMapping):
return tuple([(k,v) for k,v in x.items()])
else:
return x
def indexOf(obj, objs):
"""
Returns the index of an object in a list of objects. Unlike the
list.index method this function only checks for identity not
equality.
"""
for i, o in enumerate(objs):
if o is obj:
return i
try:
if o == obj:
return i
except Exception:
pass
raise ValueError('%s not in list' % obj)
def param_name(name: str) -> str:
"""
Removes the integer id from a Parameterized class name.
"""
match = re.findall(r'\D+(\d{5,})', name)
return name[:name.index(match[0])] if match else name
def abbreviated_repr(value, max_length=25, natural_breaks=(',', ' ')):
"""
Returns an abbreviated repr for the supplied object. Attempts to
find a natural break point while adhering to the maximum length.
"""
if isinstance(value, list):
vrepr = '[' + ', '.join([abbreviated_repr(v) for v in value]) + ']'
if isinstance(value, param.Parameterized):
vrepr = type(value).__name__
else:
vrepr = repr(value)
if len(vrepr) > max_length:
# Attempt to find natural cutoff point
abbrev = vrepr[max_length//2:]
natural_break = None
for brk in natural_breaks:
if brk in abbrev:
natural_break = abbrev.index(brk) + max_length//2
break
if natural_break and natural_break < max_length:
max_length = natural_break + 1
end_char = ''
if isinstance(value, list):
end_char = ']'
elif isinstance(value, OrderedDict):
end_char = '])'
elif isinstance(value, (dict, set)):
end_char = '}'
return vrepr[:max_length+1] + '...' + end_char
return vrepr
def param_reprs(parameterized, skip=None):
"""
Returns a list of reprs for parameters on the parameterized object.
Skips default and empty values.
"""
cls = type(parameterized).__name__
param_reprs = []
for p, v in sorted(parameterized.param.values().items()):
default = parameterized.param[p].default
equal = v is default
if not equal:
if isinstance(v, np.ndarray):
if isinstance(default, np.ndarray):
equal = np.array_equal(v, default, equal_nan=True)
else:
equal = False
else:
try:
equal = bool(v==default)
except Exception:
equal = False
if equal: continue
elif v is None: continue
elif isinstance(v, str) and v == '': continue
elif isinstance(v, list) and v == []: continue
elif isinstance(v, dict) and v == {}: continue
elif (skip and p in skip) or (p == 'name' and v.startswith(cls)): continue
else: v = abbreviated_repr(v)
param_reprs.append(f'{p}={v}')
return param_reprs
def full_groupby(l, key=lambda x: x):
"""
Groupby implementation which does not require a prior sort
"""
d = defaultdict(list)
for item in l:
d[key(item)].append(item)
return d.items()
def value_as_datetime(value):
"""
Retrieve the value tuple as a tuple of datetime objects.
"""
if isinstance(value, numbers.Number):
value = datetime.utcfromtimestamp(value / 1000)
return value
def value_as_date(value):
if isinstance(value, numbers.Number):
value = datetime.utcfromtimestamp(value / 1000).date()
elif isinstance(value, datetime):
value = value.date()
return value
def datetime_as_utctimestamp(value):
"""
Converts a datetime to a UTC timestamp used by Bokeh internally.
"""
return value.replace(tzinfo=dt.timezone.utc).timestamp() * 1000
def parse_query(query: str) -> dict[str, Any]:
"""
Parses a url query string, e.g. ?a=1&b=2.1&c=string, converting
numeric strings to int or float types.
"""
query_dict = dict(urlparse.parse_qsl(query[1:]))
parsed_query: dict[str, Any] = {}
for k, v in query_dict.items():
if v.isdigit():
parsed_query[k] = int(v)
elif is_number(v):
parsed_query[k] = float(v)
elif v.startswith(('[', '{')):
try:
parsed_query[k] = json.loads(v)
except Exception:
try:
parsed_query[k] = ast.literal_eval(v)
except Exception:
log.warning(
f'Could not parse value {v!r} of query parameter {k}. '
'Parameter will be ignored.'
)
elif v.lower() in ("true", "false"):
parsed_query[k] = v.lower() == "true"
else:
parsed_query[k] = v
return parsed_query
def base64url_encode(input):
if isinstance(input, str):
input = input.encode("utf-8")
encoded = base64.urlsafe_b64encode(input).decode('ascii')
# remove padding '=' chars that cause trouble
return str(encoded.rstrip('='))
def base64url_decode(input):
if isinstance(input, str):
input = input.encode("ascii")
rem = len(input) % 4
if rem > 0:
input += b"=" * (4 - rem)
return base64.urlsafe_b64decode(input)
def decode_token(token: str, signed: bool = True) -> dict[str, Any]:
"""
Decodes a signed or unsigned JWT token.
"""
if signed and "." in token:
signing_input, _ = token.encode('utf-8').rsplit(b".", 1)
_, payload_segment = signing_input.split(b".", 1)
else:
payload_segment = token
return json.loads(base64url_decode(payload_segment).decode('utf-8'))
class classproperty:
def __init__(self, f):
self.f = f
def __get__(self, obj, owner):
return self.f(owner)
def url_path(url: str) -> str:
"""
Strips the protocol and domain from a URL returning just the path.
"""
subpaths = url.split('//')[1:]
return '/'.join('/'.join(subpaths).split('/')[1:])
def lazy_load(module, model, notebook=False, root=None, ext=None):
from ..config import panel_extension as extension
from ..io.state import state
external_modules = {
module: ext for ext, module in extension._imports.items()
}
ext = ext or module.split('.')[-1]
ext_name = external_modules[module]
loaded_extensions = state._extensions
loaded = loaded_extensions is None or ext_name in loaded_extensions
if module in sys.modules and loaded:
model_cls = getattr(sys.modules[module], model)
if f'{model_cls.__module__}.{model}' not in Model.model_class_reverse_map:
_default_resolver.add(model_cls)
return model_cls
if notebook:
param.main.param.warning(
f'{model} was not imported on instantiation and may not '
'render in a notebook. Restart the notebook kernel and '
'ensure you load it as part of the extension using:'
f'\n\npn.extension(\'{ext}\')\n'
)
elif not loaded and state._is_launching:
# If we are still launching the application it is not too late
# to automatically load the extension and therefore ensure it
# is included in the resources added to the served page
param.main.param.warning(
f'pn.extension was initialized but {ext!r} extension was not '
'loaded. Since the application is still launching the extension '
'was loaded automatically but we strongly recommend you load '
'the extension explicitly with the following argument(s):'
f'\n\npn.extension({ext!r})\n'
)
if loaded_extensions is None:
state._extensions_[state.curdoc] = [ext_name]
else:
loaded_extensions.append(ext_name)
elif not loaded:
param.main.param.warning(
f'pn.extension was initialized but {ext!r} extension was not '
'loaded. In order for the required resources to be initialized '
'ensure the extension is loaded with the following argument(s):'
f'\n\npn.extension({ext!r})\n'
)
elif root is not None and root.ref['id'] in state._views:
param.main.param.warning(
f'{model} was not imported on instantiation may not '
'render in the served application. Ensure you add the '
'following to the top of your application:'
f'\n\npn.extension(\'{ext}\')\n'
)
return getattr(import_module(module), model)
def updating(fn):
def wrapped(self, *args, **kwargs):
updating = self._updating
self._updating = True
try:
fn(self, *args, **kwargs)
finally:
self._updating = updating
return wrapped
def clone_model(bokeh_model, include_defaults=False, include_undefined=False):
properties = bokeh_model.properties_with_values(
include_defaults=include_defaults, include_undefined=include_undefined
)
return type(bokeh_model)(**properties)
def function_name(func) -> str:
"""
Returns the name of a function (or its string repr)
"""
while isinstance(func, partial):
func = func.func
if hasattr(func, '__name__'):
return func.__name__
return str(func)
_period_regex = re.compile(r'((?P<weeks>\d+?)w)?((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)m)?((?P<seconds>\d+?\.?\d*?)s)?')
def parse_timedelta(time_str: str) -> dt.timedelta | None:
parts = _period_regex.match(time_str)
if not parts:
return None
parts_dict = parts.groupdict()
time_params = {}
for (name, p) in parts_dict.items():
if p:
time_params[name] = float(p)
return dt.timedelta(**time_params)
def fullpath(path: AnyStr | os.PathLike) -> AnyStr | os.PathLike:
"""Expanduser and then abspath for a given path
"""
return os.path.abspath(os.path.expanduser(path))
def base_version(version: str) -> str:
"""Extract the final release and if available pre-release (alpha, beta,
release candidate) segments of a PEP440 version, defined with three
components (major.minor.micro).
Useful to avoid nbsite/sphinx to display the documentation HTML title
with a not so informative and rather ugly long version (e.g.
``0.13.0a19.post4+g0695e214``). Use it in ``conf.py``::
version = release = base_version(package.__version__)
Return the version passed as input if no match is found with the pattern.
"""
# look at the start for e.g. 0.13.0, 0.13.0rc1, 0.13.0a19, 0.13.0b10
pattern = r"([\d]+\.[\d]+\.[\d]+(?:a|rc|b)?[\d]*)"
match = re.match(pattern, version)
if match:
return match.group()
else:
return version
def relative_to(path, other_path):
try:
pathlib.Path(path).relative_to(other_path)
return True
except Exception:
return False
def flatten(line):
"""
Flatten an arbitrarily nested sequence.
Inspired by: pd.core.common.flatten
Parameters
----------
line : sequence
The sequence to flatten
Notes
-----
This only flattens list, tuple, and dict sequences.
Returns
-------
flattened : generator
"""
for element in line:
if any(isinstance(element, tp) for tp in (list, tuple, dict)):
yield from flatten(element)
else:
yield element
def styler_update(styler, new_df):
"""
Updates the todo items on a pandas Styler object to apply to a new
DataFrame.
Arguments
---------
styler: pandas.io.formats.style.Styler
Styler objects
new_df: pd.DataFrame
New DataFrame to update the styler to do items
Returns
-------
todos: list
"""
todos = []
for todo in styler._todo:
if not isinstance(todo, tuple):
todos.append(todo)
continue
ops = []
for op in todo:
if not isinstance(op, tuple):
ops.append(op)
continue
op_fn = str(op[0])
if ('_background_gradient' in op_fn or '_bar' in op_fn) and op[1] in (0, 1):
applies = np.array([
new_df[col].dtype.kind in 'uif' for col in new_df.columns
])
if len(op[2]) == len(applies):
applies = np.logical_and(applies, op[2])
op = (op[0], op[1], applies)
ops.append(op)
todo = tuple(ops)
todos.append(todo)
return todos
def try_datetime64_to_datetime(value):
if isinstance(value, np.datetime64):
value = value.astype('datetime64[ms]').astype(datetime)
return value