Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 283 additions & 9 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ enum DeleteFileIndexState {
#[derive(Debug)]
struct PopulatedDeleteFileIndex {
#[allow(dead_code)]
global_deletes: Vec<Arc<DeleteFileContext>>,
global_equality_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
// TODO: do we need this?
Expand All @@ -65,7 +65,8 @@ impl DeleteFileIndex {
spawn({
let state = state.clone();
async move {
let delete_files = delete_file_stream.collect::<Vec<_>>().await;
let delete_files: Vec<DeleteFileContext> =
delete_file_stream.collect::<Vec<_>>().await;

let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);

Expand Down Expand Up @@ -114,15 +115,15 @@ impl PopulatedDeleteFileIndex {
///
/// 1. The partition information is extracted from each delete file's manifest entry.
/// 2. If the partition is empty and the delete file is not a positional delete,
/// it is added to the `global_deletes` vector
/// it is added to the `global_equality_deletes` vector
/// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();

let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];
let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];

files.into_iter().for_each(|ctx| {
let arc_ctx = Arc::new(ctx);
Expand All @@ -133,7 +134,7 @@ impl PopulatedDeleteFileIndex {
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos del
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
global_equality_deletes.push(arc_ctx);
return;
}
}
Expand All @@ -153,7 +154,7 @@ impl PopulatedDeleteFileIndex {
});

PopulatedDeleteFileIndex {
global_deletes,
global_equality_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
}
Expand All @@ -167,12 +168,12 @@ impl PopulatedDeleteFileIndex {
) -> Vec<FileScanTaskDeleteFile> {
let mut results = vec![];

self.global_deletes
self.global_equality_deletes
.iter()
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
// filter that returns true if the provided delete file's sequence number is **greater than** `seq_num`
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Oct 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could just be missing something in the implementation, but right now seems like we're over applying in a way that would technically be incorrect. If a snapshot has some added data files and an equality delete, the equality deletes should only be applied to older seq numbers. We do this correctly for partition scoped eq deletes below, but for some reason the global equality delete matching is different; the seq number criteria for eq deletes applies regardless of scope of the delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this when I was working on some optimizations for equality delete matching (eliminating eq deletes to apply based on bounds)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. I think "greater than" is correct here. The global_deletes variable is used for gathering all the eq delete files for an unpartitioned table, but the same logic should still apply.

Looking at past commits, i did find this was previously discussed here

Also from https://iceberg.apache.org/spec/#scan-planning
Screenshot 2025-10-18 at 5 57 03 PM
This implies that we should not apply eq delete files that have equivalent sequence number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for finding that github thread, looks like the confusion was what global_deletes meant, but really it's global equality deletes. Regardless any equality delete must be applied for files strictly older. I also fixed some of the partition scoped delete matching logic because it's technically possible to have tuples match but the spec IDs don't match; currently we only go off the tuple which isn't quite correct for matching as well.

.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
Expand All @@ -185,6 +186,7 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
&& data_file.partition_spec_id == delete.partition_spec_id
})
.for_each(|delete| results.push(delete.as_ref().into()));
}
Expand All @@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
&& data_file.partition_spec_id == delete.partition_spec_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

👍
partition value match is handled by self.pos_deletes_by_partition.get(data_file.partition())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. We could also define a key which is tupled off the <spec ID, partition value> pair in the map (this is what the java implementation does).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets see if we can refactor this in a follow up PR and keep the changes here straightforward :)

})
.for_each(|delete| results.push(delete.as_ref().into()));
}

results
}
}

#[cfg(test)]
mod tests {
use uuid::Uuid;

use super::*;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, ManifestStatus,
Struct,
};

#[test]
fn test_delete_file_index_unpartitioned() {
let deletes: Vec<ManifestEntry> = vec![
build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
];

let delete_file_paths: Vec<String> = deletes
.iter()
.map(|file| file.file_path().to_string())
.collect();

let delete_contexts: Vec<DeleteFileContext> = deletes
.into_iter()
.map(|entry| DeleteFileContext {
manifest_entry: entry.into(),
partition_spec_id: 0,
})
.collect();

let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);

let data_file = build_unpartitioned_data_file();

// All deletes apply to sequence 0
let delete_files_to_apply_for_seq_0 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);

// All deletes apply to sequence 3
let delete_files_to_apply_for_seq_3 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);

// Last 3 deletes apply to sequence 4
let delete_files_to_apply_for_seq_4 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
.into_iter()
.map(|file| file.file_path)
.collect();

assert_eq!(
actual_paths_to_apply_for_seq_4,
delete_file_paths[delete_file_paths.len() - 3..]
);

