Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11879 [Rust][DataFusion] Make ExecutionContext::sql return dataframe with optimized plan #9639

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ impl ExecutionContext {
))),
},

plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))),
plan => Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&self.optimize(&plan)?,
))),
}
}

Expand Down Expand Up @@ -1702,6 +1705,23 @@ mod tests {
}
Ok(())
}
#[test]
fn ctx_sql_should_optimize_plan() -> Result<()> {
let mut ctx = ExecutionContext::new();
let plan1 =
ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;

let opt_plan1 = ctx.optimize(&plan1)?;

let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?;
Copy link
Contributor Author

@Dandandan Dandandan Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the PR the test fails, as it doesn't optimize the plan (an optimized plan just returns the same as a plan for SELECT 1).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


assert_eq!(
format!("{:?}", opt_plan1),
format!("{:?}", plan2.to_logical_plan())
);

Ok(())
}

#[tokio::test]
async fn scalar_udf() -> Result<()> {
Expand Down
38 changes: 23 additions & 15 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Projection Push Down optimizer rule ensures that only referenced columns are
//! loaded into memory

use crate::error::{DataFusionError, Result};
use crate::error::Result;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
Expand Down Expand Up @@ -57,16 +57,9 @@ impl ProjectionPushDown {

fn get_projected_schema(
schema: &Schema,
projection: &Option<Vec<usize>>,
required_columns: &HashSet<String>,
has_projection: bool,
) -> Result<(Vec<usize>, DFSchemaRef)> {
if projection.is_some() {
return Err(DataFusionError::Internal(
"Cannot run projection push-down rule more than once".to_string(),
));
}

// once we reach the table scan, we can use the accumulated set of column
// names to construct the set of column indexes in the scan
//
Expand Down Expand Up @@ -242,16 +235,11 @@ fn optimize_plan(
LogicalPlan::TableScan {
table_name,
source,
projection,
filters,
..
} => {
let (projection, projected_schema) = get_projected_schema(
&source.schema(),
projection,
required_columns,
has_projection,
)?;
let (projection, projected_schema) =
get_projected_schema(&source.schema(), required_columns, has_projection)?;

// return the table scan with projection
Ok(LogicalPlan::TableScan {
Expand Down Expand Up @@ -491,6 +479,26 @@ mod tests {
Ok(())
}

/// tests that optimizing twice yields same plan
#[test]
fn test_double_optimization() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(&table_scan)
.project(&[col("b")])?
.project(&[lit(1).alias("a")])?
.build()?;

let optimized_plan1 = optimize(&plan).expect("failed to optimize plan");
let optimized_plan2 =
optimize(&optimized_plan1).expect("failed to optimize plan");

let formatted_plan1 = format!("{:?}", optimized_plan1);
let formatted_plan2 = format!("{:?}", optimized_plan2);
assert_eq!(formatted_plan1, formatted_plan2);
Ok(())
}

/// tests that it removes an aggregate is never used downstream
#[test]
fn table_unused_aggregate() -> Result<()> {
Expand Down