From c26d33609506eebb00aefc1ddf1dc2161cef45e5 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Mon, 15 Sep 2025 21:25:42 +0100 Subject: [PATCH] proto: don't include parquet feature by default --- datafusion/proto/Cargo.toml | 4 +- .../proto/src/logical_plan/file_formats.rs | 277 ++++++++++-------- 2 files changed, 150 insertions(+), 131 deletions(-) diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 9dc433f7efcc..9f35f4ff5f85 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -46,7 +46,7 @@ avro = ["datafusion/avro", "datafusion-common/avro"] [dependencies] arrow = { workspace = true } chrono = { workspace = true } -datafusion = { workspace = true, default-features = false, features = ["parquet", "nested_expressions"] } +datafusion = { workspace = true, default-features = false } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } datafusion-proto-common = { workspace = true } @@ -57,7 +57,7 @@ serde = { version = "1.0", optional = true } serde_json = { workspace = true, optional = true } [dev-dependencies] -datafusion = { workspace = true, default-features = false, features = ["sql"] } +datafusion = { workspace = true, default-features = false, features = ["sql", "nested_expressions"] } datafusion-functions = { workspace = true, default-features = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-window-common = { workspace = true } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 492795855cf6..d648a609e633 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -18,13 +18,10 @@ use std::sync::Arc; use datafusion::{ - config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, - TableParquetOptions, - }, + config::{CsvOptions, JsonOptions}, datasource::file_format::{ arrow::ArrowFormatFactory, csv::CsvFormatFactory, json::JsonFormatFactory, - parquet::ParquetFormatFactory, FileFormatFactory, + FileFormatFactory, }, prelude::SessionContext, }; @@ -34,12 +31,7 @@ use datafusion_common::{ }; use prost::Message; -use crate::protobuf::{ - parquet_column_options, parquet_options, CsvOptions as CsvOptionsProto, - JsonOptions as JsonOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, - ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, - TableParquetOptions as TableParquetOptionsProto, -}; +use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto}; use super::LogicalExtensionCodec; @@ -355,16 +347,32 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { } } -impl TableParquetOptionsProto { - fn from_factory(factory: &ParquetFormatFactory) -> Self { - let global_options = if let Some(ref options) = factory.options { - options.clone() - } else { - return TableParquetOptionsProto::default(); - }; +#[cfg(feature = "parquet")] +mod parquet { + use super::*; + + use crate::protobuf::{ + parquet_column_options, parquet_options, + ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, + ParquetOptions as ParquetOptionsProto, + TableParquetOptions as TableParquetOptionsProto, + }; + + use datafusion::{ + config::{ParquetColumnOptions, ParquetOptions, TableParquetOptions}, + datasource::file_format::parquet::ParquetFormatFactory, + }; + + impl TableParquetOptionsProto { + fn from_factory(factory: &ParquetFormatFactory) -> Self { + let global_options = if let Some(ref options) = factory.options { + options.clone() + } else { + return TableParquetOptionsProto::default(); + }; - let column_specific_options = global_options.column_specific_options; - #[allow(deprecated)] // max_statistics_size + let column_specific_options = global_options.column_specific_options; + #[allow(deprecated)] // max_statistics_size TableParquetOptionsProto { global: Some(ParquetOptionsProto { enable_page_index: global_options.global.enable_page_index, @@ -453,12 +461,12 @@ impl TableParquetOptionsProto { }) .collect(), } + } } -} -impl From<&ParquetOptionsProto> for ParquetOptions { - fn from(proto: &ParquetOptionsProto) -> Self { - #[allow(deprecated)] // max_statistics_size + impl From<&ParquetOptionsProto> for ParquetOptions { + fn from(proto: &ParquetOptionsProto) -> Self { + #[allow(deprecated)] // max_statistics_size ParquetOptions { enable_page_index: proto.enable_page_index, pruning: proto.pruning, @@ -511,12 +519,12 @@ impl From<&ParquetOptionsProto> for ParquetOptions { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), } + } } -} -impl From for ParquetColumnOptions { - fn from(proto: ParquetColumnOptionsProto) -> Self { - #[allow(deprecated)] // max_statistics_size + impl From for ParquetColumnOptions { + fn from(proto: ParquetColumnOptionsProto) -> Self { + #[allow(deprecated)] // max_statistics_size ParquetColumnOptions { bloom_filter_enabled: proto.bloom_filter_enabled_opt.map( |parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v)| v, @@ -540,124 +548,135 @@ impl From for ParquetColumnOptions { .bloom_filter_ndv_opt .map(|parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v)| v), } + } } -} -impl From<&TableParquetOptionsProto> for TableParquetOptions { - fn from(proto: &TableParquetOptionsProto) -> Self { - TableParquetOptions { - global: proto - .global - .as_ref() - .map(ParquetOptions::from) - .unwrap_or_default(), - column_specific_options: proto - .column_specific_options - .iter() - .map(|parquet_column_options| { - ( - parquet_column_options.column_name.clone(), - ParquetColumnOptions::from( - parquet_column_options.options.clone().unwrap_or_default(), - ), - ) - }) - .collect(), - key_value_metadata: proto - .key_value_metadata - .iter() - .map(|(k, v)| (k.clone(), Some(v.clone()))) - .collect(), - crypto: Default::default(), + impl From<&TableParquetOptionsProto> for TableParquetOptions { + fn from(proto: &TableParquetOptionsProto) -> Self { + TableParquetOptions { + global: proto + .global + .as_ref() + .map(ParquetOptions::from) + .unwrap_or_default(), + column_specific_options: proto + .column_specific_options + .iter() + .map(|parquet_column_options| { + ( + parquet_column_options.column_name.clone(), + ParquetColumnOptions::from( + parquet_column_options + .options + .clone() + .unwrap_or_default(), + ), + ) + }) + .collect(), + key_value_metadata: proto + .key_value_metadata + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))) + .collect(), + crypto: Default::default(), + } } } -} -#[derive(Debug)] -pub struct ParquetLogicalExtensionCodec; + #[derive(Debug)] + pub struct ParquetLogicalExtensionCodec; -// TODO! This is a placeholder for now and needs to be implemented for real. -impl LogicalExtensionCodec for ParquetLogicalExtensionCodec { - fn try_decode( - &self, - _buf: &[u8], - _inputs: &[datafusion_expr::LogicalPlan], - _ctx: &SessionContext, - ) -> datafusion_common::Result { - not_impl_err!("Method not implemented") - } + // TODO! This is a placeholder for now and needs to be implemented for real. + impl LogicalExtensionCodec for ParquetLogicalExtensionCodec { + fn try_decode( + &self, + _buf: &[u8], + _inputs: &[datafusion_expr::LogicalPlan], + _ctx: &SessionContext, + ) -> datafusion_common::Result { + not_impl_err!("Method not implemented") + } - fn try_encode( - &self, - _node: &datafusion_expr::Extension, - _buf: &mut Vec, - ) -> datafusion_common::Result<()> { - not_impl_err!("Method not implemented") - } + fn try_encode( + &self, + _node: &datafusion_expr::Extension, + _buf: &mut Vec, + ) -> datafusion_common::Result<()> { + not_impl_err!("Method not implemented") + } - fn try_decode_table_provider( - &self, - _buf: &[u8], - _table_ref: &TableReference, - _schema: arrow::datatypes::SchemaRef, - _ctx: &SessionContext, - ) -> datafusion_common::Result> { - not_impl_err!("Method not implemented") - } + fn try_decode_table_provider( + &self, + _buf: &[u8], + _table_ref: &TableReference, + _schema: arrow::datatypes::SchemaRef, + _ctx: &SessionContext, + ) -> datafusion_common::Result> + { + not_impl_err!("Method not implemented") + } - fn try_encode_table_provider( - &self, - _table_ref: &TableReference, - _node: Arc, - _buf: &mut Vec, - ) -> datafusion_common::Result<()> { - not_impl_err!("Method not implemented") - } + fn try_encode_table_provider( + &self, + _table_ref: &TableReference, + _node: Arc, + _buf: &mut Vec, + ) -> datafusion_common::Result<()> { + not_impl_err!("Method not implemented") + } - fn try_decode_file_format( - &self, - buf: &[u8], - _ctx: &SessionContext, - ) -> datafusion_common::Result> { - let proto = TableParquetOptionsProto::decode(buf).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to decode TableParquetOptionsProto: {e:?}" + fn try_decode_file_format( + &self, + buf: &[u8], + _ctx: &SessionContext, + ) -> datafusion_common::Result> { + let proto = TableParquetOptionsProto::decode(buf).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to decode TableParquetOptionsProto: {e:?}" + )) + })?; + let options: TableParquetOptions = (&proto).into(); + Ok(Arc::new( + datafusion::datasource::file_format::parquet::ParquetFormatFactory { + options: Some(options), + }, )) - })?; - let options: TableParquetOptions = (&proto).into(); - Ok(Arc::new(ParquetFormatFactory { - options: Some(options), - })) - } + } - fn try_encode_file_format( - &self, - buf: &mut Vec, - node: Arc, - ) -> datafusion_common::Result<()> { - let options = if let Some(parquet_factory) = - node.as_any().downcast_ref::() - { - parquet_factory.options.clone().unwrap_or_default() - } else { - return Err(DataFusionError::Execution( - "Unsupported FileFormatFactory type".to_string(), - )); - }; + fn try_encode_file_format( + &self, + buf: &mut Vec, + node: Arc, + ) -> datafusion_common::Result<()> { + use datafusion::datasource::file_format::parquet::ParquetFormatFactory; + + let options = if let Some(parquet_factory) = + node.as_any().downcast_ref::() + { + parquet_factory.options.clone().unwrap_or_default() + } else { + return Err(DataFusionError::Execution( + "Unsupported FileFormatFactory type".to_string(), + )); + }; - let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory { - options: Some(options), - }); + let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory { + options: Some(options), + }); - proto.encode(buf).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to encode TableParquetOptionsProto: {e:?}" - )) - })?; + proto.encode(buf).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to encode TableParquetOptionsProto: {e:?}" + )) + })?; - Ok(()) + Ok(()) + } } } +#[cfg(feature = "parquet")] +pub use parquet::ParquetLogicalExtensionCodec; #[derive(Debug)] pub struct ArrowLogicalExtensionCodec;