Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
MohamedAbdeen21 committed May 8, 2024
1 parent 6a41392 commit 72f1395
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 113 deletions.
47 changes: 19 additions & 28 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl CommonSubexprEliminate {
agg_exprs.push(expr.alias(&name));
proj_exprs.push(Expr::Column(Column::from_name(name)));
} else {
let id = expr_identifier(&expr_rewritten, "".to_string());
let id = expr_identifier(&expr_rewritten);
let (qualifier, field) =
expr_rewritten.to_field(&new_input_schema)?;
let out_name = qualified_name(qualifier.as_ref(), field.name());
Expand Down Expand Up @@ -643,24 +643,16 @@ enum VisitRecord {
EnterMark(usize),
/// the node's children were skipped => jump to f_up on same node
JumpMark,
/// Accumulated identifier of sub expression.
ExprItem(Identifier),
}

impl ExprIdentifierVisitor<'_> {
/// Find the first `EnterMark` in the stack, and accumulates every `ExprItem`
/// before it.
fn pop_enter_mark(&mut self) -> Option<(usize, Identifier)> {
let mut desc = String::new();

fn pop_enter_mark(&mut self) -> Option<usize> {
while let Some(item) = self.visit_stack.pop() {
match item {
VisitRecord::EnterMark(idx) => {
return Some((idx, desc));
}
VisitRecord::ExprItem(id) => {
desc.push('|');
desc.push_str(&id);
return Some(idx);
}
VisitRecord::JumpMark => return None,
}
Expand Down Expand Up @@ -692,11 +684,11 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
}

fn f_up(&mut self, expr: &Expr) -> Result<TreeNodeRecursion> {
let Some((down_index, sub_expr_id)) = self.pop_enter_mark() else {
let Some(down_index) = self.pop_enter_mark() else {
return Ok(TreeNodeRecursion::Continue);
};

let expr_id = expr_identifier(expr, sub_expr_id);
let expr_id = expr_identifier(expr);

self.id_array[down_index].0 = self.up_index;
if !self.expr_mask.ignores(expr) {
Expand All @@ -711,15 +703,14 @@ impl TreeNodeVisitor for ExprIdentifierVisitor<'_> {
.or_insert((0, data_type));
*count += 1;
}
self.visit_stack.push(VisitRecord::ExprItem(expr_id));
self.up_index += 1;

Ok(TreeNodeRecursion::Continue)
}
}

fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier {
format!("{{{expr}{sub_expr_identifier}}}")
fn expr_identifier(expr: &Expr) -> Identifier {
format!("#{{{expr}}}")
}

/// Go through an expression tree and generate identifier for every node in this tree.
Expand Down Expand Up @@ -872,15 +863,15 @@ mod test {
)?;

let expected = vec![
(8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"),
(6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
(8, "#{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)}"),
(6, "#{SUM(a + Int32(1)) - AVG(c)}"),
(3, ""),
(2, "{a + Int32(1)|{Int32(1)}|{a}}"),
(2, "#{a + Int32(1)}"),
(0, ""),
(1, ""),
(5, ""),
(4, ""),
(7, "")
(7, ""),
]
.into_iter()
.map(|(number, id)| (number, id.into()))
Expand All @@ -898,15 +889,15 @@ mod test {
)?;

let expected = vec![
(8, "{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"),
(6, "{SUM(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"),
(3, "{SUM(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"),
(2, "{a + Int32(1)|{Int32(1)}|{a}}"),
(8, "#{(SUM(a + Int32(1)) - AVG(c)) * Int32(2)}"),
(6, "#{SUM(a + Int32(1)) - AVG(c)}"),
(3, "#{SUM(a + Int32(1))}"),
(2, "#{a + Int32(1)}"),
(0, ""),
(1, ""),
(5, "{AVG(c)|{c}}"),
(5, "#{AVG(c)}"),
(4, ""),
(7, "")
(7, ""),
]
.into_iter()
.map(|(number, id)| (number, id.into()))
Expand Down Expand Up @@ -991,8 +982,8 @@ mod test {
)?
.build()?;

let expected = "Projection: #{AVG(test.a)} AS AVG(test.a) AS col1, #{AVG(test.a)} AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), #{my_agg(test.a)} AS my_agg(test.a) AS col4, #{my_agg(test.a)} AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
\n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS #{AVG(test.a)}, my_agg(test.a) AS #{my_agg(test.a)}, AVG(test.b) AS col3, AVG(test.c) AS AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
let expected = "Projection: #{AVG(test.a)} AS AVG(test.a) AS col1, #{AVG(test.a)} AS AVG(test.a) AS col2, col3, #{AVG(test.c)} AS AVG(test.c), #{my_agg(test.a)} AS my_agg(test.a) AS col4, #{my_agg(test.a)} AS my_agg(test.a) AS col5, col6, #{my_agg(test.c)} AS my_agg(test.c)\
\n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS #{AVG(test.a)}, my_agg(test.a) AS #{my_agg(test.a)}, AVG(test.b) AS col3, AVG(test.c) AS #{AVG(test.c)}, my_agg(test.b) AS col6, my_agg(test.c) AS #{my_agg(test.c)}]]\
\n TableScan: test";

assert_optimized_plan_eq(expected, &plan);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4187,8 +4187,8 @@ EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE))
logical_plan
01)Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)
02)--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
03)----Aggregate: groupBy=[[t1.y, {CAST(t1.x AS Float64)|{t1.x}} AS t1.x AS alias1]], aggr=[[]]
04)------Projection: CAST(t1.x AS Float64) AS {CAST(t1.x AS Float64)|{t1.x}}, t1.y
03)----Aggregate: groupBy=[[t1.y, #{CAST(t1.x AS Float64)} AS t1.x AS alias1]], aggr=[[]]
04)------Projection: CAST(t1.x AS Float64) AS #{CAST(t1.x AS Float64)}, t1.y
05)--------TableScan: t1 projection=[x, y]
physical_plan
01)ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)]
Expand All @@ -4200,8 +4200,8 @@ physical_plan
07)------------CoalesceBatchesExec: target_batch_size=2
08)--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8
09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, {CAST(t1.x AS Float64)|{t1.x}}@0 as alias1], aggr=[]
11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as {CAST(t1.x AS Float64)|{t1.x}}, y@1 as y]
10)------------------AggregateExec: mode=Partial, gby=[y@1 as y, #{CAST(t1.x AS Float64)}@0 as alias1], aggr=[]
11)--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as #{CAST(t1.x AS Float64)}, y@1 as y]
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]

