From 3ac86792f4ca090bf5894a42f0459092e5e90d98 Mon Sep 17 00:00:00 2001 From: Minh Vu <38443830+fallintoplace@users.noreply.github.com> Date: Fri, 22 May 2026 22:03:50 +0200 Subject: [PATCH 1/2] fix parquet proto writer version decode --- .../proto/src/logical_plan/file_formats.rs | 290 +++++++++++++----- 1 file changed, 208 insertions(+), 82 deletions(-) diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d5af7be485f26..299e242b62e16 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use super::LogicalExtensionCodec; -use crate::convert::FromProto; +use crate::convert::{FromProto, TryFromProto}; use crate::protobuf::{ CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto, JsonOptions as JsonOptionsProto, @@ -500,81 +500,144 @@ mod parquet { } } - impl FromProto<&ParquetOptionsProto> for ParquetOptions { - fn from_proto(proto: &ParquetOptionsProto) -> Self { - ParquetOptions { - enable_page_index: proto.enable_page_index, - pruning: proto.pruning, - skip_metadata: proto.skip_metadata, - metadata_size_hint: proto.metadata_size_hint_opt.as_ref().map(|opt| match opt { - parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize, - }), - pushdown_filters: proto.pushdown_filters, - reorder_filters: proto.reorder_filters, - force_filter_selections: proto.force_filter_selections, - data_pagesize_limit: proto.data_pagesize_limit as usize, - write_batch_size: proto.write_batch_size as usize, - // TODO: Consider changing to TryFrom to avoid panic on invalid proto data - writer_version: proto.writer_version.parse().expect(" - Invalid parquet writer version in proto, expected '1.0' or '2.0' - "), - compression: proto.compression_opt.as_ref().map(|opt| match opt { - parquet_options::CompressionOpt::Compression(compression) => compression.clone(), - }), - dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| match opt { - parquet_options::DictionaryEnabledOpt::DictionaryEnabled(enabled) => *enabled, - }), - dictionary_page_size_limit: proto.dictionary_page_size_limit as usize, - statistics_enabled: proto.statistics_enabled_opt.as_ref().map(|opt| match opt { - parquet_options::StatisticsEnabledOpt::StatisticsEnabled(statistics) => statistics.clone(), - }), - max_row_group_size: proto.max_row_group_size as usize, - created_by: proto.created_by.clone(), - column_index_truncate_length: proto.column_index_truncate_length_opt.as_ref().map(|opt| match opt { - parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize, - }), - statistics_truncate_length: proto.statistics_truncate_length_opt.as_ref().map(|opt| match opt { - parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize, - }), - data_page_row_count_limit: proto.data_page_row_count_limit as usize, - encoding: proto.encoding_opt.as_ref().map(|opt| match opt { - parquet_options::EncodingOpt::Encoding(encoding) => encoding.clone(), - }), - bloom_filter_on_read: proto.bloom_filter_on_read, - bloom_filter_on_write: proto.bloom_filter_on_write, - bloom_filter_fpp: proto.bloom_filter_fpp_opt.as_ref().map(|opt| match opt { - parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp, - }), - bloom_filter_ndv: proto.bloom_filter_ndv_opt.as_ref().map(|opt| match opt { - parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv, - }), - allow_single_file_parallelism: proto.allow_single_file_parallelism, - maximum_parallel_row_group_writers: proto.maximum_parallel_row_group_writers as usize, - maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, - schema_force_view_types: proto.schema_force_view_types, - binary_as_string: proto.binary_as_string, - skip_arrow_metadata: proto.skip_arrow_metadata, - coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { - parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), - }), - coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt { - parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(), - }), - max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { - parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, - }), - use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { - let defaults = CdcOptions::default(); - CdcOptions { - // proto3 uses 0 as the wire default for uint64; a zero chunk size is - // invalid, so treat it as "field not set" and fall back to the default. - min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, - max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, - // norm_level = 0 is a valid value (and the default), so pass it through directly. - norm_level: cdc.norm_level, - } - }), - } + impl TryFromProto<&ParquetOptionsProto> for ParquetOptions { + type Error = datafusion_common::DataFusionError; + + fn try_from_proto( + proto: &ParquetOptionsProto, + ) -> datafusion_common::Result { + let default_options = ParquetOptions::default(); + let writer_version = if proto.writer_version.is_empty() { + default_options.writer_version + } else { + proto.writer_version.parse()? + }; + + Ok(ParquetOptions { + enable_page_index: proto.enable_page_index, + pruning: proto.pruning, + skip_metadata: proto.skip_metadata, + metadata_size_hint: proto + .metadata_size_hint_opt + .as_ref() + .map(|opt| match opt { + parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => { + *size as usize + } + }), + pushdown_filters: proto.pushdown_filters, + reorder_filters: proto.reorder_filters, + force_filter_selections: proto.force_filter_selections, + data_pagesize_limit: proto.data_pagesize_limit as usize, + write_batch_size: proto.write_batch_size as usize, + writer_version, + compression: proto.compression_opt.as_ref().map(|opt| match opt { + parquet_options::CompressionOpt::Compression(compression) => { + compression.clone() + } + }), + dictionary_enabled: proto.dictionary_enabled_opt.as_ref().map(|opt| { + match opt { + parquet_options::DictionaryEnabledOpt::DictionaryEnabled( + enabled, + ) => *enabled, + } + }), + dictionary_page_size_limit: proto.dictionary_page_size_limit as usize, + statistics_enabled: proto.statistics_enabled_opt.as_ref().map( + |opt| match opt { + parquet_options::StatisticsEnabledOpt::StatisticsEnabled( + statistics, + ) => statistics.clone(), + }, + ), + max_row_group_size: proto.max_row_group_size as usize, + created_by: proto.created_by.clone(), + column_index_truncate_length: proto + .column_index_truncate_length_opt + .as_ref() + .map(|opt| match opt { + parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(length) => *length as usize, + }), + statistics_truncate_length: proto + .statistics_truncate_length_opt + .as_ref() + .map(|opt| match opt { + parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(length) => *length as usize, + }), + data_page_row_count_limit: proto.data_page_row_count_limit as usize, + encoding: proto.encoding_opt.as_ref().map(|opt| match opt { + parquet_options::EncodingOpt::Encoding(encoding) => { + encoding.clone() + } + }), + bloom_filter_on_read: proto.bloom_filter_on_read, + bloom_filter_on_write: proto.bloom_filter_on_write, + bloom_filter_fpp: proto + .bloom_filter_fpp_opt + .as_ref() + .map(|opt| match opt { + parquet_options::BloomFilterFppOpt::BloomFilterFpp(fpp) => *fpp, + }), + bloom_filter_ndv: proto + .bloom_filter_ndv_opt + .as_ref() + .map(|opt| match opt { + parquet_options::BloomFilterNdvOpt::BloomFilterNdv(ndv) => *ndv, + }), + allow_single_file_parallelism: proto.allow_single_file_parallelism, + maximum_parallel_row_group_writers: proto + .maximum_parallel_row_group_writers + as usize, + maximum_buffered_record_batches_per_stream: proto + .maximum_buffered_record_batches_per_stream + as usize, + schema_force_view_types: proto.schema_force_view_types, + binary_as_string: proto.binary_as_string, + skip_arrow_metadata: proto.skip_arrow_metadata, + coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { + parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => { + coerce_int96.clone() + } + }), + coerce_int96_tz: proto + .coerce_int96_tz_opt + .as_ref() + .map(|opt| match opt { + parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => { + tz.clone() + } + }), + max_predicate_cache_size: proto + .max_predicate_cache_size_opt + .as_ref() + .map(|opt| match opt { + parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize( + size, + ) => *size as usize, + }), + use_content_defined_chunking: proto.content_defined_chunking.map( + |cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. + min_chunk_size: if cdc.min_chunk_size != 0 { + cdc.min_chunk_size as usize + } else { + defaults.min_chunk_size + }, + max_chunk_size: if cdc.max_chunk_size != 0 { + cdc.max_chunk_size as usize + } else { + defaults.max_chunk_size + }, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, + } + }, + ), + }) } } @@ -606,13 +669,18 @@ mod parquet { } } - impl FromProto<&TableParquetOptionsProto> for TableParquetOptions { - fn from_proto(proto: &TableParquetOptionsProto) -> Self { - TableParquetOptions { + impl TryFromProto<&TableParquetOptionsProto> for TableParquetOptions { + type Error = datafusion_common::DataFusionError; + + fn try_from_proto( + proto: &TableParquetOptionsProto, + ) -> datafusion_common::Result { + Ok(TableParquetOptions { global: proto .global .as_ref() - .map(ParquetOptions::from_proto) + .map(ParquetOptions::try_from_proto) + .transpose()? .unwrap_or_default(), column_specific_options: proto .column_specific_options @@ -635,7 +703,7 @@ mod parquet { .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), ..Default::default() - } + }) } } @@ -689,7 +757,7 @@ mod parquet { let proto = TableParquetOptionsProto::decode(buf).map_err(|e| { exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}") })?; - let options = TableParquetOptions::from_proto(&proto); + let options = TableParquetOptions::try_from_proto(&proto)?; Ok(Arc::new( datafusion_datasource_parquet::file_format::ParquetFormatFactory { options: Some(options), @@ -723,6 +791,64 @@ mod parquet { Ok(()) } } + + #[cfg(test)] + mod tests { + use super::*; + + fn encode_table_options(proto: TableParquetOptionsProto) -> Vec { + let mut buf = Vec::new(); + proto.encode(&mut buf).expect("encode parquet options"); + buf + } + + #[test] + fn try_decode_file_format_errors_on_invalid_writer_version() { + let proto = TableParquetOptionsProto { + global: Some(ParquetOptionsProto { + writer_version: "3.0".to_string(), + ..Default::default() + }), + ..Default::default() + }; + + let result = ParquetLogicalExtensionCodec.try_decode_file_format( + &encode_table_options(proto), + &TaskContext::default(), + ); + + let err = result.err().expect("invalid writer version should error"); + assert!( + err.to_string() + .contains("Invalid parquet writer version: 3.0"), + "{err}" + ); + } + + #[test] + fn try_decode_file_format_defaults_empty_writer_version() { + let proto = TableParquetOptionsProto { + global: Some(ParquetOptionsProto::default()), + ..Default::default() + }; + + let factory = ParquetLogicalExtensionCodec + .try_decode_file_format( + &encode_table_options(proto), + &TaskContext::default(), + ) + .expect("decode parquet options"); + let parquet_factory = factory + .downcast_ref::() + .expect("parquet format factory"); + let options = parquet_factory.options.as_ref().expect("parquet options"); + + assert_eq!( + options.global.writer_version, + ParquetOptions::default().writer_version + ); + } + } } #[cfg(feature = "parquet")] pub use parquet::ParquetLogicalExtensionCodec; From 6ee499e8be790e8ecc140c1544c6235f4f42912d Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 25 May 2026 13:19:10 +0200 Subject: [PATCH 2/2] Clarify parquet writer version proto defaults --- datafusion/proto/src/logical_plan/file_formats.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 299e242b62e16..de54745155479 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -506,11 +506,12 @@ mod parquet { fn try_from_proto( proto: &ParquetOptionsProto, ) -> datafusion_common::Result { - let default_options = ParquetOptions::default(); - let writer_version = if proto.writer_version.is_empty() { - default_options.writer_version - } else { - proto.writer_version.parse()? + let writer_version = match proto.writer_version.as_str() { + // Proto3 decodes an omitted string field as the empty string. The + // schema documents writer_version's logical default as "1.0", so + // preserve that default when the field is absent on the wire. + "" => ParquetOptions::default().writer_version, + version => version.parse()?, }; Ok(ParquetOptions { @@ -817,7 +818,7 @@ mod parquet { &TaskContext::default(), ); - let err = result.err().expect("invalid writer version should error"); + let err = result.expect_err("invalid writer version should error"); assert!( err.to_string() .contains("Invalid parquet writer version: 3.0"),