Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7f72bfb
feat(writer): Add PositionDeleteFileWriter for writing position delet…
claude Nov 19, 2025
a0e3426
refactor(writer): Move position delete field IDs to tests and add ref…
claude Nov 19, 2025
9084506
feat(transaction): Add AppendDeleteFilesAction for committing delete …
claude Nov 19, 2025
e5170ae
test(s3tables): Add end-to-end integration test for delete functionality
claude Nov 19, 2025
224b2d8
feat(writer): Implement referenced_data_file optimization for positio…
claude Nov 19, 2025
8b6e5dd
fix(writer): Fix critical bugs in referenced_data_file optimization
claude Nov 19, 2025
6fdc6f0
test(writer): Add critical tests for referenced_data_file edge cases
claude Nov 19, 2025
fc75ec2
fix(delete): Implement referenced_data_file matching for position del…
claude Nov 19, 2025
1acb1c5
feat: implement vectorized deletions with Puffin deletion vectors
claude Nov 19, 2025
28517de
feat: add deletion vector writer API and end-to-end integration tests
claude Nov 19, 2025
12dd32b
docs: add comprehensive deletion vectors analysis and S3 Tables integ…
claude Nov 19, 2025
5a8d846
test: add comprehensive deletion vector test suite
claude Nov 19, 2025
a5271db
Merge position delete writer foundation
EmilLindfors Nov 20, 2025
985c11b
Merge referenced_data_file optimization
EmilLindfors Nov 20, 2025
73fb117
Merge referenced_data_file matching fixes
EmilLindfors Nov 20, 2025
b2ca948
Merge Puffin deletion vectors
EmilLindfors Nov 20, 2025
e8b80ac
feat: Add comprehensive deletion support with RowDeltaAction
EmilLindfors Nov 22, 2025
e1a0679
fix: Correct doc test examples for RowDeltaAction and AppendDeleteFil…
EmilLindfors Nov 22, 2025
954439c
style: Run cargo fmt to fix formatting issues
EmilLindfors Nov 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/catalog/s3tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ iceberg = { workspace = true }


[dev-dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
parquet = { workspace = true }
tokio = { workspace = true }
180 changes: 179 additions & 1 deletion crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ where T: std::fmt::Debug {

#[cfg(test)]
mod tests {
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::spec::{ManifestContentType, NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};

use super::*;
Expand Down Expand Up @@ -868,6 +868,184 @@ mod tests {
);
}

#[tokio::test]
async fn test_s3tables_append_delete_files() {
use std::sync::Arc;

use iceberg::arrow::arrow_schema_to_schema;
use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Struct};
use iceberg::transaction::ApplyTransactionAction;
use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};

let catalog = match load_s3tables_catalog_from_env().await {
Ok(Some(catalog)) => catalog,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {e}"),
};

// Create a test namespace and table
let namespace = NamespaceIdent::new("test_s3tables_deletes".to_string());
let table_ident = TableIdent::new(namespace.clone(), "test_s3tables_deletes".to_string());

// Clean up any existing resources
catalog.drop_table(&table_ident).await.ok();
catalog.drop_namespace(&namespace).await.ok();

// Create namespace and table with simple schema
catalog
.create_namespace(&namespace, HashMap::new())
.await
.unwrap();

let creation = {
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap();
TableCreation::builder()
.name(table_ident.name().to_string())
.properties(HashMap::new())
.schema(schema)
.build()
};

let table = catalog.create_table(&namespace, creation).await.unwrap();

// Step 1: Append a data file to the table
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("s3://test-bucket/data/file1.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(1024)
.record_count(100)
.partition(Struct::empty())
.partition_spec_id(table.metadata().default_partition_spec_id())
.build()
.unwrap();

let tx = Transaction::new(&table);
let tx = tx
.fast_append()
.add_data_files(vec![data_file.clone()])
.apply(tx)
.unwrap();
let table = tx.commit(&catalog).await.unwrap();

// Step 2: Create position delete file using writer
let pos_delete_schema = {
use arrow_schema::{DataType, Field, Schema as ArrowSchema};

Arc::new(ArrowSchema::new(vec![
Field::new("file_path", DataType::Utf8, false).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"2147483546".to_string(),
)])),
Field::new("pos", DataType::Int64, false).with_metadata(HashMap::from([(
"PARQUET:field_id".to_string(),
"2147483545".to_string(),
)])),
]))
};

let iceberg_schema = Arc::new(arrow_schema_to_schema(&pos_delete_schema).unwrap());

// Write position delete file
let location_gen = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
let file_name_gen =
DefaultFileNameGenerator::new("pos-delete".to_string(), None, DataFileFormat::Parquet);

let parquet_writer_builder = ParquetWriterBuilder::new(
parquet::file::properties::WriterProperties::builder().build(),
iceberg_schema,
);
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
parquet_writer_builder,
table.file_io().clone(),
location_gen,
file_name_gen,
);
let mut pos_delete_writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder)
.build(None)
.await
.unwrap();

// Create position delete batch (delete rows at positions 5, 10, 15)
let delete_batch = {
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};

let file_paths = StringArray::from(vec![
data_file.file_path(),
data_file.file_path(),
data_file.file_path(),
]);
let positions = Int64Array::from(vec![5, 10, 15]);
RecordBatch::try_new(pos_delete_schema.clone(), vec![
Arc::new(file_paths) as ArrayRef,
Arc::new(positions) as ArrayRef,
])
.unwrap()
};

pos_delete_writer.write(delete_batch).await.unwrap();
let delete_files = pos_delete_writer.close().await.unwrap();

assert_eq!(delete_files.len(), 1);
assert_eq!(
delete_files[0].content_type(),
DataContentType::PositionDeletes
);
assert_eq!(delete_files[0].record_count(), 3);

// Step 3: Append delete files using transaction
let tx = Transaction::new(&table);
let tx = tx
.append_delete_files()
.add_files(delete_files.clone())
.apply(tx)
.unwrap();
let table = tx.commit(&catalog).await.unwrap();

// Step 4: Verify the delete files are in the table metadata
let snapshot = table.metadata().current_snapshot().unwrap();
let manifest_list = snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();

// Find the manifest with delete files
let delete_manifest = manifest_list
.entries()
.iter()
.find(|entry| entry.has_added_files() && entry.content == ManifestContentType::Deletes)
.expect("Should have a delete manifest");

let manifest = delete_manifest
.load_manifest(table.file_io())
.await
.unwrap();

assert_eq!(manifest.entries().len(), 1);
assert_eq!(
manifest.entries()[0].data_file().content_type(),
DataContentType::PositionDeletes
);
assert_eq!(manifest.entries()[0].data_file().record_count(), 3);

// Clean up
catalog.drop_table(&table_ident).await.ok();
catalog.drop_namespace(&namespace).await.ok();
}

#[tokio::test]
async fn test_builder_load_missing_bucket_arn() {
let builder = S3TablesCatalogBuilder::default();
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ async-trait = { workspace = true }
backon = { workspace = true }
base64 = { workspace = true }
bimap = { workspace = true }
byteorder = "1.5"
bytes = { workspace = true }
chrono = { workspace = true }
crc32fast = "1.4"
derive_builder = { workspace = true }
expect-test = { workspace = true }
flate2 = { workspace = true }
Expand Down
Loading
Loading