-
-
Notifications
You must be signed in to change notification settings - Fork 473
/
location.py
288 lines (257 loc) · 10.5 KB
/
location.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""
Defines the Location widget which allows changing the href of the window.
"""
from __future__ import annotations
import json
import urllib.parse as urlparse
from typing import (
TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Mapping, Optional,
)
import param
from ..models.location import Location as _BkLocation
from ..reactive import Syncable
from ..util import edit_readonly, parse_query
from .document import create_doc_if_none_exists
from .state import state
if TYPE_CHECKING:
from bokeh.document import Document
from bokeh.model import Model
from bokeh.server.contexts import BokehSessionContext
from pyviz_comms import Comm
class Location(Syncable):
"""
The Location component can be made available in a server context
to provide read and write access to the URL components in the
browser.
"""
href = param.String(readonly=True, doc="""
The full url, e.g. 'https://localhost:80?color=blue#interact'""")
hostname = param.String(readonly=True, doc="""
hostname in window.location e.g. 'panel.holoviz.org'""")
pathname = param.String(regex=r"^$|[\/]|srcdoc$", doc="""
pathname in window.location e.g. '/user_guide/Interact.html'""")
protocol = param.String(readonly=True, doc="""
protocol in window.location e.g. 'http:' or 'https:'""")
port = param.String(readonly=True, doc="""
port in window.location e.g. '80'""")
search = param.String(regex=r"^$|\?", doc="""
search in window.location e.g. '?color=blue'""")
hash = param.String(regex=r"^$|#", doc="""
hash in window.location e.g. '#interact'""")
reload = param.Boolean(default=False, doc="""
Reload the page when the location is updated. For multipage
apps this should be set to True, For single page apps this
should be set to False""")
# Mapping from parameter name to bokeh model property name
_rename: ClassVar[Mapping[str, str | None]] = {"name": None}
@classmethod
def from_request(cls, request):
try:
from bokeh.server.contexts import _RequestProxy
if not isinstance(request, _RequestProxy) or request._request is None:
return cls()
except ImportError:
return cls()
params = {}
href = ''
if request.protocol:
params['protocol'] = href = f'{request.protocol}:'
if request.host:
href += f'//{request.host}'
if ':' in request.host:
params['hostname'], params['port'] = request.host.split(':')
else:
params['hostname'] = request.host
if request.uri:
search = hash = None
href += request.uri
if '?' in request.uri and '#' in request.uri:
params['pathname'], query = request.uri.split('?')
search, hash = query.split('#')
elif '?' in request.uri:
params['pathname'], search = request.uri.split('?')
elif '#' in request.uri:
params['pathname'], hash = request.uri.split('#')
else:
params['pathname'] = request.uri
if search:
params['search'] = f'?{search}'
if hash:
params['hash'] = f'#{hash}'
params['href'] = href
loc = cls()
with edit_readonly(loc):
loc.param.update(params)
return loc
def __init__(self, **params):
super().__init__(**params)
self._synced = []
self._syncing = False
self.param.watch(self._update_synced, ['search'])
def _get_model(
self, doc: 'Document', root: Optional['Model'] = None,
parent: Optional['Model'] = None, comm: Optional['Comm'] = None
) -> 'Model':
model = _BkLocation(**self._process_param_change(self._init_params()))
root = root or model
self._models[root.ref['id']] = (model, parent)
self._link_props(model, self._linked_properties, doc, root, comm)
return model
def get_root(
self, doc: Optional[Document] = None, comm: Optional[Comm] = None,
preprocess: bool = True
) -> 'Model':
doc = create_doc_if_none_exists(doc)
root = self._get_model(doc, comm=comm)
ref = root.ref['id']
state._views[ref] = (self, root, doc, comm)
self._documents[doc] = root
return root
def _server_destroy(self, session_context: BokehSessionContext) -> None:
for p, ps, _, _ in self._synced:
try:
self.unsync(p, ps)
except Exception:
pass
super()._server_destroy(session_context)
def _cleanup(self, root: Model | None = None) -> None:
if root:
if root.document in self._documents:
del self._documents[root.document]
ref = root.ref['id']
else:
ref = None
super()._cleanup(root)
if ref in state._views:
del state._views[ref]
def _update_synced(self, event: param.parameterized.Event = None) -> None:
if self._syncing:
return
query_params = self.query_params
for p, parameters, _, on_error in self._synced:
mapping = {v: k for k, v in parameters.items()}
mapped = {}
for k, v in query_params.items():
if k not in mapping:
continue
pname = mapping[k]
try:
v = p.param[pname].deserialize(v)
except Exception:
pass
try:
equal = v == getattr(p, pname)
except Exception:
equal = False
if not equal:
mapped[pname] = v
try:
p.param.update(**mapped)
except Exception:
if on_error:
on_error(mapped)
def _update_query(
self, *events: param.parameterized.Event, query: Optional[Dict[str, Any]] = None
) -> None:
if self._syncing:
return
serialized = query or {}
for e in events:
matches = [ps for o, ps, _, _ in self._synced if o in (e.cls, e.obj)]
if not matches:
continue
owner = e.cls if e.obj is None else e.obj
try:
val = owner.param[e.name].serialize(e.new)
except Exception:
val = e.new
if not isinstance(val, str):
val = json.dumps(val)
serialized[matches[0][e.name]] = val
self._syncing = True
try:
self.update_query(**{k: v for k, v in serialized.items() if v is not None})
finally:
self._syncing = False
@property
def query_params(self) -> Dict[str, Any]:
return parse_query(self.search)
def update_query(self, **kwargs: Mapping[str, Any]) -> None:
query = self.query_params
query.update(kwargs)
self.search = '?' + urlparse.urlencode(query)
def sync(
self, parameterized: param.Parameterized, parameters: Optional[List[str] | Dict[str, str]] = None,
on_error: Optional[Callable[[Dict[str, Any]], None]] = None
) -> None:
"""
Syncs the parameters of a Parameterized object with the query
parameters in the URL. If no parameters are supplied all
parameters except the name are synced.
Arguments
---------
parameterized (param.Parameterized):
The Parameterized object to sync query parameters with
parameters (list or dict):
A list or dictionary specifying parameters to sync.
If a dictionary is supplied it should define a mapping from
the Parameterized's parameters to the names of the query
parameters.
on_error: (callable):
Callback when syncing Parameterized with URL parameters
fails. The callback is passed a dictionary of parameter
values, which could not be applied.
"""
parameters = parameters or [p for p in parameterized.param if p != 'name']
if not isinstance(parameters, dict):
parameters = dict(zip(parameters, parameters))
watcher = parameterized.param.watch(self._update_query, list(parameters))
self._synced.append((parameterized, parameters, watcher, on_error))
self._update_synced()
query = {}
for p, name in parameters.items():
v = getattr(parameterized, p)
if v is None:
continue
try:
v = parameterized.param[p].serialize(v)
except Exception:
pass
if not isinstance(v, str):
v = json.dumps(v)
query[name] = v
self._update_query(query=query)
def unsync(self, parameterized: param.Parameterized, parameters: Optional[List[str]] = None) -> None:
"""
Unsyncs the parameters of the Parameterized with the query
params in the URL. If no parameters are supplied all
parameters except the name are unsynced.
Arguments
---------
parameterized (param.Parameterized):
The Parameterized object to unsync query parameters with
parameters (list):
A list of parameters to unsync.
"""
matches = [s for s in self._synced if s[0] is parameterized]
if not matches:
ptype = type(parameterized)
raise ValueError(f"Cannot unsync {ptype} object since it "
"was never synced in the first place.")
synced, unsynced = [], []
for p, params, watcher, on_error in self._synced:
if parameterized is not p:
synced.append((p, params, watcher, on_error))
continue
parameterized.param.unwatch(watcher)
if parameters is None:
unsynced.extend(list(params.values()))
else:
unsynced.extend([q for p, q in params.items() if p in parameters])
new_params = {p: q for p, q in params.items()
if p not in parameters}
new_watcher = parameterized.param.watch(watcher.fn, list(new_params))
synced.append((p, new_params, new_watcher, on_error))
self._synced = synced
query = {k: v for k, v in self.query_params.items() if k not in unsynced}
self.search = '?' + urlparse.urlencode(query) if query else ''