Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 141 additions & 2 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,150 @@
//! Ok(())
//! }
//! ```
//!
//! # Adding Partitioning to Data File Writers
//!
//! You can wrap a `DataFileWriter` with partitioning writers to handle partitioned tables.
//! Iceberg provides two partitioning strategies:
//!
//! ## FanoutWriter - For Unsorted Data
//!
//! Wraps the data file writer to handle unsorted data by maintaining multiple active writers.
//! Use this when your data is not pre-sorted by partition key. Writes to different partitions
//! can happen in any order, even interleaved.
//!
//! ```rust, no_run
//! # // Same setup as the simple example above...
//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
//! # use iceberg::writer::file_writer::location_generator::{
//! # DefaultFileNameGenerator, DefaultLocationGenerator,
//! # };
//! # use parquet::file::properties::WriterProperties;
//! # use std::collections::HashMap;
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let catalog = MemoryCatalogBuilder::default()
//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
//! # .await?;
//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone());
//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator);
//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
//!
//! // Wrap the data file writer with FanoutWriter for partitioning
//! use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
//! use iceberg::writer::partitioning::PartitioningWriter;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
//!
//! let mut fanout_writer = FanoutWriter::new(data_file_writer_builder);
//!
//! // Create partition keys for different regions
//! let schema = table.metadata().current_schema().clone();
//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
//!
//! let partition_key_us = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("US"))]),
//! );
//!
//! let partition_key_eu = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("EU"))]),
//! );
//!
//! // Write to different partitions in any order - can interleave partition writes
//! // fanout_writer.write(partition_key_us.clone(), batch_us1).await?;
//! // fanout_writer.write(partition_key_eu.clone(), batch_eu1).await?;
//! // fanout_writer.write(partition_key_us.clone(), batch_us2).await?; // Back to US - OK!
//! // fanout_writer.write(partition_key_eu.clone(), batch_eu2).await?; // Back to EU - OK!
//!
//! let data_files = fanout_writer.close().await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## ClusteredWriter - For Sorted Data
//!
//! Wraps the data file writer for pre-sorted data. More memory efficient as it maintains
//! only one active writer at a time, but requires input sorted by partition key.
//!
//! ```rust, no_run
//! # // Same setup as the simple example above...
//! # use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
//! # use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
//! # use iceberg::{Catalog, CatalogBuilder, Result, TableIdent};
//! # use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! # use iceberg::writer::file_writer::ParquetWriterBuilder;
//! # use iceberg::writer::file_writer::location_generator::{
//! # DefaultFileNameGenerator, DefaultLocationGenerator,
//! # };
//! # use parquet::file::properties::WriterProperties;
//! # use std::collections::HashMap;
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let catalog = MemoryCatalogBuilder::default()
//! # .load("memory", HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), "file:///path/to/warehouse".to_string())]))
//! # .await?;
//! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?;
//! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
//! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet);
//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone());
//! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator);
//! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
//!
//! // Wrap the data file writer with ClusteredWriter for sorted partitioning
//! use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
//! use iceberg::writer::partitioning::PartitioningWriter;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
//!
//! let mut clustered_writer = ClusteredWriter::new(data_file_writer_builder);
//!
//! // Create partition keys (must write in sorted order)
//! let schema = table.metadata().current_schema().clone();
//! let partition_spec = table.metadata().default_partition_spec().as_ref().clone();
//!
//! let partition_key_asia = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("ASIA"))]),
//! );
//!
//! let partition_key_eu = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("EU"))]),
//! );
//!
//! let partition_key_us = PartitionKey::new(
//! partition_spec.clone(),
//! schema.clone(),
//! Struct::from_iter([Some(Literal::string("US"))]),
//! );
//!
//! // Write to partitions in sorted order (ASIA -> EU -> US)
//! // clustered_writer.write(partition_key_asia, batch_asia).await?;
//! // clustered_writer.write(partition_key_eu, batch_eu).await?;
//! // clustered_writer.write(partition_key_us, batch_us).await?;
//! // Writing back to ASIA would fail since data must be sorted!
//!
//! let data_files = clustered_writer.close().await?;
//!
//! Ok(())
//! }
//! ```

pub mod base_writer;
pub mod file_writer;
/// Provides partition-aware writers
/// TODO examples
pub mod partitioning;

use arrow_array::RecordBatch;
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/writer/partitioning/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! Partitioning writers for handling partitioned Iceberg tables.
//!
//! This module provides two strategies for writing to partitioned tables:
//! - [`FanoutWriter`](fanout_writer::FanoutWriter): Handles unsorted data by maintaining multiple active writers
//! - [`ClusteredWriter`](clustered_writer::ClusteredWriter): Optimized for pre-sorted data with single active writer

pub mod clustered_writer;
pub mod fanout_writer;

Expand Down
Loading