-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[datafusion-cli] Implement average LIST duration for object store profiling #19127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[datafusion-cli] Implement average LIST duration for object store profiling #19127
Conversation
…mentedObjectStore.
|
FYI @BlakeOrth -- are you available to review this PR? |
|
@alamb Yes, I will allocate time for a review. |
BlakeOrth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking this on! Overall I think the strategy of wrapping the stream seems sound.
I do have one small concern about how the timing is being tracked; I've left an inline comment with more details.
| if !self.first_item_yielded && poll_result.is_ready() { | ||
| self.first_item_yielded = true; | ||
| let elapsed = self.start.elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm somewhat concerned this elapsed calculation could end up generating misleading results in some cases. The concern stems from the fact that self.start is set when the stream is created. While this will probably be pretty accurate in many scenarios, imagine a scenario where something like the following happens:
let list_stream = TimeToFirstItemStream::new(stream, Instant::now(), 0, requests);
// The stream is created here, but has never been polled because the user has yet
// to await the stream. However, the "timer" is already running.
some_long_running_method().await;
let item = list_stream.next().await.unwrap();In this case the elapsed duration would effectively be measuring the time of both some_long_running_method() as well as the time it took to yield the first element on the stream.
I'm wondering if we can set self.start once on the first call to poll_next(...) and then set elapsed on the first time an element hits Poll::Ready (as you've already done here) to get more accurate results.
| let mut requests = self.requests.lock(); | ||
| if let Some(request) = requests.get_mut(self.request_index) { | ||
| request.duration = Some(elapsed); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current implementation I believe this strategy is currently "safe" (e.g. we won't accidentally modify the duration of a different request, leading to errant data). However, it does rely on the assumption that self.requests never has items removed from the middle of the Vec.
It might be useful to find a place to leave a comment noting that requests should be append-only to make it less likely for this assumption to be broken in the future.
Which issue does this PR close?
Rationale for this change
The
listoperation returns a stream, so it previously recordedduration: None, missing performance insights. Time-to-first-item is a useful metric for list operations, indicating how quickly results start. This adds duration tracking by measuring time until the first item is yielded (or the stream ends).What changes are included in this PR?
TimeToFirstItemStream: A stream wrapper that measures elapsed time from creation until the first item is yielded (or the stream ends if empty).instrumented_list: Wraps the inner stream withTimeToFirstItemStreamto record duration.requestsfield: Switched fromMutex<Vec<RequestDetails>>toArc<Mutex<Vec<RequestDetails>>>to allow sharing across async boundaries (needed for the stream wrapper).instrumented_store_listto consume at least one stream item and verify thatdurationis nowSome(Duration)instead ofNone.Are these changes tested?
Yes. The existing test
instrumented_store_listwas updated to:stream.next().awaitrequest.duration.is_some()(previouslyis_none())All tests pass, including the updated list test and other instrumented operation tests.
Are there any user-facing changes?
Users with profiling enabled will see duration values for
listoperations instead of nothing.