-
Notifications
You must be signed in to change notification settings - Fork 63
fix: show progress even in job optional queries #2119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
* feat: Render more BigQuery events in progress bar This change updates bigframes/formatting_helpers.py to render more event types from bigframes/core/events.py. Specifically, it adds rendering support for: - BigQueryRetryEvent - BigQueryReceivedEvent - BigQueryFinishedEvent - BigQueryUnknownEvent This provides users with more detailed feedback during query execution in both notebook (HTML) and terminal (plaintext) environments. * feat: Render more BigQuery events in progress bar This change updates bigframes/formatting_helpers.py to render more event types from bigframes/core/events.py. Specifically, it adds rendering support for: - BigQueryRetryEvent - BigQueryReceivedEvent - BigQueryFinishedEvent - BigQueryUnknownEvent This provides users with more detailed feedback during query execution in both notebook (HTML) and terminal (plaintext) environments. Unit tests have been added to verify the rendering of each new event type. --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
class ExecutionStarted(Event): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have an execution_id or similar so we can correlate all the events tied to a single request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could help if we start doing async / background query execution. I don't think it's needed right now, though.
bigframes/core/events.py
Outdated
|
||
@dataclasses.dataclass(frozen=True) | ||
class Subscriber: | ||
callback_ref: weakref.ref |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its more intuitive to just keep subscribers alive? What is the scenario we are imagining? I could imagine this ref could deleted even when subscriber is still alive, because they created an ephemeral function that went out of scope, though the target of said function is still around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the scenario we are imagining?
The anywidget table widgets. When they re-run the cell, the TableWidget we create is no longer needed and should go out of scope, but as far as I know, we don't really have an opportunity to unsubscribe at that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really do think the weakref here will have quite unintuitive results. I think there are some other options, basically the main thing is we want to unsubscribe before the callback becomes invalid (because it points at resources that no longer exist, most crucially). The subscriber itself should be able to time this best, and it may not be quite correlated with python object cleanup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, maybe any widget could make some short lived subscribers before/after a call to to_pandas_batches()? I can give it a try.
bigframes/core/events.py
Outdated
callback(event) | ||
|
||
|
||
publisher = Publisher() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we absolutely need global state? And if we do need it, can it at least be passed into sessions as a field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we don't have a need for it right now, as we've moved the cell-level visualization to bigframes anywidget mode. I can try to refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bigframes/core/events.py
Outdated
self._subscribers: List[Subscriber] = [] | ||
self._subscribers_lock = threading.Lock() | ||
|
||
def subscribe(self, callback): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should subscribers specify what they are subscribing too in terms of event type enum or class? Otherwise new event types will change the behavior of existing subcribers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a need for this right now. The only purpose so far is progress bars, which should receive everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I guess as long as this is internal-use only, we can tack on features like this later, and refactor callers as needed.
class Publisher: | ||
def __init__(self): | ||
self._subscribers: List[Subscriber] = [] | ||
self._subscribers_lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single lock on a global could cause unexpected cross-session contention.
e2e failure: |
self._datset_ref = bf_io_bigquery.create_bq_dataset_reference( | ||
self.bqclient, | ||
location=self._location, | ||
publisher=self._publisher, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here but no publishing in eg create_temp_table? How do we determine what types of jobs we are publishing for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real answer is that create_temp_table
isn't calling start_query_with_client
, so it didn't show up in my refactor. As discussed in chat, it also breaks some of the assumptions about a query being associated with an execution.
In the interest of transparency, we probably should be notifying the user when we run such queries, though. I'll do a search for client.query / client.query_and_wait and see what I find.
array_value: bigframes.core.ArrayValue, | ||
execution_spec: ex_spec.ExecutionSpec, | ||
) -> executor.ExecuteResult: | ||
self._publisher.publish(bigframes.core.events.ExecutionStarted()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No action now, but maybe we just end up doing a decorator/wrapper for the execute method if we want to generalize across executor types?
bigframes/core/events.py
Outdated
return subscriber | ||
|
||
def unsubscribe(self, subscriber: Subscriber): | ||
self._subscribers.remove(subscriber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might cause errors if done concurrently with iteration?
bigframes/core/events.py
Outdated
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher): | ||
self._publisher = publisher | ||
self._callback = callback | ||
self._subscriber_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we even need to convert to string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't. It's hashable without use of UUID.
In [1]: import uuid
In [2]: type(uuid.uuid4())
Out[2]: uuid.UUID
In [3]: hash(uuid.uuid4())
Out[3]: 695624583744666396
return value._subscriber_id == self._subscriber_id | ||
|
||
def close(self): | ||
self._publisher.unsubscribe(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative to explicitly removing from subscriber list in a blocking way is to just flag oneself as closed, and the publisher can then remove at its convenience. I think this approach works fine though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wary of that because it would mean a circular reference would hang around until the next time an event is published, but I suppose that'd be OK.
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes b/409390651
Towards b/409104302
🦕