Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl InstrumentedObjectStore {
req.drain(..).collect()
}

fn enabled(&self) -> bool {
self.instrument_mode.load(Ordering::Relaxed)
!= InstrumentedObjectStoreMode::Disabled as u8
}

async fn instrumented_get_opts(
&self,
location: &Path,
Expand All @@ -138,6 +143,26 @@ impl InstrumentedObjectStore {

Ok(ret)
}

fn instrumented_list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let timestamp = Utc::now();
let ret = self.inner.list(prefix);

self.requests.lock().push(RequestDetails {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
duration: None, // list returns a stream, so the duration isn't meaningful
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time to first response is meaningful 🤔 Something we can improve in the future if needed

Copy link
Contributor Author

@BlakeOrth BlakeOrth Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, not being able to easily evaluate a meaningful duration from this is a pretty big bummer honestly. I think time to first response is probably the ideal measurement to take here. I briefly looked into what it would take to make that happen within this instrumented store and I think it ends up being quite complex. I'm pretty sure we'd have to write a custom future to wrap the elements within the stream since the duration is only meaningful once elements in the stream start reporting Poll::Ready. Hopefully there's an easier way, because that sounds pretty painful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size: None,
range: None,
extra_display: None,
});

ret
}
}

impl fmt::Display for InstrumentedObjectStore {
Expand Down Expand Up @@ -172,9 +197,7 @@ impl ObjectStore for InstrumentedObjectStore {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
if self.instrument_mode.load(Ordering::Relaxed)
!= InstrumentedObjectStoreMode::Disabled as u8
{
if self.enabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

return self.instrumented_get_opts(location, options).await;
}

Expand All @@ -186,6 +209,10 @@ impl ObjectStore for InstrumentedObjectStore {
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
if self.enabled() {
return self.instrumented_list(prefix);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may need to instrument the other versions of list below (list_with_delimiter, etc) as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, list_with_delimiter is "coming soon to a PR near you" just to help keep the size of the PRs easily reviewable.

self.inner.list(prefix)
}

Expand Down Expand Up @@ -213,7 +240,7 @@ pub enum Operation {
_Delete,
Get,
_Head,
_List,
List,
_Put,
}

Expand Down Expand Up @@ -477,8 +504,9 @@ mod tests {
assert_eq!(reg.stores().len(), 1);
}

#[tokio::test]
async fn instrumented_store() {
// Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to
// access the data
async fn setup_test_store() -> (InstrumentedObjectStore, Path) {
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);
Expand All @@ -488,6 +516,13 @@ mod tests {
let payload = PutPayload::from_static(b"test_data");
instrumented.put(&path, payload).await.unwrap();

(instrumented, path)
}

#[tokio::test]
async fn instrumented_store_get() {
let (instrumented, path) = setup_test_store().await;

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.get(&path).await.unwrap();
Expand All @@ -511,6 +546,29 @@ mod tests {
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_list() {
let (instrumented, path) = setup_test_store().await;

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
assert!(instrumented.requests.lock().is_empty());

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.list(Some(&path));
assert_eq!(instrumented.requests.lock().len(), 1);

let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
assert!(request.duration.is_none());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}

#[test]
fn request_details() {
let rd = RequestDetails {
Expand Down