Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async fn exec_scan(
enable_page_index,
} = scan_options;

let mut config_options = ConfigOptions::new();
let config_options = Arc::new(ConfigOptions::new());
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
Expand All @@ -233,7 +233,7 @@ async fn exec_scan(
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
config_options,
};

let df_schema = schema.clone().to_dfschema()?;
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ async fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);
let format = ParquetFormat::new(ctx.config.config_options())
.with_enable_pruning(true);

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let ctx = SessionContext::new();
let format = Arc::new(ParquetFormat::new(ctx.config_options()));
let listing_options = ListingOptions::new(format);
let table_path =
ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;

let ctx = SessionContext::new();
let schema = listing_options
.infer_schema(&ctx.state(), &table_path)
.await
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<()> {
let testdata = datafusion::test_util::parquet_test_data();

// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let file_format = ParquetFormat::new(ctx.config_options());
let listing_options = ListingOptions {
file_extension: FileType::PARQUET.get_ext(),
format: Arc::new(file_format),
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use std::{
sync::{Arc, Weak},
};

use parking_lot::RwLock;

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
Expand Down Expand Up @@ -53,15 +51,15 @@ const DF_SETTINGS: &str = "df_settings";
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
config_options: Weak<ConfigOptions>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
config_options: Weak<ConfigOptions>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
Expand Down Expand Up @@ -118,7 +116,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
config_options: Arc<ConfigOptions>,
}

impl InformationSchemaProvider {
Expand Down Expand Up @@ -230,7 +228,7 @@ impl InformationSchemaProvider {
fn make_df_settings(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaDfSettingsBuilder::new();

for (name, setting) in self.config_options.read().options() {
for (name, setting) in self.config_options.options().read().iter() {
builder.add_setting(name, setting.to_string());
}

Expand Down
86 changes: 67 additions & 19 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use log::warn;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
Expand Down Expand Up @@ -60,6 +59,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str =
pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
"datafusion.execution.parquet.enable_page_index";

/// Configuration option "datafusion.execution.parquet.pruning"
pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning";

/// Configuration option "datafusion.execution.parquet.skip_metadata"
pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata";

/// Configuration option "datafusion.execution.parquet.metadata_size_hint"
pub const OPT_PARQUET_METADATA_SIZE_HINT: &str =
"datafusion.execution.parquet.metadata_size_hint";

/// Configuration option "datafusion.optimizer.skip_failed_rules"
pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
"datafusion.optimizer.skip_failed_rules";
Expand Down Expand Up @@ -237,6 +246,29 @@ impl BuiltInConfigs {
to reduce the number of rows decoded.",
false,
),
ConfigDefinition::new_bool(
OPT_PARQUET_ENABLE_PRUNING,
"If true, the parquet reader attempts to skip entire row groups based \
on the predicate in the query and the metadata (min/max values) stored in \
the parquet file.",
true,
),
ConfigDefinition::new_bool(
OPT_PARQUET_SKIP_METADATA,
"If true, the parquet reader skip the optional embedded metadata that may be in \
the file Schema. This setting can help avoid schema conflicts when querying \
multiple parquet files with schemas containing compatible types but different metadata.",
true,
),
ConfigDefinition::new(
OPT_PARQUET_METADATA_SIZE_HINT,
"If specified, the parquet reader will try and fetch the last `size_hint` \
bytes of the parquet file optimistically. If not specified, two read are required: \
One read to fetch the 8-byte parquet footer and \
another to fetch the metadata length encoded in the footer.",
DataType::Boolean,
ScalarValue::Boolean(None),
),
ConfigDefinition::new_bool(
OPT_OPTIMIZER_SKIP_FAILED_RULES,
"When set to true, the logical plan optimizer will produce warning \
Expand Down Expand Up @@ -273,10 +305,26 @@ impl BuiltInConfigs {
}
}

/// Configuration options struct. This can contain values for built-in and custom options
#[derive(Debug, Clone)]
/// Configuration options for DataFusion.
///
/// `ConfigOptions` can contain values for built-in options as well as
/// custom options, and can be configured programatically or from the
/// environment (see [`ConfigOptons::from_env()`]).
///
/// Example:
/// ```
/// # use std::sync::Arc;
/// # use datafusion::config::{ConfigOptions, OPT_BATCH_SIZE};
/// let config_options = Arc::new(ConfigOptions::new());
///
/// // set default batch size to 1000
/// config_options.set_u64(OPT_BATCH_SIZE, 1000);
/// assert_eq!(config_options.get_u64(OPT_BATCH_SIZE), Some(1000));
/// ```
///
#[derive(Debug)]
pub struct ConfigOptions {
options: HashMap<String, ScalarValue>,
options: RwLock<HashMap<String, ScalarValue>>,
}

impl Default for ConfigOptions {
Expand All @@ -293,12 +341,10 @@ impl ConfigOptions {
for config_def in &built_in.config_definitions {
options.insert(config_def.key.clone(), config_def.default_value.clone());
}
Self { options }
}

/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
Self {
options: RwLock::new(options),
}
}

/// Create new ConfigOptions struct, taking values from
Expand Down Expand Up @@ -329,27 +375,29 @@ impl ConfigOptions {
};
options.insert(config_def.key.clone(), config_value);
}
Self { options }
Self {
options: RwLock::new(options),
}
}

/// set a configuration option
pub fn set(&mut self, key: &str, value: ScalarValue) {
self.options.insert(key.to_string(), value);
pub fn set(&self, key: &str, value: ScalarValue) {
self.options.write().insert(key.to_string(), value);
}

/// set a boolean configuration option
pub fn set_bool(&mut self, key: &str, value: bool) {
pub fn set_bool(&self, key: &str, value: bool) {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// set a `u64` configuration option
pub fn set_u64(&mut self, key: &str, value: u64) {
pub fn set_u64(&self, key: &str, value: u64) {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// get a configuration option
pub fn get(&self, key: &str) -> Option<ScalarValue> {
self.options.get(key).cloned()
self.options.read().get(key).cloned()
}

/// get a boolean configuration option
Expand All @@ -367,14 +415,14 @@ impl ConfigOptions {
get_conf_value!(self, Utf8, key, "string")
}

/// Access the underlying hashmap
pub fn options(&self) -> &HashMap<String, ScalarValue> {
/// Get a reference to the underlying hashmap
pub fn options(&self) -> &RwLock<HashMap<String, ScalarValue>> {
&self.options
}

/// Tests if the key exists in the configuration
pub fn exists(&self, key: &str) -> bool {
self.options().contains_key(key)
self.options.read().contains_key(key)
}
}

Expand All @@ -398,7 +446,7 @@ mod test {

#[test]
fn get_then_set() {
let mut config = ConfigOptions::new();
let config = ConfigOptions::new();
let config_key = "datafusion.optimizer.filter_null_join_keys";
assert!(!config.get_bool(config_key).unwrap_or_default());
config.set_bool(config_key, true);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
config_options: Arc::new(ConfigOptions::new()),
},
&[],
)
Expand Down
Loading