Skip to content

Commit

Permalink
[SPARK-29140][SQL] Handle parameters having "array" of javaType prope…
Browse files Browse the repository at this point in the history
…rly in splitAggregateExpressions

### What changes were proposed in this pull request?

This patch fixes the issue brought by [SPARK-21870](http://issues.apache.org/jira/browse/SPARK-21870): when generating code for parameter type, it doesn't consider array type in javaType. At least we have one, Spark should generate code for BinaryType as `byte[]`, but Spark create the code for BinaryType as `[B` and generated code fails compilation.

Below is the generated code which failed compilation (Line 380):

```
/* 380 */   private void agg_doAggregate_count_0([B agg_expr_1_1, boolean agg_exprIsNull_1_1, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_1) throws java.io.IOException {
/* 381 */     // evaluate aggregate function for count
/* 382 */     boolean agg_isNull_26 = false;
/* 383 */     long agg_value_28 = -1L;
/* 384 */     if (!false && agg_exprIsNull_1_1) {
/* 385 */       long agg_value_31 = agg_unsafeRowAggBuffer_1.getLong(1);
/* 386 */       agg_isNull_26 = false;
/* 387 */       agg_value_28 = agg_value_31;
/* 388 */     } else {
/* 389 */       long agg_value_33 = agg_unsafeRowAggBuffer_1.getLong(1);
/* 390 */
/* 391 */       long agg_value_32 = -1L;
/* 392 */
/* 393 */       agg_value_32 = agg_value_33 + 1L;
/* 394 */       agg_isNull_26 = false;
/* 395 */       agg_value_28 = agg_value_32;
/* 396 */     }
/* 397 */     // update unsafe row buffer
/* 398 */     agg_unsafeRowAggBuffer_1.setLong(1, agg_value_28);
/* 399 */   }
```

There wasn't any test for HashAggregateExec specifically testing this, but randomized test in ObjectHashAggregateSuite could encounter this and that's why ObjectHashAggregateSuite is flaky.

### Why are the changes needed?

Without the fix, generated code from HashAggregateExec may fail compilation.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added new UT. Without the fix, newly added UT fails.

Closes #25830 from HeartSaVioR/SPARK-29140.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
HeartSaVioR authored and maropu committed Sep 21, 2019
1 parent a9ae262 commit f7cc695
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
Expand Up @@ -1811,6 +1811,14 @@ object CodeGenerator extends Logging {

def boxedType(dt: DataType): String = boxedType(javaType(dt))

def typeName(clazz: Class[_]): String = {
if (clazz.isArray) {
typeName(clazz.getComponentType) + "[]"
} else {
clazz.getName
}
}

/**
* Returns the representation of default value for a given Java Type.
* @param jt the string name of the Java type
Expand Down
Expand Up @@ -299,7 +299,9 @@ case class HashAggregateExec(
if (inputVars.forall(_.isDefined)) {
val splitCodes = inputVars.flatten.zipWithIndex.map { case (args, i) =>
val doAggFunc = ctx.freshName(s"doAggregate_${aggNames(i)}")
val argList = args.map(v => s"${v.javaType.getName} ${v.variableName}").mkString(", ")
val argList = args.map { v =>
s"${CodeGenerator.typeName(v.javaType)} ${v.variableName}"
}.mkString(", ")
val doAggFuncName = ctx.addNewFunction(doAggFunc,
s"""
|private void $doAggFunc($argList) throws java.io.IOException {
Expand Down
Expand Up @@ -1028,6 +1028,21 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
}
}
}

test("SPARK-29140: HashAggregateExec aggregating binary type doesn't break codegen compilation") {
val schema = new StructType().add("id", IntegerType, nullable = false)
.add("c1", BinaryType, nullable = true)

withSQLConf(
SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC.key -> "true",
SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1") {
val emptyRows = spark.sparkContext.parallelize(Seq.empty[Row], 1)
val aggDf = spark.createDataFrame(emptyRows, schema)
.groupBy($"id" % 10 as "group")
.agg(countDistinct($"c1"))
checkAnswer(aggDf, Seq.empty[Row])
}
}
}


Expand Down

0 comments on commit f7cc695

Please sign in to comment.