From 7d6bc03cbd9900025836187970b1c6492483b38b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 11:44:55 -0700 Subject: [PATCH 1/2] Add doc for partitioning writers --- crates/iceberg/src/writer/mod.rs | 141 +++++++++++++++++- crates/iceberg/src/writer/partitioning/mod.rs | 6 + 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 1da3fa6790..90483c3276 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -242,11 +242,148 @@ //! Ok(()) //! } //! ``` +//! +//! # Adding Partitioning to Data File Writers +//! +//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables. +//! Iceberg provides two partitioning strategies: +//! +//! ## FanoutWriter - For Unsorted Data +//! +//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers. +//! Use this when your data is not pre-sorted by partition key. +//! +//! ```rust, no_run +//! # // Same setup as the simple example above... +//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent}; +//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +//! # use iceberg::writer::file_writer::ParquetWriterBuilder; +//! # use iceberg::writer::file_writer::location_generator::{ +//! # DefaultFileNameGenerator, DefaultLocationGenerator, +//! # }; +//! # use parquet::file::properties::WriterProperties; +//! # use std::collections::HashMap; +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! # let catalog = MemoryCatalogBuilder::default() +//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())])) +//! # .await?; +//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; +//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); +//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); +//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( +//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); +//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); +//! +//! // Wrap the data file writer with FanoutWriter for partitioning +//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter; +//! use iceberg::writer::partitioning::PartitioningWriter; +//! use iceberg::spec::{Literal, PartitionKey, Struct}; +//! +//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder); +//! +//! // Create partition keys for different regions +//! let schema = table.metadata().current_schema().clone(); +//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone(); +//! +//! let partition_key_us = PartitionKey::new( +//! partition_spec.clone(), +//! schema.clone(), +//! Struct::from_iter([Some(Literal::string("US"))]), +//! ); +//! +//! let partition_key_eu = PartitionKey::new( +//! partition_spec.clone(), +//! schema.clone(), +//! Struct::from_iter([Some(Literal::string("EU"))]), +//! ); +//! +//! // Write to different partitions in any order (unsorted) +//! // fanout_writer.write(partition_key_us, batch_us).await?; +//! // fanout_writer.write(partition_key_eu, batch_eu).await?; +//! // fanout_writer.write(partition_key_us, batch_us2).await?; // Can go back to US +//! +//! let data_files = fanout_writer.close().await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! ## ClusteredWriter - For Sorted Data +//! +//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains +//! only one active writer at a time, but requires input sorted by partition key. +//! +//! ```rust, no_run +//! # // Same setup as the simple example above... +//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent}; +//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +//! # use iceberg::writer::file_writer::ParquetWriterBuilder; +//! # use iceberg::writer::file_writer::location_generator::{ +//! # DefaultFileNameGenerator, DefaultLocationGenerator, +//! # }; +//! # use parquet::file::properties::WriterProperties; +//! # use std::collections::HashMap; +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! # let catalog = MemoryCatalogBuilder::default() +//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())])) +//! # .await?; +//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; +//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); +//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); +//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( +//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); +//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); +//! +//! // Wrap the data file writer with ClusteredWriter for sorted partitioning +//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter; +//! use iceberg::writer::partitioning::PartitioningWriter; +//! use iceberg::spec::{Literal, PartitionKey, Struct}; +//! +//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder); +//! +//! // Create partition keys (must write in sorted order) +//! let schema = table.metadata().current_schema().clone(); +//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone(); +//! +//! let partition_key_asia = PartitionKey::new( +//! partition_spec.clone(), +//! schema.clone(), +//! Struct::from_iter([Some(Literal::string("ASIA"))]), +//! ); +//! +//! let partition_key_eu = PartitionKey::new( +//! partition_spec.clone(), +//! schema.clone(), +//! Struct::from_iter([Some(Literal::string("EU"))]), +//! ); +//! +//! let partition_key_us = PartitionKey::new( +//! partition_spec.clone(), +//! schema.clone(), +//! Struct::from_iter([Some(Literal::string("US"))]), +//! ); +//! +//! // Write to partitions in sorted order (ASIA -> EU -> US) +//! // clustered_writer.write(partition_key_asia, batch_asia).await?; +//! // clustered_writer.write(partition_key_eu, batch_eu).await?; +//! // clustered_writer.write(partition_key_us, batch_us).await?; +//! // Writing back to ASIA would fail since data must be sorted! +//! +//! let data_files = clustered_writer.close().await?; +//! +//! Ok(()) +//! } +//! ``` pub mod base_writer; pub mod file_writer; -/// Provides partition-aware writers -/// TODO examples pub mod partitioning; use arrow_array::RecordBatch; diff --git a/crates/iceberg/src/writer/partitioning/mod.rs b/crates/iceberg/src/writer/partitioning/mod.rs index 36acb2a987..f63a9d0d26 100644 --- a/crates/iceberg/src/writer/partitioning/mod.rs +++ b/crates/iceberg/src/writer/partitioning/mod.rs @@ -15,6 +15,12 @@ // specific language governing permissions and limitations // under the License. +//! Partitioning writers for handling partitioned Iceberg tables. +//! +//! This module provides two strategies for writing to partitioned tables: +//! - [`FanoutWriter`](fanout_writer::FanoutWriter): Handles unsorted data by maintaining multiple active writers +//! - [`ClusteredWriter`](clustered_writer::ClusteredWriter): Optimized for pre-sorted data with single active writer + pub mod clustered_writer; pub mod fanout_writer; From 6739cb3d586da980c7913ffddc3266ddc0857176 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 12:00:01 -0700 Subject: [PATCH 2/2] minor --- crates/iceberg/src/writer/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 90483c3276..a7892d49e1 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -251,7 +251,8 @@ //! ## FanoutWriter - For Unsorted Data //! //! Wraps the data file writer to handle unsorted data by maintaining multiple active writers. -//! Use this when your data is not pre-sorted by partition key. +//! Use this when your data is not pre-sorted by partition key. Writes to different partitions +//! can happen in any order, even interleaved. //! //! ```rust, no_run //! # // Same setup as the simple example above... @@ -301,10 +302,11 @@ //! Struct::from_iter([Some(Literal::string("EU"))]), //! ); //! -//! // Write to different partitions in any order (unsorted) -//! // fanout_writer.write(partition_key_us, batch_us).await?; -//! // fanout_writer.write(partition_key_eu, batch_eu).await?; -//! // fanout_writer.write(partition_key_us, batch_us2).await?; // Can go back to US +//! // Write to different partitions in any order - can interleave partition writes +//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?; +//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?; +//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK! +//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK! //! //! let data_files = fanout_writer.close().await?; //! # Ok(())