Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/iceberg/src/spec/table_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct TableProperties {
pub write_format_default: String,
/// The target file size for files.
pub write_target_file_size_bytes: usize,
/// Whether to use `FanoutWriter` for partitioned tables.
pub write_fanout_enabled: bool,
}

impl TableProperties {
Expand Down Expand Up @@ -137,6 +139,12 @@ impl TableProperties {
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
/// Default target file size
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB

/// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
/// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
pub const PROPERTY_WRITE_FANOUT_ENABLED: &str = "write.fanout.enabled";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think write.datafusion.fanout.enabled will be a better property name since Java has

public static final String SPARK_WRITE_PARTITIONED_FANOUT_ENABLED = "write.spark.fanout.enabled";

/// Default value for fanout writer enabled
pub const PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
}

impl TryFrom<&HashMap<String, String>> for TableProperties {
Expand Down Expand Up @@ -175,6 +183,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties {
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)?,
write_fanout_enabled: parse_property(
props,
TableProperties::PROPERTY_WRITE_FANOUT_ENABLED,
TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT,
)?,
})
}
}
Expand Down
22 changes: 20 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,26 @@ impl ExecutionPlan for IcebergWriteExec {
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);

// Create TaskWriter
// TODO: Make fanout_enabled configurable via table properties
let fanout_enabled = true;
let fanout_enabled = self
.table
.metadata()
.properties()
.get(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED)
.map(|value| {
value
.parse::<bool>()
.map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for write.fanout.enabled, expected 'true' or 'false'",
)
.with_source(e)
})
.map_err(to_datafusion_error)
})
.transpose()?
.unwrap_or(TableProperties::PROPERTY_WRITE_FANOUT_ENABLED_DEFAULT);

let schema = self.table.metadata().current_schema().clone();
let partition_spec = self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::try_new(
Expand Down
Loading