From 7f901979f6e3c6ed572c691d75d556087aa98876 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 19 Nov 2025 00:28:28 -0500 Subject: [PATCH 1/2] feat: Make `FanoutWriter` writer configurable --- crates/iceberg/src/spec/table_properties.rs | 6 +++++ .../datafusion/src/physical_plan/write.rs | 22 +++++++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 4975456010..2b4c9d000d 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -137,6 +137,12 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + + /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data). + /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient). + pub const PROPERTY_WRITE_FANOUT_ENABLED: &str = "write.fanout.enabled"; + /// Default value for fanout writer enabled + pub const PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; } impl TryFrom<&HashMap> for TableProperties { diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 9eb53c235f..71d3525986 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -266,8 +266,26 @@ impl ExecutionPlan for IcebergWriteExec { let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Create TaskWriter - // TODO: Make fanout_enabled configurable via table properties - let fanout_enabled = true; + let fanout_enabled = self + .table + .metadata() + .properties() + .get(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED) + .map(|value| { + value + .parse::() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for write.fanout.enabled, expected 'true' or 'false'", + ) + .with_source(e) + }) + .map_err(to_datafusion_error) + }) + .transpose()? + .unwrap_or(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT); + let schema = self.table.metadata().current_schema().clone(); let partition_spec = self.table.metadata().default_partition_spec().clone(); let task_writer = TaskWriter::try_new( From 4308d0988992f3efe0a4f682056c6d3373bc22a5 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 20 Nov 2025 15:37:30 -0500 Subject: [PATCH 2/2] Add to `TableProperties` struct --- crates/iceberg/src/spec/table_properties.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 2b4c9d000d..916577962c 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -49,6 +49,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Whether to use `FanoutWriter` for partitioned tables. + pub write_fanout_enabled: bool, } impl TableProperties { @@ -181,6 +183,11 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + write_fanout_enabled: parse_property( + props, + TableProperties::PROPERTY_WRITE_FANOUT_ENABLED, + TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT, + )?, }) } }