Skip to content

Commit

Permalink
partial merge
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Apr 13, 2023
1 parent 04132e0 commit cee9e48
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,77 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("count") {
val df = runQueryAndCompare("select count(*) from lineitem " +
"where l_partkey in (1552, 674, 1062)") {
_ =>
}
checkLengthAndPlan(df, 1)
val df = runQueryAndCompare(
"select count(*) from lineitem where l_partkey in (1552, 674, 1062)") {
checkOperatorMatch[GlutenHashAggregateExecTransformer] }
runQueryAndCompare(
"select count(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("avg") {
val df = runQueryAndCompare("select avg(l_partkey) from lineitem " +
"where l_partkey < 1000") { _ => }
checkLengthAndPlan(df, 1)
val df = runQueryAndCompare(
"select avg(l_partkey) from lineitem where l_partkey < 1000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer] }
runQueryAndCompare(
"select avg(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select avg(cast (l_quantity as DECIMAL(12, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select avg(cast (l_quantity as DECIMAL(22, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("sum") {
val df = runQueryAndCompare("select sum(l_partkey) from lineitem " +
"where l_partkey < 2000") { _ => }
checkLengthAndPlan(df, 1)
runQueryAndCompare(
"select sum(l_partkey) from lineitem where l_partkey < 2000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select sum(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(22, 2))) from lineitem") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(12, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(22, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("min and max") {
runQueryAndCompare(
"select min(l_partkey), max(l_partkey) from lineitem where l_partkey < 2000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select min(l_partkey), max(l_partkey), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("groupby") {
Expand Down Expand Up @@ -328,7 +382,7 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("union_all three tables") {
val df = runQueryAndCompare(
runQueryAndCompare(
"""
|select count(orderkey) from (
| select l_orderkey as orderkey from lineitem
Expand Down Expand Up @@ -383,6 +437,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select stddev_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("round") {
Expand All @@ -408,6 +467,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select stddev_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("var_samp") {
Expand All @@ -424,6 +488,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select var_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("var_pop") {
Expand All @@ -440,6 +509,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select var_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("bit_and and bit_or") {
Expand All @@ -450,13 +524,23 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select bit_and(l_linenumber), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select bit_or(l_linenumber) from lineitem
|group by l_orderkey;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select bit_or(l_linenumber), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("bool scan") {
Expand Down Expand Up @@ -491,26 +575,39 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("corr covar_pop covar_samp") {
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
runQueryAndCompare(
"""
|select corr(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select covar_pop(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select covar_samp(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select corr(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select corr(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select covar_pop(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select covar_pop(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select covar_samp(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select covar_samp(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("Cast double to decimal") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,24 +202,18 @@ class VeloxDataTypeValidationSuite extends WholeStageTransformerSuite {
runQueryAndCompare("select int, date from type1 " +
" group by grouping sets(int, date) sort by date, int limit 1") { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[BatchScanExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[ProjectExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[GlutenHashAggregateExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[SortExecTransformer]).isDefined))
assert(executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[ProjectExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[GlutenHashAggregateExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[SortExecTransformer]))
}}

// Validation: Expand, Filter.
runQueryAndCompare("select date, string, sum(int) from type1 where date > date '1990-01-09' " +
"group by rollup(date, string) order by date, string") { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[ExpandExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[GlutenFilterExecTransformer]).isDefined))
assert(executedPlan.exists(plan => plan.isInstanceOf[ExpandExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[GlutenFilterExecTransformer]))
}}

// Validation: Union.
Expand All @@ -231,9 +225,7 @@ class VeloxDataTypeValidationSuite extends WholeStageTransformerSuite {
| select date as d from type1
|);
|""".stripMargin) { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[UnionExecTransformer]).isDefined))
assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[UnionExecTransformer]))
}}

// Validation: Limit.
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ JNIEXPORT jboolean JNICALL Java_io_glutenproject_vectorized_ExpressionEvaluatorJ
try {
return planValidator.validate(subPlan);
} catch (std::invalid_argument& e) {
LOG(INFO) << "Faled to validate substrait plan because " << e.what();
LOG(INFO) << "Failed to validate substrait plan because " << e.what();
return false;
}
JNI_METHOD_END(false)
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=main
VELOX_REPO=https://github.com/rui-mo/velox.git
VELOX_BRANCH=companion

#Set on run gluten on HDFS
ENABLE_HDFS=OFF
Expand Down
Loading

0 comments on commit cee9e48

Please sign in to comment.