diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 413604f51c..ba21892061 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; +use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; + // Helper function to parse a property from a HashMap // If the property is not found, use the default value fn parse_property( @@ -47,6 +49,14 @@ pub struct TableProperties { pub commit_total_retry_timeout_ms: u64, /// The default format for files. pub write_format_default: String, + /// The default compression codec for Parquet data files. + pub write_parquet_compression_codec: String, + /// The default compression level for Parquet data files. + pub write_parquet_compression_level: Option, + /// The default compression codec for Parquet delete files. + pub write_delete_parquet_compression_codec: String, + /// The default compression level for Parquet delete files. + pub write_delete_parquet_compression_level: Option, /// The target file size for files. pub write_target_file_size_bytes: usize, /// Whether to use `FanoutWriter` for partitioned tables. @@ -134,6 +144,18 @@ impl TableProperties { pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; /// Default value for data file format pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; + /// Default compression codec for Parquet data files. + pub const PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC: &str = "write.parquet.compression-codec"; + /// Default compression codec for Parquet delete files. + pub const PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC: &str = + "write.delete.parquet.compression-codec"; + /// Default value for Parquet data file compression. + pub const PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT: &str = "zstd"; + /// Default compression level for Parquet data files. + pub const PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL: &str = "write.parquet.compression-level"; + /// Default compression level for Parquet delete files. + pub const PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL: &str = + "write.delete.parquet.compression-level"; /// Target file size for newly written files. pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; @@ -144,6 +166,61 @@ impl TableProperties { pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled"; /// Default value for fanout writer enabled pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; + + /// Returns the Parquet compression codec requested by table properties. + /// + /// Unknown or unparsable values intentionally fall back to ZSTD so table + /// writes remain possible even when catalogs provide unexpected strings. + pub fn write_parquet_compression(&self) -> Compression { + parse_parquet_compression( + &self.write_parquet_compression_codec, + self.write_parquet_compression_level.as_deref(), + ) + .unwrap_or(Self::default_parquet_compression()) + } + + /// Returns the Parquet compression codec requested for delete files. + /// + /// Mirrors Java's write-conf behavior: delete compression inherits data + /// compression unless `write.delete.parquet.compression-codec` is set. + pub fn write_delete_parquet_compression(&self) -> Compression { + parse_parquet_compression( + &self.write_delete_parquet_compression_codec, + self.write_delete_parquet_compression_level.as_deref(), + ) + .unwrap_or(Self::default_parquet_compression()) + } + + fn default_parquet_compression() -> Compression { + Compression::ZSTD(ZstdLevel::default()) + } +} + +fn parse_parquet_compression(value: &str, level: Option<&str>) -> Option { + let normalized = value.trim().to_ascii_lowercase().replace('-', "_"); + let level = level.map(str::parse::).transpose().ok()?; + + match normalized.as_str() { + "none" | "uncompressed" => Some(Compression::UNCOMPRESSED), + "snappy" => Some(Compression::SNAPPY), + "gzip" => Some(Compression::GZIP(match level { + Some(level) => GzipLevel::try_new(level).ok()?, + None => GzipLevel::default(), + })), + "lzo" => Some(Compression::LZO), + "brotli" => Some(Compression::BROTLI(match level { + Some(level) => BrotliLevel::try_new(level).ok()?, + None => BrotliLevel::default(), + })), + "lz4" => Some(Compression::LZ4), + "lz4_raw" => Some(Compression::LZ4_RAW), + "zstd" => Some(Compression::ZSTD(match level { + Some(level) => ZstdLevel::try_new(level as i32).ok()?, + None => ZstdLevel::default(), + })), + _ if level.is_none() => value.parse::().ok(), + _ => None, + } } impl TryFrom<&HashMap> for TableProperties { @@ -151,6 +228,24 @@ impl TryFrom<&HashMap> for TableProperties { type Error = anyhow::Error; fn try_from(props: &HashMap) -> Result { + let write_parquet_compression_codec = parse_property( + props, + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC, + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT.to_string(), + )?; + let write_parquet_compression_level = props + .get(TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL) + .cloned(); + let write_delete_parquet_compression_codec = parse_property( + props, + TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC, + write_parquet_compression_codec.clone(), + )?; + let write_delete_parquet_compression_level = props + .get(TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL) + .cloned() + .or_else(|| write_parquet_compression_level.clone()); + Ok(TableProperties { commit_num_retries: parse_property( props, @@ -177,6 +272,10 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_DEFAULT_FILE_FORMAT, TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(), )?, + write_parquet_compression_codec, + write_parquet_compression_level, + write_delete_parquet_compression_codec, + write_delete_parquet_compression_level, write_target_file_size_bytes: parse_property( props, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, @@ -215,6 +314,18 @@ mod tests { table_properties.write_format_default, TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string() ); + assert_eq!( + table_properties.write_parquet_compression_codec, + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT.to_string() + ); + assert_eq!( + table_properties.write_parquet_compression(), + Compression::ZSTD(ZstdLevel::default()) + ); + assert_eq!( + table_properties.write_delete_parquet_compression(), + Compression::ZSTD(ZstdLevel::default()) + ); assert_eq!( table_properties.write_target_file_size_bytes, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT @@ -236,6 +347,10 @@ mod tests { TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(), "avro".to_string(), ), + ( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "snappy".to_string(), + ), ( TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), "512".to_string(), @@ -245,9 +360,80 @@ mod tests { assert_eq!(table_properties.commit_num_retries, 10); assert_eq!(table_properties.commit_max_retry_wait_ms, 20); assert_eq!(table_properties.write_format_default, "avro".to_string()); + assert_eq!( + table_properties.write_parquet_compression(), + Compression::SNAPPY + ); + assert_eq!( + table_properties.write_delete_parquet_compression(), + Compression::SNAPPY + ); assert_eq!(table_properties.write_target_file_size_bytes, 512); } + #[test] + fn test_table_properties_parquet_compression_fallback() { + let invalid_compression = HashMap::from([( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "not-a-codec".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_compression).unwrap(); + assert_eq!( + table_properties.write_parquet_compression(), + Compression::ZSTD(ZstdLevel::default()) + ); + } + + #[test] + fn test_table_properties_parquet_compression_level() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "zstd".to_string(), + ), + ( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL.to_string(), + "5".to_string(), + ), + ]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.write_parquet_compression(), + Compression::ZSTD(ZstdLevel::try_new(5).unwrap()) + ); + assert_eq!( + table_properties.write_delete_parquet_compression(), + Compression::ZSTD(ZstdLevel::try_new(5).unwrap()) + ); + } + + #[test] + fn test_table_properties_delete_parquet_compression_overrides_data_compression() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + ), + ( + TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC.to_string(), + "zstd".to_string(), + ), + ( + TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL.to_string(), + "3".to_string(), + ), + ]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.write_parquet_compression(), + Compression::GZIP(GzipLevel::default()) + ); + assert_eq!( + table_properties.write_delete_parquet_compression(), + Compression::ZSTD(ZstdLevel::try_new(3).unwrap()) + ); + } + #[test] fn test_table_properties_invalid() { let invalid_retries = HashMap::from([( diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 0dea150d31..adf7b79997 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -227,7 +227,9 @@ impl ExecutionPlan for IcebergWriteExec { // Create data file writer builder let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( - WriterProperties::default(), + WriterProperties::builder() + .set_compression(table_props.write_parquet_compression()) + .build(), self.table.metadata().current_schema().clone(), FieldMatchMode::Name, ); @@ -324,10 +326,13 @@ mod tests { use futures::{StreamExt, stream}; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ - DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json, + DataFile, DataFileFormat, NestedField, PrimitiveType, Schema, TableProperties, Type, + deserialize_data_file_from_json, }; use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::basic::{Compression, ZstdLevel}; + use parquet::file::metadata::ParquetMetaDataReader; use tempfile::TempDir; use super::*; @@ -445,15 +450,85 @@ mod tests { location: impl ToString, name: impl ToString, schema: Schema, + ) -> TableCreation { + get_table_creation_with_properties(location, name, schema, HashMap::new()) + } + + fn get_table_creation_with_properties( + location: impl ToString, + name: impl ToString, + schema: Schema, + properties: HashMap, ) -> TableCreation { TableCreation::builder() .location(location.to_string()) .name(name.to_string()) - .properties(HashMap::new()) + .properties(properties) .schema(schema) .build() } + async fn write_test_data_file(table: iceberg::table::Table) -> Result { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_array, name_array]) + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?; + + let input_plan = Arc::new(MockExecutionPlan::new(arrow_schema.clone(), vec![batch])); + let write_exec = IcebergWriteExec::new(table.clone(), input_plan, arrow_schema); + let task_ctx = Arc::new(TaskContext::default()); + let mut stream = write_exec + .execute(0, task_ctx) + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?; + + let batch = stream + .next() + .await + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Expected one result batch"))? + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?; + + let data_file_json = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected StringArray") + .value(0); + + deserialize_data_file_from_json( + data_file_json, + table.metadata().default_partition_spec_id(), + table.metadata().default_partition_type(), + table.metadata().current_schema(), + ) + } + + async fn data_file_parquet_compression( + table: &iceberg::table::Table, + data_file: &DataFile, + ) -> Result { + let bytes = table + .file_io() + .new_input(data_file.file_path())? + .read() + .await?; + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&bytes) + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?; + + Ok(metadata.row_group(0).column(0).compression()) + } + #[tokio::test] async fn test_iceberg_write_exec() -> Result<()> { // 1. Set up test environment @@ -610,4 +685,87 @@ mod tests { Ok(()) } + + /// Java ref: iceberg-1.10.1/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java:169 + /// + /// Java setup: creates a table, sets `write.parquet.compression-codec`, writes rows, and + /// inspects the produced Parquet file footer. + /// Java asserts: the written data file uses the requested compression codec. + /// + /// What this tests: DataFusion writes honor the Iceberg table property + /// `write.parquet.compression-codec` when building Parquet writer properties. + /// + /// Faithfulness: FAITHFUL for data files. Java also covers delete files and Spark write/session + /// options, which are not part of this Rust DataFusion write path. + /// + /// Exercises: `IcebergWriteExec::execute` -> `ParquetWriterBuilder` -> Parquet file metadata. + #[tokio::test] + async fn test_iceberg_write_exec_uses_table_parquet_compression_property() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + let namespace = NamespaceIdent::new("test_namespace".to_string()); + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + let schema = get_test_schema()?; + let creation = get_table_creation_with_properties( + temp_path(), + "test_table", + schema, + HashMap::from([( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "snappy".to_string(), + )]), + ); + let table = iceberg_catalog.create_table(&namespace, creation).await?; + + let data_file = write_test_data_file(table.clone()).await?; + let compression = data_file_parquet_compression(&table, &data_file).await?; + + assert_eq!(compression, Compression::SNAPPY); + Ok(()) + } + + /// Java ref: iceberg-1.10.1/core/src/main/java/org/apache/iceberg/TableMetadata.java:90 + /// Java ref: iceberg-1.10.1/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:563 + /// + /// Java setup: new Java tables persist `write.parquet.compression-codec=zstd`; Spark write + /// config falls back to the table/default value when no write override is present. + /// Java asserts: compression defaults are resolved before writing. + /// + /// What this tests: Rust's requested tolerant behavior: absent or unrecognized + /// `write.parquet.compression-codec` values fall back to ZSTD for actual Parquet writes. + /// + /// Faithfulness: Rust-specific. Java rejects invalid codec values later in Parquet config, + /// while this branch intentionally treats unrecognized values as ZSTD. + /// + /// Exercises: `TableProperties::write_parquet_compression` through the production + /// DataFusion write path and Parquet file metadata. + #[tokio::test] + async fn test_iceberg_write_exec_falls_back_to_zstd_for_invalid_parquet_compression_property() + -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + let namespace = NamespaceIdent::new("test_namespace".to_string()); + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + let schema = get_test_schema()?; + let creation = get_table_creation_with_properties( + temp_path(), + "test_table", + schema, + HashMap::from([( + TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(), + "not-a-codec".to_string(), + )]), + ); + let table = iceberg_catalog.create_table(&namespace, creation).await?; + + let data_file = write_test_data_file(table.clone()).await?; + let compression = data_file_parquet_compression(&table, &data_file).await?; + + assert_eq!(compression, Compression::ZSTD(ZstdLevel::default())); + Ok(()) + } }