From 665b970af76d4612800c8eba55f9cc531678b157 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 12 Nov 2025 23:00:56 +0000 Subject: [PATCH 01/15] feat!: Support compression codecs for JSON metadata and Avro Previously these properties where not honored on tabel properties. - Adds table properties for these values. - Plumbs them through for writers. --- Cargo.lock | 2 + crates/iceberg/Cargo.toml | 2 + crates/iceberg/src/spec/avro_util.rs | 230 ++++++++++++++++++++ crates/iceberg/src/spec/manifest/writer.rs | 37 +++- crates/iceberg/src/spec/manifest_list.rs | 49 ++++- crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/table_metadata.rs | 115 +++++++++- crates/iceberg/src/spec/table_properties.rs | 71 ++++++ crates/iceberg/src/transaction/snapshot.rs | 33 ++- 9 files changed, 533 insertions(+), 7 deletions(-) create mode 100644 crates/iceberg/src/spec/avro_util.rs diff --git a/Cargo.lock b/Cargo.lock index 2edc51a4ea..d094e4edc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3521,7 +3521,9 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "log", "minijinja", + "miniz_oxide", "mockall", "moka", "murmur3", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 895a5cf5e4..b1fec279b2 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -67,6 +67,8 @@ flate2 = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +log = { workspace = true } +miniz_oxide = "0.8" moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs new file mode 100644 index 0000000000..055b304f47 --- /dev/null +++ b/crates/iceberg/src/spec/avro_util.rs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for working with Apache Avro in Iceberg. + +use apache_avro::Codec; +use log::warn; + +/// Convert codec name and level to apache_avro::Codec. +/// Returns Codec::Null for unknown or unsupported codecs. +/// +/// # Arguments +/// +/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "deflate", "none") +/// * `level` - The compression level. For deflate/gzip: +/// - 0: NoCompression +/// - 1: BestSpeed +/// - 9: BestCompression +/// - 10: UberCompression +/// - Other values: DefaultLevel (6) +/// +/// # Supported Codecs +/// +/// - `gzip` or `deflate`: Uses Deflate compression with specified level +/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range) +/// - `none` or `None`: No compression +/// - Any other value: Defaults to no compression (Codec::Null) +/// +/// # Compression Levels +/// +/// The compression level mapping is based on miniz_oxide's CompressionLevel enum: +/// - Level 0: No compression +/// - Level 1: Best speed (fastest) +/// - Level 9: Best compression (slower, better compression) +/// - Level 10: Uber compression (slowest, best compression) +/// - Other: Default level (balanced speed/compression) +pub(crate) fn codec_from_str(codec: Option<&str>, level: u8) -> Codec { + use apache_avro::{DeflateSettings, ZstandardSettings}; + + match codec { + Some("gzip") | Some("deflate") => { + // Map compression level to miniz_oxide::deflate::CompressionLevel + // Reference: https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html + use miniz_oxide::deflate::CompressionLevel; + + let compression_level = match level { + 0 => CompressionLevel::NoCompression, + 1 => CompressionLevel::BestSpeed, + 9 => CompressionLevel::BestCompression, + 10 => CompressionLevel::UberCompression, + _ => CompressionLevel::DefaultLevel, + }; + + Codec::Deflate(DeflateSettings::new(compression_level)) + } + Some("zstd") => { + // Zstandard supports levels 0-22, clamp to valid range + let zstd_level = level.min(22); + Codec::Zstandard(ZstandardSettings::new(zstd_level)) + } + Some("none") | None => Codec::Null, + Some(unknown) => { + warn!( + "Unrecognized compression codec '{}', using no compression (Codec::Null)", + unknown + ); + Codec::Null + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_codec_from_str_gzip() { + let codec = codec_from_str(Some("gzip"), 5); + assert!(matches!(codec, Codec::Deflate(_))); + } + + #[test] + fn test_codec_from_str_deflate() { + let codec = codec_from_str(Some("deflate"), 9); + assert!(matches!(codec, Codec::Deflate(_))); + } + + #[test] + fn test_codec_from_str_zstd() { + let codec = codec_from_str(Some("zstd"), 3); + assert!(matches!(codec, Codec::Zstandard(_))); + } + + #[test] + fn test_codec_from_str_none() { + let codec = codec_from_str(Some("none"), 0); + assert!(matches!(codec, Codec::Null)); + } + + #[test] + fn test_codec_from_str_null() { + let codec = codec_from_str(None, 0); + assert!(matches!(codec, Codec::Null)); + } + + #[test] + fn test_codec_from_str_unknown() { + let codec = codec_from_str(Some("unknown"), 1); + assert!(matches!(codec, Codec::Null)); + } + + #[test] + fn test_codec_from_str_deflate_levels() { + use apache_avro::{Writer, Schema, types::Record}; + use std::collections::HashMap; + + // Create a simple schema for testing + let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); + + // Create test data + let test_str = "test data that should compress differently at different levels. This is a longer string to ensure compression has something to work with. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog."; + + // Test that different compression levels produce different output sizes + let mut sizes = HashMap::new(); + for level in [0, 1, 5, 9, 10] { + let codec = codec_from_str(Some("gzip"), level); + let mut writer = Writer::with_codec(&schema, Vec::new(), codec); + + let mut record = Record::new(&schema).unwrap(); + record.put("field", test_str); + writer.append(record).unwrap(); + + let encoded = writer.into_inner().unwrap(); + sizes.insert(level, encoded.len()); + } + + // Level 0 (NoCompression) should be largest + // Level 10 (UberCompression) should be smallest or equal to level 9 + assert!(sizes[&0] >= sizes[&1], "Level 0 should be >= level 1"); + assert!(sizes[&1] >= sizes[&9] || sizes[&1] == sizes[&9], "Level 1 should be >= level 9"); + assert!(sizes[&9] >= sizes[&10] || sizes[&9] == sizes[&10], "Level 9 should be >= level 10"); + } + + #[test] + fn test_codec_from_str_zstd_levels() { + use apache_avro::{Writer, Schema, types::Record}; + + // Create a simple schema for testing + let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); + let test_str = "test data that should compress differently at different levels. This is a longer string to ensure compression has something to work with. The quick brown fox jumps over the lazy dog. The quick brown fox jumps over the lazy dog."; + + // Test various levels by checking they produce valid codecs + for level in [0, 3, 15, 22] { + let codec = codec_from_str(Some("zstd"), level); + assert!(matches!(codec, Codec::Zstandard(_))); + + // Verify the codec actually works by compressing data + let mut writer = Writer::with_codec(&schema, Vec::new(), codec); + let mut record = Record::new(&schema).unwrap(); + record.put("field", test_str); + writer.append(record).unwrap(); + + let encoded = writer.into_inner().unwrap(); + assert!(encoded.len() > 0, "Compression should produce output"); + } + + // Test clamping - higher than 22 should be clamped to 22 + let codec_100 = codec_from_str(Some("zstd"), 100); + let codec_22 = codec_from_str(Some("zstd"), 22); + + // Both should work and produce similar results + let mut writer_100 = Writer::with_codec(&schema, Vec::new(), codec_100); + let mut record_100 = Record::new(&schema).unwrap(); + record_100.put("field", test_str); + writer_100.append(record_100).unwrap(); + let encoded_100 = writer_100.into_inner().unwrap(); + + let mut writer_22 = Writer::with_codec(&schema, Vec::new(), codec_22); + let mut record_22 = Record::new(&schema).unwrap(); + record_22.put("field", test_str); + writer_22.append(record_22).unwrap(); + let encoded_22 = writer_22.into_inner().unwrap(); + + // Both should produce the same size since 100 is clamped to 22 + assert_eq!(encoded_100.len(), encoded_22.len(), "Level 100 should be clamped to 22"); + } + + #[test] + fn test_compression_level_differences() { + use apache_avro::{Writer, Schema, types::Record}; + + // Create a schema and data that will compress well + let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); + + // Use highly compressible data + let test_str = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + + // Test gzip level 0 (no compression) vs level 9 (best compression) + let codec_0 = codec_from_str(Some("gzip"), 0); + let mut writer_0 = Writer::with_codec(&schema, Vec::new(), codec_0); + let mut record_0 = Record::new(&schema).unwrap(); + record_0.put("field", test_str); + writer_0.append(record_0).unwrap(); + let size_0 = writer_0.into_inner().unwrap().len(); + + let codec_9 = codec_from_str(Some("gzip"), 9); + let mut writer_9 = Writer::with_codec(&schema, Vec::new(), codec_9); + let mut record_9 = Record::new(&schema).unwrap(); + record_9.put("field", test_str); + writer_9.append(record_9).unwrap(); + let size_9 = writer_9.into_inner().unwrap().len(); + + // Level 0 should produce larger output than level 9 for compressible data + assert!(size_0 > size_9, "NoCompression (level 0) should produce larger output than BestCompression (level 9): {} vs {}", size_0, size_9); + } +} diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index ebb0590bcf..aa727c1388 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -28,6 +28,7 @@ use super::{ }; use crate::error::Result; use crate::io::OutputFile; +use crate::spec::avro_util::codec_from_str; use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2}; use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2}; use crate::spec::{ @@ -43,6 +44,8 @@ pub struct ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, + compression_codec: String, + compression_level: u8, } impl ManifestWriterBuilder { @@ -54,15 +57,26 @@ impl ManifestWriterBuilder { schema: SchemaRef, partition_spec: PartitionSpec, ) -> Self { + use crate::spec::TableProperties; + Self { output, snapshot_id, key_metadata, schema, partition_spec, + compression_codec: TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + compression_level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, } } + /// Set compression codec and level for the manifest file. + pub fn with_compression(mut self, codec: String, level: u8) -> Self { + self.compression_codec = codec; + self.compression_level = level; + self + } + /// Build a [`ManifestWriter`] for format version 1. pub fn build_v1(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() @@ -78,6 +92,8 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression_codec, + self.compression_level, ) } @@ -96,6 +112,8 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression_codec, + self.compression_level, ) } @@ -114,6 +132,8 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression_codec, + self.compression_level, ) } @@ -134,6 +154,8 @@ impl ManifestWriterBuilder { // First row id is assigned by the [`ManifestListWriter`] when the manifest // is added to the list. None, + self.compression_codec, + self.compression_level, ) } @@ -152,6 +174,8 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression_codec, + self.compression_level, ) } } @@ -177,6 +201,9 @@ pub struct ManifestWriter { manifest_entries: Vec, metadata: ManifestMetadata, + + compression_codec: String, + compression_level: u8, } impl ManifestWriter { @@ -187,6 +214,8 @@ impl ManifestWriter { key_metadata: Option>, metadata: ManifestMetadata, first_row_id: Option, + compression_codec: String, + compression_level: u8, ) -> Self { Self { output, @@ -202,6 +231,8 @@ impl ManifestWriter { key_metadata, manifest_entries: Vec::new(), metadata, + compression_codec, + compression_level, } } @@ -410,7 +441,11 @@ impl ManifestWriter { // Manifest schema did not change between V2 and V3 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?, }; - let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + + // Determine compression codec using helper function + let codec = codec_from_str(Some(self.compression_codec.as_str()), self.compression_level); + + let mut avro_writer = AvroWriter::with_codec(&avro_schema, Vec::new(), codec); avro_writer.add_user_metadata( "schema".to_string(), to_vec(table_schema).map_err(|err| { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5e97e5466e..fd41f33e12 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -31,6 +31,7 @@ use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{FormatVersion, Manifest}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; +use crate::spec::avro_util::codec_from_str; use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; use crate::spec::manifest_list::_serde::ManifestFileV3; use crate::{Error, ErrorKind}; @@ -98,6 +99,8 @@ pub struct ManifestListWriter { sequence_number: i64, snapshot_id: i64, next_row_id: Option, + compression_codec: String, + compression_level: u8, } impl std::fmt::Debug for ManifestListWriter { @@ -118,6 +121,8 @@ impl ManifestListWriter { /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + use crate::spec::TableProperties; + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("format-version".to_string(), "1".to_string()), @@ -135,6 +140,8 @@ impl ManifestListWriter { 0, snapshot_id, None, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, ) } @@ -145,6 +152,8 @@ impl ManifestListWriter { parent_snapshot_id: Option, sequence_number: i64, ) -> Self { + use crate::spec::TableProperties; + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("sequence-number".to_string(), sequence_number.to_string()), @@ -163,6 +172,8 @@ impl ManifestListWriter { sequence_number, snapshot_id, None, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, ) } @@ -174,6 +185,8 @@ impl ManifestListWriter { sequence_number: i64, first_row_id: Option, // Always None for delete manifests ) -> Self { + use crate::spec::TableProperties; + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("sequence-number".to_string(), sequence_number.to_string()), @@ -198,9 +211,35 @@ impl ManifestListWriter { sequence_number, snapshot_id, first_row_id, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, ) } + /// Set compression codec and level for the manifest list file. + pub fn with_compression(mut self, codec: String, level: u8) -> Self { + self.compression_codec = codec.clone(); + self.compression_level = level; + + // Recreate the avro_writer with the new codec + let avro_schema = match self.format_version { + FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, + FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2, + FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, + }; + + // Use helper function to get codec + let compression = codec_from_str(Some(codec.as_str()), level); + + let new_writer = Writer::with_codec(avro_schema, Vec::new(), compression); + + // Copy over existing metadata from the old writer + // Unfortunately, we can't extract metadata from the old writer, + // so we'll need to handle this differently + self.avro_writer = new_writer; + self + } + fn new( format_version: FormatVersion, output_file: OutputFile, @@ -208,13 +247,19 @@ impl ManifestListWriter { sequence_number: i64, snapshot_id: i64, first_row_id: Option, + compression_codec: String, + compression_level: u8, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2, FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, }; - let mut avro_writer = Writer::new(avro_schema, Vec::new()); + + // Use helper function to determine compression codec + let codec = codec_from_str(Some(compression_codec.as_str()), compression_level); + + let mut avro_writer = Writer::with_codec(avro_schema, Vec::new(), codec); for (key, value) in metadata { avro_writer .add_user_metadata(key, value) @@ -227,6 +272,8 @@ impl ManifestListWriter { sequence_number, snapshot_id, next_row_id: first_row_id, + compression_codec, + compression_level, } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 44b35e5a6b..cfe9132193 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -17,6 +17,7 @@ //! Spec for Iceberg. +mod avro_util; mod datatypes; mod encrypted_key; mod manifest; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 06b32cc847..24c0fb99a5 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -461,9 +461,57 @@ impl TableMetadata { file_io: &FileIO, metadata_location: impl AsRef, ) -> Result<()> { + use std::io::Write as _; + + use flate2::write::GzEncoder; + + let json_data = serde_json::to_vec(self)?; + + // Check if compression is enabled via table properties + let codec = self + .properties + .get(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) + .map(|s| s.as_str()) + .unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT); + + let (data_to_write, actual_location) = match codec { + "gzip" => { + let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(&json_data).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to compress metadata with gzip", + ) + .with_source(e) + })?; + let compressed_data = encoder.finish().map_err(|e| { + Error::new(ErrorKind::DataInvalid, "Failed to finish gzip compression") + .with_source(e) + })?; + + // Modify filename to add .gz before .metadata.json + let location = metadata_location.as_ref(); + let new_location = if location.ends_with(".metadata.json") { + location.replace(".metadata.json", ".gz.metadata.json") + } else { + // If it doesn't end with expected pattern, just append .gz + format!("{}.gz", location) + }; + + (compressed_data, new_location) + } + "none" | "" => (json_data, metadata_location.as_ref().to_string()), + other => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported metadata compression codec: {}", other), + )); + } + }; + file_io - .new_output(metadata_location)? - .write(serde_json::to_vec(self)?.into()) + .new_output(actual_location)? + .write(data_to_write.into()) .await } @@ -1556,7 +1604,7 @@ mod tests { BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, Transform, Type, UnboundPartitionField, + Summary, TableProperties, Transform, Type, UnboundPartitionField, }; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { @@ -3584,6 +3632,67 @@ mod tests { assert!(result.is_err()); } + #[tokio::test] + async fn test_table_metadata_write_with_gzip_compression() { + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_str().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + // Get a test metadata and add gzip compression property + let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); + + // Modify properties to enable gzip compression + let mut props = original_metadata.properties.clone(); + props.insert( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + ); + // Use builder to create new metadata with updated properties + let compressed_metadata = + TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None) + .assign_uuid(original_metadata.table_uuid) + .set_properties(props.clone()) + .unwrap() + .build() + .unwrap() + .metadata; + + // Write the metadata with compression - note the location will be modified to add .gz + let metadata_location = format!("{temp_path}/00001-test.metadata.json"); + compressed_metadata + .write_to(&file_io, &metadata_location) + .await + .unwrap(); + + // The actual file should be written with .gz.metadata.json extension + let expected_compressed_location = format!("{temp_path}/00001-test.gz.metadata.json"); + + // Verify the compressed file exists + assert!(std::path::Path::new(&expected_compressed_location).exists()); + + // Read the raw file and check it's gzip compressed + let raw_content = std::fs::read(&expected_compressed_location).unwrap(); + assert!(raw_content.len() > 2); + assert_eq!(raw_content[0], 0x1F); // gzip magic number + assert_eq!(raw_content[1], 0x8B); // gzip magic number + + // Read the metadata back using the compressed location + let read_metadata = TableMetadata::read_from(&file_io, &expected_compressed_location) + .await + .unwrap(); + + // Verify the properties include the compression codec + assert_eq!( + read_metadata + .properties + .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC), + Some(&"gzip".to_string()) + ); + + // Verify the complete round-trip: read metadata should match what we wrote + assert_eq!(read_metadata, compressed_metadata); + } + #[test] fn test_partition_name_exists() { let schema = Schema::builder() diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 4975456010..10908362ca 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -49,6 +49,12 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Compression codec for metadata files (JSON) + pub metadata_compression_codec: String, + /// Compression codec for Avro files (manifests, manifest lists) + pub avro_compression_codec: String, + /// Compression level for Avro files + pub avro_compression_level: u8, } impl TableProperties { @@ -137,6 +143,21 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + + /// Compression codec for metadata files (JSON) + pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; + /// Default metadata compression codec - none + pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; + + /// Compression codec for Avro files (manifests, manifest lists) + pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = "write.avro.compression-codec"; + /// Default Avro compression codec - gzip + pub const PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT: &str = "gzip"; + + /// Compression level for Avro files + pub const PROPERTY_AVRO_COMPRESSION_LEVEL: &str = "write.avro.compression-level"; + /// Default Avro compression level (9 = BestCompression) + pub const PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT: u8 = 9; } impl TryFrom<&HashMap> for TableProperties { @@ -175,6 +196,21 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + metadata_compression_codec: parse_property( + props, + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC, + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT.to_string(), + )?, + avro_compression_codec: parse_property( + props, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + )?, + avro_compression_level: parse_property( + props, + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL, + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + )?, }) } } @@ -207,6 +243,41 @@ mod tests { table_properties.write_target_file_size_bytes, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ); + // Test compression defaults + assert_eq!( + table_properties.metadata_compression_codec, + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT.to_string() + ); + assert_eq!( + table_properties.avro_compression_codec, + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string() + ); + assert_eq!( + table_properties.avro_compression_level, + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT + ); + } + + #[test] + fn test_table_properties_compression() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + ), + ( + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC.to_string(), + "zstd".to_string(), + ), + ( + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL.to_string(), + "3".to_string(), + ), + ]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!(table_properties.metadata_compression_codec, "gzip"); + assert_eq!(table_properties.avro_compression_codec, "zstd"); + assert_eq!(table_properties.avro_compression_level, 3); } #[test] diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 4f85962ff1..1d4fbd13dd 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -195,6 +195,19 @@ impl<'a> SnapshotProducer<'a> { DataFileFormat::Avro ); let output_file = self.table.file_io().new_output(new_manifest_path)?; + + // Get compression settings from table properties + let table_props = + TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to parse table properties for compression settings", + ) + .with_source(e) + })?; + let codec = table_props.avro_compression_codec.clone(); + let level = table_props.avro_compression_level; + let builder = ManifestWriterBuilder::new( output_file, Some(self.snapshot_id), @@ -205,7 +218,9 @@ impl<'a> SnapshotProducer<'a> { .default_partition_spec() .as_ref() .clone(), - ); + ) + .with_compression(codec, level); + match self.table.metadata().format_version() { FormatVersion::V1 => Ok(builder.build_v1()), FormatVersion::V2 => match content { @@ -386,6 +401,19 @@ impl<'a> SnapshotProducer<'a> { let manifest_list_path = self.generate_manifest_list_file_path(0); let next_seq_num = self.table.metadata().next_sequence_number(); let first_row_id = self.table.metadata().next_row_id(); + + // Get compression settings from table properties + let table_props = + TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to parse table properties for compression settings", + ) + .with_source(e) + })?; + let codec = table_props.avro_compression_codec.clone(); + let level = table_props.avro_compression_level; + let mut manifest_list_writer = match self.table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.table @@ -411,7 +439,8 @@ impl<'a> SnapshotProducer<'a> { next_seq_num, Some(first_row_id), ), - }; + } + .with_compression(codec, level); // Calling self.summary() before self.manifest_file() is important because self.added_data_files // will be set to an empty vec after self.manifest_file() returns, resulting in an empty summary From 091d3bc5388d20b1bd84cea29df04947d3e89530 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 13 Nov 2025 00:50:57 +0000 Subject: [PATCH 02/15] fmt --- crates/iceberg/src/spec/avro_util.rs | 33 +++++++++++++++++----- crates/iceberg/src/spec/manifest/writer.rs | 5 +++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 055b304f47..9d75a35575 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -125,9 +125,11 @@ mod tests { #[test] fn test_codec_from_str_deflate_levels() { - use apache_avro::{Writer, Schema, types::Record}; use std::collections::HashMap; + use apache_avro::types::Record; + use apache_avro::{Schema, Writer}; + // Create a simple schema for testing let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); @@ -151,13 +153,20 @@ mod tests { // Level 0 (NoCompression) should be largest // Level 10 (UberCompression) should be smallest or equal to level 9 assert!(sizes[&0] >= sizes[&1], "Level 0 should be >= level 1"); - assert!(sizes[&1] >= sizes[&9] || sizes[&1] == sizes[&9], "Level 1 should be >= level 9"); - assert!(sizes[&9] >= sizes[&10] || sizes[&9] == sizes[&10], "Level 9 should be >= level 10"); + assert!( + sizes[&1] >= sizes[&9] || sizes[&1] == sizes[&9], + "Level 1 should be >= level 9" + ); + assert!( + sizes[&9] >= sizes[&10] || sizes[&9] == sizes[&10], + "Level 9 should be >= level 10" + ); } #[test] fn test_codec_from_str_zstd_levels() { - use apache_avro::{Writer, Schema, types::Record}; + use apache_avro::types::Record; + use apache_avro::{Schema, Writer}; // Create a simple schema for testing let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); @@ -196,12 +205,17 @@ mod tests { let encoded_22 = writer_22.into_inner().unwrap(); // Both should produce the same size since 100 is clamped to 22 - assert_eq!(encoded_100.len(), encoded_22.len(), "Level 100 should be clamped to 22"); + assert_eq!( + encoded_100.len(), + encoded_22.len(), + "Level 100 should be clamped to 22" + ); } #[test] fn test_compression_level_differences() { - use apache_avro::{Writer, Schema, types::Record}; + use apache_avro::types::Record; + use apache_avro::{Schema, Writer}; // Create a schema and data that will compress well let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "field", "type": "string"}]}"#).unwrap(); @@ -225,6 +239,11 @@ mod tests { let size_9 = writer_9.into_inner().unwrap().len(); // Level 0 should produce larger output than level 9 for compressible data - assert!(size_0 > size_9, "NoCompression (level 0) should produce larger output than BestCompression (level 9): {} vs {}", size_0, size_9); + assert!( + size_0 > size_9, + "NoCompression (level 0) should produce larger output than BestCompression (level 9): {} vs {}", + size_0, + size_9 + ); } } diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index aa727c1388..2db3ca6356 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -443,7 +443,10 @@ impl ManifestWriter { }; // Determine compression codec using helper function - let codec = codec_from_str(Some(self.compression_codec.as_str()), self.compression_level); + let codec = codec_from_str( + Some(self.compression_codec.as_str()), + self.compression_level, + ); let mut avro_writer = AvroWriter::with_codec(&avro_schema, Vec::new(), codec); avro_writer.add_user_metadata( From 41a8c1c6fe27f61e8b33d5ae7d961268261c1fb1 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 13 Nov 2025 06:54:57 +0000 Subject: [PATCH 03/15] fix clippy --- crates/iceberg/src/spec/avro_util.rs | 31 ++++++++++++++ crates/iceberg/src/spec/manifest/writer.rs | 48 ++++++++-------------- crates/iceberg/src/spec/manifest_list.rs | 43 +++++++------------ crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/transaction/snapshot.rs | 24 ++++++----- 5 files changed, 77 insertions(+), 70 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 9d75a35575..12e1395d4d 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -20,6 +20,37 @@ use apache_avro::Codec; use log::warn; +/// Settings for compression codec and level. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CompressionSettings { + /// The compression codec name (e.g., "gzip", "zstd", "deflate", "none") + pub codec: String, + /// The compression level + pub level: u8, +} + +impl CompressionSettings { + /// Create a new CompressionSettings with the specified codec and level. + pub fn new(codec: String, level: u8) -> Self { + Self { codec, level } + } + + /// Convert to apache_avro::Codec using the codec_from_str helper function. + pub(crate) fn to_codec(&self) -> Codec { + codec_from_str(Some(&self.codec), self.level) + } +} + +impl Default for CompressionSettings { + fn default() -> Self { + use crate::spec::TableProperties; + Self { + codec: TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), + level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + } + } +} + /// Convert codec name and level to apache_avro::Codec. /// Returns Codec::Null for unknown or unsupported codecs. /// diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 2db3ca6356..2ced707efd 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -28,7 +28,7 @@ use super::{ }; use crate::error::Result; use crate::io::OutputFile; -use crate::spec::avro_util::codec_from_str; +use crate::spec::avro_util::CompressionSettings; use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2}; use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2}; use crate::spec::{ @@ -44,8 +44,7 @@ pub struct ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, - compression_codec: String, - compression_level: u8, + compression: CompressionSettings, } impl ManifestWriterBuilder { @@ -57,23 +56,19 @@ impl ManifestWriterBuilder { schema: SchemaRef, partition_spec: PartitionSpec, ) -> Self { - use crate::spec::TableProperties; - Self { output, snapshot_id, key_metadata, schema, partition_spec, - compression_codec: TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), - compression_level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + compression: CompressionSettings::default(), } } - /// Set compression codec and level for the manifest file. - pub fn with_compression(mut self, codec: String, level: u8) -> Self { - self.compression_codec = codec; - self.compression_level = level; + /// Set compression settings for the manifest file. + pub fn with_compression(mut self, compression: CompressionSettings) -> Self { + self.compression = compression; self } @@ -92,8 +87,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, - self.compression_codec, - self.compression_level, + self.compression, ) } @@ -112,8 +106,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, - self.compression_codec, - self.compression_level, + self.compression, ) } @@ -132,8 +125,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, - self.compression_codec, - self.compression_level, + self.compression, ) } @@ -154,8 +146,7 @@ impl ManifestWriterBuilder { // First row id is assigned by the [`ManifestListWriter`] when the manifest // is added to the list. None, - self.compression_codec, - self.compression_level, + self.compression, ) } @@ -174,8 +165,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, - self.compression_codec, - self.compression_level, + self.compression, ) } } @@ -202,8 +192,7 @@ pub struct ManifestWriter { metadata: ManifestMetadata, - compression_codec: String, - compression_level: u8, + compression: CompressionSettings, } impl ManifestWriter { @@ -214,8 +203,7 @@ impl ManifestWriter { key_metadata: Option>, metadata: ManifestMetadata, first_row_id: Option, - compression_codec: String, - compression_level: u8, + compression: CompressionSettings, ) -> Self { Self { output, @@ -231,8 +219,7 @@ impl ManifestWriter { key_metadata, manifest_entries: Vec::new(), metadata, - compression_codec, - compression_level, + compression, } } @@ -442,11 +429,8 @@ impl ManifestWriter { FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?, }; - // Determine compression codec using helper function - let codec = codec_from_str( - Some(self.compression_codec.as_str()), - self.compression_level, - ); + // Determine compression codec using CompressionSettings + let codec = self.compression.to_codec(); let mut avro_writer = AvroWriter::with_codec(&avro_schema, Vec::new(), codec); avro_writer.add_user_metadata( diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index fd41f33e12..e89f3a0243 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -31,7 +31,7 @@ use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{FormatVersion, Manifest}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; -use crate::spec::avro_util::codec_from_str; +use crate::spec::avro_util::CompressionSettings; use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; use crate::spec::manifest_list::_serde::ManifestFileV3; use crate::{Error, ErrorKind}; @@ -99,8 +99,7 @@ pub struct ManifestListWriter { sequence_number: i64, snapshot_id: i64, next_row_id: Option, - compression_codec: String, - compression_level: u8, + compression: CompressionSettings, } impl std::fmt::Debug for ManifestListWriter { @@ -121,8 +120,6 @@ impl ManifestListWriter { /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { - use crate::spec::TableProperties; - let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("format-version".to_string(), "1".to_string()), @@ -140,8 +137,7 @@ impl ManifestListWriter { 0, snapshot_id, None, - TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + CompressionSettings::default(), ) } @@ -152,8 +148,6 @@ impl ManifestListWriter { parent_snapshot_id: Option, sequence_number: i64, ) -> Self { - use crate::spec::TableProperties; - let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("sequence-number".to_string(), sequence_number.to_string()), @@ -172,8 +166,7 @@ impl ManifestListWriter { sequence_number, snapshot_id, None, - TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + CompressionSettings::default(), ) } @@ -185,8 +178,6 @@ impl ManifestListWriter { sequence_number: i64, first_row_id: Option, // Always None for delete manifests ) -> Self { - use crate::spec::TableProperties; - let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("sequence-number".to_string(), sequence_number.to_string()), @@ -211,15 +202,13 @@ impl ManifestListWriter { sequence_number, snapshot_id, first_row_id, - TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + CompressionSettings::default(), ) } - /// Set compression codec and level for the manifest list file. - pub fn with_compression(mut self, codec: String, level: u8) -> Self { - self.compression_codec = codec.clone(); - self.compression_level = level; + /// Set compression settings for the manifest list file. + pub fn with_compression(mut self, compression: CompressionSettings) -> Self { + self.compression = compression.clone(); // Recreate the avro_writer with the new codec let avro_schema = match self.format_version { @@ -228,10 +217,10 @@ impl ManifestListWriter { FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, }; - // Use helper function to get codec - let compression = codec_from_str(Some(codec.as_str()), level); + // Use CompressionSettings to get codec + let codec = compression.to_codec(); - let new_writer = Writer::with_codec(avro_schema, Vec::new(), compression); + let new_writer = Writer::with_codec(avro_schema, Vec::new(), codec); // Copy over existing metadata from the old writer // Unfortunately, we can't extract metadata from the old writer, @@ -247,8 +236,7 @@ impl ManifestListWriter { sequence_number: i64, snapshot_id: i64, first_row_id: Option, - compression_codec: String, - compression_level: u8, + compression: CompressionSettings, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, @@ -256,8 +244,8 @@ impl ManifestListWriter { FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, }; - // Use helper function to determine compression codec - let codec = codec_from_str(Some(compression_codec.as_str()), compression_level); + // Use CompressionSettings to determine compression codec + let codec = compression.to_codec(); let mut avro_writer = Writer::with_codec(avro_schema, Vec::new(), codec); for (key, value) in metadata { @@ -272,8 +260,7 @@ impl ManifestListWriter { sequence_number, snapshot_id, next_row_id: first_row_id, - compression_codec, - compression_level, + compression, } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index cfe9132193..bb8a50d828 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -38,6 +38,7 @@ mod view_metadata; mod view_metadata_builder; mod view_version; +pub use avro_util::CompressionSettings; pub use datatypes::*; pub use encrypted_key::*; pub use manifest::*; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 1d4fbd13dd..c1fde6232c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -23,10 +23,10 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, - TableProperties, update_snapshot_summaries, + CompressionSettings, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, + StructType, Summary, TableProperties, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -205,8 +205,10 @@ impl<'a> SnapshotProducer<'a> { ) .with_source(e) })?; - let codec = table_props.avro_compression_codec.clone(); - let level = table_props.avro_compression_level; + let compression = CompressionSettings::new( + table_props.avro_compression_codec.clone(), + table_props.avro_compression_level, + ); let builder = ManifestWriterBuilder::new( output_file, @@ -219,7 +221,7 @@ impl<'a> SnapshotProducer<'a> { .as_ref() .clone(), ) - .with_compression(codec, level); + .with_compression(compression); match self.table.metadata().format_version() { FormatVersion::V1 => Ok(builder.build_v1()), @@ -411,8 +413,10 @@ impl<'a> SnapshotProducer<'a> { ) .with_source(e) })?; - let codec = table_props.avro_compression_codec.clone(); - let level = table_props.avro_compression_level; + let compression = CompressionSettings::new( + table_props.avro_compression_codec.clone(), + table_props.avro_compression_level, + ); let mut manifest_list_writer = match self.table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( @@ -440,7 +444,7 @@ impl<'a> SnapshotProducer<'a> { Some(first_row_id), ), } - .with_compression(codec, level); + .with_compression(compression); // Calling self.summary() before self.manifest_file() is important because self.added_data_files // will be set to an empty vec after self.manifest_file() returns, resulting in an empty summary From 51e781ea5cf85e37791965a8f81bb26496bbaf1e Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 13 Nov 2025 07:22:03 +0000 Subject: [PATCH 04/15] clippy again --- crates/iceberg/src/spec/avro_util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 12e1395d4d..bf1e5e545b 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -215,7 +215,7 @@ mod tests { writer.append(record).unwrap(); let encoded = writer.into_inner().unwrap(); - assert!(encoded.len() > 0, "Compression should produce output"); + assert!(!encoded.is_empty(), "Compression should produce output"); } // Test clamping - higher than 22 should be clamped to 22 From 253bf59b98d5715fa9308578980d90bfe0e30c5f Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 16 Nov 2025 00:27:42 +0000 Subject: [PATCH 05/15] wip --- crates/iceberg/src/spec/avro_util.rs | 84 +++++++++++++------- crates/iceberg/src/spec/manifest/writer.rs | 9 +-- crates/iceberg/src/spec/manifest_list.rs | 33 ++------ crates/iceberg/src/spec/table_metadata.rs | 2 +- crates/iceberg/src/spec/table_properties.rs | 86 ++++++++++++++++++--- 5 files changed, 140 insertions(+), 74 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index bf1e5e545b..3562054db1 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -23,15 +23,15 @@ use log::warn; /// Settings for compression codec and level. #[derive(Debug, Clone, PartialEq, Eq)] pub struct CompressionSettings { - /// The compression codec name (e.g., "gzip", "zstd", "deflate", "none") + /// The compression codec name (e.g., "gzip", "zstd", "deflate", "uncompressed") pub codec: String, - /// The compression level - pub level: u8, + /// The compression level (None uses codec-specific defaults: gzip=9, zstd=1) + pub level: Option, } impl CompressionSettings { /// Create a new CompressionSettings with the specified codec and level. - pub fn new(codec: String, level: u8) -> Self { + pub fn new(codec: String, level: Option) -> Self { Self { codec, level } } @@ -46,7 +46,7 @@ impl Default for CompressionSettings { use crate::spec::TableProperties; Self { codec: TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), - level: TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, + level: None, } } } @@ -56,8 +56,8 @@ impl Default for CompressionSettings { /// /// # Arguments /// -/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "deflate", "none") -/// * `level` - The compression level. For deflate/gzip: +/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "deflate", "uncompressed") +/// * `level` - The compression level (None uses codec defaults: gzip=9, zstd=1). For deflate/gzip: /// - 0: NoCompression /// - 1: BestSpeed /// - 9: BestCompression @@ -66,9 +66,9 @@ impl Default for CompressionSettings { /// /// # Supported Codecs /// -/// - `gzip` or `deflate`: Uses Deflate compression with specified level -/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range) -/// - `none` or `None`: No compression +/// - `gzip` or `deflate`: Uses Deflate compression with specified level (default: 9) +/// - `zstd`: Uses Zstandard compression (default: 1, level clamped to valid zstd range 0-22) +/// - `uncompressed` or `None`: No compression /// - Any other value: Defaults to no compression (Codec::Null) /// /// # Compression Levels @@ -79,16 +79,17 @@ impl Default for CompressionSettings { /// - Level 9: Best compression (slower, better compression) /// - Level 10: Uber compression (slowest, best compression) /// - Other: Default level (balanced speed/compression) -pub(crate) fn codec_from_str(codec: Option<&str>, level: u8) -> Codec { +pub(crate) fn codec_from_str(codec: Option<&str>, level: Option) -> Codec { use apache_avro::{DeflateSettings, ZstandardSettings}; match codec { Some("gzip") | Some("deflate") => { // Map compression level to miniz_oxide::deflate::CompressionLevel // Reference: https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html + // Default level for gzip/deflate is 9 (BestCompression) to match Java use miniz_oxide::deflate::CompressionLevel; - let compression_level = match level { + let compression_level = match level.unwrap_or(9) { 0 => CompressionLevel::NoCompression, 1 => CompressionLevel::BestSpeed, 9 => CompressionLevel::BestCompression, @@ -100,10 +101,11 @@ pub(crate) fn codec_from_str(codec: Option<&str>, level: u8) -> Codec { } Some("zstd") => { // Zstandard supports levels 0-22, clamp to valid range - let zstd_level = level.min(22); + // Default level for zstd is 1 to match Java + let zstd_level = level.unwrap_or(1).min(22); Codec::Zstandard(ZstandardSettings::new(zstd_level)) } - Some("none") | None => Codec::Null, + Some("uncompressed") | None => Codec::Null, Some(unknown) => { warn!( "Unrecognized compression codec '{}', using no compression (Codec::Null)", @@ -117,43 +119,65 @@ pub(crate) fn codec_from_str(codec: Option<&str>, level: u8) -> Codec { #[cfg(test)] mod tests { use super::*; + use apache_avro::{DeflateSettings, ZstandardSettings}; + use miniz_oxide::deflate::CompressionLevel; #[test] fn test_codec_from_str_gzip() { - let codec = codec_from_str(Some("gzip"), 5); - assert!(matches!(codec, Codec::Deflate(_))); + let codec = codec_from_str(Some("gzip"), Some(5)); + assert_eq!( + codec, + Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel)) + ); } #[test] fn test_codec_from_str_deflate() { - let codec = codec_from_str(Some("deflate"), 9); - assert!(matches!(codec, Codec::Deflate(_))); + let codec = codec_from_str(Some("deflate"), Some(9)); + assert_eq!( + codec, + Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression)) + ); } #[test] fn test_codec_from_str_zstd() { - let codec = codec_from_str(Some("zstd"), 3); - assert!(matches!(codec, Codec::Zstandard(_))); + let codec = codec_from_str(Some("zstd"), Some(3)); + assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(3))); } #[test] - fn test_codec_from_str_none() { - let codec = codec_from_str(Some("none"), 0); + fn test_codec_from_str_uncompressed() { + let codec = codec_from_str(Some("uncompressed"), None); assert!(matches!(codec, Codec::Null)); } #[test] fn test_codec_from_str_null() { - let codec = codec_from_str(None, 0); + let codec = codec_from_str(None, None); assert!(matches!(codec, Codec::Null)); } #[test] fn test_codec_from_str_unknown() { - let codec = codec_from_str(Some("unknown"), 1); + let codec = codec_from_str(Some("unknown"), Some(1)); assert!(matches!(codec, Codec::Null)); } + #[test] + fn test_codec_from_str_gzip_default_level() { + // Test that None level defaults to 9 for gzip + let codec = codec_from_str(Some("gzip"), None); + assert_eq!(codec, Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression))); + } + + #[test] + fn test_codec_from_str_zstd_default_level() { + // Test that None level defaults to 1 for zstd + let codec = codec_from_str(Some("zstd"), None); + assert_eq!(codec, Codec::Zstandard(ZstandardSettings::new(1))); + } + #[test] fn test_codec_from_str_deflate_levels() { use std::collections::HashMap; @@ -170,7 +194,7 @@ mod tests { // Test that different compression levels produce different output sizes let mut sizes = HashMap::new(); for level in [0, 1, 5, 9, 10] { - let codec = codec_from_str(Some("gzip"), level); + let codec = codec_from_str(Some("gzip"), Some(level)); let mut writer = Writer::with_codec(&schema, Vec::new(), codec); let mut record = Record::new(&schema).unwrap(); @@ -205,7 +229,7 @@ mod tests { // Test various levels by checking they produce valid codecs for level in [0, 3, 15, 22] { - let codec = codec_from_str(Some("zstd"), level); + let codec = codec_from_str(Some("zstd"), Some(level)); assert!(matches!(codec, Codec::Zstandard(_))); // Verify the codec actually works by compressing data @@ -219,8 +243,8 @@ mod tests { } // Test clamping - higher than 22 should be clamped to 22 - let codec_100 = codec_from_str(Some("zstd"), 100); - let codec_22 = codec_from_str(Some("zstd"), 22); + let codec_100 = codec_from_str(Some("zstd"), Some(100)); + let codec_22 = codec_from_str(Some("zstd"), Some(22)); // Both should work and produce similar results let mut writer_100 = Writer::with_codec(&schema, Vec::new(), codec_100); @@ -255,14 +279,14 @@ mod tests { let test_str = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; // Test gzip level 0 (no compression) vs level 9 (best compression) - let codec_0 = codec_from_str(Some("gzip"), 0); + let codec_0 = codec_from_str(Some("gzip"), Some(0)); let mut writer_0 = Writer::with_codec(&schema, Vec::new(), codec_0); let mut record_0 = Record::new(&schema).unwrap(); record_0.put("field", test_str); writer_0.append(record_0).unwrap(); let size_0 = writer_0.into_inner().unwrap().len(); - let codec_9 = codec_from_str(Some("gzip"), 9); + let codec_9 = codec_from_str(Some("gzip"), Some(9)); let mut writer_9 = Writer::with_codec(&schema, Vec::new(), codec_9); let mut record_9 = Record::new(&schema).unwrap(); record_9.put("field", test_str); diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 2ced707efd..5db54d27cb 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -55,6 +55,7 @@ impl ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, + compression_settings : CompressionSettings ) -> Self { Self { output, @@ -62,16 +63,10 @@ impl ManifestWriterBuilder { key_metadata, schema, partition_spec, - compression: CompressionSettings::default(), + compression_settings, } } - /// Set compression settings for the manifest file. - pub fn with_compression(mut self, compression: CompressionSettings) -> Self { - self.compression = compression; - self - } - /// Build a [`ManifestWriter`] for format version 1. pub fn build_v1(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index e89f3a0243..f1096ccfaf 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -119,7 +119,7 @@ impl ManifestListWriter { } /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option, compression_settings : CompressionSettings) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("format-version".to_string(), "1".to_string()), @@ -137,7 +137,7 @@ impl ManifestListWriter { 0, snapshot_id, None, - CompressionSettings::default(), + compression_settings, ) } @@ -147,6 +147,7 @@ impl ManifestListWriter { snapshot_id: i64, parent_snapshot_id: Option, sequence_number: i64, + compression_settings : CompressionSettings ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -166,7 +167,7 @@ impl ManifestListWriter { sequence_number, snapshot_id, None, - CompressionSettings::default(), + compression_settings, ) } @@ -177,6 +178,7 @@ impl ManifestListWriter { parent_snapshot_id: Option, sequence_number: i64, first_row_id: Option, // Always None for delete manifests + compression_settings : CompressionSettings ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -202,33 +204,10 @@ impl ManifestListWriter { sequence_number, snapshot_id, first_row_id, - CompressionSettings::default(), + compression_settings, ) } - /// Set compression settings for the manifest list file. - pub fn with_compression(mut self, compression: CompressionSettings) -> Self { - self.compression = compression.clone(); - - // Recreate the avro_writer with the new codec - let avro_schema = match self.format_version { - FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, - FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2, - FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, - }; - - // Use CompressionSettings to get codec - let codec = compression.to_codec(); - - let new_writer = Writer::with_codec(avro_schema, Vec::new(), codec); - - // Copy over existing metadata from the old writer - // Unfortunately, we can't extract metadata from the old writer, - // so we'll need to handle this differently - self.avro_writer = new_writer; - self - } - fn new( format_version: FormatVersion, output_file: OutputFile, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 24c0fb99a5..9407555f2c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -500,7 +500,7 @@ impl TableMetadata { (compressed_data, new_location) } - "none" | "" => (json_data, metadata_location.as_ref().to_string()), + "uncompressed" | "" => (json_data, metadata_location.as_ref().to_string()), other => { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 10908362ca..cad82a95f6 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -34,6 +34,25 @@ where }) } +// Helper function to parse an optional property from a HashMap +// If the property is not found, returns None +fn parse_optional_property( + properties: &HashMap, + key: &str, +) -> Result, anyhow::Error> +where + ::Err: std::fmt::Display, +{ + properties + .get(key) + .map(|value| { + value + .parse::() + .map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}")) + }) + .transpose() +} + /// TableProperties that contains the properties of a table. #[derive(Debug)] pub struct TableProperties { @@ -53,8 +72,8 @@ pub struct TableProperties { pub metadata_compression_codec: String, /// Compression codec for Avro files (manifests, manifest lists) pub avro_compression_codec: String, - /// Compression level for Avro files - pub avro_compression_level: u8, + /// Compression level for Avro files (None uses codec-specific defaults: gzip=9, zstd=1) + pub avro_compression_level: Option, } impl TableProperties { @@ -146,8 +165,8 @@ impl TableProperties { /// Compression codec for metadata files (JSON) pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; - /// Default metadata compression codec - none - pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; + /// Default metadata compression codec - uncompressed + pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "uncompressed"; /// Compression codec for Avro files (manifests, manifest lists) pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = "write.avro.compression-codec"; @@ -156,8 +175,8 @@ impl TableProperties { /// Compression level for Avro files pub const PROPERTY_AVRO_COMPRESSION_LEVEL: &str = "write.avro.compression-level"; - /// Default Avro compression level (9 = BestCompression) - pub const PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT: u8 = 9; + /// Default Avro compression level (None, uses codec-specific defaults: gzip=9, zstd=1) + pub const PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT: Option = None; } impl TryFrom<&HashMap> for TableProperties { @@ -206,10 +225,9 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC, TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), )?, - avro_compression_level: parse_property( + avro_compression_level: parse_optional_property( props, TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL, - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT, )?, }) } @@ -277,7 +295,7 @@ mod tests { let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!(table_properties.metadata_compression_codec, "gzip"); assert_eq!(table_properties.avro_compression_codec, "zstd"); - assert_eq!(table_properties.avro_compression_level, 3); + assert_eq!(table_properties.avro_compression_level, Some(3)); } #[test] @@ -352,4 +370,54 @@ mod tests { "Invalid value for write.target-file-size-bytes: invalid digit found in string" )); } + + #[test] + fn test_parse_optional_property() { + // Test when key is not present - should return None + let props = HashMap::new(); + let result: Option = parse_optional_property(&props, "missing-key").unwrap(); + assert_eq!(result, None); + + // Test when key is present with valid value - should return Some(value) + let props = HashMap::from([("test-key".to_string(), "42".to_string())]); + let result: Option = parse_optional_property(&props, "test-key").unwrap(); + assert_eq!(result, Some(42)); + + // Test when key is present with invalid value - should return error + let props = HashMap::from([("test-key".to_string(), "invalid".to_string())]); + let result = parse_optional_property::(&props, "test-key"); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid value for test-key")); + } + + #[test] + fn test_table_properties_optional_compression_level() { + // Test that compression level is None when not specified + let props = HashMap::new(); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!(table_properties.avro_compression_level, None); + + // Test that compression level is Some(value) when specified + let props = HashMap::from([( + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL.to_string(), + "5".to_string(), + )]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!(table_properties.avro_compression_level, Some(5)); + + // Test that invalid compression level returns error + let props = HashMap::from([( + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL.to_string(), + "invalid".to_string(), + )]); + let result = TableProperties::try_from(&props); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid value for write.avro.compression-level")); + } } From 8bdb52dbf37de9aee8f537eebaf12638d07fd402 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 16 Nov 2025 00:41:10 +0000 Subject: [PATCH 06/15] address comments --- crates/iceberg/src/io/object_cache.rs | 7 ++- crates/iceberg/src/scan/mod.rs | 10 ++- crates/iceberg/src/spec/avro_util.rs | 8 ++- crates/iceberg/src/spec/manifest/mod.rs | 10 ++- crates/iceberg/src/spec/manifest/writer.rs | 5 +- crates/iceberg/src/spec/manifest_list.rs | 67 ++++++++++++++++----- crates/iceberg/src/spec/table_properties.rs | 20 +++--- crates/iceberg/src/transaction/snapshot.rs | 10 +-- 8 files changed, 101 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index af297bebb5..36c8e3e6fd 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -194,8 +194,9 @@ mod tests { use crate::TableIdent; use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata, + CompressionSettings, DataContentType, DataFileBuilder, DataFileFormat, Literal, + ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, + TableMetadata, }; use crate::table::Table; @@ -275,6 +276,7 @@ mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionSettings::default(), ) .build_v2_data(); writer @@ -307,6 +309,7 @@ mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionSettings::default(), ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 6884e00b9b..85009243b1 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -577,9 +577,9 @@ pub mod tests { use crate::io::{FileIO, OutputFile}; use crate::scan::FileScanTask; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, - PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, + CompressionSettings, DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, + ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, + PartitionSpec, PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, }; use crate::table::Table; @@ -749,6 +749,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionSettings::default(), ) .build_v2_data(); writer @@ -826,6 +827,7 @@ pub mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionSettings::default(), ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) @@ -961,6 +963,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionSettings::default(), ) .build_v2_data(); @@ -1045,6 +1048,7 @@ pub mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionSettings::default(), ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 3562054db1..19221c4c9f 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -118,10 +118,11 @@ pub(crate) fn codec_from_str(codec: Option<&str>, level: Option) -> Codec { #[cfg(test)] mod tests { - use super::*; use apache_avro::{DeflateSettings, ZstandardSettings}; use miniz_oxide::deflate::CompressionLevel; + use super::*; + #[test] fn test_codec_from_str_gzip() { let codec = codec_from_str(Some("gzip"), Some(5)); @@ -168,7 +169,10 @@ mod tests { fn test_codec_from_str_gzip_default_level() { // Test that None level defaults to 9 for gzip let codec = codec_from_str(Some("gzip"), None); - assert_eq!(codec, Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression))); + assert_eq!( + codec, + Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression)) + ); } #[test] diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 51219bfdb7..e3679433eb 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -166,7 +166,9 @@ mod tests { use super::*; use crate::io::FileIOBuilder; - use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type}; + use crate::spec::{ + CompressionSettings, Literal, NestedField, PrimitiveType, Struct, Transform, Type, + }; #[tokio::test] async fn test_parse_manifest_v2_unpartition() { @@ -272,6 +274,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v2_data(); for entry in &entries { @@ -457,6 +460,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v2_data(); for entry in &entries { @@ -554,6 +558,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v1(); for entry in &entries { @@ -663,6 +668,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v1(); for entry in &entries { @@ -771,6 +777,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v2_data(); for entry in &entries { @@ -1050,6 +1057,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v2_data(); for entry in &entries { diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 5db54d27cb..db9853cc35 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -55,7 +55,7 @@ impl ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, - compression_settings : CompressionSettings + compression: CompressionSettings, ) -> Self { Self { output, @@ -63,7 +63,7 @@ impl ManifestWriterBuilder { key_metadata, schema, partition_spec, - compression_settings, + compression, } } @@ -706,6 +706,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionSettings::default(), ) .build_v2_data(); writer.add_entry(entries[0].clone()).unwrap(); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index f1096ccfaf..54dbc987e2 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -99,7 +99,6 @@ pub struct ManifestListWriter { sequence_number: i64, snapshot_id: i64, next_row_id: Option, - compression: CompressionSettings, } impl std::fmt::Debug for ManifestListWriter { @@ -119,7 +118,12 @@ impl ManifestListWriter { } /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option, compression_settings : CompressionSettings) -> Self { + pub fn v1( + output_file: OutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + compression_settings: CompressionSettings, + ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("format-version".to_string(), "1".to_string()), @@ -147,7 +151,7 @@ impl ManifestListWriter { snapshot_id: i64, parent_snapshot_id: Option, sequence_number: i64, - compression_settings : CompressionSettings + compression_settings: CompressionSettings, ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -178,7 +182,7 @@ impl ManifestListWriter { parent_snapshot_id: Option, sequence_number: i64, first_row_id: Option, // Always None for delete manifests - compression_settings : CompressionSettings + compression_settings: CompressionSettings, ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -239,7 +243,6 @@ impl ManifestListWriter { sequence_number, snapshot_id, next_row_id: first_row_id, - compression, } } @@ -1381,8 +1384,8 @@ mod test { use crate::io::FileIOBuilder; use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3}; use crate::spec::{ - Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter, - UNASSIGNED_SEQUENCE_NUMBER, + CompressionSettings, Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, + ManifestListWriter, UNASSIGNED_SEQUENCE_NUMBER, }; #[tokio::test] @@ -1420,6 +1423,7 @@ mod test { file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, Some(1646658105718557341), + CompressionSettings::default(), ); writer @@ -1493,6 +1497,7 @@ mod test { 1646658105718557341, Some(1646658105718557341), 1, + CompressionSettings::default(), ); writer @@ -1567,6 +1572,7 @@ mod test { Some(377075049360453639), 1, Some(10), + CompressionSettings::default(), ); writer @@ -1703,7 +1709,12 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1750,7 +1761,13 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); + let mut writer = ManifestListWriter::v2( + output_file, + snapshot_id, + Some(0), + seq_num, + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1798,8 +1815,14 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = - ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10)); + let mut writer = ManifestListWriter::v3( + output_file, + snapshot_id, + Some(0), + seq_num, + Some(10), + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1846,7 +1869,12 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1891,7 +1919,12 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1938,7 +1971,13 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); + let mut writer = ManifestListWriter::v2( + output_file, + snapshot_id, + Some(0), + seq_num, + CompressionSettings::default(), + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index cad82a95f6..df477c6f73 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -387,10 +387,12 @@ mod tests { let props = HashMap::from([("test-key".to_string(), "invalid".to_string())]); let result = parse_optional_property::(&props, "test-key"); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Invalid value for test-key")); + assert!( + result + .unwrap_err() + .to_string() + .contains("Invalid value for test-key") + ); } #[test] @@ -415,9 +417,11 @@ mod tests { )]); let result = TableProperties::try_from(&props); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Invalid value for write.avro.compression-level")); + assert!( + result + .unwrap_err() + .to_string() + .contains("Invalid value for write.avro.compression-level") + ); } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c1fde6232c..b0f4276338 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -220,8 +220,8 @@ impl<'a> SnapshotProducer<'a> { .default_partition_spec() .as_ref() .clone(), - ) - .with_compression(compression); + compression, + ); match self.table.metadata().format_version() { FormatVersion::V1 => Ok(builder.build_v1()), @@ -425,6 +425,7 @@ impl<'a> SnapshotProducer<'a> { .new_output(manifest_list_path.clone())?, self.snapshot_id, self.table.metadata().current_snapshot_id(), + compression.clone(), ), FormatVersion::V2 => ManifestListWriter::v2( self.table @@ -433,6 +434,7 @@ impl<'a> SnapshotProducer<'a> { self.snapshot_id, self.table.metadata().current_snapshot_id(), next_seq_num, + compression.clone(), ), FormatVersion::V3 => ManifestListWriter::v3( self.table @@ -442,9 +444,9 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_snapshot_id(), next_seq_num, Some(first_row_id), + compression, ), - } - .with_compression(compression); + }; // Calling self.summary() before self.manifest_file() is important because self.added_data_files // will be set to an empty vec after self.manifest_file() returns, resulting in an empty summary From 9d27116b75672544c3ee295123f9b63f693225bb Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 16 Nov 2025 00:52:36 +0000 Subject: [PATCH 07/15] no clone needed --- crates/iceberg/src/transaction/snapshot.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index b0f4276338..74dca8927b 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -425,7 +425,7 @@ impl<'a> SnapshotProducer<'a> { .new_output(manifest_list_path.clone())?, self.snapshot_id, self.table.metadata().current_snapshot_id(), - compression.clone(), + compression, ), FormatVersion::V2 => ManifestListWriter::v2( self.table @@ -434,7 +434,7 @@ impl<'a> SnapshotProducer<'a> { self.snapshot_id, self.table.metadata().current_snapshot_id(), next_seq_num, - compression.clone(), + compression, ), FormatVersion::V3 => ManifestListWriter::v3( self.table From d1ee0b26009625f06812de98741f50d279b2df25 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 16 Nov 2025 01:22:27 +0000 Subject: [PATCH 08/15] test compression works --- crates/iceberg/src/spec/manifest/writer.rs | 128 +++++++++++++++++++++ crates/iceberg/src/spec/manifest_list.rs | 92 +++++++++++++++ 2 files changed, 220 insertions(+) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index db9853cc35..ccea427a0b 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -726,4 +726,132 @@ mod tests { entries[0].file_sequence_number = None; assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } + + #[tokio::test] + async fn test_manifest_writer_with_compression() { + use std::fs; + + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, ManifestContentType, ManifestEntry, + ManifestMetadata, ManifestStatus, NestedField, PartitionSpec, PrimitiveType, Schema, + }; + + let metadata = { + let schema = Schema::builder() + .with_fields(vec![Arc::new(NestedField::required( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + ))]) + .build() + .unwrap(); + + ManifestMetadata { + schema_id: 0, + schema: Arc::new(schema), + partition_spec: PartitionSpec::unpartition_spec(), + format_version: FormatVersion::V2, + content: ManifestContentType::Data, + } + }; + + // Write uncompressed manifest with multiple entries to make compression effective + let tmp_dir = TempDir::new().unwrap(); + let uncompressed_path = tmp_dir.path().join("uncompressed_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(uncompressed_path.to_str().unwrap()).unwrap(); + let uncompressed_settings = CompressionSettings::new("uncompressed".to_string(), None); + let mut writer = ManifestWriterBuilder::new( + output_file, + Some(1), + None, + metadata.schema.clone(), + metadata.partition_spec.clone(), + uncompressed_settings, + ) + .build_v2_data(); + // Add multiple entries with long paths to create compressible data + for i in 0..1000 { + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "/very/long/path/to/data/directory/with/many/subdirectories/file_{}.parquet", + i + )) + .file_format(DataFileFormat::Parquet) + .partition(Struct::empty()) + .file_size_in_bytes(100000 + i) + .record_count(1000 + i) + .build() + .unwrap(); + + let entry = ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(1) + .sequence_number(1) + .file_sequence_number(1) + .data_file(data_file) + .build(); + writer.add_entry(entry).unwrap(); + } + writer.write_manifest_file().await.unwrap(); + let uncompressed_size = fs::metadata(&uncompressed_path).unwrap().len(); + + // Write compressed manifest with gzip + let compressed_path = tmp_dir.path().join("compressed_manifest.avro"); + let output_file = io.new_output(compressed_path.to_str().unwrap()).unwrap(); + let compression = CompressionSettings::new("gzip".to_string(), Some(9)); + let mut writer = ManifestWriterBuilder::new( + output_file, + Some(1), + None, + metadata.schema.clone(), + metadata.partition_spec.clone(), + compression, + ) + .build_v2_data(); + // Add the same entries with long paths as the uncompressed version + for i in 0..1000 { + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "/very/long/path/to/data/directory/with/many/subdirectories/file_{}.parquet", + i + )) + .file_format(DataFileFormat::Parquet) + .partition(Struct::empty()) + .file_size_in_bytes(100000 + i) + .record_count(1000 + i) + .build() + .unwrap(); + + let entry = ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(1) + .sequence_number(1) + .file_sequence_number(1) + .data_file(data_file) + .build(); + writer.add_entry(entry).unwrap(); + } + writer.write_manifest_file().await.unwrap(); + let compressed_size = fs::metadata(&compressed_path).unwrap().len(); + + // Verify compression is actually working + assert!( + compressed_size < uncompressed_size, + "Compressed size ({}) should be less than uncompressed size ({})", + compressed_size, + uncompressed_size + ); + + // Verify the compressed file can be read back correctly + let compressed_bytes = fs::read(&compressed_path).unwrap(); + let manifest = Manifest::parse_avro(&compressed_bytes).unwrap(); + assert_eq!(manifest.metadata.format_version, FormatVersion::V2); + assert_eq!(manifest.entries.len(), 1000); + } } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 54dbc987e2..a5be5c2198 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -2096,4 +2096,96 @@ mod test { assert_eq!(v2_manifest.partitions, None); assert_eq!(v2_manifest.key_metadata, None); } + + #[tokio::test] + async fn test_manifest_list_writer_with_compression() { + use std::fs; + + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + + // Create multiple manifest entries to make compression effective + let mut entries = Vec::new(); + for i in 0..100 { + entries.push(ManifestFile { + manifest_path: format!("/test/manifest{}.avro", i), + manifest_length: 1000 + i, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 1646658105718557341, + added_files_count: Some(10), + existing_files_count: Some(5), + deleted_files_count: Some(2), + added_rows_count: Some(100), + existing_rows_count: Some(50), + deleted_rows_count: Some(20), + partitions: None, + key_metadata: None, + first_row_id: None, + }); + } + let manifest_list = ManifestList { entries }; + + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let tmp_dir = TempDir::new().unwrap(); + + // Write uncompressed manifest list + let uncompressed_path = tmp_dir + .path() + .join("uncompressed_manifest_list.avro") + .to_str() + .unwrap() + .to_string(); + let mut writer = ManifestListWriter::v2( + file_io.new_output(&uncompressed_path).unwrap(), + 1646658105718557341, + Some(0), + 1, + CompressionSettings::new("uncompressed".to_string(), None), + ); + writer + .add_manifests(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + let uncompressed_size = fs::metadata(&uncompressed_path).unwrap().len(); + + // Write compressed manifest list with gzip + let compressed_path = tmp_dir + .path() + .join("compressed_manifest_list.avro") + .to_str() + .unwrap() + .to_string(); + let compression = CompressionSettings::new("gzip".to_string(), Some(9)); + let mut writer = ManifestListWriter::v2( + file_io.new_output(&compressed_path).unwrap(), + 1646658105718557341, + Some(0), + 1, + compression, + ); + writer + .add_manifests(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + let compressed_size = fs::metadata(&compressed_path).unwrap().len(); + + // Verify compression is actually working + assert!( + compressed_size < uncompressed_size, + "Compressed size ({}) should be less than uncompressed size ({})", + compressed_size, + uncompressed_size + ); + + // Verify the compressed file can be read back correctly + let compressed_bytes = fs::read(&compressed_path).unwrap(); + let parsed_manifest_list = + ManifestList::parse_with_version(&compressed_bytes, crate::spec::FormatVersion::V2) + .unwrap(); + assert_eq!(manifest_list, parsed_manifest_list); + } } From 393622f6412697158e9413155f184214d651dc33 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 17 Nov 2025 21:51:19 +0000 Subject: [PATCH 09/15] comments --- Cargo.toml | 2 +- crates/iceberg/src/spec/avro_util.rs | 24 ++++++++++----------- crates/iceberg/src/spec/manifest/writer.rs | 1 - crates/iceberg/src/spec/table_metadata.rs | 6 ++++-- crates/iceberg/src/spec/table_properties.rs | 2 +- crates/iceberg/src/transaction/snapshot.rs | 4 ++-- 6 files changed, 20 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c10c01d94a..349e33112d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ rust-version = "1.87" [workspace.dependencies] anyhow = "1.0.72" -apache-avro = { version = "0.20", features = ["zstandard"] } +apache-avro = { version = "0.20", features = ["zstandard", "snappy"] } array-init = "2" arrow-arith = "56.2" arrow-array = "56.2" diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 19221c4c9f..5c90c3b94a 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -23,7 +23,7 @@ use log::warn; /// Settings for compression codec and level. #[derive(Debug, Clone, PartialEq, Eq)] pub struct CompressionSettings { - /// The compression codec name (e.g., "gzip", "zstd", "deflate", "uncompressed") + /// The compression codec name (e.g., "gzip", "zstd", "snappy", "uncompressed") pub codec: String, /// The compression level (None uses codec-specific defaults: gzip=9, zstd=1) pub level: Option, @@ -56,8 +56,8 @@ impl Default for CompressionSettings { /// /// # Arguments /// -/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "deflate", "uncompressed") -/// * `level` - The compression level (None uses codec defaults: gzip=9, zstd=1). For deflate/gzip: +/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "snappy", "uncompressed") +/// * `level` - The compression level (None uses codec defaults: gzip=9, zstd=1). For gzip: /// - 0: NoCompression /// - 1: BestSpeed /// - 9: BestCompression @@ -66,8 +66,9 @@ impl Default for CompressionSettings { /// /// # Supported Codecs /// -/// - `gzip` or `deflate`: Uses Deflate compression with specified level (default: 9) +/// - `gzip`: Uses Deflate compression with specified level (default: 9) /// - `zstd`: Uses Zstandard compression (default: 1, level clamped to valid zstd range 0-22) +/// - `snappy`: Uses Snappy compression (level parameter ignored) /// - `uncompressed` or `None`: No compression /// - Any other value: Defaults to no compression (Codec::Null) /// @@ -82,8 +83,9 @@ impl Default for CompressionSettings { pub(crate) fn codec_from_str(codec: Option<&str>, level: Option) -> Codec { use apache_avro::{DeflateSettings, ZstandardSettings}; - match codec { - Some("gzip") | Some("deflate") => { + // Use case-insensitive comparison to match Java implementation + match codec.map(|s| s.to_lowercase()).as_deref() { + Some("gzip") => { // Map compression level to miniz_oxide::deflate::CompressionLevel // Reference: https://docs.rs/miniz_oxide/latest/miniz_oxide/deflate/enum.CompressionLevel.html // Default level for gzip/deflate is 9 (BestCompression) to match Java @@ -105,6 +107,7 @@ pub(crate) fn codec_from_str(codec: Option<&str>, level: Option) -> Codec { let zstd_level = level.unwrap_or(1).min(22); Codec::Zstandard(ZstandardSettings::new(zstd_level)) } + Some("snappy") => Codec::Snappy, Some("uncompressed") | None => Codec::Null, Some(unknown) => { warn!( @@ -133,12 +136,9 @@ mod tests { } #[test] - fn test_codec_from_str_deflate() { - let codec = codec_from_str(Some("deflate"), Some(9)); - assert_eq!( - codec, - Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression)) - ); + fn test_codec_from_str_snappy() { + let codec = codec_from_str(Some("snappy"), None); + assert_eq!(codec, Codec::Snappy); } #[test] diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index ccea427a0b..672e58407f 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -424,7 +424,6 @@ impl ManifestWriter { FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?, }; - // Determine compression codec using CompressionSettings let codec = self.compression.to_codec(); let mut avro_writer = AvroWriter::with_codec(&avro_schema, Vec::new(), codec); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 9407555f2c..451002d1f4 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -474,7 +474,9 @@ impl TableMetadata { .map(|s| s.as_str()) .unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT); - let (data_to_write, actual_location) = match codec { + // Use case-insensitive comparison to match Java implementation + let codec_lower = codec.to_lowercase(); + let (data_to_write, actual_location) = match codec_lower.as_str() { "gzip" => { let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); encoder.write_all(&json_data).map_err(|e| { @@ -500,7 +502,7 @@ impl TableMetadata { (compressed_data, new_location) } - "uncompressed" | "" => (json_data, metadata_location.as_ref().to_string()), + "none" | "" => (json_data, metadata_location.as_ref().to_string()), other => { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index df477c6f73..0a30bae119 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -166,7 +166,7 @@ impl TableProperties { /// Compression codec for metadata files (JSON) pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; /// Default metadata compression codec - uncompressed - pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "uncompressed"; + pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; /// Compression codec for Avro files (manifests, manifest lists) pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = "write.avro.compression-codec"; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 74dca8927b..5a3414ac02 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -206,7 +206,7 @@ impl<'a> SnapshotProducer<'a> { .with_source(e) })?; let compression = CompressionSettings::new( - table_props.avro_compression_codec.clone(), + table_props.avro_compression_codec, table_props.avro_compression_level, ); @@ -414,7 +414,7 @@ impl<'a> SnapshotProducer<'a> { .with_source(e) })?; let compression = CompressionSettings::new( - table_props.avro_compression_codec.clone(), + table_props.avro_compression_codec, table_props.avro_compression_level, ); From 46fdb8e779bfa2e23e856098a7faed1147a50f01 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 17 Nov 2025 21:57:27 +0000 Subject: [PATCH 10/15] update tests --- crates/iceberg/src/spec/avro_util.rs | 3 ++- crates/iceberg/src/spec/table_metadata.rs | 12 ++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 5c90c3b94a..c9f3cfffe0 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -128,7 +128,8 @@ mod tests { #[test] fn test_codec_from_str_gzip() { - let codec = codec_from_str(Some("gzip"), Some(5)); + // Test with mixed case to verify case-insensitive matching + let codec = codec_from_str(Some("GZip"), Some(5)); assert_eq!( codec, Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel)) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 451002d1f4..1d02c924e4 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -3643,11 +3643,11 @@ mod tests { // Get a test metadata and add gzip compression property let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); - // Modify properties to enable gzip compression + // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching) let mut props = original_metadata.properties.clone(); props.insert( TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), - "gzip".to_string(), + "GziP".to_string(), ); // Use builder to create new metadata with updated properties let compressed_metadata = @@ -3683,14 +3683,6 @@ mod tests { .await .unwrap(); - // Verify the properties include the compression codec - assert_eq!( - read_metadata - .properties - .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC), - Some(&"gzip".to_string()) - ); - // Verify the complete round-trip: read metadata should match what we wrote assert_eq!(read_metadata, compressed_metadata); } From d50cb7ddf21a20a139407bca30de6cd5ca01d3a1 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 19 Nov 2025 19:06:53 +0000 Subject: [PATCH 11/15] address comments --- crates/iceberg/src/spec/avro_util.rs | 3 +- crates/iceberg/src/spec/table_metadata.rs | 109 +------------------- crates/iceberg/src/spec/table_properties.rs | 43 +++----- 3 files changed, 18 insertions(+), 137 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index c9f3cfffe0..8aaca29fd6 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -20,6 +20,8 @@ use apache_avro::Codec; use log::warn; +use crate::spec::TableProperties; + /// Settings for compression codec and level. #[derive(Debug, Clone, PartialEq, Eq)] pub struct CompressionSettings { @@ -43,7 +45,6 @@ impl CompressionSettings { impl Default for CompressionSettings { fn default() -> Self { - use crate::spec::TableProperties; Self { codec: TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), level: None, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 1d02c924e4..06b32cc847 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -461,59 +461,9 @@ impl TableMetadata { file_io: &FileIO, metadata_location: impl AsRef, ) -> Result<()> { - use std::io::Write as _; - - use flate2::write::GzEncoder; - - let json_data = serde_json::to_vec(self)?; - - // Check if compression is enabled via table properties - let codec = self - .properties - .get(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) - .map(|s| s.as_str()) - .unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT); - - // Use case-insensitive comparison to match Java implementation - let codec_lower = codec.to_lowercase(); - let (data_to_write, actual_location) = match codec_lower.as_str() { - "gzip" => { - let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); - encoder.write_all(&json_data).map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Failed to compress metadata with gzip", - ) - .with_source(e) - })?; - let compressed_data = encoder.finish().map_err(|e| { - Error::new(ErrorKind::DataInvalid, "Failed to finish gzip compression") - .with_source(e) - })?; - - // Modify filename to add .gz before .metadata.json - let location = metadata_location.as_ref(); - let new_location = if location.ends_with(".metadata.json") { - location.replace(".metadata.json", ".gz.metadata.json") - } else { - // If it doesn't end with expected pattern, just append .gz - format!("{}.gz", location) - }; - - (compressed_data, new_location) - } - "none" | "" => (json_data, metadata_location.as_ref().to_string()), - other => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported metadata compression codec: {}", other), - )); - } - }; - file_io - .new_output(actual_location)? - .write(data_to_write.into()) + .new_output(metadata_location)? + .write(serde_json::to_vec(self)?.into()) .await } @@ -1606,7 +1556,7 @@ mod tests { BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, TableProperties, Transform, Type, UnboundPartitionField, + Summary, Transform, Type, UnboundPartitionField, }; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { @@ -3634,59 +3584,6 @@ mod tests { assert!(result.is_err()); } - #[tokio::test] - async fn test_table_metadata_write_with_gzip_compression() { - let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path().to_str().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - - // Get a test metadata and add gzip compression property - let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); - - // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching) - let mut props = original_metadata.properties.clone(); - props.insert( - TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), - "GziP".to_string(), - ); - // Use builder to create new metadata with updated properties - let compressed_metadata = - TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None) - .assign_uuid(original_metadata.table_uuid) - .set_properties(props.clone()) - .unwrap() - .build() - .unwrap() - .metadata; - - // Write the metadata with compression - note the location will be modified to add .gz - let metadata_location = format!("{temp_path}/00001-test.metadata.json"); - compressed_metadata - .write_to(&file_io, &metadata_location) - .await - .unwrap(); - - // The actual file should be written with .gz.metadata.json extension - let expected_compressed_location = format!("{temp_path}/00001-test.gz.metadata.json"); - - // Verify the compressed file exists - assert!(std::path::Path::new(&expected_compressed_location).exists()); - - // Read the raw file and check it's gzip compressed - let raw_content = std::fs::read(&expected_compressed_location).unwrap(); - assert!(raw_content.len() > 2); - assert_eq!(raw_content[0], 0x1F); // gzip magic number - assert_eq!(raw_content[1], 0x8B); // gzip magic number - - // Read the metadata back using the compressed location - let read_metadata = TableMetadata::read_from(&file_io, &expected_compressed_location) - .await - .unwrap(); - - // Verify the complete round-trip: read metadata should match what we wrote - assert_eq!(read_metadata, compressed_metadata); - } - #[test] fn test_partition_name_exists() { let schema = Schema::builder() diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 0a30bae119..a77c7f5aab 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -16,39 +16,43 @@ // under the License. use std::collections::HashMap; +use std::fmt::Display; +use std::str::FromStr; + +use crate::error::{Error, ErrorKind}; // Helper function to parse a property from a HashMap // If the property is not found, use the default value -fn parse_property( +fn parse_property( properties: &HashMap, key: &str, default: T, -) -> Result +) -> crate::error::Result where - ::Err: std::fmt::Display, + ::Err: Display, { properties.get(key).map_or(Ok(default), |value| { value .parse::() - .map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}")) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("Invalid value for {key}: {e}"))) }) } // Helper function to parse an optional property from a HashMap // If the property is not found, returns None -fn parse_optional_property( +fn parse_optional_property( properties: &HashMap, key: &str, -) -> Result, anyhow::Error> +) -> crate::error::Result> where - ::Err: std::fmt::Display, + ::Err: Display, { properties .get(key) .map(|value| { value .parse::() - .map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}")) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("Invalid value for {key}: {e}"))) }) .transpose() } @@ -68,8 +72,6 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, - /// Compression codec for metadata files (JSON) - pub metadata_compression_codec: String, /// Compression codec for Avro files (manifests, manifest lists) pub avro_compression_codec: String, /// Compression level for Avro files (None uses codec-specific defaults: gzip=9, zstd=1) @@ -163,11 +165,6 @@ impl TableProperties { /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB - /// Compression codec for metadata files (JSON) - pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; - /// Default metadata compression codec - uncompressed - pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; - /// Compression codec for Avro files (manifests, manifest lists) pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = "write.avro.compression-codec"; /// Default Avro compression codec - gzip @@ -181,7 +178,7 @@ impl TableProperties { impl TryFrom<&HashMap> for TableProperties { // parse by entry key or use default value - type Error = anyhow::Error; + type Error = crate::Error; fn try_from(props: &HashMap) -> Result { Ok(TableProperties { @@ -215,11 +212,6 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, - metadata_compression_codec: parse_property( - props, - TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC, - TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT.to_string(), - )?, avro_compression_codec: parse_property( props, TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC, @@ -262,10 +254,6 @@ mod tests { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ); // Test compression defaults - assert_eq!( - table_properties.metadata_compression_codec, - TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT.to_string() - ); assert_eq!( table_properties.avro_compression_codec, TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string() @@ -279,10 +267,6 @@ mod tests { #[test] fn test_table_properties_compression() { let props = HashMap::from([ - ( - TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), - "gzip".to_string(), - ), ( TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC.to_string(), "zstd".to_string(), @@ -293,7 +277,6 @@ mod tests { ), ]); let table_properties = TableProperties::try_from(&props).unwrap(); - assert_eq!(table_properties.metadata_compression_codec, "gzip"); assert_eq!(table_properties.avro_compression_codec, "zstd"); assert_eq!(table_properties.avro_compression_level, Some(3)); } From 5370f77594a62094923acd345573b6b37f46a2fd Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 19 Nov 2025 21:53:17 +0000 Subject: [PATCH 12/15] remove parse optional property --- crates/iceberg/src/spec/table_properties.rs | 59 +++++---------------- 1 file changed, 12 insertions(+), 47 deletions(-) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index a77c7f5aab..ae36645312 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -38,25 +38,6 @@ where }) } -// Helper function to parse an optional property from a HashMap -// If the property is not found, returns None -fn parse_optional_property( - properties: &HashMap, - key: &str, -) -> crate::error::Result> -where - ::Err: Display, -{ - properties - .get(key) - .map(|value| { - value - .parse::() - .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("Invalid value for {key}: {e}"))) - }) - .transpose() -} - /// TableProperties that contains the properties of a table. #[derive(Debug)] pub struct TableProperties { @@ -217,10 +198,18 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC, TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(), )?, - avro_compression_level: parse_optional_property( - props, - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL, - )?, + avro_compression_level: { + let level = parse_property( + props, + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL, + 255u8, + )?; + if level == 255 { + None + } else { + Some(level) + } + }, }) } } @@ -354,30 +343,6 @@ mod tests { )); } - #[test] - fn test_parse_optional_property() { - // Test when key is not present - should return None - let props = HashMap::new(); - let result: Option = parse_optional_property(&props, "missing-key").unwrap(); - assert_eq!(result, None); - - // Test when key is present with valid value - should return Some(value) - let props = HashMap::from([("test-key".to_string(), "42".to_string())]); - let result: Option = parse_optional_property(&props, "test-key").unwrap(); - assert_eq!(result, Some(42)); - - // Test when key is present with invalid value - should return error - let props = HashMap::from([("test-key".to_string(), "invalid".to_string())]); - let result = parse_optional_property::(&props, "test-key"); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("Invalid value for test-key") - ); - } - #[test] fn test_table_properties_optional_compression_level() { // Test that compression level is None when not specified From 6b3d8ed0a3865a3304e2817de5864c78c4417c0d Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 19 Nov 2025 22:01:16 +0000 Subject: [PATCH 13/15] fmt --- crates/iceberg/src/spec/table_properties.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index ae36645312..01a6b772f3 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -32,9 +32,12 @@ where ::Err: Display, { properties.get(key).map_or(Ok(default), |value| { - value - .parse::() - .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("Invalid value for {key}: {e}"))) + value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for {key}: {e}"), + ) + }) }) } @@ -204,11 +207,7 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL, 255u8, )?; - if level == 255 { - None - } else { - Some(level) - } + if level == 255 { None } else { Some(level) } }, }) } From 5ec090f6a4b94fe7b897faa7a53e47c1dcfc8e8d Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 21 Nov 2025 18:56:17 +0000 Subject: [PATCH 14/15] wip --- crates/iceberg/src/spec/table_properties.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 01a6b772f3..2007fb2a97 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -156,8 +156,6 @@ impl TableProperties { /// Compression level for Avro files pub const PROPERTY_AVRO_COMPRESSION_LEVEL: &str = "write.avro.compression-level"; - /// Default Avro compression level (None, uses codec-specific defaults: gzip=9, zstd=1) - pub const PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT: Option = None; } impl TryFrom<&HashMap> for TableProperties { @@ -246,10 +244,7 @@ mod tests { table_properties.avro_compression_codec, TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string() ); - assert_eq!( - table_properties.avro_compression_level, - TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL_DEFAULT - ); + assert_eq!(table_properties.avro_compression_level, None); } #[test] From 4337a3483b229e9352d2709e36d275a240613f99 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 21 Nov 2025 19:06:07 +0000 Subject: [PATCH 15/15] address comments --- crates/iceberg/src/spec/avro_util.rs | 22 ++++++++------------- crates/iceberg/src/spec/table_properties.rs | 2 +- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs index 8aaca29fd6..5e0c863a83 100644 --- a/crates/iceberg/src/spec/avro_util.rs +++ b/crates/iceberg/src/spec/avro_util.rs @@ -27,7 +27,7 @@ use crate::spec::TableProperties; pub struct CompressionSettings { /// The compression codec name (e.g., "gzip", "zstd", "snappy", "uncompressed") pub codec: String, - /// The compression level (None uses codec-specific defaults: gzip=9, zstd=1) + /// The compression level (None uses codec-specific defaults) pub level: Option, } @@ -58,29 +58,23 @@ impl Default for CompressionSettings { /// # Arguments /// /// * `codec` - The name of the compression codec (e.g., "gzip", "zstd", "snappy", "uncompressed") -/// * `level` - The compression level (None uses codec defaults: gzip=9, zstd=1). For gzip: +/// * `level` - The compression level. For gzip/deflate: /// - 0: NoCompression /// - 1: BestSpeed /// - 9: BestCompression /// - 10: UberCompression -/// - Other values: DefaultLevel (6) +/// - 6: DefaultLevel (balanced speed/compression) +/// - Other values: DefaultLevel +/// For zstd, level is clamped to valid range (0-22). +/// When `None`, uses codec-specific defaults. /// /// # Supported Codecs /// -/// - `gzip`: Uses Deflate compression with specified level (default: 9) -/// - `zstd`: Uses Zstandard compression (default: 1, level clamped to valid zstd range 0-22) +/// - `gzip`: Uses Deflate compression with specified level +/// - `zstd`: Uses Zstandard compression (level clamped to valid zstd range 0-22) /// - `snappy`: Uses Snappy compression (level parameter ignored) /// - `uncompressed` or `None`: No compression /// - Any other value: Defaults to no compression (Codec::Null) -/// -/// # Compression Levels -/// -/// The compression level mapping is based on miniz_oxide's CompressionLevel enum: -/// - Level 0: No compression -/// - Level 1: Best speed (fastest) -/// - Level 9: Best compression (slower, better compression) -/// - Level 10: Uber compression (slowest, best compression) -/// - Other: Default level (balanced speed/compression) pub(crate) fn codec_from_str(codec: Option<&str>, level: Option) -> Codec { use apache_avro::{DeflateSettings, ZstandardSettings}; diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 2007fb2a97..fb5d990122 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -58,7 +58,7 @@ pub struct TableProperties { pub write_target_file_size_bytes: usize, /// Compression codec for Avro files (manifests, manifest lists) pub avro_compression_codec: String, - /// Compression level for Avro files (None uses codec-specific defaults: gzip=9, zstd=1) + /// Compression level for Avro files (None uses codec-specific defaults) pub avro_compression_level: Option, }