# create an unbounded table that contains ordered timestamp.
Expand Down
9 changes: 6 additions & 3 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -274,17 +274,20 @@ DROP TABLE arrow_table;

## Use pre-existing files we don't have a way to create avro files yet

statement error DataFusion error: This feature is not implemented: cannot read avro schema without the 'avro' feature enabled
statement ok
CREATE EXTERNAL TABLE avro_table
STORED AS AVRO
WITH HEADER ROW
LOCATION '../../testing/data/avro/simple_enum.avro'


# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
query error DataFusion error: Error during planning: table 'datafusion\.public\.avro_table' not found
query TT
EXPLAIN SELECT * FROM avro_table
----
logical_plan TableScan: avro_table projection=[f1, f2, f3]
physical_plan AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]

# Cleanup
statement error DataFusion error: Execution error: Table 'avro_table' doesn't exist\.
statement ok
DROP TABLE avro_table;
16 changes: 8 additions & 8 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1426,12 +1426,12 @@ query TT
EXPLAIN SELECT x/2, x/2+1 FROM t;
----
logical_plan
01)Projection: {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}} AS t.x / Int64(2) + Int64(1)
02)--Projection: t.x / Int64(2) AS {t.x / Int64(2)|{Int64(2)}|{t.x}}
01)Projection: #{t.x / Int64(2)} AS t.x / Int64(2), #{t.x / Int64(2)} AS t.x / Int64(2) + Int64(1)
02)--Projection: t.x / Int64(2) AS #{t.x / Int64(2)}
03)----TableScan: t projection=[x]
physical_plan
01)ProjectionExec: expr=[{t.x / Int64(2)|{Int64(2)}|{t.x}}@0 as t.x / Int64(2), {t.x / Int64(2)|{Int64(2)}|{t.x}}@0 + 1 as t.x / Int64(2) + Int64(1)]
02)--ProjectionExec: expr=[x@0 / 2 as {t.x / Int64(2)|{Int64(2)}|{t.x}}]
01)ProjectionExec: expr=[#{t.x / Int64(2)}@0 as t.x / Int64(2), #{t.x / Int64(2)}@0 + 1 as t.x / Int64(2) + Int64(1)]
02)--ProjectionExec: expr=[x@0 / 2 as #{t.x / Int64(2)}]
03)----MemoryExec: partitions=1, partition_sizes=[1]