// Last 3 deletes apply to sequence 5
let delete_files_to_apply_for_seq_5 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_5,
delete_file_paths[delete_file_paths.len() - 3..]
);

// Only the last position delete applies to sequence 6
let delete_files_to_apply_for_seq_6 =
delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_6,
delete_file_paths[delete_file_paths.len() - 1..]
);

// The 2 global equality deletes should match against any partitioned file
let partitioned_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);

let delete_files_to_apply_for_partitioned_file =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
let actual_paths_to_apply_for_partitioned_file: Vec<String> =
delete_files_to_apply_for_partitioned_file
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_partitioned_file,
delete_file_paths[..2]
);
}

#[test]
fn test_delete_file_index_partitioned() {
let partition_one = Struct::from_iter([Some(Literal::long(100))]);
let spec_id = 1;
let deletes: Vec<ManifestEntry> = vec![
build_added_manifest_entry(4, &build_partitioned_eq_delete(&partition_one, spec_id)),
build_added_manifest_entry(6, &build_partitioned_eq_delete(&partition_one, spec_id)),
build_added_manifest_entry(5, &build_partitioned_pos_delete(&partition_one, spec_id)),
build_added_manifest_entry(6, &build_partitioned_pos_delete(&partition_one, spec_id)),
];

let delete_file_paths: Vec<String> = deletes
.iter()
.map(|file| file.file_path().to_string())
.collect();

let delete_contexts: Vec<DeleteFileContext> = deletes
.into_iter()
.map(|entry| DeleteFileContext {
manifest_entry: entry.into(),
partition_spec_id: spec_id,
})
.collect();

let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);

let partitioned_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), spec_id);

// All deletes apply to sequence 0
let delete_files_to_apply_for_seq_0 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);

// All deletes apply to sequence 3
let delete_files_to_apply_for_seq_3 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(3));
assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);

// Last 3 deletes apply to sequence 4
let delete_files_to_apply_for_seq_4 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(4));
let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
.into_iter()
.map(|file| file.file_path)
.collect();

assert_eq!(
actual_paths_to_apply_for_seq_4,
delete_file_paths[delete_file_paths.len() - 3..]
);

// Last 3 deletes apply to sequence 5
let delete_files_to_apply_for_seq_5 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(5));
let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_5,
delete_file_paths[delete_file_paths.len() - 3..]
);

// Only the last position delete applies to sequence 6
let delete_files_to_apply_for_seq_6 =
delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(6));
let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
.into_iter()
.map(|file| file.file_path)
.collect();
assert_eq!(
actual_paths_to_apply_for_seq_6,
delete_file_paths[delete_file_paths.len() - 1..]
);

// Data file with different partition tuples does not match any delete files
let partitioned_second_file =
build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
let delete_files_to_apply_for_different_partition =
delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
let actual_paths_to_apply_for_different_partition: Vec<String> =
delete_files_to_apply_for_different_partition
.into_iter()
.map(|file| file.file_path)
.collect();
assert!(actual_paths_to_apply_for_different_partition.is_empty());

// Data file with same tuple but different spec ID does not match any delete files
let partitioned_different_spec = build_partitioned_data_file(&partition_one, 2);
let delete_files_to_apply_for_different_spec =
delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, Some(0));
let actual_paths_to_apply_for_different_spec: Vec<String> =
delete_files_to_apply_for_different_spec
.into_iter()
.map(|file| file.file_path)
.collect();
assert!(actual_paths_to_apply_for_different_spec.is_empty());
}

fn build_unpartitioned_eq_delete() -> DataFile {
build_partitioned_eq_delete(&Struct::empty(), 0)
}

fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::EqualityDeletes)
.equality_ids(Some(vec![1]))
.record_count(1)
.partition(partition.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}

fn build_unpartitioned_pos_delete() -> DataFile {
build_partitioned_pos_delete(&Struct::empty(), 0)
}

fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::PositionDeletes)
.record_count(1)
.referenced_data_file(Some("/some-data-file.parquet".to_string()))
.partition(partition.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}

fn build_unpartitioned_data_file() -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-data.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::Data)
.record_count(100)
.partition(Struct::empty())
.partition_spec_id(0)
.file_size_in_bytes(100)
.build()
.unwrap()
}

fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) -> DataFile {
DataFileBuilder::default()
.file_path(format!("{}-data.parquet", Uuid::new_v4()))
.file_format(DataFileFormat::Parquet)
.content(DataContentType::Data)
.record_count(100)
.partition(partition_value.clone())
.partition_spec_id(spec_id)
.file_size_in_bytes(100)
.build()
.unwrap()
}

fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) -> ManifestEntry {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.sequence_number(data_seq_number)
.data_file(file.clone())
.build()
}
}