diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index f77cbc8fd680..0f347b704edb 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -215,7 +215,7 @@ async fn exec_scan( enable_page_index, } = scan_options; - let mut config_options = ConfigOptions::new(); + let config_options = Arc::new(ConfigOptions::new()); config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); @@ -233,7 +233,7 @@ async fn exec_scan( projection: None, limit: None, table_partition_cols: vec![], - config_options: config_options.into_shareable(), + config_options, }; let df_schema = schema.clone().to_dfschema()?; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index b2f2bf181787..71257d7b9647 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -431,7 +431,8 @@ async fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - let format = ParquetFormat::default().with_enable_pruning(true); + let format = ParquetFormat::new(ctx.config.config_options()) + .with_enable_pruning(true); (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 88cc6f1c23be..16463185fab2 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -67,11 +67,12 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let ctx = SessionContext::new(); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let listing_options = ListingOptions::new(format); let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; - let ctx = SessionContext::new(); let schema = listing_options .infer_schema(&ctx.state(), &table_path) .await diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 6004ce67df29..8bf2ea70891f 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { let testdata = datafusion::test_util::parquet_test_data(); // Configure listing options - let file_format = ParquetFormat::default().with_enable_pruning(true); + let file_format = ParquetFormat::new(ctx.config_options()); let listing_options = ListingOptions { file_extension: FileType::PARQUET.get_ext(), format: Arc::new(file_format), diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index eca399e27093..189cf29688de 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -24,8 +24,6 @@ use std::{ sync::{Arc, Weak}, }; -use parking_lot::RwLock; - use arrow::{ array::{StringBuilder, UInt64Builder}, datatypes::{DataType, Field, Schema}, @@ -53,7 +51,7 @@ const DF_SETTINGS: &str = "df_settings"; /// schema that can introspect on tables in the catalog_list pub(crate) struct CatalogWithInformationSchema { catalog_list: Weak, - config_options: Weak>, + config_options: Weak, /// wrapped provider inner: Arc, } @@ -61,7 +59,7 @@ pub(crate) struct CatalogWithInformationSchema { impl CatalogWithInformationSchema { pub(crate) fn new( catalog_list: Weak, - config_options: Weak>, + config_options: Weak, inner: Arc, ) -> Self { Self { @@ -118,7 +116,7 @@ impl CatalogProvider for CatalogWithInformationSchema { /// table is queried. struct InformationSchemaProvider { catalog_list: Arc, - config_options: Arc>, + config_options: Arc, } impl InformationSchemaProvider { @@ -230,7 +228,7 @@ impl InformationSchemaProvider { fn make_df_settings(&self) -> Arc { let mut builder = InformationSchemaDfSettingsBuilder::new(); - for (name, setting) in self.config_options.read().options() { + for (name, setting) in self.config_options.options().read().iter() { builder.add_setting(name, setting.to_string()); } diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index e058f5c72bd4..7a0eb2e6d864 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -24,7 +24,6 @@ use log::warn; use parking_lot::RwLock; use std::collections::HashMap; use std::env; -use std::sync::Arc; /// Configuration option "datafusion.optimizer.filter_null_join_keys" pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys"; @@ -60,6 +59,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str = pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str = "datafusion.execution.parquet.enable_page_index"; +/// Configuration option "datafusion.execution.parquet.pruning" +pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning"; + +/// Configuration option "datafusion.execution.parquet.skip_metadata" +pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata"; + +/// Configuration option "datafusion.execution.parquet.metadata_size_hint" +pub const OPT_PARQUET_METADATA_SIZE_HINT: &str = + "datafusion.execution.parquet.metadata_size_hint"; + /// Configuration option "datafusion.optimizer.skip_failed_rules" pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = "datafusion.optimizer.skip_failed_rules"; @@ -237,6 +246,29 @@ impl BuiltInConfigs { to reduce the number of rows decoded.", false, ), + ConfigDefinition::new_bool( + OPT_PARQUET_ENABLE_PRUNING, + "If true, the parquet reader attempts to skip entire row groups based \ + on the predicate in the query and the metadata (min/max values) stored in \ + the parquet file.", + true, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_SKIP_METADATA, + "If true, the parquet reader skip the optional embedded metadata that may be in \ + the file Schema. This setting can help avoid schema conflicts when querying \ + multiple parquet files with schemas containing compatible types but different metadata.", + true, + ), + ConfigDefinition::new( + OPT_PARQUET_METADATA_SIZE_HINT, + "If specified, the parquet reader will try and fetch the last `size_hint` \ + bytes of the parquet file optimistically. If not specified, two read are required: \ + One read to fetch the 8-byte parquet footer and \ + another to fetch the metadata length encoded in the footer.", + DataType::Boolean, + ScalarValue::Boolean(None), + ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES, "When set to true, the logical plan optimizer will produce warning \ @@ -273,10 +305,26 @@ impl BuiltInConfigs { } } -/// Configuration options struct. This can contain values for built-in and custom options -#[derive(Debug, Clone)] +/// Configuration options for DataFusion. +/// +/// `ConfigOptions` can contain values for built-in options as well as +/// custom options, and can be configured programatically or from the +/// environment (see [`ConfigOptons::from_env()`]). +/// +/// Example: +/// ``` +/// # use std::sync::Arc; +/// # use datafusion::config::{ConfigOptions, OPT_BATCH_SIZE}; +/// let config_options = Arc::new(ConfigOptions::new()); +/// +/// // set default batch size to 1000 +/// config_options.set_u64(OPT_BATCH_SIZE, 1000); +/// assert_eq!(config_options.get_u64(OPT_BATCH_SIZE), Some(1000)); +/// ``` +/// +#[derive(Debug)] pub struct ConfigOptions { - options: HashMap, + options: RwLock>, } impl Default for ConfigOptions { @@ -293,12 +341,10 @@ impl ConfigOptions { for config_def in &built_in.config_definitions { options.insert(config_def.key.clone(), config_def.default_value.clone()); } - Self { options } - } - /// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc - pub fn into_shareable(self) -> Arc> { - Arc::new(RwLock::new(self)) + Self { + options: RwLock::new(options), + } } /// Create new ConfigOptions struct, taking values from @@ -329,27 +375,29 @@ impl ConfigOptions { }; options.insert(config_def.key.clone(), config_value); } - Self { options } + Self { + options: RwLock::new(options), + } } /// set a configuration option - pub fn set(&mut self, key: &str, value: ScalarValue) { - self.options.insert(key.to_string(), value); + pub fn set(&self, key: &str, value: ScalarValue) { + self.options.write().insert(key.to_string(), value); } /// set a boolean configuration option - pub fn set_bool(&mut self, key: &str, value: bool) { + pub fn set_bool(&self, key: &str, value: bool) { self.set(key, ScalarValue::Boolean(Some(value))) } /// set a `u64` configuration option - pub fn set_u64(&mut self, key: &str, value: u64) { + pub fn set_u64(&self, key: &str, value: u64) { self.set(key, ScalarValue::UInt64(Some(value))) } /// get a configuration option pub fn get(&self, key: &str) -> Option { - self.options.get(key).cloned() + self.options.read().get(key).cloned() } /// get a boolean configuration option @@ -367,14 +415,14 @@ impl ConfigOptions { get_conf_value!(self, Utf8, key, "string") } - /// Access the underlying hashmap - pub fn options(&self) -> &HashMap { + /// Get a reference to the underlying hashmap + pub fn options(&self) -> &RwLock> { &self.options } /// Tests if the key exists in the configuration pub fn exists(&self, key: &str) -> bool { - self.options().contains_key(key) + self.options.read().contains_key(key) } } @@ -398,7 +446,7 @@ mod test { #[test] fn get_then_set() { - let mut config = ConfigOptions::new(); + let config = ConfigOptions::new(); let config_key = "datafusion.optimizer.filter_null_join_keys"; assert!(!config.get_bool(config_key).unwrap_or_default()); config.set_bool(config_key, true); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 6775117e2c5b..9e8a0f79849c 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -123,7 +123,7 @@ pub(crate) mod test_util { projection, limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, &[], ) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07819bdf52cb..01484b4dd925 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -39,6 +39,10 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; +use crate::config::ConfigOptions; +use crate::config::OPT_PARQUET_ENABLE_PRUNING; +use crate::config::OPT_PARQUET_METADATA_SIZE_HINT; +use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; @@ -52,26 +56,22 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; /// The Apache Parquet `FileFormat` implementation #[derive(Debug)] pub struct ParquetFormat { - enable_pruning: bool, - metadata_size_hint: Option, - skip_metadata: bool, + config_options: Arc, } -impl Default for ParquetFormat { - fn default() -> Self { - Self { - enable_pruning: true, - metadata_size_hint: None, - skip_metadata: true, - } +impl ParquetFormat { + /// construct a new Format with the specified `ConfigOptions` + pub fn new(config_options: Arc) -> Self { + Self { config_options } } } impl ParquetFormat { /// Activate statistics based row group level pruning /// - defaults to true - pub fn with_enable_pruning(mut self, enable: bool) -> Self { - self.enable_pruning = enable; + pub fn with_enable_pruning(self, enable: bool) -> Self { + self.config_options + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enable); self } @@ -79,32 +79,41 @@ impl ParquetFormat { /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then /// another read to fetch the metadata length encoded in the footer. - pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self { - self.metadata_size_hint = Some(size_hint); + pub fn with_metadata_size_hint(self, size_hint: usize) -> Self { + self.config_options + .set_u64(OPT_PARQUET_METADATA_SIZE_HINT, size_hint as u64); + self + } + + /// Tell the parquet reader to skip any metadata that may be in + /// the file Schema. This can help avoid schema conflicts due to + /// metadata. Defaults to true. + pub fn with_skip_metadata(self, skip_metadata: bool) -> Self { + self.config_options + .set_bool(OPT_PARQUET_SKIP_METADATA, skip_metadata); self } + /// Return true if pruning is enabled pub fn enable_pruning(&self) -> bool { - self.enable_pruning + self.config_options + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) } /// Return the metadata size hint if set pub fn metadata_size_hint(&self) -> Option { - self.metadata_size_hint - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self + self.config_options + .get_u64(OPT_PARQUET_METADATA_SIZE_HINT) + .map(|u| u as usize) } /// returns true if schema metadata will be cleared prior to /// schema merging. pub fn skip_metadata(&self) -> bool { - self.skip_metadata + self.config_options + .get_bool(OPT_PARQUET_SKIP_METADATA) + .unwrap_or(false) } } @@ -139,11 +148,11 @@ impl FileFormat for ParquetFormat { let mut schemas = Vec::with_capacity(objects.len()); for object in objects { let schema = - fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?; + fetch_schema(store.as_ref(), object, self.metadata_size_hint()).await?; schemas.push(schema) } - let schema = if self.skip_metadata { + let schema = if self.skip_metadata() { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -162,7 +171,7 @@ impl FileFormat for ParquetFormat { store.as_ref(), table_schema, object, - self.metadata_size_hint, + self.metadata_size_hint(), ) .await?; Ok(stats) @@ -176,7 +185,7 @@ impl FileFormat for ParquetFormat { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = if self.enable_pruning { + let predicate = if self.enable_pruning() { conjunction(filters.to_vec()) } else { None @@ -601,7 +610,8 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; - let format = ParquetFormat::default(); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()); let schema = format.infer_schema(&store, &meta).await.unwrap(); let stats = @@ -748,7 +758,8 @@ mod tests { assert_eq!(store.request_count(), 2); - let format = ParquetFormat::default().with_metadata_size_hint(9); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(9); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = @@ -775,7 +786,8 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::default().with_metadata_size_hint(size_hint); + let format = + ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(size_hint); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = fetch_statistics( store.upcast().as_ref(), @@ -812,7 +824,7 @@ mod tests { let config = SessionConfig::new().with_batch_size(2); let ctx = SessionContext::with_config(config); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx)?; @@ -841,11 +853,12 @@ mod tests { // Read the full file let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; // Read only one column. This should scan less data. let projection = Some(vec![0]); - let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec_projected = + get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); @@ -863,7 +876,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -882,7 +896,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let x: Vec = exec .schema() @@ -920,7 +935,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -950,7 +966,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -977,7 +994,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1004,7 +1022,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1034,7 +1053,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1064,7 +1084,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1095,7 +1116,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); // parquet use the int32 as the physical type to store decimal - let exec = get_exec("int32_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1103,7 +1124,7 @@ mod tests { assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal - let exec = get_exec("int64_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1111,14 +1132,21 @@ mod tests { assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal - let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; + let exec = + get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); - let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; + let exec = get_exec( + &session_ctx, + "fixed_length_decimal_legacy.parquet", + None, + None, + ) + .await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1128,7 +1156,7 @@ mod tests { // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal // https://github.com/apache/arrow-rs/pull/2160 - // let exec = get_exec("byte_array_decimal.parquet", None, None).await?; + // let exec = get_exec(&session_ctx, "byte_array_decimal.parquet", None, None).await?; Ok(()) } @@ -1212,12 +1240,13 @@ mod tests { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); scan_format(&format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index deaa09249e11..15337fa477ab 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,6 +28,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ @@ -104,7 +105,10 @@ impl ListingTableConfig { } } - fn infer_format(path: &str) -> Result<(Arc, String)> { + fn infer_format( + config_options: Arc, + path: &str, + ) -> Result<(Arc, String)> { let err_msg = format!("Unable to infer file type from path: {}", path); let mut exts = path.rsplit('.'); @@ -133,7 +137,7 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)), }; Ok((file_format, ext)) @@ -154,8 +158,10 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = - ListingTableConfig::infer_format(file.location.as_ref())?; + let (format, file_extension) = ListingTableConfig::infer_format( + ctx.config.config_options(), + file.location.as_ref(), + )?; let listing_options = ListingOptions { format, @@ -538,7 +544,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f140ce1c3b98..653976103fa2 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -21,6 +21,7 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, + config::OPT_PARQUET_ENABLE_PRUNING, datasource::listing::{ListingOptions, ListingTable}, datasource::{ file_format::{ @@ -212,6 +213,11 @@ impl SessionContext { self.state.read().runtime_env.clone() } + /// Return a handle to the shared configuration options + pub fn config_options(&self) -> Arc { + self.state.read().config.config_options() + } + /// Return the session_id of this Session pub fn session_id(&self) -> String { self.session_id.clone() @@ -465,7 +471,7 @@ impl SessionContext { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(self.config_options())), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), @@ -680,7 +686,8 @@ impl SessionContext { let table_path = ListingTableUrl::parse(table_path)?; let target_partitions = self.copied_config().target_partitions; - let listing_options = options.to_listing_options(target_partitions); + let listing_options = + options.to_listing_options(self.config_options(), target_partitions); // with parquet we resolve the schema in all cases let resolved_schema = listing_options @@ -799,13 +806,10 @@ impl SessionContext { table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { - let (target_partitions, parquet_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let listing_options = options - .parquet_pruning(parquet_pruning) - .to_listing_options(target_partitions); + let listing_options = options.to_listing_options( + self.config_options(), + self.copied_config().target_partitions, + ); self.register_listing_table(name, table_path, listing_options, None, None) .await?; @@ -1142,12 +1146,10 @@ pub struct SessionConfig { /// Should DataFusion repartition data using the partition keys to execute window functions in /// parallel using the provided `target_partitions` level pub repartition_windows: bool, - /// Should DataFusion parquet reader using the predicate to prune data - pub parquet_pruning: bool, /// Should DataFusion collect statistics after listing files pub collect_statistics: bool, /// Configuration options - pub config_options: Arc>, + pub config_options: Arc, /// Opaque extensions. extensions: AnyMap, } @@ -1163,9 +1165,8 @@ impl Default for SessionConfig { repartition_joins: true, repartition_aggregations: true, repartition_windows: true, - parquet_pruning: true, collect_statistics: false, - config_options: Arc::new(RwLock::new(ConfigOptions::new())), + config_options: Arc::new(ConfigOptions::new()), // Assume no extensions by default. extensions: HashMap::with_capacity_and_hasher( 0, @@ -1184,14 +1185,14 @@ impl SessionConfig { /// Create an execution config with config options read from the environment pub fn from_env() -> Self { Self { - config_options: ConfigOptions::from_env().into_shareable(), + config_options: Arc::new(ConfigOptions::from_env()), ..Default::default() } } /// Set a configuration option pub fn set(self, key: &str, value: ScalarValue) -> Self { - self.config_options.write().set(key, value); + self.config_options.set(key, value); self } @@ -1262,11 +1263,19 @@ impl SessionConfig { } /// Enables or disables the use of pruning predicate for parquet readers to skip row groups - pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { - self.parquet_pruning = enabled; + pub fn with_parquet_pruning(self, enabled: bool) -> Self { + self.config_options + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled); self } + /// Returns true if pruning predicate use is enabled for parquet reader + pub fn parquet_pruning(&self) -> bool { + self.config_options + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) + } + /// Enables or disables the collection of statistics after listing files pub fn with_collect_statistics(mut self, enabled: bool) -> Self { self.collect_statistics = enabled; @@ -1276,7 +1285,6 @@ impl SessionConfig { /// Get the currently configured batch size pub fn batch_size(&self) -> usize { self.config_options - .read() .get_u64(OPT_BATCH_SIZE) .unwrap_or_default() .try_into() @@ -1293,8 +1301,8 @@ impl SessionConfig { pub fn to_props(&self) -> HashMap { let mut map = HashMap::new(); // copy configs from config_options - for (k, v) in self.config_options.read().options() { - map.insert(k.to_string(), format!("{}", v)); + for (k, v) in self.config_options.options().read().iter() { + map.insert(k.to_string(), v.to_string()); } map.insert( TARGET_PARTITIONS.to_owned(), @@ -1314,7 +1322,7 @@ impl SessionConfig { ); map.insert( PARQUET_PRUNING.to_owned(), - format!("{}", self.parquet_pruning), + format!("{}", self.parquet_pruning()), ); map.insert( COLLECT_STATISTICS.to_owned(), @@ -1327,7 +1335,7 @@ impl SessionConfig { /// Return a handle to the shared configuration options. /// /// [`config_options`]: SessionContext::config_option - pub fn config_options(&self) -> Arc> { + pub fn config_options(&self) -> Arc { self.config_options.clone() } @@ -1471,7 +1479,6 @@ impl SessionState { let optimizer_config = OptimizerConfig::new().filter_null_keys( config .config_options - .read() .get_bool(OPT_FILTER_NULL_JOIN_KEYS) .unwrap_or_default(), ); @@ -1482,14 +1489,12 @@ impl SessionState { ]; if config .config_options - .read() .get_bool(OPT_COALESCE_BATCHES) .unwrap_or_default() { physical_optimizers.push(Arc::new(CoalesceBatches::new( config .config_options - .read() .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE) .unwrap_or_default() .try_into() @@ -1595,14 +1600,12 @@ impl SessionState { .with_skip_failing_rules( self.config .config_options - .read() .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES) .unwrap_or_default(), ) .with_max_passes( self.config .config_options - .read() .get_u64(OPT_OPTIMIZER_MAX_PASSES) .unwrap_or_default() as u8, ) diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 150a20670ce2..85b047ebacde 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use crate::config::ConfigOptions; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; @@ -168,45 +169,18 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, - /// Should DataFusion parquet reader use the predicate to prune data, - /// overridden by value on execution::context::SessionConfig - // TODO move this into ConfigOptions - pub parquet_pruning: bool, - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - // TODO move this into ConfigOptions - pub skip_metadata: bool, } impl<'a> Default for ParquetReadOptions<'a> { fn default() -> Self { - let format_default = ParquetFormat::default(); - Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], - parquet_pruning: format_default.enable_pruning(), - skip_metadata: format_default.skip_metadata(), } } } impl<'a> ParquetReadOptions<'a> { - /// Specify parquet_pruning - pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { - self.parquet_pruning = parquet_pruning; - self - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self - } - /// Specify table_partition_cols for partition pruning pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; @@ -214,10 +188,12 @@ impl<'a> ParquetReadOptions<'a> { } /// Helper to convert these user facing options to `ListingTable` options - pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = ParquetFormat::default() - .with_enable_pruning(self.parquet_pruning) - .with_skip_metadata(self.skip_metadata); + pub fn to_listing_options( + &self, + config_options: Arc, + target_partitions: usize, + ) -> ListingOptions { + let file_format = ParquetFormat::new(config_options); ListingOptions { format: Arc::new(file_format), diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 839908d0659b..13bd0da88f3d 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -270,7 +270,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, None, None, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 2aab84fadbcf..c0ef48cf1c16 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -238,7 +238,7 @@ mod tests { projection: Some(vec![0, 1, 2]), limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -308,7 +308,7 @@ mod tests { projection, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -377,7 +377,7 @@ mod tests { statistics: Statistics::default(), limit: None, table_partition_cols: vec!["date".to_owned()], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index df12f310534e..a299a7292d31 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -367,7 +367,7 @@ mod tests { projection: None, limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }; let file_stream = FileStream::new( diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index c8c5d71bd73f..466df2665ec9 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -331,7 +331,7 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, file_compression_type.to_owned(), ); @@ -407,7 +407,7 @@ mod tests { projection: None, limit: Some(3), table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, file_compression_type.to_owned(), ); @@ -453,7 +453,7 @@ mod tests { projection: Some(vec![0, 2]), limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index c33e2bc14701..5999261b30f5 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -42,7 +42,6 @@ pub use avro::AvroExec; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use parking_lot::RwLock; use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{config::ConfigOptions, datasource::listing::FileRange}; @@ -91,7 +90,7 @@ pub struct FileScanConfig { /// The partitioning column names pub table_partition_cols: Vec, /// Configuration options passed to the physical plans - pub config_options: Arc>, + pub config_options: Arc, } impl FileScanConfig { @@ -699,7 +698,7 @@ mod tests { projection, statistics, table_partition_cols, - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f5bd890591fd..e947dde86715 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -165,7 +165,6 @@ impl ParquetExec { pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self { self.base_config .config_options - .write() .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); self } @@ -174,7 +173,6 @@ impl ParquetExec { pub fn pushdown_filters(&self) -> bool { self.base_config .config_options - .read() .get_bool(OPT_PARQUET_PUSHDOWN_FILTERS) // default to false .unwrap_or_default() @@ -187,7 +185,6 @@ impl ParquetExec { pub fn with_reorder_filters(self, reorder_filters: bool) -> Self { self.base_config .config_options - .write() .set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); self } @@ -196,7 +193,6 @@ impl ParquetExec { pub fn reorder_filters(&self) -> bool { self.base_config .config_options - .read() .get_bool(OPT_PARQUET_REORDER_FILTERS) // default to false .unwrap_or_default() @@ -209,7 +205,6 @@ impl ParquetExec { pub fn with_enable_page_index(self, enable_page_index: bool) -> Self { self.base_config .config_options - .write() .set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); self } @@ -218,7 +213,6 @@ impl ParquetExec { pub fn enable_page_index(&self) -> bool { self.base_config .config_options - .read() .get_bool(OPT_PARQUET_ENABLE_PAGE_INDEX) // default to false .unwrap_or_default() @@ -1164,6 +1158,7 @@ mod tests { use crate::config::ConfigOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; + use crate::datasource::file_format::FileFormat; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; @@ -1171,8 +1166,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use crate::{ assert_batches_sorted_eq, assert_contains, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, - physical_plan::collect, + datasource::file_format::parquet::ParquetFormat, physical_plan::collect, }; use arrow::array::Float32Array; use arrow::datatypes::DataType::Decimal128; @@ -1227,7 +1221,7 @@ mod tests { projection, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, predicate, None, @@ -1661,7 +1655,7 @@ mod tests { async fn parquet_exec_with_projection() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = "alltypes_plain.parquet"; - let format = ParquetFormat::default(); + let format = ParquetFormat::new(Arc::new(ConfigOptions::new())); let parquet_exec = scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None) .await @@ -1718,7 +1712,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, None, None, @@ -1743,7 +1737,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let store = Arc::new(LocalFileSystem::new()) as _; - let file_schema = ParquetFormat::default() + let file_schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await?; @@ -1790,7 +1784,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); @@ -1820,7 +1814,7 @@ mod tests { "month".to_owned(), "day".to_owned(), ], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, None, None, @@ -1879,7 +1873,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, None, None, @@ -2478,7 +2472,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); @@ -2502,7 +2496,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, Some(filter), None, diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1995a6196eed..9d3efa9b388f 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1585,7 +1585,6 @@ impl DefaultPhysicalPlanner { if !session_state .config .config_options - .read() .get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY) .unwrap_or_default() { @@ -1597,7 +1596,6 @@ impl DefaultPhysicalPlanner { if !session_state .config .config_options - .read() .get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY) .unwrap_or_default() { diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index bce277676dea..ad27452f3c6e 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -166,7 +166,7 @@ pub fn partitioned_csv_config( projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }) } diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs index ded5fad022db..955fa5fa6986 100644 --- a/datafusion/core/tests/custom_parquet_reader.rs +++ b/datafusion/core/tests/custom_parquet_reader.rs @@ -89,7 +89,7 @@ mod tests { projection: None, limit: None, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, None, None, diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index fca9b9a43b1c..fc2d3deae49f 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -462,7 +462,8 @@ async fn register_partitioned_alltypes_parquet( MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), ); - let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let mut options = ListingOptions::new(format); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 630c28a109c2..99dfd7c9a24c 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -34,7 +34,8 @@ async fn test_with_parquet() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -55,7 +56,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -72,6 +74,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: &Option>, limit: Option, @@ -81,7 +84,7 @@ async fn get_exec( let path = Path::from_filesystem_path(filename).unwrap(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); @@ -106,7 +109,7 @@ async fn get_exec( projection: projection.clone(), limit, table_partition_cols: vec![], - config_options: ConfigOptions::new().into_shareable(), + config_options: Arc::new(ConfigOptions::new()), }, &[], ) diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index ce318e6c2625..e68d077041f7 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -702,8 +702,11 @@ async fn show_all() { "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", "| datafusion.execution.parquet.enable_page_index | false |", + "| datafusion.execution.parquet.metadata_size_hint | NULL |", + "| datafusion.execution.parquet.pruning | true |", "| datafusion.execution.parquet.pushdown_filters | false |", "| datafusion.execution.parquet.reorder_filters | false |", + "| datafusion.execution.parquet.skip_metadata | true |", "| datafusion.execution.time_zone | UTC |", "| datafusion.explain.logical_plan_only | false |", "| datafusion.explain.physical_plan_only | false |", diff --git a/datafusion/core/tests/sql/parquet_schema.rs b/datafusion/core/tests/sql/parquet_schema.rs index b5a891268d02..54ccdf468a90 100644 --- a/datafusion/core/tests/sql/parquet_schema.rs +++ b/datafusion/core/tests/sql/parquet_schema.rs @@ -23,6 +23,7 @@ use std::{ }; use ::parquet::arrow::ArrowWriter; +use datafusion::config::OPT_PARQUET_SKIP_METADATA; use tempfile::TempDir; use super::*; @@ -106,9 +107,6 @@ async fn schema_merge_can_preserve_metadata() { let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); - // explicitly disable schema clearing - let options = ParquetReadOptions::default().skip_metadata(false); - let f1 = Field::new("id", DataType::Int32, true); let f2 = Field::new("name", DataType::Utf8, true); @@ -141,6 +139,12 @@ async fn schema_merge_can_preserve_metadata() { let table_path = table_dir.to_str().unwrap().to_string(); let ctx = SessionContext::new(); + + // explicitly disable schema clearing + ctx.config_options() + .set_bool(OPT_PARQUET_SKIP_METADATA, false); + + let options = ParquetReadOptions::default(); let df = ctx .read_parquet(&table_path, options.clone()) .await diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index d61bb2d65bae..74e04752a5b9 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -368,7 +368,8 @@ impl AsLogicalPlan for LogicalPlanNode { &FileFormatType::Parquet(protobuf::ParquetFormat { enable_pruning, }) => Arc::new( - ParquetFormat::default().with_enable_pruning(enable_pruning), + ParquetFormat::new(ctx.config_options()) + .with_enable_pruning(enable_pruning), ), FileFormatType::Csv(protobuf::CsvFormat { has_header,