Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] feat: support partial merge phase in aggregation #1330

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,77 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("count") {
val df = runQueryAndCompare("select count(*) from lineitem " +
"where l_partkey in (1552, 674, 1062)") {
_ =>
}
checkLengthAndPlan(df, 1)
val df = runQueryAndCompare(
"select count(*) from lineitem where l_partkey in (1552, 674, 1062)") {
checkOperatorMatch[GlutenHashAggregateExecTransformer] }
runQueryAndCompare(
"select count(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("avg") {
val df = runQueryAndCompare("select avg(l_partkey) from lineitem " +
"where l_partkey < 1000") { _ => }
checkLengthAndPlan(df, 1)
val df = runQueryAndCompare(
"select avg(l_partkey) from lineitem where l_partkey < 1000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer] }
runQueryAndCompare(
"select avg(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select avg(cast (l_quantity as DECIMAL(12, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select avg(cast (l_quantity as DECIMAL(22, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("sum") {
val df = runQueryAndCompare("select sum(l_partkey) from lineitem " +
"where l_partkey < 2000") { _ => }
checkLengthAndPlan(df, 1)
runQueryAndCompare(
"select sum(l_partkey) from lineitem where l_partkey < 2000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select sum(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(22, 2))) from lineitem") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(12, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"select sum(cast (l_quantity as DECIMAL(22, 2))), " +
"count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("min and max") {
runQueryAndCompare(
"select min(l_partkey), max(l_partkey) from lineitem where l_partkey < 2000") {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select min(l_partkey), max(l_partkey), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("groupby") {
Expand Down Expand Up @@ -328,7 +382,7 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("union_all three tables") {
val df = runQueryAndCompare(
runQueryAndCompare(
"""
|select count(orderkey) from (
| select l_orderkey as orderkey from lineitem
Expand Down Expand Up @@ -383,6 +437,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select stddev_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("round") {
Expand All @@ -408,6 +467,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select stddev_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("var_samp") {
Expand All @@ -424,6 +488,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select var_samp(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("var_pop") {
Expand All @@ -440,6 +509,11 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select var_pop(l_quantity), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("bit_and and bit_or") {
Expand All @@ -450,13 +524,23 @@ class TestOperator extends WholeStageTransformerSuite {
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select bit_and(l_linenumber), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select bit_or(l_linenumber) from lineitem
|group by l_orderkey;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select bit_or(l_linenumber), count(distinct l_partkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("bool scan") {
Expand Down Expand Up @@ -491,26 +575,39 @@ class TestOperator extends WholeStageTransformerSuite {
}

test("corr covar_pop covar_samp") {
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
runQueryAndCompare(
"""
|select corr(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select covar_pop(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select covar_samp(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"""
|select corr(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select corr(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select covar_pop(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select covar_pop(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
runQueryAndCompare(
"""
|select covar_samp(l_partkey, l_suppkey) from lineitem;
|""".stripMargin) {
checkOperatorMatch[GlutenHashAggregateExecTransformer]
}
runQueryAndCompare(
"select covar_samp(l_partkey, l_suppkey), count(distinct l_orderkey) from lineitem") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]}) == 4)
}}
}

test("Cast double to decimal") {
Expand All @@ -521,4 +618,15 @@ class TestOperator extends WholeStageTransformerSuite {
assert(result.collect()(0).get(0).toString.equals("0.0345678900000000000000000000000000000"))
checkOperatorMatch[GlutenHashAggregateExecTransformer](result)
}

test("corr distinct") {
Seq((1, 1), (2, 2), (2, 2))
.toDF("a", "b").createOrReplaceTempView("view")
runQueryAndCompare("SELECT corr(DISTINCT a, b)," +
"corr(DISTINCT b, a), count(*) FROM view") { df => {
assert(getExecutedPlan(df).count(plan => {
plan.isInstanceOf[GlutenHashAggregateExecTransformer]
}) == 4)
}}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,24 +202,18 @@ class VeloxDataTypeValidationSuite extends WholeStageTransformerSuite {
runQueryAndCompare("select int, date from type1 " +
" group by grouping sets(int, date) sort by date, int limit 1") { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[BatchScanExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[ProjectExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[GlutenHashAggregateExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[SortExecTransformer]).isDefined))
assert(executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[ProjectExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[GlutenHashAggregateExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[SortExecTransformer]))
}}

// Validation: Expand, Filter.
runQueryAndCompare("select date, string, sum(int) from type1 where date > date '1990-01-09' " +
"group by rollup(date, string) order by date, string") { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[ExpandExecTransformer]).isDefined))
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[GlutenFilterExecTransformer]).isDefined))
assert(executedPlan.exists(plan => plan.isInstanceOf[ExpandExecTransformer]))
assert(executedPlan.exists(plan => plan.isInstanceOf[GlutenFilterExecTransformer]))
}}

// Validation: Union.
Expand All @@ -231,9 +225,7 @@ class VeloxDataTypeValidationSuite extends WholeStageTransformerSuite {
| select date as d from type1
|);
|""".stripMargin) { df => {
val executedPlan = getExecutedPlan(df)
assert(executedPlan.exists(plan =>
plan.find(child => child.isInstanceOf[UnionExecTransformer]).isDefined))
assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[UnionExecTransformer]))
}}

// Validation: Limit.
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ JNIEXPORT jboolean JNICALL Java_io_glutenproject_vectorized_ExpressionEvaluatorJ
try {
return planValidator.validate(subPlan);
} catch (std::invalid_argument& e) {
LOG(INFO) << "Faled to validate substrait plan because " << e.what();
LOG(INFO) << "Failed to validate substrait plan because " << e.what();
return false;
}
JNI_METHOD_END(false)
Expand Down
Loading