query II
Expand All @@ -1444,12 +1444,12 @@ query TT
EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t;
----
logical_plan
01)Projection: {abs(t.x)|{t.x}} AS abs(t.x), {abs(t.x)|{t.x}} AS abs(t.x) + abs(t.y)
02)--Projection: abs(t.x) AS {abs(t.x)|{t.x}}, t.y
01)Projection: #{abs(t.x)} AS abs(t.x), #{abs(t.x)} AS abs(t.x) + abs(t.y)
02)--Projection: abs(t.x) AS #{abs(t.x)}, t.y
03)----TableScan: t projection=[x, y]
physical_plan
01)ProjectionExec: expr=[{abs(t.x)|{t.x}}@0 as abs(t.x), {abs(t.x)|{t.x}}@0 + abs(y@1) as abs(t.x) + abs(t.y)]
02)--ProjectionExec: expr=[abs(x@0) as {abs(t.x)|{t.x}}, y@1 as y]
01)ProjectionExec: expr=[#{abs(t.x)}@0 as abs(t.x), #{abs(t.x)}@0 + abs(y@1) as abs(t.x) + abs(t.y)]
02)--ProjectionExec: expr=[abs(x@0) as #{abs(t.x)}, y@1 as y]
03)----MemoryExec: partitions=1, partition_sizes=[1]

query II
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
01)Projection: #{t.a / Int64(2)} AS t.a / Int64(2), #{t.a / Int64(2)} AS t.a / Int64(2) + Int64(1)
02)--Projection: t.a / Int64(2) AS #{t.a / Int64(2)}
03)----TableScan: t projection=[a]

statement ok
Expand All @@ -1083,8 +1083,8 @@ query TT
explain select a/2, a/2 + 1 from t
----
logical_plan
01)Projection: {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2), {t.a / Int64(2)|{Int64(2)}|{t.a}} AS t.a / Int64(2) + Int64(1)
02)--Projection: t.a / Int64(2) AS {t.a / Int64(2)|{Int64(2)}|{t.a}}
01)Projection: #{t.a / Int64(2)} AS t.a / Int64(2), #{t.a / Int64(2)} AS t.a / Int64(2) + Int64(1)
02)--Projection: t.a / Int64(2) AS #{t.a / Int64(2)}
03)----TableScan: t projection=[a]

###
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/tpch/q1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ explain select
logical_plan
01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order
03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(Int64(1)) AS COUNT(*)]]
04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(#{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(#{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(Int64(1)) AS COUNT(*)]]
04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS #{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)}, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
05)--------Filter: lineitem.l_shipdate <= Date32("10471")
06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("10471")]
physical_plan
Expand All @@ -54,7 +54,7 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)]
08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as #{lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)}, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------FilterExec: l_shipdate@6 <= 10471
11)--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false
Expand Down
Loading

0 comments on commit 72f1395

Please sign in to comment.