diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index cb96734f2464..8acece315f76 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -114,6 +114,11 @@ impl InstrumentedObjectStore { req.drain(..).collect() } + fn enabled(&self) -> bool { + self.instrument_mode.load(Ordering::Relaxed) + != InstrumentedObjectStoreMode::Disabled as u8 + } + async fn instrumented_get_opts( &self, location: &Path, @@ -138,6 +143,26 @@ impl InstrumentedObjectStore { Ok(ret) } + + fn instrumented_list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, Result> { + let timestamp = Utc::now(); + let ret = self.inner.list(prefix); + + self.requests.lock().push(RequestDetails { + op: Operation::List, + path: prefix.cloned().unwrap_or_else(|| Path::from("")), + timestamp, + duration: None, // list returns a stream, so the duration isn't meaningful + size: None, + range: None, + extra_display: None, + }); + + ret + } } impl fmt::Display for InstrumentedObjectStore { @@ -172,9 +197,7 @@ impl ObjectStore for InstrumentedObjectStore { } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - if self.instrument_mode.load(Ordering::Relaxed) - != InstrumentedObjectStoreMode::Disabled as u8 - { + if self.enabled() { return self.instrumented_get_opts(location, options).await; } @@ -186,6 +209,10 @@ impl ObjectStore for InstrumentedObjectStore { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + if self.enabled() { + return self.instrumented_list(prefix); + } + self.inner.list(prefix) } @@ -213,7 +240,7 @@ pub enum Operation { _Delete, Get, _Head, - _List, + List, _Put, } @@ -477,8 +504,9 @@ mod tests { assert_eq!(reg.stores().len(), 1); } - #[tokio::test] - async fn instrumented_store() { + // Returns an `InstrumentedObjectStore` with some data loaded for testing and the path to + // access the data + async fn setup_test_store() -> (InstrumentedObjectStore, Path) { let store = Arc::new(object_store::memory::InMemory::new()); let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8); let instrumented = InstrumentedObjectStore::new(store, mode); @@ -488,6 +516,13 @@ mod tests { let payload = PutPayload::from_static(b"test_data"); instrumented.put(&path, payload).await.unwrap(); + (instrumented, path) + } + + #[tokio::test] + async fn instrumented_store_get() { + let (instrumented, path) = setup_test_store().await; + // By default no requests should be instrumented/stored assert!(instrumented.requests.lock().is_empty()); let _ = instrumented.get(&path).await.unwrap(); @@ -511,6 +546,29 @@ mod tests { assert!(request.extra_display.is_none()); } + #[tokio::test] + async fn instrumented_store_list() { + let (instrumented, path) = setup_test_store().await; + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + let _ = instrumented.list(Some(&path)); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + let _ = instrumented.list(Some(&path)); + assert_eq!(instrumented.requests.lock().len(), 1); + + let request = instrumented.take_requests().pop().unwrap(); + assert_eq!(request.op, Operation::List); + assert_eq!(request.path, path); + assert!(request.duration.is_none()); + assert!(request.size.is_none()); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + #[test] fn request_details() { let rd = RequestDetails {