diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 4514f2d7ab..7e302db79e 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -471,7 +471,8 @@ impl Catalog for GlueCatalog { .build()? .metadata; let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + MetadataLocation::new_with_properties(location.clone(), metadata.properties()) + .to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index 457471b34a..086acb1f1f 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -306,8 +306,9 @@ mod tests { fn test_convert_to_glue_table() -> Result<()> { let table_name = "my_table".to_string(); let location = "s3a://warehouse/hive".to_string(); - let metadata_location = MetadataLocation::new_with_table_location(location).to_string(); let properties = HashMap::new(); + let metadata_location = + MetadataLocation::new_with_properties(location, &properties).to_string(); let schema = Schema::builder() .with_schema_id(1) .with_fields(vec![ diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index b7d192210b..59cef3bf17 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -443,7 +443,8 @@ impl Catalog for HmsCatalog { .metadata; let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + MetadataLocation::new_with_properties(location.clone(), metadata.properties()) + .to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 096e792f61..241a9c137b 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -343,9 +343,9 @@ mod tests { let db_name = "my_db".to_string(); let table_name = "my_table".to_string(); let location = "s3a://warehouse/hms".to_string(); - let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); let properties = HashMap::new(); + let metadata_location = + MetadataLocation::new_with_properties(location.clone(), &properties).to_string(); let schema = Schema::builder() .with_schema_id(1) .with_fields(vec![ diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 3606fac99a..815997f7b0 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -450,7 +450,7 @@ impl Catalog for S3TablesCatalog { // prepare metadata location. the warehouse location is generated by s3tables catalog, // which looks like: s3://e6c9bf20-991a-46fb-kni5xs1q2yxi3xxdyxzjzigdeop1quse2b--table-s3 - let metadata_location = match &creation.location { + let table_location = match &creation.location { Some(_) => { return Err(Error::new( ErrorKind::DataInvalid, @@ -467,16 +467,18 @@ impl Catalog for S3TablesCatalog { .send() .await .map_err(from_aws_sdk_error)?; - let warehouse_location = get_resp.warehouse_location().to_string(); - MetadataLocation::new_with_table_location(warehouse_location).to_string() + get_resp.warehouse_location().to_string() } }; // write metadata to file - creation.location = Some(metadata_location.clone()); + creation.location = Some(table_location.clone()); let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; + let metadata_location = + MetadataLocation::new_with_properties(table_location, metadata.properties()) + .to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; // update metadata location diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 77b35a228f..8006533a90 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -826,7 +826,8 @@ impl Catalog for SqlCatalog { .build()? .metadata; let tbl_metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + MetadataLocation::new_with_properties(location.clone(), tbl_metadata.properties()) + .to_string(); tbl_metadata .write_to(&self.fileio, &tbl_metadata_location) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index cfa3dc6b52..91e95de8a7 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -275,7 +275,8 @@ impl Catalog for MemoryCatalog { let metadata = TableMetadataBuilder::from_table_creation(table_creation)? .build()? .metadata; - let metadata_location = MetadataLocation::new_with_table_location(location).to_string(); + let metadata_location = + MetadataLocation::new_with_properties(location, metadata.properties()).to_string(); metadata.write_to(&self.file_io, &metadata_location).await?; diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index 3705ee42dc..29386944eb 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -15,38 +15,94 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; use uuid::Uuid; +use crate::spec::TableProperties; use crate::{Error, ErrorKind, Result}; +/// The file extension suffix for gzip compressed metadata files +const GZIP_SUFFIX: &str = ".gz"; + /// Helper for parsing a location of the format: `/metadata/-.metadata.json` +/// or with compression: `/metadata/-.gz.metadata.json` #[derive(Clone, Debug, PartialEq)] pub struct MetadataLocation { table_location: String, version: i32, id: Uuid, + compression_suffix: Option, } impl MetadataLocation { + /// Determines the compression suffix from table properties. + fn compression_suffix_from_properties(properties: &HashMap) -> Option { + properties + .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) + .and_then(|codec| match codec.to_lowercase().as_str() { + "gzip" => Some(GZIP_SUFFIX.to_string()), + "none" | "" => None, + _ => None, + }) + } + /// Creates a completely new metadata location starting at version 0. /// Only used for creating a new table. For updates, see `with_next_version`. + #[deprecated( + since = "0.8.0", + note = "Use new_with_properties instead to properly handle compression settings" + )] pub fn new_with_table_location(table_location: impl ToString) -> Self { Self { table_location: table_location.to_string(), version: 0, id: Uuid::new_v4(), + compression_suffix: None, + } + } + + /// Creates a completely new metadata location starting at version 0, + /// with compression settings from the table's properties. + /// Only used for creating a new table. For updates, see `with_next_version`. + pub fn new_with_properties( + table_location: impl ToString, + properties: &HashMap, + ) -> Self { + Self { + table_location: table_location.to_string(), + version: 0, + id: Uuid::new_v4(), + compression_suffix: Self::compression_suffix_from_properties(properties), } } /// Creates a new metadata location for an updated metadata file. + /// Preserves the compression settings from the current location. + #[deprecated( + since = "0.8.0", + note = "Use with_next_version_and_properties instead to properly handle compression settings changes" + )] pub fn with_next_version(&self) -> Self { Self { table_location: self.table_location.clone(), version: self.version + 1, id: Uuid::new_v4(), + compression_suffix: self.compression_suffix.clone(), + } + } + + /// Creates a new metadata location for an updated metadata file. + /// Takes table properties to determine compression settings, which may have changed + /// from the previous version. + pub fn with_next_version_and_properties(&self, properties: &HashMap) -> Self { + Self { + table_location: self.table_location.clone(), + version: self.version + 1, + id: Uuid::new_v4(), + compression_suffix: Self::compression_suffix_from_properties(properties), } } @@ -59,30 +115,41 @@ impl MetadataLocation { Ok(prefix.to_string()) } - /// Parses a file name of the format `-.metadata.json`. - fn parse_file_name(file_name: &str) -> Result<(i32, Uuid)> { - let (version, id) = file_name - .strip_suffix(".metadata.json") - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file ending: {file_name}"), - ))? - .split_once('-') - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file name format: {file_name}"), - ))?; - - Ok((version.parse::()?, Uuid::parse_str(id)?)) + /// Parses a file name of the format `-.metadata.json` + /// or with compression: `-.gz.metadata.json`. + fn parse_file_name(file_name: &str) -> Result<(i32, Uuid, Option)> { + let stripped = file_name.strip_suffix(".metadata.json").ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file ending: {file_name}"), + ))?; + + // Check for compression suffix (e.g., .gz) + let (stripped, compression_suffix) = if let Some(s) = stripped.strip_suffix(GZIP_SUFFIX) { + (s, Some(GZIP_SUFFIX.to_string())) + } else { + (stripped, None) + }; + + let (version, id) = stripped.split_once('-').ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file name format: {file_name}"), + ))?; + + Ok(( + version.parse::()?, + Uuid::parse_str(id)?, + compression_suffix, + )) } } impl Display for MetadataLocation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let suffix = self.compression_suffix.as_deref().unwrap_or(""); write!( f, - "{}/metadata/{:0>5}-{}.metadata.json", - self.table_location, self.version, self.id + "{}/metadata/{:0>5}-{}{}.metadata.json", + self.table_location, self.version, self.id, suffix ) } } @@ -97,18 +164,20 @@ impl FromStr for MetadataLocation { ))?; let prefix = Self::parse_metadata_path_prefix(path)?; - let (version, id) = Self::parse_file_name(file_name)?; + let (version, id, compression_suffix) = Self::parse_file_name(file_name)?; Ok(MetadataLocation { table_location: prefix, version, id, + compression_suffix, }) } } #[cfg(test)] mod test { + use std::collections::HashMap; use std::str::FromStr; use uuid::Uuid; @@ -125,6 +194,7 @@ mod test { table_location: "".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: None, }), ), // Some prefix @@ -134,6 +204,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: None, }), ), // Longer prefix @@ -143,6 +214,7 @@ mod test { table_location: "/abc/def".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: None, }), ), // Prefix with special characters @@ -152,6 +224,7 @@ mod test { table_location: "https://127.0.0.1".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: None, }), ), // Another id @@ -161,6 +234,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("81056704-ce5b-41c4-bb83-eb6408081af6").unwrap(), + compression_suffix: None, }), ), // Version 0 @@ -170,6 +244,17 @@ mod test { table_location: "/abc".to_string(), version: 0, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: None, + }), + ), + // With gzip compression + ( + "/abc/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.gz.metadata.json", + Ok(MetadataLocation { + table_location: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_suffix: Some(".gz".to_string()), }), ), // Negative version @@ -215,6 +300,7 @@ mod test { } #[test] + #[allow(deprecated)] fn test_metadata_location_with_next_version() { let test_cases = vec![ MetadataLocation::new_with_table_location("/abc"), @@ -233,4 +319,91 @@ mod test { assert_ne!(next.id, input.id); } } + + #[test] + fn test_metadata_location_new_with_properties() { + // Test with no compression + let props_none = HashMap::new(); + let location = MetadataLocation::new_with_properties("/test/table", &props_none); + assert_eq!(location.table_location, "/test/table"); + assert_eq!(location.version, 0); + assert_eq!(location.compression_suffix, None); + assert_eq!( + location.to_string(), + format!("/test/table/metadata/00000-{}.metadata.json", location.id) + ); + + // Test with gzip compression + let mut props_gzip = HashMap::new(); + props_gzip.insert( + "write.metadata.compression-codec".to_string(), + "gzip".to_string(), + ); + let location = MetadataLocation::new_with_properties("/test/table", &props_gzip); + assert_eq!(location.compression_suffix, Some(".gz".to_string())); + assert_eq!( + location.to_string(), + format!( + "/test/table/metadata/00000-{}.gz.metadata.json", + location.id + ) + ); + + // Test with "none" codec (explicitly no compression) + let mut props_explicit_none = HashMap::new(); + props_explicit_none.insert( + "write.metadata.compression-codec".to_string(), + "none".to_string(), + ); + let location = MetadataLocation::new_with_properties("/test/table", &props_explicit_none); + assert_eq!(location.compression_suffix, None); + + // Test case insensitivity + let mut props_gzip_upper = HashMap::new(); + props_gzip_upper.insert( + "write.metadata.compression-codec".to_string(), + "GZIP".to_string(), + ); + let location = MetadataLocation::new_with_properties("/test/table", &props_gzip_upper); + assert_eq!(location.compression_suffix, Some(".gz".to_string())); + } + + #[test] + fn test_with_next_version_and_properties() { + // Start with a location without compression + let props_none = HashMap::new(); + let location = MetadataLocation::new_with_properties("/test/table", &props_none); + assert_eq!(location.compression_suffix, None); + assert_eq!(location.version, 0); + + // Update to next version with gzip compression + let mut props_gzip = HashMap::new(); + props_gzip.insert( + "write.metadata.compression-codec".to_string(), + "gzip".to_string(), + ); + let next_location = location.with_next_version_and_properties(&props_gzip); + assert_eq!(next_location.compression_suffix, Some(".gz".to_string())); + assert_eq!(next_location.version, 1); + assert_eq!( + next_location.to_string(), + format!( + "/test/table/metadata/00001-{}.gz.metadata.json", + next_location.id + ) + ); + + // Update to next version without compression (changed from gzip) + let props_none_again = HashMap::new(); + let final_location = next_location.with_next_version_and_properties(&props_none_again); + assert_eq!(final_location.compression_suffix, None); + assert_eq!(final_location.version, 2); + assert_eq!( + final_location.to_string(), + format!( + "/test/table/metadata/00002-{}.metadata.json", + final_location.id + ) + ); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 27d5edaedb..070a11dfb2 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -353,13 +353,16 @@ impl TableCommit { metadata_builder = update.apply(metadata_builder)?; } - // Bump the version of metadata + // Build the new metadata + let new_metadata = metadata_builder.build()?.metadata; + + // Bump the version of metadata, using properties from the new metadata let new_metadata_location = MetadataLocation::from_str(current_metadata_location)? - .with_next_version() + .with_next_version_and_properties(new_metadata.properties()) .to_string(); Ok(table - .with_metadata(Arc::new(metadata_builder.build()?.metadata)) + .with_metadata(Arc::new(new_metadata)) .with_metadata_location(new_metadata_location)) } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 06b32cc847..06925e3dc3 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,12 +22,13 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::io::Read as _; +use std::io::{Read as _, Write as _}; use std::sync::Arc; use _serde::TableMetadataEnum; use chrono::{DateTime, Utc}; use flate2::read::GzDecoder; +use flate2::write::GzEncoder; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; @@ -37,6 +38,7 @@ pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB use super::{ DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, + TableProperties, }; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; @@ -461,9 +463,58 @@ impl TableMetadata { file_io: &FileIO, metadata_location: impl AsRef, ) -> Result<()> { + let json_data = serde_json::to_vec(self)?; + + // Check if compression is enabled via table properties + let codec = self + .properties + .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) + .map(|s| s.as_str()); + + // Use case-insensitive comparison to match Java implementation + let (data_to_write, actual_location) = match codec.map(|s| s.to_lowercase()).as_deref() { + Some("gzip") => { + let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(&json_data).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to compress metadata with gzip", + ) + .with_source(e) + })?; + let compressed_data = encoder.finish().map_err(|e| { + Error::new(ErrorKind::DataInvalid, "Failed to finish gzip compression") + .with_source(e) + })?; + + // Modify filename to add .gz before .metadata.json + let location = metadata_location.as_ref(); + let new_location = if location.ends_with(".gz.metadata.json") { + // File already has the correct compressed naming convention + // This check can be removed after the deprecated method for naming is removed, + // but provides safety that compressed files have the correct naming convention. + location.to_string() + } else if location.ends_with(".metadata.json") { + location.replace(".metadata.json", ".gz.metadata.json") + } else { + // Location doesn't end with expected pattern, use as-is + location.to_string() + }; + + (compressed_data, new_location) + } + None | Some("none") | Some("") => (json_data, metadata_location.as_ref().to_string()), + Some(other) => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported metadata compression codec: {}", other), + )); + } + }; + file_io - .new_output(metadata_location)? - .write(serde_json::to_vec(self)?.into()) + .new_output(actual_location)? + .write(data_to_write.into()) .await } @@ -1556,7 +1607,7 @@ mod tests { BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, Transform, Type, UnboundPartitionField, + Summary, TableProperties, Transform, Type, UnboundPartitionField, }; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { @@ -3584,6 +3635,59 @@ mod tests { assert!(result.is_err()); } + #[tokio::test] + async fn test_table_metadata_write_with_gzip_compression() { + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_str().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + // Get a test metadata and add gzip compression property + let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); + + // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching) + let mut props = original_metadata.properties.clone(); + props.insert( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GziP".to_string(), + ); + // Use builder to create new metadata with updated properties + let compressed_metadata = + TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None) + .assign_uuid(original_metadata.table_uuid) + .set_properties(props.clone()) + .unwrap() + .build() + .unwrap() + .metadata; + + // Write the metadata with compression - note the location will be modified to add .gz + let metadata_location = format!("{temp_path}/00001-test.metadata.json"); + compressed_metadata + .write_to(&file_io, &metadata_location) + .await + .unwrap(); + + // The actual file should be written with .gz.metadata.json extension + let expected_compressed_location = format!("{temp_path}/00001-test.gz.metadata.json"); + + // Verify the compressed file exists + assert!(std::path::Path::new(&expected_compressed_location).exists()); + + // Read the raw file and check it's gzip compressed + let raw_content = std::fs::read(&expected_compressed_location).unwrap(); + assert!(raw_content.len() > 2); + assert_eq!(raw_content[0], 0x1F); // gzip magic number + assert_eq!(raw_content[1], 0x8B); // gzip magic number + + // Read the metadata back using the compressed location + let read_metadata = TableMetadata::read_from(&file_io, &expected_compressed_location) + .await + .unwrap(); + + // Verify the complete round-trip: read metadata should match what we wrote + assert_eq!(read_metadata, compressed_metadata); + } + #[test] fn test_partition_name_exists() { let schema = Schema::builder() diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 4975456010..c9258f2076 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -16,21 +16,28 @@ // under the License. use std::collections::HashMap; +use std::fmt::Display; +use std::str::FromStr; + +use crate::error::{Error, ErrorKind, Result}; // Helper function to parse a property from a HashMap // If the property is not found, use the default value -fn parse_property( +fn parse_property( properties: &HashMap, key: &str, default: T, -) -> Result +) -> Result where - ::Err: std::fmt::Display, + ::Err: Display, { properties.get(key).map_or(Ok(default), |value| { - value - .parse::() - .map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}")) + value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for {key}: {e}"), + ) + }) }) } @@ -49,6 +56,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Compression codec for metadata files (JSON), None means no compression + pub metadata_compression_codec: Option, } impl TableProperties { @@ -137,13 +146,18 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + + /// Compression codec for metadata files (JSON) + pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; + /// Default metadata compression codec - uncompressed + pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; } impl TryFrom<&HashMap> for TableProperties { // parse by entry key or use default value - type Error = anyhow::Error; + type Error = Error; - fn try_from(props: &HashMap) -> Result { + fn try_from(props: &HashMap) -> Result { Ok(TableProperties { commit_num_retries: parse_property( props, @@ -175,6 +189,12 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + metadata_compression_codec: props + .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) + .and_then(|v| match v.to_lowercase().as_str() { + "none" | "" => None, + codec => Some(codec.to_string()), + }), }) } } @@ -207,6 +227,64 @@ mod tests { table_properties.write_target_file_size_bytes, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ); + // Test compression defaults (none means None) + assert_eq!(table_properties.metadata_compression_codec, None); + } + + #[test] + fn test_table_properties_compression() { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + )]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + Some("gzip".to_string()) + ); + } + + #[test] + fn test_table_properties_compression_none() { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "none".to_string(), + )]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!(table_properties.metadata_compression_codec, None); + } + + #[test] + fn test_table_properties_compression_case_insensitive() { + // Test uppercase + let props_upper = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GZIP".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_upper).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + Some("gzip".to_string()) + ); + + // Test mixed case + let props_mixed = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GzIp".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_mixed).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + Some("gzip".to_string()) + ); + + // Test "NONE" should also be case-insensitive + let props_none_upper = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "NONE".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_none_upper).unwrap(); + assert_eq!(table_properties.metadata_compression_codec, None); } #[test]