Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ mod tests {

use super::*;
use crate::datasource::datasource::Statistics;
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::parquet::{
ParquetExec, ParquetExecMetrics, ParquetPartition,
};
use crate::physical_plan::projection::ProjectionExec;

#[test]
Expand All @@ -119,12 +121,13 @@ mod tests {
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition {
filenames: vec!["x".to_string()],
statistics: Statistics::default(),
}],
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
)],
schema,
None,
ParquetExecMetrics::new(),
None,
2048,
None,
Expand Down Expand Up @@ -156,12 +159,13 @@ mod tests {
Arc::new(ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition {
filenames: vec!["x".to_string()],
statistics: Statistics::default(),
}],
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
)],
schema,
None,
ParquetExecMetrics::new(),
None,
2048,
None,
Expand Down
14 changes: 14 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,20 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
Ok(())
}

/// Recursively gateher all execution metrics from this plan and all of its input plans
pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> HashMap<String, SQLMetric> {
fn get_metrics_inner(
plan: &dyn ExecutionPlan,
mut metrics: HashMap<String, SQLMetric>,
) -> HashMap<String, SQLMetric> {
metrics.extend(plan.metrics().into_iter());
plan.children().into_iter().fold(metrics, |metrics, child| {
get_metrics_inner(child.as_ref(), metrics)
})
}
get_metrics_inner(plan.as_ref(), HashMap::new())
}

/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
match plan.output_partitioning().partition_count() {
Expand Down
156 changes: 133 additions & 23 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use hashbrown::HashMap;
use log::debug;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
Expand All @@ -59,6 +61,8 @@ use crate::datasource::datasource::{ColumnStatistics, Statistics};
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};

use super::SQLMetric;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this file are to add metrics on the pruning (that are then used in the test)


/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
Expand All @@ -72,6 +76,8 @@ pub struct ParquetExec {
batch_size: usize,
/// Statistics for the data set (sum of statistics for all partitions)
statistics: Statistics,
/// metrics for the overall execution
metrics: ParquetExecMetrics,
/// Optional predicate builder
predicate_builder: Option<PruningPredicate>,
/// Optional limit of the number of rows
Expand All @@ -93,6 +99,24 @@ pub struct ParquetPartition {
pub filenames: Vec<String>,
/// Statistics for this partition
pub statistics: Statistics,
/// Execution metrics
metrics: ParquetPartitionMetrics,
}

/// Stores metrics about the overall parquet execution
#[derive(Debug, Clone)]
pub struct ParquetExecMetrics {
/// Numer of times the pruning predicate could not be created
pub predicate_creation_errors: Arc<SQLMetric>,
}

/// Stores metrics about the parquet execution for a particular ParquetPartition
#[derive(Debug, Clone)]
struct ParquetPartitionMetrics {
/// Numer of times the predicate could not be evaluated
pub predicate_evaluation_errors: Arc<SQLMetric>,
/// Number of row groups pruned using
pub row_groups_pruned: Arc<SQLMetric>,
}

impl ParquetExec {
Expand Down Expand Up @@ -140,6 +164,8 @@ impl ParquetExec {
max_concurrency: usize,
limit: Option<usize>,
) -> Result<Self> {
debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
filenames, projection, predicate, limit);
// build a list of Parquet partitions with statistics and gather all unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];
Expand Down Expand Up @@ -205,10 +231,7 @@ impl ParquetExec {
};
// remove files that are not needed in case of limit
filenames.truncate(total_files);
partitions.push(ParquetPartition {
filenames,
statistics,
});
partitions.push(ParquetPartition::new(filenames, statistics));
if limit_exhausted {
break;
}
Expand All @@ -225,14 +248,27 @@ impl ParquetExec {
)));
}
let schema = Arc::new(schemas.pop().unwrap());
let metrics = ParquetExecMetrics::new();

let predicate_builder = predicate.and_then(|predicate_expr| {
PruningPredicate::try_new(&predicate_expr, schema.clone()).ok()
match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
Ok(predicate_builder) => Some(predicate_builder),
Err(e) => {
debug!(
"Could not create pruning predicate for {:?}: {}",
predicate_expr, e
);
metrics.predicate_creation_errors.add(1);
None
}
}
});

