Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure pending writes are dispatched in order and only from correct thread #6443

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def init_doc(doc: Optional[Document]) -> Document:
if not curdoc.session_context:
return curdoc

thread = threading.current_thread()
if thread:
state._thread_id_[curdoc] = thread.ident
thread_id = threading.get_ident()
if thread_id:
state._thread_id_[curdoc] = thread_id

if config.global_loading_spinner:
curdoc.js_on_event(
Expand Down Expand Up @@ -289,7 +289,16 @@ def unlocked() -> Iterator:
curdoc = state.curdoc
session_context = getattr(curdoc, 'session_context', None)
session = getattr(session_context, 'session', None)
if curdoc is None or session_context is None or session is None or state._jupyter_kernel_context:
if state._current_thread != state._thread_id and state.loaded:
logger.error(
"Using the unlocked decorator when running inside a thread "
"is not safe! Ensure you check that pn.state._current_thread "
"matches the current thread id."
)
yield
return
elif (curdoc is None or session_context is None or session is None or
not state.loaded or state._jupyter_kernel_context):
yield
return
elif curdoc.callbacks.hold_value:
Expand All @@ -312,6 +321,7 @@ def unlocked() -> Iterator:
break

events = curdoc.callbacks._held_events
curdoc.callbacks._held_events = []
monkeypatch_events(events)
remaining_events, dispatch_events = [], []
for event in events:
Expand All @@ -334,27 +344,27 @@ def unlocked() -> Iterator:
_dispatch_write_task(curdoc, _run_write_futures, futures)
else:
curdoc.add_next_tick_callback(partial(_run_write_futures, futures))
except Exception:
remaining_events = events

curdoc.callbacks._held_events = remaining_events
finally:
try:
curdoc.unhold()
except RuntimeError:
if not remaining_events:
return
leftover_events = [e for e in remaining_events if not isinstance(e, Serializable)]
remaining_events = [e for e in remaining_events if isinstance(e, Serializable)]
# Create messages for remaining events
msgs = {}
for conn in connections:
if not remaining_events:
continue
# Create a protocol message for any events that cannot be immediately dispatched
msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
_dispatch_write_task(curdoc, _dispatch_msgs, curdoc, msgs)
curdoc.hold()
curdoc.callbacks._held_events = leftover_events
curdoc.unhold()
if not remaining_events:
curdoc.unhold()
return

# Separate serializable and non-serializable events
leftover_events = [e for e in remaining_events if not isinstance(e, Serializable)]
remaining_events = [e for e in remaining_events if isinstance(e, Serializable)]

# Create messages for remaining events
msgs = {}
for conn in connections:
if not remaining_events:
continue
# Create a protocol message for any events that cannot be immediately dispatched
msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
_dispatch_write_task(curdoc, _dispatch_msgs, curdoc, msgs)
curdoc.callbacks._held_events = leftover_events
curdoc.unhold()

@contextmanager
def immediate_dispatch(doc: Document | None = None):
Expand All @@ -371,7 +381,7 @@ def immediate_dispatch(doc: Document | None = None):
doc = doc or state.curdoc

# Skip if not in a server context
if not doc or not doc._session_context:
if not doc or not doc._session_context or not state._unblocked(doc):
yield
return

Expand Down
4 changes: 1 addition & 3 deletions panel/io/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ def _extensions(self):

@property
def _current_thread(self) -> str | None:
thread = threading.current_thread()
thread_id = thread.ident if thread else None
return thread_id
return threading.get_ident()

@property
def _is_launching(self) -> bool:
Expand Down
Loading