From 4a6d40533292af942c6ecb6d570ad35bd17fd7ef Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 17 Nov 2016 09:44:02 +0900 Subject: [PATCH 1/5] Support codegen for HiveUdf --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 90 ++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 4590197548104..73925a42d3f7d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -35,14 +35,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends Expression with HiveInspectors with CodegenFallback with Logging { + extends Expression with HiveInspectors with Logging { override def deterministic: Boolean = isUDFDeterministic @@ -85,6 +85,92 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + // Generate codes used to convert the arguments to Hive types for `HiveSimpleUDF` + private[this] def genCodeForConverter(ctx: CodegenContext, index: Int): String = { + val converterClassName = classOf[Any => Any].getName + val hiveUDFClassName = classOf[HiveSimpleUDF].getName + + val converterTerm = ctx.freshName("converter") + val expressionIdx = ctx.references.size - 1 + ctx.addMutableState(converterClassName, converterTerm, + s"this.$converterTerm = ($converterClassName)" + + s"(($hiveUDFClassName) references[$expressionIdx]).getWrapper($index);") + converterTerm + } + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val hiveUDF = ctx.addReferenceObj("hiveUDF", this) + val converterClassName = classOf[Any => Any].getName + + // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type + val catalystConverterTerm = ctx.freshName("catalystConverter") + ctx.addMutableState(converterClassName, catalystConverterTerm, + s"this.$catalystConverterTerm = ($converterClassName)$hiveUDF.getUnwrapper();") + + val resultTerm = ctx.freshName("result") + + // This must be called before children expressions' codegen + // because ctx.references is used in genCodeForConverter + val converterTerms = children.indices.map(genCodeForConverter(ctx, _)) + + // codegen for children expressions + val evals = children.map(_.genCode(ctx)) + + // Generate the codes for expressions and calling `HiveSimpleUDF`. + // We need to get the boxedType of dataType's javaType here. Because for the dataType + // such as IntegerType, its javaType is `int` and the returned type of the function is Object. + // Trying to convert an Object to `int` will cause casting exception. + val evalCode = evals.map(_.code).mkString + val (converters, funcArguments) = converterTerms.zipWithIndex.map { case (converter, i) => + val eval = evals(i) + val argTerm = ctx.freshName("arg") + val convert = s"Object $argTerm = ${eval.isNull} ? null : $converter.apply(${eval.value});" + (convert, argTerm) + }.unzip + + val getFuncResult = s"$hiveUDF.callUdf(${funcArguments.mkString(", ")})" + val callFunc = + s""" + ${ctx.boxedType(dataType)} $resultTerm = null; + try { + $resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($getFuncResult); + } catch (Exception e) { + throw new org.apache.spark.SparkException($hiveUDF.udfErrorMessage(), e); + } + """ + + ev.copy(code = s""" + $evalCode + ${converters.mkString("\n")} + $callFunc + + boolean ${ev.isNull} = $resultTerm == null; + ${ctx.boxedType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${ev.value} = $resultTerm; + }""") + } + + def getChildren(): Seq[Expression] = children + + def getWrapper(index: Int): (Any) => Any = wrappers(index) + + def getUnwrapper(): (Any) => Any = unwrapper + + @scala.annotation.varargs + def callUdf(args: AnyRef*): AnyRef = { + FunctionRegistry.invoke( + method, + function, + conversionHelper.convertIfNecessary(args: _*): _*) + } + + lazy val udfErrorMessage = { + val funcCls = funcWrapper.functionClassName + val inputTypes = children.map(_.dataType.simpleString).mkString(", ") + s"Failed to execute user defined function($funcCls: ($inputTypes) => ${dataType.simpleString})" + } + // TODO: Finish input output types. override def eval(input: InternalRow): Any = { val inputs = wrap(children.map(_.eval(input)), wrappers, cached, inputDataTypes) From c29e5a3dd7a77d976f6c1bd0ca6bd5f15cdc74c9 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 17 Nov 2016 16:12:12 +0900 Subject: [PATCH 2/5] Add benchmark results --- .../sql/hive/execution/UDFLongToString.java | 26 +++++++++++ .../benchmark/HiveUDFBenchmark.scala | 44 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java new file mode 100644 index 0000000000000..164d3f3120996 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +public class UDFLongToString extends UDF { + public String evaluate(Long l) { + return l.toString(); + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala new file mode 100644 index 0000000000000..e1f2cba189174 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.sql.hive.execution.UDFLongToString +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveUDFBenchmark extends BenchmarkBase with TestHiveSingleton { + + ignore("HiveSimpleUDF") { + val N = 2L << 26 + sparkSession.range(N).createOrReplaceTempView("t") + sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFLongToString].getName}'") + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive UDF: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Call Hive UDF wholestage off 3 / 4 40941.7 0.0 1.0X + Call Hive UDF wholestage on 1 / 2 96620.3 0.0 2.4X + */ + runBenchmark("Call Hive UDF", N) { + sparkSession.sql("SELECT f(id) FROM t") + } + sparkSession.sql("DROP TEMPORARY FUNCTION IF EXISTS f") + } +} From 5eb3def2b88adc41b78799cfcd1e60d590276828 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 18 Nov 2016 01:55:30 +0900 Subject: [PATCH 3/5] Support codegen for HiveGenericUdf --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 143 ++++++++++++++++-- .../sql/hive/execution/UDFLongToString.java | 26 ---- .../benchmark/HiveUDFBenchmark.scala | 44 ------ .../benchmark/HiveUDFsBenchmark.scala | 66 ++++++++ 4 files changed, 197 insertions(+), 82 deletions(-) delete mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 73925a42d3f7d..2fea3fab013b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions +import org.apache.spark.api.java.function.Function0 import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -100,9 +101,9 @@ private[hive] case class HiveSimpleUDF( protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val hiveUDF = ctx.addReferenceObj("hiveUDF", this) - val converterClassName = classOf[Any => Any].getName // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type + val converterClassName = classOf[Any => Any].getName val catalystConverterTerm = ctx.freshName("catalystConverter") ctx.addMutableState(converterClassName, catalystConverterTerm, s"this.$catalystConverterTerm = ($converterClassName)$hiveUDF.getUnwrapper();") @@ -121,6 +122,7 @@ private[hive] case class HiveSimpleUDF( // such as IntegerType, its javaType is `int` and the returned type of the function is Object. // Trying to convert an Object to `int` will cause casting exception. val evalCode = evals.map(_.code).mkString + val (converters, funcArguments) = converterTerms.zipWithIndex.map { case (converter, i) => val eval = evals(i) val argTerm = ctx.freshName("arg") @@ -191,20 +193,19 @@ private[hive] case class HiveSimpleUDF( } // Adapter from Catalyst ExpressionResult to Hive DeferredObject -private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataType) - extends DeferredObject with HiveInspectors { +private[hive] class DeferredObjectAdapter extends DeferredObject { - private var func: () => Any = _ - def set(func: () => Any): Unit = { + private var func: Function0[AnyRef] = _ + def set(func: Function0[AnyRef]): Unit = { this.func = func } override def prepare(i: Int): Unit = {} - override def get(): AnyRef = wrap(func(), oi, dataType) + override def get(): AnyRef = func.call() } private[hive] case class HiveGenericUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends Expression with HiveInspectors with CodegenFallback with Logging { + extends Expression with HiveInspectors with Logging { override def nullable: Boolean = true @@ -220,10 +221,15 @@ private[hive] case class HiveGenericUDF( private lazy val argumentInspectors = children.map(toInspector) @transient - private lazy val returnInspector = { + lazy val returnInspector = { function.initializeAndFoldConstants(argumentInspectors.toArray) } + @transient + private lazy val wrappers = argumentInspectors.zip(children).map { case (inspect, child) => + wrapperFor(inspect, child.dataType) + }.toArray + @transient private lazy val unwrapper = unwrapperFor(returnInspector) @@ -234,12 +240,122 @@ private[hive] case class HiveGenericUDF( } @transient - private lazy val deferredObjects = argumentInspectors.zip(children).map { case (inspect, child) => - new DeferredObjectAdapter(inspect, child.dataType) + private lazy val deferredObjects = Seq.tabulate(children.size) { i => + new DeferredObjectAdapter() }.toArray[DeferredObject] override lazy val dataType: DataType = inspectorToDataType(returnInspector) + // Generate codes used to convert the arguments to Hive types for `HiveGenericUDF` + private[this] def genCodeForConverter(ctx: CodegenContext, index: Int): String = { + val converterClassName = classOf[Any => Any].getName + val hiveUDFClassName = classOf[HiveGenericUDF].getName + + val converterTerm = ctx.freshName("converter") + val expressionIdx = ctx.references.size - 1 + ctx.addMutableState(converterClassName, converterTerm, + s"this.$converterTerm = ($converterClassName)" + + s"(($hiveUDFClassName) references[$expressionIdx]).getWrapper($index);") + converterTerm + } + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val hiveUDF = ctx.addReferenceObj("hiveUDF", this) + + // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type + val converterClassName = classOf[Any => Any].getName + val catalystConverterTerm = ctx.freshName("catalystConverter") + ctx.addMutableState(converterClassName, catalystConverterTerm, + s"this.$catalystConverterTerm = ($converterClassName)$hiveUDF.getUnwrapper();") + + val resultTerm = ctx.freshName("result") + + // This must be called before children expressions' codegen + // because ctx.references is used in genCodeForConverter + val converterTerms = children.indices.map(genCodeForConverter(ctx, _)) + + // Generate codes used to convert input values into `DeferredObject`s + val deferredObjectsClassName = classOf[DeferredObject].getName + "[]" + val deferredObjectsTerm = ctx.freshName("deferredObjects") + ctx.addMutableState(deferredObjectsClassName, deferredObjectsTerm, + s"this.$deferredObjectsTerm = ($deferredObjectsClassName)$hiveUDF.getDeferredObjects();") + + // Make sure initialized + val objectInspectorClassName = classOf[ObjectInspector].getName + val objectInspectorTerm = ctx.freshName("objectInspector") + ctx.addMutableState(objectInspectorClassName, objectInspectorTerm, + s"this.$objectInspectorTerm = ($objectInspectorClassName)$hiveUDF.returnInspector();") + + // codegen for children expressions + val evals = children.map(_.genCode(ctx)) + + val (converters, funcArguments) = converterTerms.zipWithIndex.map { case (converter, i) => + val eval = evals(i) + val argTerm = ctx.freshName("arg") + val convert = s"final Object $argTerm = ${eval.isNull} ? " + + s"null : $converter.apply(${eval.value});" + (convert, argTerm) + }.unzip + + // Generate the codes for expressions and calling `HiveGenericUDF`. + val evalCode = evals.map(_.code).mkString + + val funcClassName = classOf[Function0[AnyRef]].getName + val deferredObjectAdapterClassName = classOf[DeferredObjectAdapter].getName + val argsTerm = evals.zipWithIndex.map { case (eval, i) => + s""" + (($deferredObjectAdapterClassName) this.$deferredObjectsTerm[$i]) + .set(new ${funcClassName}() { + @Override + public Object call() { + return ${funcArguments(i)}; + } + }); + """.stripMargin + } + + val getFuncResult = s"$hiveUDF.callUdf($deferredObjectsTerm)" + val callFunc = + s""" + ${ctx.boxedType(dataType)} $resultTerm = null; + try { + $resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($getFuncResult); + } catch (Exception e) { + throw new org.apache.spark.SparkException($hiveUDF.udfErrorMessage(), e); + } + """ + + ev.copy(code = s""" + $evalCode + ${converters.mkString("\n")} + ${argsTerm.mkString("\n")} + $callFunc + + boolean ${ev.isNull} = $resultTerm == null; + ${ctx.boxedType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${ev.value} = $resultTerm; + }""") + } + + def getChildren(): Seq[Expression] = children + + def getWrapper(index: Int): (Any) => Any = wrappers(index) + + def getUnwrapper(): (Any) => Any = unwrapper + + def getDeferredObjects(): Array[DeferredObject] = deferredObjects + + def callUdf(args: Array[DeferredObject]): AnyRef = { + function.evaluate(deferredObjects) + } + + lazy val udfErrorMessage = { + val funcCls = funcWrapper.functionClassName + val inputTypes = children.map(_.dataType.simpleString).mkString(", ") + s"Failed to execute user defined function($funcCls: ($inputTypes) => ${dataType.simpleString})" + } + override def eval(input: InternalRow): Any = { returnInspector // Make sure initialized. @@ -247,8 +363,11 @@ private[hive] case class HiveGenericUDF( val length = children.length while (i < length) { val idx = i - deferredObjects(i).asInstanceOf[DeferredObjectAdapter] - .set(() => children(idx).eval(input)) + deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set(new Function0[AnyRef]() { + override def call(): AnyRef = { + wrappers(idx)(children(idx).eval(input)).asInstanceOf[AnyRef] + } + }) i += 1 } unwrapper(function.evaluate(deferredObjects)) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java deleted file mode 100644 index 164d3f3120996..0000000000000 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFLongToString.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution; - -import org.apache.hadoop.hive.ql.exec.UDF; - -public class UDFLongToString extends UDF { - public String evaluate(Long l) { - return l.toString(); - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala deleted file mode 100644 index e1f2cba189174..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFBenchmark.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.benchmark - -import org.apache.spark.sql.hive.execution.UDFLongToString -import org.apache.spark.sql.hive.test.TestHiveSingleton - -class HiveUDFBenchmark extends BenchmarkBase with TestHiveSingleton { - - ignore("HiveSimpleUDF") { - val N = 2L << 26 - sparkSession.range(N).createOrReplaceTempView("t") - sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFLongToString].getName}'") - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 - Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz - - Call Hive UDF: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Call Hive UDF wholestage off 3 / 4 40941.7 0.0 1.0X - Call Hive UDF wholestage on 1 / 2 96620.3 0.0 2.4X - */ - runBenchmark("Call Hive UDF", N) { - sparkSession.sql("SELECT f(id) FROM t") - } - sparkSession.sql("DROP TEMPORARY FUNCTION IF EXISTS f") - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala new file mode 100644 index 0000000000000..90a1f5e74b95c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.hive.ql.udf.UDFToDouble +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs + +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveUDFsBenchmark extends BenchmarkBase with TestHiveSingleton { + + ignore("HiveSimpleUDF") { + val N = 2L << 26 + sparkSession.range(N).createOrReplaceTempView("t") + sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFToDouble].getName}'") + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive UDF: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ---------------------------------------------------------------------------------------------- + Call Hive UDF wholestage off 3 / 3 43794.0 0.0 1.0X + Call Hive UDF wholestage on 1 / 2 101551.3 0.0 2.3X + */ + runBenchmark("Call Hive UDF", N) { + sparkSession.sql("SELECT f(id) FROM t") + } + sparkSession.sql("DROP TEMPORARY FUNCTION IF EXISTS f") + } + + ignore("HiveGenericUDF") { + val N = 2L << 26 + sparkSession.range(N).createOrReplaceTempView("t") + sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[GenericUDFAbs].getName}'") + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive generic UDF: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ---------------------------------------------------------------------------------------------- + Call Hive generic UDF wholestage off 2 / 2 86919.9 0.0 1.0X + Call Hive generic UDF wholestage on 1 / 1 143314.2 0.0 1.6X + */ + runBenchmark("Call Hive generic UDF", N) { + sparkSession.sql("SELECT f(id) FROM t") + } + sparkSession.sql("DROP TEMPORARY FUNCTION IF EXISTS f") + } +} From b2530f7be0c176cdbe2e9d399f6cda026071d616 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 18 Nov 2016 14:29:37 +0900 Subject: [PATCH 4/5] Brush up code --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 259 ++++++++---------- 1 file changed, 110 insertions(+), 149 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 2fea3fab013b0..6b5020e318982 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -41,9 +41,86 @@ import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ +/** Utility trait for [[HiveSimpleUDF]] and [[HiveGenericUDF]]. */ +private[hive] trait HiveUDFCodegen { + self: Expression with HiveInspectors => + + val funcWrapper: HiveFunctionWrapper + val children: Seq[Expression] + + def getArgumentInspector(i: Int): ObjectInspector + + def getReturnInspector(): ObjectInspector + + @transient + protected lazy val wrappers = children.zipWithIndex.map { case (expr, i) => + wrapperFor(getArgumentInspector(i), expr.dataType) + }.toArray + + @transient + protected lazy val unwrapper = unwrapperFor(getReturnInspector()) + + lazy val udfErrorMessage = { + val funcCls = funcWrapper.functionClassName + val inputTypes = children.map(_.dataType.simpleString).mkString(", ") + s"Failed to execute user defined function($funcCls: ($inputTypes) => ${dataType.simpleString})" + } + + def getChildren(): Seq[Expression] = children + + def getWrapper(index: Int): (Any) => Any = wrappers(index) + + def getUnwrapper(): (Any) => Any = unwrapper + + def genCodeForWrapper(ctx: CodegenContext, evals: Seq[ExprCode], udf: String) + : (Seq[String], Seq[String]) = { + // Generate codes used to convert an argument to a Hive type + val converterTerms = children.indices.map { index => + val converterClassName = classOf[Any => Any].getName + val converterTerm = ctx.freshName("converter") + ctx.addMutableState(converterClassName, converterTerm, + s"this.$converterTerm = ($converterClassName) $udf.getWrapper($index);") + converterTerm + } + + converterTerms.zipWithIndex.map { case (converter, i) => + val eval = evals(i) + val argTerm = ctx.freshName("arg") + val convert = s"final Object $argTerm = ${eval.isNull} ? " + + s"null : $converter.apply(${eval.value});" + (convert, argTerm) + }.unzip + } + + def genCodeForUnwrapper(ctx: CodegenContext, ev: ExprCode, udf: String, callFunc: String) + : String = { + val resultTerm = ctx.freshName("result") + + // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type + val converterClassName = classOf[Any => Any].getName + val catalystConverterTerm = ctx.freshName("catalystConverter") + ctx.addMutableState(converterClassName, catalystConverterTerm, + s"this.$catalystConverterTerm = ($converterClassName)$udf.getUnwrapper();") + + s""" + ${ctx.boxedType(dataType)} $resultTerm = null; + try { + $resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($callFunc); + } catch (Exception e) { + throw new org.apache.spark.SparkException($udf.udfErrorMessage(), e); + } + boolean ${ev.isNull} = $resultTerm == null; + ${ctx.boxedType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${ev.value} = $resultTerm; + } + """ + } +} + private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends Expression with HiveInspectors with Logging { + extends Expression with HiveInspectors with HiveUDFCodegen with Logging { override def deterministic: Boolean = isUDFDeterministic @@ -73,12 +150,12 @@ private[hive] case class HiveSimpleUDF( override lazy val dataType = javaClassToDataType(method.getReturnType) - @transient - private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + def getArgumentInspector(i: Int): ObjectInspector = toInspector(children(i)) - @transient - lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( - method.getGenericReturnType, ObjectInspectorOptions.JAVA)) + def getReturnInspector(): ObjectInspector = { + ObjectInspectorFactory.getReflectionObjectInspector( + method.getGenericReturnType, ObjectInspectorOptions.JAVA) + } @transient private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) @@ -86,34 +163,17 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray - // Generate codes used to convert the arguments to Hive types for `HiveSimpleUDF` - private[this] def genCodeForConverter(ctx: CodegenContext, index: Int): String = { - val converterClassName = classOf[Any => Any].getName - val hiveUDFClassName = classOf[HiveSimpleUDF].getName - - val converterTerm = ctx.freshName("converter") - val expressionIdx = ctx.references.size - 1 - ctx.addMutableState(converterClassName, converterTerm, - s"this.$converterTerm = ($converterClassName)" + - s"(($hiveUDFClassName) references[$expressionIdx]).getWrapper($index);") - converterTerm + @scala.annotation.varargs + def callUdf(args: AnyRef*): AnyRef = { + FunctionRegistry.invoke( + method, + function, + conversionHelper.convertIfNecessary(args: _*): _*) } protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val hiveUDF = ctx.addReferenceObj("hiveUDF", this) - // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type - val converterClassName = classOf[Any => Any].getName - val catalystConverterTerm = ctx.freshName("catalystConverter") - ctx.addMutableState(converterClassName, catalystConverterTerm, - s"this.$catalystConverterTerm = ($converterClassName)$hiveUDF.getUnwrapper();") - - val resultTerm = ctx.freshName("result") - - // This must be called before children expressions' codegen - // because ctx.references is used in genCodeForConverter - val converterTerms = children.indices.map(genCodeForConverter(ctx, _)) - // codegen for children expressions val evals = children.map(_.genCode(ctx)) @@ -123,54 +183,15 @@ private[hive] case class HiveSimpleUDF( // Trying to convert an Object to `int` will cause casting exception. val evalCode = evals.map(_.code).mkString - val (converters, funcArguments) = converterTerms.zipWithIndex.map { case (converter, i) => - val eval = evals(i) - val argTerm = ctx.freshName("arg") - val convert = s"Object $argTerm = ${eval.isNull} ? null : $converter.apply(${eval.value});" - (convert, argTerm) - }.unzip + val (converters, funcArguments) = genCodeForWrapper(ctx, evals, hiveUDF) - val getFuncResult = s"$hiveUDF.callUdf(${funcArguments.mkString(", ")})" - val callFunc = - s""" - ${ctx.boxedType(dataType)} $resultTerm = null; - try { - $resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($getFuncResult); - } catch (Exception e) { - throw new org.apache.spark.SparkException($hiveUDF.udfErrorMessage(), e); - } - """ + val callFunc = s"$hiveUDF.callUdf(${funcArguments.mkString(", ")})" ev.copy(code = s""" $evalCode ${converters.mkString("\n")} - $callFunc - - boolean ${ev.isNull} = $resultTerm == null; - ${ctx.boxedType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; - if (!${ev.isNull}) { - ${ev.value} = $resultTerm; - }""") - } - - def getChildren(): Seq[Expression] = children - - def getWrapper(index: Int): (Any) => Any = wrappers(index) - - def getUnwrapper(): (Any) => Any = unwrapper - - @scala.annotation.varargs - def callUdf(args: AnyRef*): AnyRef = { - FunctionRegistry.invoke( - method, - function, - conversionHelper.convertIfNecessary(args: _*): _*) - } - - lazy val udfErrorMessage = { - val funcCls = funcWrapper.functionClassName - val inputTypes = children.map(_.dataType.simpleString).mkString(", ") - s"Failed to execute user defined function($funcCls: ($inputTypes) => ${dataType.simpleString})" + ${genCodeForUnwrapper(ctx, ev, hiveUDF, callFunc)} + """) } // TODO: Finish input output types. @@ -205,7 +226,7 @@ private[hive] class DeferredObjectAdapter extends DeferredObject { private[hive] case class HiveGenericUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends Expression with HiveInspectors with Logging { + extends Expression with HiveInspectors with HiveUDFCodegen with Logging { override def nullable: Boolean = true @@ -221,18 +242,10 @@ private[hive] case class HiveGenericUDF( private lazy val argumentInspectors = children.map(toInspector) @transient - lazy val returnInspector = { + private lazy val returnInspector = { function.initializeAndFoldConstants(argumentInspectors.toArray) } - @transient - private lazy val wrappers = argumentInspectors.zip(children).map { case (inspect, child) => - wrapperFor(inspect, child.dataType) - }.toArray - - @transient - private lazy val unwrapper = unwrapperFor(returnInspector) - @transient private lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) @@ -246,33 +259,18 @@ private[hive] case class HiveGenericUDF( override lazy val dataType: DataType = inspectorToDataType(returnInspector) - // Generate codes used to convert the arguments to Hive types for `HiveGenericUDF` - private[this] def genCodeForConverter(ctx: CodegenContext, index: Int): String = { - val converterClassName = classOf[Any => Any].getName - val hiveUDFClassName = classOf[HiveGenericUDF].getName - - val converterTerm = ctx.freshName("converter") - val expressionIdx = ctx.references.size - 1 - ctx.addMutableState(converterClassName, converterTerm, - s"this.$converterTerm = ($converterClassName)" + - s"(($hiveUDFClassName) references[$expressionIdx]).getWrapper($index);") - converterTerm - } + def getArgumentInspector(i: Int): ObjectInspector = argumentInspectors(i) - protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val hiveUDF = ctx.addReferenceObj("hiveUDF", this) + def getReturnInspector(): ObjectInspector = returnInspector - // Generate codes used to convert the returned value of `HiveSimpleUDF` to Catalyst type - val converterClassName = classOf[Any => Any].getName - val catalystConverterTerm = ctx.freshName("catalystConverter") - ctx.addMutableState(converterClassName, catalystConverterTerm, - s"this.$catalystConverterTerm = ($converterClassName)$hiveUDF.getUnwrapper();") + def getDeferredObjects(): Array[DeferredObject] = deferredObjects - val resultTerm = ctx.freshName("result") + def callUdf(args: Array[DeferredObject]): AnyRef = { + function.evaluate(deferredObjects) + } - // This must be called before children expressions' codegen - // because ctx.references is used in genCodeForConverter - val converterTerms = children.indices.map(genCodeForConverter(ctx, _)) + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val hiveUDF = ctx.addReferenceObj("hiveUDF", this) // Generate codes used to convert input values into `DeferredObject`s val deferredObjectsClassName = classOf[DeferredObject].getName + "[]" @@ -284,28 +282,23 @@ private[hive] case class HiveGenericUDF( val objectInspectorClassName = classOf[ObjectInspector].getName val objectInspectorTerm = ctx.freshName("objectInspector") ctx.addMutableState(objectInspectorClassName, objectInspectorTerm, - s"this.$objectInspectorTerm = ($objectInspectorClassName)$hiveUDF.returnInspector();") + s"this.$objectInspectorTerm = ($objectInspectorClassName)$hiveUDF.getReturnInspector();") // codegen for children expressions val evals = children.map(_.genCode(ctx)) - val (converters, funcArguments) = converterTerms.zipWithIndex.map { case (converter, i) => - val eval = evals(i) - val argTerm = ctx.freshName("arg") - val convert = s"final Object $argTerm = ${eval.isNull} ? " + - s"null : $converter.apply(${eval.value});" - (convert, argTerm) - }.unzip - // Generate the codes for expressions and calling `HiveGenericUDF`. val evalCode = evals.map(_.code).mkString + val (converters, funcArguments) = genCodeForWrapper(ctx, evals, hiveUDF) + val funcClassName = classOf[Function0[AnyRef]].getName val deferredObjectAdapterClassName = classOf[DeferredObjectAdapter].getName val argsTerm = evals.zipWithIndex.map { case (eval, i) => s""" (($deferredObjectAdapterClassName) this.$deferredObjectsTerm[$i]) .set(new ${funcClassName}() { + @Override public Object call() { return ${funcArguments(i)}; @@ -314,46 +307,14 @@ private[hive] case class HiveGenericUDF( """.stripMargin } - val getFuncResult = s"$hiveUDF.callUdf($deferredObjectsTerm)" - val callFunc = - s""" - ${ctx.boxedType(dataType)} $resultTerm = null; - try { - $resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($getFuncResult); - } catch (Exception e) { - throw new org.apache.spark.SparkException($hiveUDF.udfErrorMessage(), e); - } - """ + val callFunc = s"$hiveUDF.callUdf($deferredObjectsTerm)" ev.copy(code = s""" $evalCode ${converters.mkString("\n")} ${argsTerm.mkString("\n")} - $callFunc - - boolean ${ev.isNull} = $resultTerm == null; - ${ctx.boxedType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; - if (!${ev.isNull}) { - ${ev.value} = $resultTerm; - }""") - } - - def getChildren(): Seq[Expression] = children - - def getWrapper(index: Int): (Any) => Any = wrappers(index) - - def getUnwrapper(): (Any) => Any = unwrapper - - def getDeferredObjects(): Array[DeferredObject] = deferredObjects - - def callUdf(args: Array[DeferredObject]): AnyRef = { - function.evaluate(deferredObjects) - } - - lazy val udfErrorMessage = { - val funcCls = funcWrapper.functionClassName - val inputTypes = children.map(_.dataType.simpleString).mkString(", ") - s"Failed to execute user defined function($funcCls: ($inputTypes) => ${dataType.simpleString})" + ${genCodeForUnwrapper(ctx, ev, hiveUDF, callFunc)} + """) } override def eval(input: InternalRow): Any = { From d7be870d338e70882b7fc000e450a6d20787c70d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 18 Nov 2016 16:02:43 +0900 Subject: [PATCH 5/5] Add tests --- .../sql/hive/execution/HiveUDFSuite.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 58909ab9ea0aa..2e2ca786fdf79 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -22,7 +22,7 @@ import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.UDF -import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} +import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFToDouble, UDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} @@ -30,7 +30,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.io.{LongWritable, Writable} -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -509,6 +510,29 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { checkAnswer(testData.selectExpr("statelessUDF() as s").agg(max($"s")), Row(1)) } } + + test("Hive UDFs should be included in WholeStageCodegen") { + import org.apache.spark.sql.functions._ + import spark.implicits._ + + def checkCodegenPlan(df: DataFrame): Unit = { + df.queryExecution.executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]) + } + + spark.range(3).createOrReplaceTempView("t") + + sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFToDouble].getName}'") + val df1 = spark.sql("SELECT f(id) FROM t") + checkCodegenPlan(df1) + assert(df1.collect() === Array(Row(0.0), Row(1.0), Row(2.0))) + sql("DROP TEMPORARY FUNCTION IF EXISTS f") + + sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[GenericUDFAbs].getName}'") + val df2 = spark.sql("SELECT f(id) FROM t") + checkCodegenPlan(df2) + assert(df2.collect() === Array(Row(0L), Row(1L), Row(2L))) + sql("DROP TEMPORARY FUNCTION IF EXISTS f") + } } class TestPair(x: Int, y: Int) extends Writable with Serializable {