Ok(Self::new(
partitions,
schema,
projection,
metrics,
predicate_builder,
batch_size,
limit,
Expand All @@ -244,6 +280,7 @@ impl ParquetExec {
partitions: Vec<ParquetPartition>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
metrics: ParquetExecMetrics,
predicate_builder: Option<PruningPredicate>,
batch_size: usize,
limit: Option<usize>,
Expand Down Expand Up @@ -307,6 +344,7 @@ impl ParquetExec {
partitions,
schema: Arc::new(projected_schema),
projection,
metrics,
predicate_builder,
batch_size,
statistics,
Expand Down Expand Up @@ -341,6 +379,7 @@ impl ParquetPartition {
Self {
filenames,
statistics,
metrics: ParquetPartitionMetrics::new(),
}
}

Expand All @@ -355,6 +394,25 @@ impl ParquetPartition {
}
}

impl ParquetExecMetrics {
/// Create new metrics
pub fn new() -> Self {
Self {
predicate_creation_errors: SQLMetric::counter(),
}
}
}

impl ParquetPartitionMetrics {
/// Create new metrics
pub fn new() -> Self {
Self {
predicate_evaluation_errors: SQLMetric::counter(),
row_groups_pruned: SQLMetric::counter(),
}
}
}

#[async_trait]
impl ExecutionPlan for ParquetExec {
/// Return a reference to Any that can be used for downcasting
Expand Down Expand Up @@ -398,7 +456,9 @@ impl ExecutionPlan for ParquetExec {
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);

let filenames = self.partitions[partition].filenames.clone();
let partition = &self.partitions[partition];
let filenames = partition.filenames.clone();
let metrics = partition.metrics.clone();
let projection = self.projection.clone();
let predicate_builder = self.predicate_builder.clone();
let batch_size = self.batch_size;
Expand All @@ -407,6 +467,7 @@ impl ExecutionPlan for ParquetExec {
task::spawn_blocking(move || {
if let Err(e) = read_files(
&filenames,
metrics,
&projection,
&predicate_builder,
batch_size,
Expand Down Expand Up @@ -448,6 +509,31 @@ impl ExecutionPlan for ParquetExec {
}
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
self.partitions
.iter()
.flat_map(|p| {
[
(
format!(
"numPredicateEvaluationErrors for {}",
p.filenames.join(",")
),
p.metrics.predicate_evaluation_errors.as_ref().clone(),
),
(
format!("numRowGroupsPruned for {}", p.filenames.join(",")),
p.metrics.row_groups_pruned.as_ref().clone(),
),
]
})
.chain(std::iter::once((
"numPredicateCreationErrors".to_string(),
self.metrics.predicate_creation_errors.as_ref().clone(),
)))
.collect()
}
}

fn send_result(
Expand Down Expand Up @@ -547,6 +633,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {

fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
metrics: ParquetPartitionMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = predicate_builder.schema().as_ref();
Expand All @@ -555,21 +642,28 @@ fn build_row_group_predicate(
row_group_metadata,
parquet_schema,
};

let predicate_values = predicate_builder.prune(&pruning_stats);

let predicate_values = match predicate_values {
Ok(values) => values,
match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !v).count();
metrics.row_groups_pruned.add(num_pruned);
Box::new(move |_, i| values[i])
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
_ => return Box::new(|_r, _i| true),
};

Box::new(move |_, i| predicate_values[i])
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
Box::new(|_r, _i| true)
}
}
}

fn read_files(
filenames: &[String],
metrics: ParquetPartitionMetrics,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
batch_size: usize,
Expand All @@ -583,6 +677,7 @@ fn read_files(
if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = build_row_group_predicate(
predicate_builder,
metrics.clone(),
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
Expand Down Expand Up @@ -757,8 +852,11 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
ParquetPartitionMetrics::new(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -787,8 +885,11 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
ParquetPartitionMetrics::new(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -832,8 +933,11 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
ParquetPartitionMetrics::new(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand All @@ -847,8 +951,11 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let predicate_builder = PruningPredicate::try_new(&expr, schema)?;
let row_group_predicate =
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
ParquetPartitionMetrics::new(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -891,8 +998,11 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate =
build_row_group_predicate(&predicate_builder, &row_group_metadata);
let row_group_predicate = build_row_group_predicate(
&predicate_builder,
ParquetPartitionMetrics::new(),
&row_group_metadata,
);
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down
Loading