-
Notifications
You must be signed in to change notification settings - Fork 421
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
Support for a DeltaWriter in the style of the one in the Java implementation.
Describe the solution you'd like
A DeltaWriter implementation that has underlying data and delete writers picking the right one accordingly. See the proposed API:
//! Delta writers handle row-level changes by combining data file and delete file writers.
//!
//! The delta writer has three sub-writers:
//! - A data file writer for new and updated rows.
//! - A position delete file writer for deletions of existing rows (that have been written within this writer)
//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files).
//!
//! # Input Data Format
//!
//! The `DeltaWriter` expects input data as Arrow `RecordBatch` with a specific structure:
//!
//! **Required Schema:**
//! - All data columns from your table schema (in order)
//! - A final column containing operation indicators as `Int32Array`:
//! - [`OP_INSERT`] (`1`) = Insert/Update (write to data file)
//! - [`OP_DELETE`] (`-1`) = Delete (write to delete file)
//!
//! **Example Schema:**
//! ```text
//! ┌─────────────┬──────────────┬──────────────┬──────┐
//! │ id (Int32) │ name (Utf8) │ value (Int32)│ _ops │
//! ├─────────────┼──────────────┼──────────────┼──────┤
//! │ 1 │ "Alice" │ 100 │ 1 │ <- Insert
//! │ 2 │ "Bob" │ 200 │ 1 │ <- Insert
//! │ 1 │ "Alice" │ 150 │ -1 │ <- Delete
//! │ 3 │ "Charlie" │ 300 │ 1 │ <- Insert
//! └─────────────┴──────────────┴──────────────┴──────┘
//! ```
//!
//! # Unique Columns (Row Identity)
//!
//! The writer uses `unique_cols` (specified as Iceberg field IDs) to uniquely identify rows.
//! These columns form a composite key used for:
//! - Tracking rows written in this session (for position deletes)
//! - Generating equality delete predicates (for rows outside this session)
//!
//! Typically, this would be your table's primary key columns.
//!
//! # Memory Management
//!
//! The writer tracks recently written rows to enable efficient position deletes.
//! The `max_seen_rows` parameter controls this behavior:
//!
//! - **Default (100,000)**: Track up to 100K recently written rows
//! - Deletes for tracked rows → Position deletes (most efficient)
//! - Deletes for older/evicted rows → Equality deletes
//!
//! - **Custom value**: Adjust based on your workload
//! - Higher = more position deletes, more memory usage
//! - Lower = more equality deletes, less memory usage
//!
//! - **Zero (0)**: Disable row tracking completely
//! - All deletes → Equality deletes
//! - No memory overhead, but slower reads
//!
//! # How It Works
//!
//! When you call `write()` with a batch:
//!
//! 1. The batch is partitioned by the operations column
//! 2. For each partition:
//! - **Insert batches** (`ops = OP_INSERT`):
//! - Written to data file writer
//! - Row positions recorded in memory (up to `max_seen_rows`)
//! - **Delete batches** (`ops = OP_DELETE`):
//! - If row exists in tracked positions → Position delete file
//! - If row is unknown or evicted → Equality delete file
//!
//! 3. On `close()`, all three writers are closed and their data files are returned
//!
//! # Example Usage
//!
//! ```ignore
//! use arrow_array::{Int32Array, RecordBatch, StringArray};
//! use iceberg::writer::DeltaWriterBuilder;
//!
//! // Build a delta writer with unique columns [field_id: 1] (the "id" column)
//! let delta_writer = DeltaWriterBuilder::new(
//! data_writer_builder,
//! pos_delete_writer_builder,
//! eq_delete_writer_builder,
//! vec![1], // field IDs of unique columns
//! )
//! .with_max_seen_rows(50_000) // Track 50K rows
//! .build(None)
//! .await?;
//!
//! // Create a batch with inserts and deletes
//! let batch = RecordBatch::try_new(
//! schema.clone(),
//! vec![
//! Arc::new(Int32Array::from(vec![1, 2, 1])), // id
//! Arc::new(StringArray::from(vec!["Alice", "Bob", "Alice"])), // name
//! Arc::new(Int32Array::from(vec![100, 200, -100])), // value
//! Arc::new(Int32Array::from(vec![OP_INSERT, OP_INSERT, OP_DELETE])), // ops
//! ],
//! )?;
//!
//! delta_writer.write(batch).await?;
//! let data_files = delta_writer.close().await?;
//!
Willingness to contribute
I can contribute to this feature independently
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request