diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 011db64aaf8a..4504c81daa06 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -110,7 +110,9 @@ mod tests { use super::*; use crate::datasource::datasource::Statistics; - use crate::physical_plan::parquet::{ParquetExec, ParquetPartition}; + use crate::physical_plan::parquet::{ + ParquetExec, ParquetExecMetrics, ParquetPartition, + }; use crate::physical_plan::projection::ProjectionExec; #[test] @@ -119,12 +121,13 @@ mod tests { let parquet_project = ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), - }], + vec![ParquetPartition::new( + vec!["x".to_string()], + Statistics::default(), + )], schema, None, + ParquetExecMetrics::new(), None, 2048, None, @@ -156,12 +159,13 @@ mod tests { Arc::new(ProjectionExec::try_new( vec![], Arc::new(ParquetExec::new( - vec![ParquetPartition { - filenames: vec!["x".to_string()], - statistics: Statistics::default(), - }], + vec![ParquetPartition::new( + vec!["x".to_string()], + Statistics::default(), + )], schema, None, + ParquetExecMetrics::new(), None, 2048, None, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index a940cbe7963a..d89eb1188504 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -297,6 +297,20 @@ pub fn visit_execution_plan( Ok(()) } +/// Recursively gateher all execution metrics from this plan and all of its input plans +pub fn plan_metrics(plan: Arc) -> HashMap { + fn get_metrics_inner( + plan: &dyn ExecutionPlan, + mut metrics: HashMap, + ) -> HashMap { + metrics.extend(plan.metrics().into_iter()); + plan.children().into_iter().fold(metrics, |metrics, child| { + get_metrics_inner(child.as_ref(), metrics) + }) + } + get_metrics_inner(plan.as_ref(), HashMap::new()) +} + /// Execute the [ExecutionPlan] and collect the results in memory pub async fn collect(plan: Arc) -> Result> { match plan.output_partitioning().partition_count() { diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 3d20a9bf98c1..f31b921d663b 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -40,6 +40,8 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; +use hashbrown::HashMap; +use log::debug; use parquet::file::{ metadata::RowGroupMetaData, reader::{FileReader, SerializedFileReader}, @@ -59,6 +61,8 @@ use crate::datasource::datasource::{ColumnStatistics, Statistics}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; +use super::SQLMetric; + /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { @@ -72,6 +76,8 @@ pub struct ParquetExec { batch_size: usize, /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, + /// metrics for the overall execution + metrics: ParquetExecMetrics, /// Optional predicate builder predicate_builder: Option, /// Optional limit of the number of rows @@ -93,6 +99,24 @@ pub struct ParquetPartition { pub filenames: Vec, /// Statistics for this partition pub statistics: Statistics, + /// Execution metrics + metrics: ParquetPartitionMetrics, +} + +/// Stores metrics about the overall parquet execution +#[derive(Debug, Clone)] +pub struct ParquetExecMetrics { + /// Numer of times the pruning predicate could not be created + pub predicate_creation_errors: Arc, +} + +/// Stores metrics about the parquet execution for a particular ParquetPartition +#[derive(Debug, Clone)] +struct ParquetPartitionMetrics { + /// Numer of times the predicate could not be evaluated + pub predicate_evaluation_errors: Arc, + /// Number of row groups pruned using + pub row_groups_pruned: Arc, } impl ParquetExec { @@ -140,6 +164,8 @@ impl ParquetExec { max_concurrency: usize, limit: Option, ) -> Result { + debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", + filenames, projection, predicate, limit); // build a list of Parquet partitions with statistics and gather all unique schemas // used in this data set let mut schemas: Vec = vec![]; @@ -205,10 +231,7 @@ impl ParquetExec { }; // remove files that are not needed in case of limit filenames.truncate(total_files); - partitions.push(ParquetPartition { - filenames, - statistics, - }); + partitions.push(ParquetPartition::new(filenames, statistics)); if limit_exhausted { break; } @@ -225,14 +248,27 @@ impl ParquetExec { ))); } let schema = Arc::new(schemas.pop().unwrap()); + let metrics = ParquetExecMetrics::new(); + let predicate_builder = predicate.and_then(|predicate_expr| { - PruningPredicate::try_new(&predicate_expr, schema.clone()).ok() + match PruningPredicate::try_new(&predicate_expr, schema.clone()) { + Ok(predicate_builder) => Some(predicate_builder), + Err(e) => { + debug!( + "Could not create pruning predicate for {:?}: {}", + predicate_expr, e + ); + metrics.predicate_creation_errors.add(1); + None + } + } }); Ok(Self::new( partitions, schema, projection, + metrics, predicate_builder, batch_size, limit, @@ -244,6 +280,7 @@ impl ParquetExec { partitions: Vec, schema: SchemaRef, projection: Option>, + metrics: ParquetExecMetrics, predicate_builder: Option, batch_size: usize, limit: Option, @@ -307,6 +344,7 @@ impl ParquetExec { partitions, schema: Arc::new(projected_schema), projection, + metrics, predicate_builder, batch_size, statistics, @@ -341,6 +379,7 @@ impl ParquetPartition { Self { filenames, statistics, + metrics: ParquetPartitionMetrics::new(), } } @@ -355,6 +394,25 @@ impl ParquetPartition { } } +impl ParquetExecMetrics { + /// Create new metrics + pub fn new() -> Self { + Self { + predicate_creation_errors: SQLMetric::counter(), + } + } +} + +impl ParquetPartitionMetrics { + /// Create new metrics + pub fn new() -> Self { + Self { + predicate_evaluation_errors: SQLMetric::counter(), + row_groups_pruned: SQLMetric::counter(), + } + } +} + #[async_trait] impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting @@ -398,7 +456,9 @@ impl ExecutionPlan for ParquetExec { Receiver>, ) = channel(2); - let filenames = self.partitions[partition].filenames.clone(); + let partition = &self.partitions[partition]; + let filenames = partition.filenames.clone(); + let metrics = partition.metrics.clone(); let projection = self.projection.clone(); let predicate_builder = self.predicate_builder.clone(); let batch_size = self.batch_size; @@ -407,6 +467,7 @@ impl ExecutionPlan for ParquetExec { task::spawn_blocking(move || { if let Err(e) = read_files( &filenames, + metrics, &projection, &predicate_builder, batch_size, @@ -448,6 +509,31 @@ impl ExecutionPlan for ParquetExec { } } } + + fn metrics(&self) -> HashMap { + self.partitions + .iter() + .flat_map(|p| { + [ + ( + format!( + "numPredicateEvaluationErrors for {}", + p.filenames.join(",") + ), + p.metrics.predicate_evaluation_errors.as_ref().clone(), + ), + ( + format!("numRowGroupsPruned for {}", p.filenames.join(",")), + p.metrics.row_groups_pruned.as_ref().clone(), + ), + ] + }) + .chain(std::iter::once(( + "numPredicateCreationErrors".to_string(), + self.metrics.predicate_creation_errors.as_ref().clone(), + ))) + .collect() + } } fn send_result( @@ -547,6 +633,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn build_row_group_predicate( predicate_builder: &PruningPredicate, + metrics: ParquetPartitionMetrics, row_group_metadata: &[RowGroupMetaData], ) -> Box bool> { let parquet_schema = predicate_builder.schema().as_ref(); @@ -555,21 +642,28 @@ fn build_row_group_predicate( row_group_metadata, parquet_schema, }; - let predicate_values = predicate_builder.prune(&pruning_stats); - let predicate_values = match predicate_values { - Ok(values) => values, + match predicate_values { + Ok(values) => { + // NB: false means don't scan row group + let num_pruned = values.iter().filter(|&v| !v).count(); + metrics.row_groups_pruned.add(num_pruned); + Box::new(move |_, i| values[i]) + } // stats filter array could not be built // return a closure which will not filter out any row groups - _ => return Box::new(|_r, _i| true), - }; - - Box::new(move |_, i| predicate_values[i]) + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + Box::new(|_r, _i| true) + } + } } fn read_files( filenames: &[String], + metrics: ParquetPartitionMetrics, projection: &[usize], predicate_builder: &Option, batch_size: usize, @@ -583,6 +677,7 @@ fn read_files( if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( predicate_builder, + metrics.clone(), file_reader.metadata().row_groups(), ); file_reader.filter_row_groups(&row_group_predicate); @@ -757,8 +852,11 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = - build_row_group_predicate(&predicate_builder, &row_group_metadata); + let row_group_predicate = build_row_group_predicate( + &predicate_builder, + ParquetPartitionMetrics::new(), + &row_group_metadata, + ); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -787,8 +885,11 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = - build_row_group_predicate(&predicate_builder, &row_group_metadata); + let row_group_predicate = build_row_group_predicate( + &predicate_builder, + ParquetPartitionMetrics::new(), + &row_group_metadata, + ); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -832,8 +933,11 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = - build_row_group_predicate(&predicate_builder, &row_group_metadata); + let row_group_predicate = build_row_group_predicate( + &predicate_builder, + ParquetPartitionMetrics::new(), + &row_group_metadata, + ); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -847,8 +951,11 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let predicate_builder = PruningPredicate::try_new(&expr, schema)?; - let row_group_predicate = - build_row_group_predicate(&predicate_builder, &row_group_metadata); + let row_group_predicate = build_row_group_predicate( + &predicate_builder, + ParquetPartitionMetrics::new(), + &row_group_metadata, + ); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -891,8 +998,11 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = - build_row_group_predicate(&predicate_builder, &row_group_metadata); + let row_group_predicate = build_row_group_predicate( + &predicate_builder, + ParquetPartitionMetrics::new(), + &row_group_metadata, + ); let row_group_filter = row_group_metadata .iter() .enumerate() diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 7ca7cc12d9ef..df3aec4a6850 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -251,11 +251,11 @@ pub fn make_timestamps() -> RecordBatch { let arr_names = StringArray::from(names); let schema = Schema::new(vec![ - Field::new("nanos", arr_nanos.data_type().clone(), false), - Field::new("micros", arr_micros.data_type().clone(), false), - Field::new("millis", arr_millis.data_type().clone(), false), - Field::new("secs", arr_secs.data_type().clone(), false), - Field::new("name", arr_names.data_type().clone(), false), + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("secs", arr_secs.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), ]); let schema = Arc::new(schema); diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs new file mode 100644 index 000000000000..86b3946e4712 --- /dev/null +++ b/datafusion/tests/parquet_pruning.rs @@ -0,0 +1,343 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file contains an end to end test of parquet pruning. It writes +// data into a parquet file and then +use std::sync::Arc; + +use arrow::{ + array::{ + Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, + }, + datatypes::{Field, Schema}, + record_batch::RecordBatch, + util::pretty::pretty_format_batches, +}; +use chrono::Duration; +use datafusion::{ + physical_plan::{plan_metrics, SQLMetric}, + prelude::ExecutionContext, +}; +use hashbrown::HashMap; +use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; +use tempfile::NamedTempFile; + +#[tokio::test] +async fn prune_timestamps_nanos() { + let output = ContextWithParquet::new() + .await + .query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')") + .await; + println!("{}", output.description()); + // TODO This should prune one metrics without error + assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.row_groups_pruned(), Some(0)); + assert_eq!(output.result_rows, 10, "{}", output.description()); +} + +#[tokio::test] +async fn prune_timestamps_micros() { + let output = ContextWithParquet::new() + .await + .query( + "SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')", + ) + .await; + println!("{}", output.description()); + // TODO This should prune one metrics without error + assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.row_groups_pruned(), Some(0)); + assert_eq!(output.result_rows, 10, "{}", output.description()); +} + +#[tokio::test] +async fn prune_timestamps_millis() { + let output = ContextWithParquet::new() + .await + .query( + "SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')", + ) + .await; + println!("{}", output.description()); + // TODO This should prune one metrics without error + assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.row_groups_pruned(), Some(0)); + assert_eq!(output.result_rows, 10, "{}", output.description()); +} + +#[tokio::test] +async fn prune_timestamps_seconds() { + let output = ContextWithParquet::new() + .await + .query( + "SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')", + ) + .await; + println!("{}", output.description()); + // TODO This should prune one metrics without error + assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.row_groups_pruned(), Some(0)); + assert_eq!(output.result_rows, 10, "{}", output.description()); +} + +// ---------------------- +// Begin test fixture +// ---------------------- + +/// Test fixture that has an execution context that has an external +/// table "t" registered, pointing at a parquet file made with +/// `make_test_file` +struct ContextWithParquet { + file: NamedTempFile, + ctx: ExecutionContext, +} + +/// The output of running one of the test cases +struct TestOutput { + /// The input string + sql: String, + /// Normalized metrics (filename replaced by a constant) + metrics: HashMap, + /// number of rows in results + result_rows: usize, + /// the contents of the input, as a string + pretty_input: String, + /// the raw results, as a string + pretty_results: String, +} + +impl TestOutput { + /// retrieve the value of the named metric, if any + fn metric_value(&self, metric_name: &str) -> Option { + self.metrics.get(metric_name).map(|m| m.value()) + } + + /// The number of times the pruning predicate evaluation errors + fn predicate_evaluation_errors(&self) -> Option { + self.metric_value("numPredicateEvaluationErrors for PARQUET_FILE") + } + + /// The number of times the pruning predicate evaluation errors + fn row_groups_pruned(&self) -> Option { + self.metric_value("numRowGroupsPruned for PARQUET_FILE") + } + + fn description(&self) -> String { + let metrics = self + .metrics + .iter() + .map(|(name, val)| format!(" {} = {:?}", name, val)) + .collect::>(); + + format!( + "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}", + self.pretty_input, + self.sql, + self.pretty_results, + metrics.join("\n") + ) + } +} + +/// Creates an execution context that has an external table "t" +/// registered pointing at a parquet file made with `make_test_file` +impl ContextWithParquet { + async fn new() -> Self { + let file = make_test_file().await; + + // now, setup a the file as a data source and run a query against it + let mut ctx = ExecutionContext::new(); + let parquet_path = file.path().to_string_lossy(); + ctx.register_parquet("t", &parquet_path) + .expect("registering"); + + Self { file, ctx } + } + + /// Runs the specified SQL query and returns the number of output + /// rows and normalized execution metrics + async fn query(&mut self, sql: &str) -> TestOutput { + println!("Planning sql {}", sql); + + let input = self + .ctx + .sql("SELECT * from t") + .expect("planning") + .collect() + .await + .expect("getting input"); + let pretty_input = pretty_format_batches(&input).unwrap(); + + let logical_plan = self.ctx.sql(sql).expect("planning").to_logical_plan(); + + let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); + let execution_plan = self + .ctx + .create_physical_plan(&logical_plan) + .expect("creating physical plan"); + + let results = datafusion::physical_plan::collect(execution_plan.clone()) + .await + .expect("Running"); + + // replace the path name, which varies test to test,a with some + // constant for test comparisons + let path = self.file.path(); + let path_name = path.to_string_lossy(); + let metrics = plan_metrics(execution_plan) + .into_iter() + .map(|(name, metric)| { + (name.replace(path_name.as_ref(), "PARQUET_FILE"), metric) + }) + .collect(); + + let result_rows = results.iter().map(|b| b.num_rows()).sum(); + + let pretty_results = pretty_format_batches(&results).unwrap(); + + let sql = sql.to_string(); + TestOutput { + sql, + metrics, + result_rows, + pretty_input, + pretty_results, + } + } +} + +/// Create a test parquet file with varioud data types +async fn make_test_file() -> NamedTempFile { + let output_file = tempfile::Builder::new() + .prefix("parquet_pruning") + .suffix(".parquet") + .tempfile() + .expect("tempfile creation"); + + let props = WriterProperties::builder() + .set_max_row_group_size(5) + .build(); + + let batches = vec![ + make_batch(Duration::seconds(0)), + make_batch(Duration::seconds(10)), + make_batch(Duration::minutes(10)), + make_batch(Duration::days(10)), + ]; + let schema = batches[0].schema(); + + let mut writer = ArrowWriter::try_new( + output_file + .as_file() + .try_clone() + .expect("cloning file descriptor"), + schema, + Some(props), + ) + .unwrap(); + + for batch in batches { + writer.write(&batch).expect("writing batch"); + } + writer.close().unwrap(); + + output_file +} + +/// Return record batch with a few rows of data for all of the supported timestamp types +/// values with the specified offset +/// +/// Columns are named: +/// "nanos" --> TimestampNanosecondArray +/// "micros" --> TimestampMicrosecondArray +/// "millis" --> TimestampMillisecondArray +/// "seconds" --> TimestampSecondArray +/// "names" --> StringArray +pub fn make_batch(offset: Duration) -> RecordBatch { + let ts_strings = vec![ + Some("2020-01-01T01:01:01.0000000000001"), + Some("2020-01-01T01:02:01.0000000000001"), + Some("2020-01-01T02:01:01.0000000000001"), + None, + Some("2020-01-02T01:01:01.0000000000001"), + ]; + + let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos"); + + let ts_nanos = ts_strings + .into_iter() + .map(|t| { + t.map(|t| { + offset_nanos + + t.parse::() + .unwrap() + .timestamp_nanos() + }) + }) + .collect::>(); + + let ts_micros = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) + .collect::>(); + + let ts_millis = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) + .collect::>(); + + let ts_seconds = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) + .collect::>(); + + let names = ts_nanos + .iter() + .enumerate() + .map(|(i, _)| format!("Row {} + {}", i, offset)) + .collect::>(); + + let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None); + let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None); + let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None); + let arr_seconds = TimestampSecondArray::from_opt_vec(ts_seconds, None); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("seconds", arr_seconds.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_nanos), + Arc::new(arr_micros), + Arc::new(arr_millis), + Arc::new(arr_seconds), + Arc::new(arr_names), + ], + ) + .unwrap() +}