From e637bd1ab2b115a37620cea4e94faaf7b947533c Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:24:51 +0200 Subject: [PATCH 01/13] Add tokomak optimizer --- datafusion/Cargo.toml | 1 + datafusion/src/execution/context.rs | 3 +- datafusion/src/optimizer/mod.rs | 1 + datafusion/src/optimizer/simplification.rs | 467 +++++++++++++++++++++ 4 files changed, 471 insertions(+), 1 deletion(-) create mode 100644 datafusion/src/optimizer/simplification.rs diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 0668ec016ba1..7bc25b4c1a53 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -53,6 +53,7 @@ paste = "^1.0" num_cpus = "1.13.0" chrono = "0.4" async-trait = "0.1.41" +egg = "0.6.0" futures = "0.3" pin-project-lite= "^0.2.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index cfd3b7194429..8a005ad9e3a3 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -22,7 +22,7 @@ use crate::{ information_schema::CatalogWithInformationSchema, }, optimizer::{ - eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder, + eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder,simplification::Tokomak }, physical_optimizer::optimizer::PhysicalOptimizerRule, }; @@ -648,6 +648,7 @@ impl ExecutionConfig { concurrency: num_cpus::get(), batch_size: 8192, optimizers: vec![ + Arc::new(Tokomak::new()), Arc::new(ConstantFolding::new()), Arc::new(EliminateLimit::new()), Arc::new(ProjectionPushDown::new()), diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs index 2fb8a3d62950..018f5ef58794 100644 --- a/datafusion/src/optimizer/mod.rs +++ b/datafusion/src/optimizer/mod.rs @@ -25,4 +25,5 @@ pub mod hash_build_probe_order; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; +pub mod simplification; pub mod utils; diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs new file mode 100644 index 000000000000..51c8a5db1be4 --- /dev/null +++ b/datafusion/src/optimizer/simplification.rs @@ -0,0 +1,467 @@ +use std::vec; + +use crate::{ + logical_plan::LogicalPlan, optimizer::optimizer::OptimizerRule, scalar::ScalarValue, +}; +use crate::{logical_plan::Operator, optimizer::utils}; + +use crate::error::Result as DFResult; +use crate::logical_plan::Expr; +use crate::execution::context::ExecutionProps; +use egg::{rewrite as rw, *}; + +pub struct Tokomak {} + +impl Tokomak { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} +pub type EGraph = egg::EGraph; + +pub fn rules() -> Vec> { + return vec![ + rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"), + rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"), + rw!("commute-and"; "(and ?x ?y)" => "(and ?y ?x)"), + rw!("commute-or"; "(or ?x ?y)" => "(or ?y ?x)"), + rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"), + rw!("commute-neq"; "(<> ?x ?y)" => "(<> ?y ?x)"), + rw!("converse-gt"; "(> ?x ?y)"=> "(< ?y ?x)"), + rw!("converse-gte"; "(>= ?x ?y)"=> "(<= ?y ?x)"), + rw!("converse-lt"; "(< ?x ?y)"=> "(> ?y ?x)"), + rw!("converse-lte"; "(<= ?x ?y)"=> "(>= ?x ?y)"), + rw!("add-0"; "(+ ?x 0)" => "?x"), + rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"), + rw!("minus-0"; "(- ?x 0)" => "?x"), + rw!("mul-0"; "(* ?x 0)" => "0"), + rw!("mul-1"; "(* ?x 1)" => "?x"), + rw!("div-1"; "(/ ?x 1)" => "?x"), + rw!("dist-and-or"; "(or (and ?a ?b) (and ?a ?c))" => "(and ?a (or ?b ?c))"), + rw!("dist-or-and"; "(and (or ?a ?b) (or ?a ?c))" => "(or ?a (and ?b ?c))"), + rw!("not-not"; "(not (not ?x))" => "?x"), + rw!("or-same"; "(or ?x ?x)" => "?x"), + rw!("and-same"; "(and ?x ?x)" => "?x"), + rw!("cancel-sub"; "(- ?a ?a)" => "0"), + rw!("and-true"; "(and true ?x)"=> "?x"), + rw!("0-minus"; "(- 0 ?x)"=> "(negative ?x)"), + rw!("and-false"; "(and false ?x)"=> "false"), + rw!("or-false"; "(or false ?x)"=> "?x"), + rw!("or-true"; "(or true ?x)"=> "true"), + ]; +} + +define_language! { + /// Supported expressions in Tokomak + pub enum TokomakExpr { + "+" = Plus([Id; 2]), + "-" = Minus([Id; 2]), + "*" = Multiply([Id; 2]), + "/" = Divide([Id; 2]), + "%" = Modulus([Id; 2]), + "not" = Not(Id), + "or" = Or([Id; 2]), + "and" = And([Id; 2]), + "=" = Eq([Id; 2]), + "<>" = NotEq([Id; 2]), + "<" = Lt([Id; 2]), + "<=" = LtEq([Id; 2]), + ">" = Gt([Id; 2]), + ">=" = GtEq([Id; 2]), + + "is_not_null" = IsNotNull(Id), + "is_null" = IsNull(Id), + "negative" = Negative(Id), + "between" = Between([Id; 3]), + "between_inverted" = BetweenInverted([Id; 3]), + "like" = Like([Id; 2]), + "not_like" = NotLike([Id; 2]), + Bool(bool), + Int64(i64), + Utf8(String), + LargeUtf8(String), + Column(Symbol), + } +} + +pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { + match expr { + Expr::BinaryExpr { left, op, right } => { + let left = to_tokomak_expr(rec_expr, *left)?; + let right = to_tokomak_expr(rec_expr, *right)?; + let binary_expr = match op { + Operator::Eq => TokomakExpr::Eq, + Operator::NotEq => TokomakExpr::NotEq, + Operator::Lt => TokomakExpr::Lt, + Operator::LtEq => TokomakExpr::LtEq, + Operator::Gt => TokomakExpr::Gt, + Operator::GtEq => TokomakExpr::GtEq, + Operator::Plus => TokomakExpr::Plus, + Operator::Minus => TokomakExpr::Minus, + Operator::Multiply => TokomakExpr::Multiply, + Operator::Divide => TokomakExpr::Divide, + Operator::Modulus => TokomakExpr::Modulus, + Operator::And => TokomakExpr::And, + Operator::Or => TokomakExpr::Or, + Operator::Like => TokomakExpr::Like, + Operator::NotLike => TokomakExpr::NotLike, + }; + Some(rec_expr.add(binary_expr([left, right]))) + } + Expr::Column(c) => Some(rec_expr.add(TokomakExpr::Column(Symbol::from(c)))), + Expr::Literal(ScalarValue::Int64(Some(x))) => Some(rec_expr.add(TokomakExpr::Int64(x))), + Expr::Literal(ScalarValue::Utf8(Some(x))) => Some(rec_expr.add(TokomakExpr::Utf8(x))), + Expr::Literal(ScalarValue::LargeUtf8(Some(x))) => { + Some(rec_expr.add(TokomakExpr::LargeUtf8(x))) + } + Expr::Literal(ScalarValue::Boolean(Some(x))) => { + Some(rec_expr.add(TokomakExpr::Bool(x))) + } + Expr::Not(expr) => { + let left = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::Not(left))) + } + Expr::IsNull(expr) => { + let left = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::IsNull(left))) + } + Expr::IsNotNull(expr) => { + let left = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::IsNotNull(left))) + } + + // not yet supported + _ => None, + } +} + +fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { + let refs = rec_expr.as_ref(); + let index: usize = id.into(); + match refs[index] { + TokomakExpr::Plus(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Plus, + right: Box::new(r), + } + } + TokomakExpr::Minus(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Minus, + right: Box::new(r), + } + } + TokomakExpr::Divide(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Divide, + right: Box::new(r), + } + } + TokomakExpr::Modulus(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Modulus, + right: Box::new(r), + } + } + TokomakExpr::Not(id) => { + let l = to_exprs(&rec_expr, id); + Expr::Not(Box::new(l)) + } + TokomakExpr::IsNotNull(id) => { + let l = to_exprs(&rec_expr, id); + Expr::IsNotNull(Box::new(l)) + } + TokomakExpr::IsNull(id) => { + let l = to_exprs(&rec_expr, id); + Expr::IsNull(Box::new(l)) + } + TokomakExpr::Negative(id) => { + let l = to_exprs(&rec_expr, id); + Expr::Negative(Box::new(l)) + } + + TokomakExpr::Between([expr, low, high]) => { + let left = to_exprs(&rec_expr, expr); + let low_expr = to_exprs(&rec_expr, low); + let high_expr = to_exprs(&rec_expr, high); + + Expr::Between{ + expr: Box::new(left), + negated: false, + low: Box::new(low_expr), + high: Box::new(high_expr), + } + } + TokomakExpr::BetweenInverted([expr, low, high]) => { + let left = to_exprs(&rec_expr, expr); + let low_expr = to_exprs(&rec_expr, low); + let high_expr = to_exprs(&rec_expr, high); + + Expr::Between{ + expr: Box::new(left), + negated: false, + low: Box::new(low_expr), + high: Box::new(high_expr), + } + } + TokomakExpr::Multiply(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Multiply, + right: Box::new(r), + } + } + TokomakExpr::Or(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Or, + right: Box::new(r), + } + } + TokomakExpr::And(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::And, + right: Box::new(r), + } + } + TokomakExpr::Eq(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Eq, + right: Box::new(r), + } + } + TokomakExpr::NotEq(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::NotEq, + right: Box::new(r), + } + } + TokomakExpr::Lt(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Lt, + right: Box::new(r), + } + } + TokomakExpr::LtEq(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::LtEq, + right: Box::new(r), + } + } + TokomakExpr::Gt(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Gt, + right: Box::new(r), + } + } + TokomakExpr::GtEq(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::GtEq, + right: Box::new(r), + } + } + TokomakExpr::Like(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::Like, + right: Box::new(r), + } + } + TokomakExpr::NotLike(ids) => { + let l = to_exprs(&rec_expr, ids[0]); + let r = to_exprs(&rec_expr, ids[1]); + + Expr::BinaryExpr { + left: Box::new(l), + op: Operator::NotLike, + right: Box::new(r), + } + } + + TokomakExpr::Int64(i) => Expr::Literal(ScalarValue::Int64(Some(i))), + TokomakExpr::Utf8(ref i) => Expr::Literal(ScalarValue::Utf8(Some(i.clone()))), + TokomakExpr::LargeUtf8(ref i) => Expr::Literal(ScalarValue::LargeUtf8(Some(i.clone()))), + TokomakExpr::Column(col) => Expr::Column(col.to_string()), + TokomakExpr::Bool(b) => { + Expr::Literal(ScalarValue::Boolean(Some(b))) + } + } +} + +impl OptimizerRule for Tokomak { + fn optimize(&self, plan: &LogicalPlan, props: &ExecutionProps) -> DFResult { + let inputs = plan.inputs(); + let new_inputs: Vec = inputs + .iter() + .map(|plan| self.optimize(plan, props)) + .collect::>>()?; + // optimize all expressions individual (for now) + let mut exprs = vec![]; + for expr in plan.expressions().iter() { + let rec_expr = &mut RecExpr::default(); + let tok_expr = to_tokomak_expr(rec_expr, expr.clone()); + match tok_expr { + None => exprs.push(expr.clone()), + Some(_expr) => { + let runner = Runner::::default() + .with_expr(rec_expr) + .run(&rules()); + + let mut extractor = Extractor::new(&runner.egraph, AstSize); + let (_, best_expr) = extractor.find_best(runner.roots[0]); + let start = best_expr.as_ref().len() - 1; + exprs.push(to_exprs(&best_expr, start.into()).clone()); + } + } + } + + utils::from_plan(plan, &exprs, &new_inputs) + } + + fn name(&self) -> &str { + "tokomak" + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext}; + use egg::Runner; + + #[test] + fn test_add_0() { + let expr = "(+ 0 (x))".parse().unwrap(); + let runner = Runner::::default() + .with_expr(&expr) + .run(&rules()); + + let mut extractor = Extractor::new(&runner.egraph, AstSize); + + let (_best_cost, best_expr) = extractor.find_best(runner.roots[0]); + + assert_eq!(format!("{}", best_expr), "x") + } + + #[test] + fn test_dist_and_or() { + let expr = "(or (or (and (= 1 2) foo) (and (= 1 2) bar)) (and (= 1 2) boo))" + .parse() + .unwrap(); + let runner = Runner::::default() + .with_expr(&expr) + .run(&rules()); + + let mut extractor = Extractor::new(&runner.egraph, AstSize); + + let (_, best_expr) = extractor.find_best(runner.roots[0]); + + assert_eq!( + format!("{}", best_expr), + "(and (= 1 2) (or boo (or foo bar)))" + ) + } + + #[tokio::test] + async fn custom_optimizer() { + // register custom tokomak optimizer, verify that optimization took place + + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())), + ); + + ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()) + .unwrap(); + + // create a plan to run a SQL query + let lp = ctx + .sql("SELECT price*0-price from example") + .unwrap() + .to_logical_plan(); + + assert_eq!( + format!("{}", lp.display_indent()), + "Projection: (- #price)\ + \n TableScan: example projection=Some([0])" + ) + } + + #[tokio::test] + async fn custom_optimizer_filter() { + // register custom tokomak optimizer, verify that optimization took place + + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())), + ); + ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()) + .unwrap(); + + // create a plan to run a SQL query + let lp = ctx + .sql("SELECT price from example WHERE (price=1 AND price=2) OR (price=1 AND price=3)") + .unwrap() + .to_logical_plan(); + + assert_eq!( + format!("{}", lp.display_indent()), + "Filter: #price Eq Int64(1) And #price Eq Int64(2) Or #price Eq Int64(3)\ + \n TableScan: example projection=Some([0])" + ) + } +} From e1a982bc988afb519fe0e49b6b2ac6346296aec6 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:26:54 +0200 Subject: [PATCH 02/13] Add tokomak optimizer to datafusion --- datafusion/src/optimizer/simplification.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 51c8a5db1be4..778c6c3e5f4e 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -1,3 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Expression simplification optimizer. use std::vec; use crate::{ From 23736d95c96655ff756e263cac88aa08b091048a Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:33:10 +0200 Subject: [PATCH 03/13] docs --- datafusion/src/optimizer/simplification.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 778c6c3e5f4e..569784fbc51b 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -16,6 +16,7 @@ // under the License. //! Expression simplification optimizer. +//! Rewrites expressions using equivalence rules and the egg optimization library use std::vec; use crate::{ @@ -28,17 +29,17 @@ use crate::logical_plan::Expr; use crate::execution::context::ExecutionProps; use egg::{rewrite as rw, *}; -pub struct Tokomak {} +pub struct ExprSimplifier {} -impl Tokomak { +impl ExprSimplifier { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -pub type EGraph = egg::EGraph; +pub type EGraph = egg::EGraph; -pub fn rules() -> Vec> { +pub fn rules() -> Vec> { return vec![ rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"), rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"), @@ -71,8 +72,8 @@ pub fn rules() -> Vec> { } define_language! { - /// Supported expressions in Tokomak - pub enum TokomakExpr { + /// Supported expressions in ExprSimplifier + pub enum ExprSimplifierExpr { "+" = Plus([Id; 2]), "-" = Minus([Id; 2]), "*" = Multiply([Id; 2]), @@ -103,7 +104,7 @@ define_language! { } } -pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { +pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { match expr { Expr::BinaryExpr { left, op, right } => { let left = to_tokomak_expr(rec_expr, *left)?; From d0f441b5864670b0716006f4a056109b4150e5c1 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:35:34 +0200 Subject: [PATCH 04/13] Remove csv tests --- datafusion/src/optimizer/simplification.rs | 49 +--------------------- 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 569784fbc51b..8ffe3a0589f8 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -16,7 +16,7 @@ // under the License. //! Expression simplification optimizer. -//! Rewrites expressions using equivalence rules and the egg optimization library +//! Rewrites expressions using equivalence rules and the egg optimization library use std::vec; use crate::{ @@ -436,51 +436,4 @@ mod tests { "(and (= 1 2) (or boo (or foo bar)))" ) } - - #[tokio::test] - async fn custom_optimizer() { - // register custom tokomak optimizer, verify that optimization took place - - let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())), - ); - - ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()) - .unwrap(); - - // create a plan to run a SQL query - let lp = ctx - .sql("SELECT price*0-price from example") - .unwrap() - .to_logical_plan(); - - assert_eq!( - format!("{}", lp.display_indent()), - "Projection: (- #price)\ - \n TableScan: example projection=Some([0])" - ) - } - - #[tokio::test] - async fn custom_optimizer_filter() { - // register custom tokomak optimizer, verify that optimization took place - - let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())), - ); - ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()) - .unwrap(); - - // create a plan to run a SQL query - let lp = ctx - .sql("SELECT price from example WHERE (price=1 AND price=2) OR (price=1 AND price=3)") - .unwrap() - .to_logical_plan(); - - assert_eq!( - format!("{}", lp.display_indent()), - "Filter: #price Eq Int64(1) And #price Eq Int64(2) Or #price Eq Int64(3)\ - \n TableScan: example projection=Some([0])" - ) - } } From 4357a512646cca3ad76a788bca09e4ba0b100b62 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:52:56 +0200 Subject: [PATCH 05/13] undo --- datafusion/src/optimizer/simplification.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 8ffe3a0589f8..a7c6e9550625 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -29,17 +29,17 @@ use crate::logical_plan::Expr; use crate::execution::context::ExecutionProps; use egg::{rewrite as rw, *}; -pub struct ExprSimplifier {} +pub struct Tokomak {} -impl ExprSimplifier { +impl Tokomak { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -pub type EGraph = egg::EGraph; +pub type EGraph = egg::EGraph; -pub fn rules() -> Vec> { +pub fn rules() -> Vec> { return vec![ rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"), rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"), @@ -73,7 +73,7 @@ pub fn rules() -> Vec> { define_language! { /// Supported expressions in ExprSimplifier - pub enum ExprSimplifierExpr { + pub enum TokomakExpr { "+" = Plus([Id; 2]), "-" = Minus([Id; 2]), "*" = Multiply([Id; 2]), From 694d903def6a376ba684eb63763bd6c62c7489af Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:56:08 +0200 Subject: [PATCH 06/13] Clean --- datafusion/src/optimizer/simplification.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index a7c6e9550625..9ae1dff1c9a6 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -104,7 +104,7 @@ define_language! { } } -pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { +pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { match expr { Expr::BinaryExpr { left, op, right } => { let left = to_tokomak_expr(rec_expr, *left)?; @@ -401,7 +401,6 @@ mod tests { use std::sync::Arc; use super::*; - use crate::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext}; use egg::Runner; #[test] From 26b9ea9a65f9ae42def7b5feb5225f3d7b1cc382 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sat, 29 May 2021 19:56:41 +0200 Subject: [PATCH 07/13] fmt --- datafusion/src/execution/context.rs | 3 ++- datafusion/src/optimizer/simplification.rs | 28 ++++++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8a005ad9e3a3..b6b6566af00a 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -22,7 +22,8 @@ use crate::{ information_schema::CatalogWithInformationSchema, }, optimizer::{ - eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder,simplification::Tokomak + eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder, + simplification::Tokomak, }, physical_optimizer::optimizer::PhysicalOptimizerRule, }; diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 9ae1dff1c9a6..a0f863b1ee16 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -25,8 +25,8 @@ use crate::{ use crate::{logical_plan::Operator, optimizer::utils}; use crate::error::Result as DFResult; -use crate::logical_plan::Expr; use crate::execution::context::ExecutionProps; +use crate::logical_plan::Expr; use egg::{rewrite as rw, *}; pub struct Tokomak {} @@ -129,8 +129,12 @@ pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Optio Some(rec_expr.add(binary_expr([left, right]))) } Expr::Column(c) => Some(rec_expr.add(TokomakExpr::Column(Symbol::from(c)))), - Expr::Literal(ScalarValue::Int64(Some(x))) => Some(rec_expr.add(TokomakExpr::Int64(x))), - Expr::Literal(ScalarValue::Utf8(Some(x))) => Some(rec_expr.add(TokomakExpr::Utf8(x))), + Expr::Literal(ScalarValue::Int64(Some(x))) => { + Some(rec_expr.add(TokomakExpr::Int64(x))) + } + Expr::Literal(ScalarValue::Utf8(Some(x))) => { + Some(rec_expr.add(TokomakExpr::Utf8(x))) + } Expr::Literal(ScalarValue::LargeUtf8(Some(x))) => { Some(rec_expr.add(TokomakExpr::LargeUtf8(x))) } @@ -221,7 +225,7 @@ fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { let low_expr = to_exprs(&rec_expr, low); let high_expr = to_exprs(&rec_expr, high); - Expr::Between{ + Expr::Between { expr: Box::new(left), negated: false, low: Box::new(low_expr), @@ -233,7 +237,7 @@ fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { let low_expr = to_exprs(&rec_expr, low); let high_expr = to_exprs(&rec_expr, high); - Expr::Between{ + Expr::Between { expr: Box::new(left), negated: false, low: Box::new(low_expr), @@ -353,16 +357,20 @@ fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { TokomakExpr::Int64(i) => Expr::Literal(ScalarValue::Int64(Some(i))), TokomakExpr::Utf8(ref i) => Expr::Literal(ScalarValue::Utf8(Some(i.clone()))), - TokomakExpr::LargeUtf8(ref i) => Expr::Literal(ScalarValue::LargeUtf8(Some(i.clone()))), - TokomakExpr::Column(col) => Expr::Column(col.to_string()), - TokomakExpr::Bool(b) => { - Expr::Literal(ScalarValue::Boolean(Some(b))) + TokomakExpr::LargeUtf8(ref i) => { + Expr::Literal(ScalarValue::LargeUtf8(Some(i.clone()))) } + TokomakExpr::Column(col) => Expr::Column(col.to_string()), + TokomakExpr::Bool(b) => Expr::Literal(ScalarValue::Boolean(Some(b))), } } impl OptimizerRule for Tokomak { - fn optimize(&self, plan: &LogicalPlan, props: &ExecutionProps) -> DFResult { + fn optimize( + &self, + plan: &LogicalPlan, + props: &ExecutionProps, + ) -> DFResult { let inputs = plan.inputs(); let new_inputs: Vec = inputs .iter() From a67feb06dd4444a8c50cd2aac9fa46c1f5ffeea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 29 May 2021 22:54:46 +0200 Subject: [PATCH 08/13] Fix converse --- datafusion/src/optimizer/simplification.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index a0f863b1ee16..4e24b45867c6 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -50,7 +50,7 @@ pub fn rules() -> Vec> { rw!("converse-gt"; "(> ?x ?y)"=> "(< ?y ?x)"), rw!("converse-gte"; "(>= ?x ?y)"=> "(<= ?y ?x)"), rw!("converse-lt"; "(< ?x ?y)"=> "(> ?y ?x)"), - rw!("converse-lte"; "(<= ?x ?y)"=> "(>= ?x ?y)"), + rw!("converse-lte"; "(<= ?x ?y)"=> "(>= ?y ?x)"), rw!("add-0"; "(+ ?x 0)" => "?x"), rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"), rw!("minus-0"; "(- ?x 0)" => "?x"), From 7eac951a0dcc8fdec6c49a8dfbce08ee7cbfb9bd Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sun, 30 May 2021 11:27:03 +0200 Subject: [PATCH 09/13] Cleanup, add commute-eq --- datafusion/src/optimizer/simplification.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index a0f863b1ee16..03028ca7954e 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -29,6 +29,7 @@ use crate::execution::context::ExecutionProps; use crate::logical_plan::Expr; use egg::{rewrite as rw, *}; +/// Tokomak optimization rule pub struct Tokomak {} impl Tokomak { @@ -37,20 +38,20 @@ impl Tokomak { Self {} } } -pub type EGraph = egg::EGraph; -pub fn rules() -> Vec> { +fn rules() -> Vec> { return vec![ rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"), rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"), + rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"), rw!("commute-and"; "(and ?x ?y)" => "(and ?y ?x)"), rw!("commute-or"; "(or ?x ?y)" => "(or ?y ?x)"), rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"), rw!("commute-neq"; "(<> ?x ?y)" => "(<> ?y ?x)"), - rw!("converse-gt"; "(> ?x ?y)"=> "(< ?y ?x)"), - rw!("converse-gte"; "(>= ?x ?y)"=> "(<= ?y ?x)"), - rw!("converse-lt"; "(< ?x ?y)"=> "(> ?y ?x)"), - rw!("converse-lte"; "(<= ?x ?y)"=> "(>= ?x ?y)"), + rw!("converse-gt"; "(> ?x ?y)" => "(< ?y ?x)"), + rw!("converse-gte"; "(>= ?x ?y)" => "(<= ?y ?x)"), + rw!("converse-lt"; "(< ?x ?y)" => "(> ?y ?x)"), + rw!("converse-lte"; "(<= ?x ?y)" => "(>= ?x ?y)"), rw!("add-0"; "(+ ?x 0)" => "?x"), rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"), rw!("minus-0"; "(- ?x 0)" => "?x"), @@ -73,7 +74,7 @@ pub fn rules() -> Vec> { define_language! { /// Supported expressions in ExprSimplifier - pub enum TokomakExpr { + enum TokomakExpr { "+" = Plus([Id; 2]), "-" = Minus([Id; 2]), "*" = Multiply([Id; 2]), @@ -104,7 +105,7 @@ define_language! { } } -pub fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { +fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { match expr { Expr::BinaryExpr { left, op, right } => { let left = to_tokomak_expr(rec_expr, *left)?; @@ -406,7 +407,6 @@ impl OptimizerRule for Tokomak { #[cfg(test)] mod tests { - use std::sync::Arc; use super::*; use egg::Runner; From 39b3e05d53ab77fbc8ec982d2f0500f0e78fdfcf Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sun, 30 May 2021 11:48:41 +0200 Subject: [PATCH 10/13] Support date literals --- datafusion/src/optimizer/simplification.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index da07b53570da..76c93b6469ad 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -100,6 +100,8 @@ define_language! { Bool(bool), Int64(i64), Utf8(String), + Date32(i32), + Date64(i64), LargeUtf8(String), Column(Symbol), } @@ -142,6 +144,12 @@ fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { Some(rec_expr.add(TokomakExpr::Bool(x))) } + Expr::Literal(ScalarValue::Date32(Some(x))) => { + Some(rec_expr.add(TokomakExpr::Date32(x))) + } + Expr::Literal(ScalarValue::Date64(Some(x))) => { + Some(rec_expr.add(TokomakExpr::Date64(x))) + } Expr::Not(expr) => { let left = to_tokomak_expr(rec_expr, *expr)?; Some(rec_expr.add(TokomakExpr::Not(left))) @@ -363,6 +371,8 @@ fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { } TokomakExpr::Column(col) => Expr::Column(col.to_string()), TokomakExpr::Bool(b) => Expr::Literal(ScalarValue::Boolean(Some(b))), + TokomakExpr::Date32(b) => Expr::Literal(ScalarValue::Date32(Some(b))), + TokomakExpr::Date64(b) => Expr::Literal(ScalarValue::Date64(Some(b))), } } From dfb28abfa5e6d1397175431cd7575bf7e61bd709 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sun, 30 May 2021 12:39:18 +0200 Subject: [PATCH 11/13] Add between support --- datafusion/src/optimizer/simplification.rs | 31 +++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index 76c93b6469ad..d5f090bfaba7 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -151,16 +151,35 @@ fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { - let left = to_tokomak_expr(rec_expr, *expr)?; - Some(rec_expr.add(TokomakExpr::Not(left))) + let e = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::Not(e))) } Expr::IsNull(expr) => { - let left = to_tokomak_expr(rec_expr, *expr)?; - Some(rec_expr.add(TokomakExpr::IsNull(left))) + let e = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::IsNull(e))) } Expr::IsNotNull(expr) => { - let left = to_tokomak_expr(rec_expr, *expr)?; - Some(rec_expr.add(TokomakExpr::IsNotNull(left))) + let e = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::IsNotNull(e))) + } + Expr::Negative(expr) => { + let e = to_tokomak_expr(rec_expr, *expr)?; + Some(rec_expr.add(TokomakExpr::Negative(e))) + } + Expr::Between { + expr, + negated, + low, + high, + } => { + let e = to_tokomak_expr(rec_expr, *expr)?; + let low = to_tokomak_expr(rec_expr, *low)?; + let high = to_tokomak_expr(rec_expr, *high)?; + if negated { + Some(rec_expr.add(TokomakExpr::BetweenInverted([e, low, high]))) + } else { + Some(rec_expr.add(TokomakExpr::Between([e, low, high]))) + } } // not yet supported From ecf95c5549772ec9c660889638f6ff941d7de277 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Sun, 30 May 2021 12:41:25 +0200 Subject: [PATCH 12/13] Remove rules that could be wrong in presence of nulls --- datafusion/src/optimizer/simplification.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index d5f090bfaba7..e8d6fff09e8e 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -55,7 +55,6 @@ fn rules() -> Vec> { rw!("add-0"; "(+ ?x 0)" => "?x"), rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"), rw!("minus-0"; "(- ?x 0)" => "?x"), - rw!("mul-0"; "(* ?x 0)" => "0"), rw!("mul-1"; "(* ?x 1)" => "?x"), rw!("div-1"; "(/ ?x 1)" => "?x"), rw!("dist-and-or"; "(or (and ?a ?b) (and ?a ?c))" => "(and ?a (or ?b ?c))"), @@ -63,7 +62,6 @@ fn rules() -> Vec> { rw!("not-not"; "(not (not ?x))" => "?x"), rw!("or-same"; "(or ?x ?x)" => "?x"), rw!("and-same"; "(and ?x ?x)" => "?x"), - rw!("cancel-sub"; "(- ?a ?a)" => "0"), rw!("and-true"; "(and true ?x)"=> "?x"), rw!("0-minus"; "(- 0 ?x)"=> "(negative ?x)"), rw!("and-false"; "(and false ?x)"=> "false"), From cad2aae8bb88fe245ea7a05fd9791be4ed3990d7 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Mon, 31 May 2021 22:35:51 +0200 Subject: [PATCH 13/13] Casting --- datafusion/src/optimizer/simplification.rs | 77 +++++++++++++++++++++- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs index e8d6fff09e8e..361b3044544f 100644 --- a/datafusion/src/optimizer/simplification.rs +++ b/datafusion/src/optimizer/simplification.rs @@ -17,8 +17,12 @@ //! Expression simplification optimizer. //! Rewrites expressions using equivalence rules and the egg optimization library +use std::fmt::Display; +use std::str::FromStr; use std::vec; - +use arrow::datatypes::DataType; +use log::debug; +use crate::error::DataFusionError; use crate::{ logical_plan::LogicalPlan, optimizer::optimizer::OptimizerRule, scalar::ScalarValue, }; @@ -43,7 +47,6 @@ fn rules() -> Vec> { return vec![ rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"), rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"), - rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"), rw!("commute-and"; "(and ?x ?y)" => "(and ?y ?x)"), rw!("commute-or"; "(or ?x ?y)" => "(or ?y ?x)"), rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"), @@ -70,6 +73,32 @@ fn rules() -> Vec> { ]; } +define_language! { + enum TokomakDataType { + "date32" = Date32, + "date64" = Date64, + } +} + +impl Display for TokomakDataType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self)) + } +} + + +impl FromStr for TokomakDataType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "date32" => Ok(TokomakDataType::Date32), + "date64" => Ok(TokomakDataType::Date64), + _ => Err(DataFusionError::Internal("Parsing string as TokomakDataType failed".to_string())) + } + } +} + define_language! { /// Supported expressions in ExprSimplifier enum TokomakExpr { @@ -102,6 +131,9 @@ define_language! { Date64(i64), LargeUtf8(String), Column(Symbol), + // cast id as expr. Type is encoded as symbol + "cast" = Cast([Id; 2]), + Type(TokomakDataType), } } @@ -180,8 +212,30 @@ fn to_tokomak_expr(rec_expr: &mut RecExpr, expr: Expr) -> Option { + let ty = match data_type { + DataType::Date32 => TokomakDataType::Date32, + DataType::Date64 => TokomakDataType::Date64, + _ => { + debug!("Datetype not yet supported for Cast in tokomak optimizer {:?}", data_type); + + return None; + } + }; + let e = to_tokomak_expr(rec_expr, *expr)?; + let t = rec_expr.add(TokomakExpr::Type(ty)); + + Some(rec_expr.add(TokomakExpr::Cast([e, t]))) + } + // not yet supported - _ => None, + e => { + debug!("Expression not yet supported in tokomak optimizer {:?}", e); + None + }, } } @@ -390,6 +444,23 @@ fn to_exprs(rec_expr: &RecExpr, id: Id) -> Expr { TokomakExpr::Bool(b) => Expr::Literal(ScalarValue::Boolean(Some(b))), TokomakExpr::Date32(b) => Expr::Literal(ScalarValue::Date32(Some(b))), TokomakExpr::Date64(b) => Expr::Literal(ScalarValue::Date64(Some(b))), + TokomakExpr::Cast([e, ty]) => { + let l = to_exprs(&rec_expr, e); + let index:usize = ty.into(); + let dt = match &refs[index] { + TokomakExpr::Type(s) => s, + _ => panic!("Second argument of cast should be type") + }; + let dt = match dt { + TokomakDataType::Date32 => DataType::Date32, + TokomakDataType::Date64 => DataType::Date64, + }; + + Expr::Cast { expr: Box::new(l), data_type: dt} + } + TokomakExpr::Type(_) => { + panic!("Type should only be part of expression") + } } }