diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 8b9aac9ea73b..509859e97749 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -329,11 +329,11 @@ impl LogicalPlan { LogicalPlan::Limit { input, .. } => vec![input], LogicalPlan::Extension { node } => node.inputs(), LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), + LogicalPlan::Explain { plan, .. } => vec![plan], // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => vec![], + | LogicalPlan::CreateExternalTable { .. } => vec![], } } } @@ -438,11 +438,11 @@ impl LogicalPlan { } true } + LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?, // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } - | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::Explain { .. } => true, + | LogicalPlan::CreateExternalTable { .. } => true, }; if !recurse { return Ok(false); diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 4c248e2b6483..6f6a1e8cad53 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -234,6 +234,10 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { fn optimize(plan: &LogicalPlan, mut state: State) -> Result { match plan { + LogicalPlan::Explain { .. } => { + // push the optimization to the plan of this explain + push_down(&state, plan) + } LogicalPlan::Filter { input, predicate } => { let mut predicates = vec![]; split_members(predicate, &mut predicates); diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index eb50661b42e6..12d40adc85a9 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -1555,6 +1555,8 @@ fn create_join_context_qualified() -> Result { #[tokio::test] async fn csv_explain() { + // This test uses the execute function that create full plan cycle: logical, optimized logical, and physical, + // then execute the physical plan and return the final explain results let mut ctx = ExecutionContext::new(); register_aggregate_csv_by_sql(&mut ctx).await; let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10"; @@ -1573,6 +1575,185 @@ async fn csv_explain() { assert_eq!(expected, actual); } +#[tokio::test] +async fn csv_explain_plans() { + // This test verify the look of each plan in its full cycle plan creation + + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx).await; + let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10"; + + // Logical plan + // Create plan + let msg = format!("Creating logical plan for '{}'", sql); + let plan = ctx.create_logical_plan(&sql).expect(&msg); + let logical_schema = plan.schema(); + // + println!("SQL: {}", sql); + // + // Verify schema + let expected = vec![ + "Explain [plan_type:Utf8, plan:Utf8]", + " Projection: #c1 [c1:Utf8]", + " Filter: #c2 Gt Int64(10) [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]", + " TableScan: aggregate_test_100 projection=None [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]", + ]; + let formatted = plan.display_indent_schema().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // Verify the text format of the plan + let expected = vec![ + "Explain", + " Projection: #c1", + " Filter: #c2 Gt Int64(10)", + " TableScan: aggregate_test_100 projection=None", + ]; + let formatted = plan.display_indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // verify the grahviz format of the plan + let expected = vec![ + "// Begin DataFusion GraphViz Plan (see https://graphviz.org)", + "digraph {", + " subgraph cluster_1", + " {", + " graph[label=\"LogicalPlan\"]", + " 2[shape=box label=\"Explain\"]", + " 3[shape=box label=\"Projection: #c1\"]", + " 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]", + " 4[shape=box label=\"Filter: #c2 Gt Int64(10)\"]", + " 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]", + " 5[shape=box label=\"TableScan: aggregate_test_100 projection=None\"]", + " 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + " subgraph cluster_6", + " {", + " graph[label=\"Detailed LogicalPlan\"]", + " 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]", + " 8[shape=box label=\"Projection: #c1\\nSchema: [c1:Utf8]\"]", + " 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]", + " 9[shape=box label=\"Filter: #c2 Gt Int64(10)\\nSchema: [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]\"]", + " 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]", + " 10[shape=box label=\"TableScan: aggregate_test_100 projection=None\\nSchema: [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]\"]", + " 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + "}", + "// End DataFusion GraphViz Plan", + ]; + let formatted = plan.display_graphviz().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + // Optimized logical plan + // + let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); + let plan = ctx.optimize(&plan).expect(&msg); + let optimized_logical_schema = plan.schema(); + // Both schema has to be the same + assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); + // + // Verify schema + let expected = vec![ + "Explain [plan_type:Utf8, plan:Utf8]", + " Projection: #c1 [c1:Utf8]", + " Filter: #c2 Gt Int64(10) [c1:Utf8, c2:Int32]", + " TableScan: aggregate_test_100 projection=Some([0, 1]) [c1:Utf8, c2:Int32]", + ]; + let formatted = plan.display_indent_schema().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // Verify the text format of the plan + let expected = vec![ + "Explain", + " Projection: #c1", + " Filter: #c2 Gt Int64(10)", + " TableScan: aggregate_test_100 projection=Some([0, 1])", + ]; + let formatted = plan.display_indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // verify the grahviz format of the plan + let expected = vec![ + "// Begin DataFusion GraphViz Plan (see https://graphviz.org)", + "digraph {", + " subgraph cluster_1", + " {", + " graph[label=\"LogicalPlan\"]", + " 2[shape=box label=\"Explain\"]", + " 3[shape=box label=\"Projection: #c1\"]", + " 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]", + " 4[shape=box label=\"Filter: #c2 Gt Int64(10)\"]", + " 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]", + " 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1])\"]", + " 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + " subgraph cluster_6", + " {", + " graph[label=\"Detailed LogicalPlan\"]", + " 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]", + " 8[shape=box label=\"Projection: #c1\\nSchema: [c1:Utf8]\"]", + " 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]", + " 9[shape=box label=\"Filter: #c2 Gt Int64(10)\\nSchema: [c1:Utf8, c2:Int32]\"]", + " 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]", + " 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1])\\nSchema: [c1:Utf8, c2:Int32]\"]", + " 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + "}", + "// End DataFusion GraphViz Plan", + ]; + let formatted = plan.display_graphviz().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + // Physical plan + // Create plan + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); + let plan = ctx.create_physical_plan(&plan).expect(&msg); + // + // Execute plan + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); + let results = collect(plan).await.expect(&msg); + let actual = result_vec(&results); + // flatten to a single string + let actual = actual.into_iter().map(|r| r.join("\t")).collect::(); + // Since the plan contains path that are environmentally dependant (e.g. full path of the test file), only verify important content + assert!(actual.contains("logical_plan"), "Actual: '{}'", actual); + assert!(actual.contains("Projection: #c1"), "Actual: '{}'", actual); + assert!( + actual.contains("Filter: #c2 Gt Int64(10)"), + "Actual: '{}'", + actual + ); +} + #[tokio::test] async fn csv_explain_verbose() { let mut ctx = ExecutionContext::new(); @@ -1591,6 +1772,195 @@ async fn csv_explain_verbose() { assert!(actual.contains("#c2 Gt Int64(10)"), "Actual: '{}'", actual); } +#[tokio::test] +async fn csv_explain_verbose_plans() { + // This test verify the look of each plan in its full cycle plan creation + + let mut ctx = ExecutionContext::new(); + register_aggregate_csv_by_sql(&mut ctx).await; + let sql = "EXPLAIN VERBOSE SELECT c1 FROM aggregate_test_100 where c2 > 10"; + + // Logical plan + // Create plan + let msg = format!("Creating logical plan for '{}'", sql); + let plan = ctx.create_logical_plan(&sql).expect(&msg); + let logical_schema = plan.schema(); + // + println!("SQL: {}", sql); + + // + // Verify schema + let expected = vec![ + "Explain [plan_type:Utf8, plan:Utf8]", + " Projection: #c1 [c1:Utf8]", + " Filter: #c2 Gt Int64(10) [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]", + " TableScan: aggregate_test_100 projection=None [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]", + ]; + let formatted = plan.display_indent_schema().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // Verify the text format of the plan + let expected = vec![ + "Explain", + " Projection: #c1", + " Filter: #c2 Gt Int64(10)", + " TableScan: aggregate_test_100 projection=None", + ]; + let formatted = plan.display_indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // verify the grahviz format of the plan + let expected = vec![ + "// Begin DataFusion GraphViz Plan (see https://graphviz.org)", + "digraph {", + " subgraph cluster_1", + " {", + " graph[label=\"LogicalPlan\"]", + " 2[shape=box label=\"Explain\"]", + " 3[shape=box label=\"Projection: #c1\"]", + " 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]", + " 4[shape=box label=\"Filter: #c2 Gt Int64(10)\"]", + " 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]", + " 5[shape=box label=\"TableScan: aggregate_test_100 projection=None\"]", + " 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + " subgraph cluster_6", + " {", + " graph[label=\"Detailed LogicalPlan\"]", + " 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]", + " 8[shape=box label=\"Projection: #c1\\nSchema: [c1:Utf8]\"]", + " 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]", + " 9[shape=box label=\"Filter: #c2 Gt Int64(10)\\nSchema: [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]\"]", + " 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]", + " 10[shape=box label=\"TableScan: aggregate_test_100 projection=None\\nSchema: [c1:Utf8, c2:Int32, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:Int64, c10:Utf8, c11:Float32, c12:Float64, c13:Utf8]\"]", + " 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + "}", + "// End DataFusion GraphViz Plan", + ]; + let formatted = plan.display_graphviz().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + // Optimized logical plan + // + let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); + let plan = ctx.optimize(&plan).expect(&msg); + let optimized_logical_schema = plan.schema(); + // Both schema has to be the same + assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref()); + // + // Verify schema + let expected = vec![ + "Explain [plan_type:Utf8, plan:Utf8]", + " Projection: #c1 [c1:Utf8]", + " Filter: #c2 Gt Int64(10) [c1:Utf8, c2:Int32]", + " TableScan: aggregate_test_100 projection=Some([0, 1]) [c1:Utf8, c2:Int32]", + ]; + let formatted = plan.display_indent_schema().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // Verify the text format of the plan + let expected = vec![ + "Explain", + " Projection: #c1", + " Filter: #c2 Gt Int64(10)", + " TableScan: aggregate_test_100 projection=Some([0, 1])", + ]; + let formatted = plan.display_indent().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + // + // verify the grahviz format of the plan + let expected = vec![ + "// Begin DataFusion GraphViz Plan (see https://graphviz.org)", + "digraph {", + " subgraph cluster_1", + " {", + " graph[label=\"LogicalPlan\"]", + " 2[shape=box label=\"Explain\"]", + " 3[shape=box label=\"Projection: #c1\"]", + " 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]", + " 4[shape=box label=\"Filter: #c2 Gt Int64(10)\"]", + " 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]", + " 5[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1])\"]", + " 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + " subgraph cluster_6", + " {", + " graph[label=\"Detailed LogicalPlan\"]", + " 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]", + " 8[shape=box label=\"Projection: #c1\\nSchema: [c1:Utf8]\"]", + " 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]", + " 9[shape=box label=\"Filter: #c2 Gt Int64(10)\\nSchema: [c1:Utf8, c2:Int32]\"]", + " 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]", + " 10[shape=box label=\"TableScan: aggregate_test_100 projection=Some([0, 1])\\nSchema: [c1:Utf8, c2:Int32]\"]", + " 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]", + " }", + "}", + "// End DataFusion GraphViz Plan", + ]; + let formatted = plan.display_graphviz().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + // Physical plan + // Create plan + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); + let plan = ctx.create_physical_plan(&plan).expect(&msg); + // + // Execute plan + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); + let results = collect(plan).await.expect(&msg); + let actual = result_vec(&results); + // flatten to a single string + let actual = actual.into_iter().map(|r| r.join("\t")).collect::(); + // Since the plan contains path that are environmentally dependant(e.g. full path of the test file), only verify important content + assert!( + actual.contains("logical_plan after projection_push_down"), + "Actual: '{}'", + actual + ); + assert!(actual.contains("physical_plan"), "Actual: '{}'", actual); + assert!( + actual.contains("FilterExec: CAST(c2 AS Int64) > 10"), + "Actual: '{}'", + actual + ); + assert!( + actual.contains("ProjectionExec: expr=[c1]"), + "Actual: '{}'", + actual + ); +} + fn aggr_test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("c1", DataType::Utf8, false),