diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 4975456010..916577962c 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -49,6 +49,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Whether to use `FanoutWriter` for partitioned tables. + pub write_fanout_enabled: bool, } impl TableProperties { @@ -137,6 +139,12 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + + /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data). + /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient). + pub const PROPERTY_WRITE_FANOUT_ENABLED: &str = "write.fanout.enabled"; + /// Default value for fanout writer enabled + pub const PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; } impl TryFrom<&HashMap> for TableProperties { @@ -175,6 +183,11 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + write_fanout_enabled: parse_property( + props, + TableProperties::PROPERTY_WRITE_FANOUT_ENABLED, + TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT, + )?, }) } } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 9eb53c235f..71d3525986 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -266,8 +266,26 @@ impl ExecutionPlan for IcebergWriteExec { let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Create TaskWriter - // TODO: Make fanout_enabled configurable via table properties - let fanout_enabled = true; + let fanout_enabled = self + .table + .metadata() + .properties() + .get(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED) + .map(|value| { + value + .parse::() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for write.fanout.enabled, expected 'true' or 'false'", + ) + .with_source(e) + }) + .map_err(to_datafusion_error) + }) + .transpose()? + .unwrap_or(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT); + let schema = self.table.metadata().current_schema().clone(); let partition_spec = self.table.metadata().default_partition_spec().clone(); let task_writer = TaskWriter::try_new(