diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4c9ffb6951..481c4cf1d8 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use futures::channel::mpsc::{channel, Sender}; use futures::StreamExt; +use tokio::sync::Notify; use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; @@ -38,7 +39,7 @@ pub(crate) struct DeleteFileIndex { #[derive(Debug)] enum DeleteFileIndexState { - Populating, + Populating(Arc), Populated(PopulatedDeleteFileIndex), } @@ -59,7 +60,10 @@ impl DeleteFileIndex { pub(crate) fn new() -> (DeleteFileIndex, Sender) { // TODO: what should the channel limit be? let (tx, rx) = channel(10); - let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating)); + let notify = Arc::new(Notify::new()); + let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( + notify.clone(), + ))); let delete_file_stream = rx.boxed(); spawn({ @@ -69,8 +73,11 @@ impl DeleteFileIndex { let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); - let mut guard = state.write().unwrap(); - *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + { + let mut guard = state.write().unwrap(); + *guard = DeleteFileIndexState::Populated(populated_delete_file_index); + } + notify.notify_waiters() } }); @@ -80,15 +87,29 @@ impl DeleteFileIndex { /// Gets all the delete files that apply to the specified data file. /// /// Returns a future that resolves to a Result> - pub(crate) fn get_deletes_for_data_file<'a>( + pub(crate) async fn get_deletes_for_data_file<'a>( &self, data_file: &'a DataFile, seq_num: Option, - ) -> DeletesForDataFile<'a> { - DeletesForDataFile { - state: self.state.clone(), - data_file, - seq_num, + ) -> Vec { + let notifier = { + let guard = self.state.read().unwrap(); + match *guard { + DeleteFileIndexState::Populating(ref notifier) => notifier.clone(), + DeleteFileIndexState::Populated(ref index) => { + return index.get_deletes_for_data_file(data_file, seq_num); + } + } + }; + + notifier.notified().await; + + let guard = self.state.read().unwrap(); + match guard.deref() { + DeleteFileIndexState::Populated(index) => { + index.get_deletes_for_data_file(data_file, seq_num) + } + _ => unreachable!("Cannot be any other state than loaded"), } } } @@ -193,26 +214,3 @@ impl PopulatedDeleteFileIndex { results } } - -/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method -pub(crate) struct DeletesForDataFile<'a> { - state: Arc>, - data_file: &'a DataFile, - seq_num: Option, -} - -impl Future for DeletesForDataFile<'_> { - type Output = Result>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - match self.state.try_read() { - Ok(guard) => match guard.deref() { - DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok( - idx.get_deletes_for_data_file(self.data_file, self.seq_num) - )), - _ => Poll::Pending, - }, - Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))), - } - } -} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 6bfb12b23a..9870f5225c 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -111,7 +111,7 @@ impl ManifestEntryContext { self.manifest_entry.data_file(), self.manifest_entry.sequence_number(), ) - .await? + .await } else { vec![] };