Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions crates/iceberg/src/spec/table_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

use std::collections::HashMap;

use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};

// Helper function to parse a property from a HashMap
// If the property is not found, use the default value
fn parse_property<T: std::str::FromStr>(
Expand Down Expand Up @@ -47,6 +49,14 @@ pub struct TableProperties {
pub commit_total_retry_timeout_ms: u64,
/// The default format for files.
pub write_format_default: String,
/// The default compression codec for Parquet data files.
pub write_parquet_compression_codec: String,
/// The default compression level for Parquet data files.
pub write_parquet_compression_level: Option<String>,
/// The default compression codec for Parquet delete files.
pub write_delete_parquet_compression_codec: String,
/// The default compression level for Parquet delete files.
pub write_delete_parquet_compression_level: Option<String>,
/// The target file size for files.
pub write_target_file_size_bytes: usize,
/// Whether to use `FanoutWriter` for partitioned tables.
Expand Down Expand Up @@ -134,6 +144,18 @@ impl TableProperties {
pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
/// Default value for data file format
pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
/// Default compression codec for Parquet data files.
pub const PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC: &str = "write.parquet.compression-codec";
/// Default compression codec for Parquet delete files.
pub const PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC: &str =
"write.delete.parquet.compression-codec";
/// Default value for Parquet data file compression.
pub const PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT: &str = "zstd";
/// Default compression level for Parquet data files.
pub const PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL: &str = "write.parquet.compression-level";
/// Default compression level for Parquet delete files.
pub const PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL: &str =
"write.delete.parquet.compression-level";

/// Target file size for newly written files.
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
Expand All @@ -144,13 +166,86 @@ impl TableProperties {
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
/// Default value for fanout writer enabled
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;

/// Returns the Parquet compression codec requested by table properties.
///
/// Unknown or unparsable values intentionally fall back to ZSTD so table
/// writes remain possible even when catalogs provide unexpected strings.
pub fn write_parquet_compression(&self) -> Compression {
parse_parquet_compression(
&self.write_parquet_compression_codec,
self.write_parquet_compression_level.as_deref(),
)
.unwrap_or(Self::default_parquet_compression())
}

/// Returns the Parquet compression codec requested for delete files.
///
/// Mirrors Java's write-conf behavior: delete compression inherits data
/// compression unless `write.delete.parquet.compression-codec` is set.
pub fn write_delete_parquet_compression(&self) -> Compression {
parse_parquet_compression(
&self.write_delete_parquet_compression_codec,
self.write_delete_parquet_compression_level.as_deref(),
)
.unwrap_or(Self::default_parquet_compression())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

fn default_parquet_compression() -> Compression {
Compression::ZSTD(ZstdLevel::default())
}
}

fn parse_parquet_compression(value: &str, level: Option<&str>) -> Option<Compression> {
let normalized = value.trim().to_ascii_lowercase().replace('-', "_");
let level = level.map(str::parse::<u32>).transpose().ok()?;

match normalized.as_str() {
"none" | "uncompressed" => Some(Compression::UNCOMPRESSED),
"snappy" => Some(Compression::SNAPPY),
"gzip" => Some(Compression::GZIP(match level {
Some(level) => GzipLevel::try_new(level).ok()?,
None => GzipLevel::default(),
})),
"lzo" => Some(Compression::LZO),
"brotli" => Some(Compression::BROTLI(match level {
Some(level) => BrotliLevel::try_new(level).ok()?,
None => BrotliLevel::default(),
})),
"lz4" => Some(Compression::LZ4),
"lz4_raw" => Some(Compression::LZ4_RAW),
"zstd" => Some(Compression::ZSTD(match level {
Some(level) => ZstdLevel::try_new(level as i32).ok()?,
None => ZstdLevel::default(),
})),
_ if level.is_none() => value.parse::<Compression>().ok(),
_ => None,
}
}

impl TryFrom<&HashMap<String, String>> for TableProperties {
// parse by entry key or use default value
type Error = anyhow::Error;

fn try_from(props: &HashMap<String, String>) -> Result<Self, Self::Error> {
let write_parquet_compression_codec = parse_property(
props,
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC,
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT.to_string(),
)?;
let write_parquet_compression_level = props
.get(TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL)
.cloned();
let write_delete_parquet_compression_codec = parse_property(
props,
TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC,
write_parquet_compression_codec.clone(),
)?;
let write_delete_parquet_compression_level = props
.get(TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL)
.cloned()
.or_else(|| write_parquet_compression_level.clone());

Ok(TableProperties {
commit_num_retries: parse_property(
props,
Expand All @@ -177,6 +272,10 @@ impl TryFrom<&HashMap<String, String>> for TableProperties {
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
)?,
write_parquet_compression_codec,
write_parquet_compression_level,
write_delete_parquet_compression_codec,
write_delete_parquet_compression_level,
write_target_file_size_bytes: parse_property(
props,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
Expand Down Expand Up @@ -215,6 +314,18 @@ mod tests {
table_properties.write_format_default,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
);
assert_eq!(
table_properties.write_parquet_compression_codec,
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC_DEFAULT.to_string()
);
assert_eq!(
table_properties.write_parquet_compression(),
Compression::ZSTD(ZstdLevel::default())
);
assert_eq!(
table_properties.write_delete_parquet_compression(),
Compression::ZSTD(ZstdLevel::default())
);
assert_eq!(
table_properties.write_target_file_size_bytes,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
Expand All @@ -236,6 +347,10 @@ mod tests {
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
"avro".to_string(),
),
(
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(),
"snappy".to_string(),
),
(
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
"512".to_string(),
Expand All @@ -245,9 +360,80 @@ mod tests {
assert_eq!(table_properties.commit_num_retries, 10);
assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
assert_eq!(table_properties.write_format_default, "avro".to_string());
assert_eq!(
table_properties.write_parquet_compression(),
Compression::SNAPPY
);
assert_eq!(
table_properties.write_delete_parquet_compression(),
Compression::SNAPPY
);
assert_eq!(table_properties.write_target_file_size_bytes, 512);
}

#[test]
fn test_table_properties_parquet_compression_fallback() {
let invalid_compression = HashMap::from([(
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(),
"not-a-codec".to_string(),
)]);
let table_properties = TableProperties::try_from(&invalid_compression).unwrap();
assert_eq!(
table_properties.write_parquet_compression(),
Compression::ZSTD(ZstdLevel::default())
);
}

#[test]
fn test_table_properties_parquet_compression_level() {
let props = HashMap::from([
(
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(),
"zstd".to_string(),
),
(
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_LEVEL.to_string(),
"5".to_string(),
),
]);
let table_properties = TableProperties::try_from(&props).unwrap();
assert_eq!(
table_properties.write_parquet_compression(),
Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
);
assert_eq!(
table_properties.write_delete_parquet_compression(),
Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
);
}

#[test]
fn test_table_properties_delete_parquet_compression_overrides_data_compression() {
let props = HashMap::from([
(
TableProperties::PROPERTY_WRITE_PARQUET_COMPRESSION_CODEC.to_string(),
"gzip".to_string(),
),
(
TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_CODEC.to_string(),
"zstd".to_string(),
),
(
TableProperties::PROPERTY_WRITE_DELETE_PARQUET_COMPRESSION_LEVEL.to_string(),
"3".to_string(),
),
]);
let table_properties = TableProperties::try_from(&props).unwrap();
assert_eq!(
table_properties.write_parquet_compression(),
Compression::GZIP(GzipLevel::default())
);
assert_eq!(
table_properties.write_delete_parquet_compression(),
Compression::ZSTD(ZstdLevel::try_new(3).unwrap())
);
}

#[test]
fn test_table_properties_invalid() {
let invalid_retries = HashMap::from([(
Expand Down
Loading