From 032f0065bf78d24c53259d599d4e39caa8481085 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 14 Jan 2015 20:37:23 -0800 Subject: [PATCH] [SPARK-5193][SQL] Reconcile Java and Scala UDFRegistration. 1. Removed UDFRegistration as a mixin in SQLContext and made it a field ("udf"). 2. For Java UDFs, renamed dataType to returnType, and made that the 2nd argument since the UDF itself can be long. 3. Added all Java UDF registration methods to Scala's UDFRegistration. 4. Documentation --- .../org/apache/spark/sql/SQLContext.scala | 28 +- .../apache/spark/sql/UdfRegistration.scala | 384 ++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../scala/org/apache/spark/sql/UDFSuite.scala | 9 +- .../spark/sql/UserDefinedTypeSuite.scala | 2 +- .../sql/hive/execution/HiveUdfSuite.scala | 2 +- 6 files changed, 383 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 279671ced0a17..cec36da8df35a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -54,7 +54,6 @@ class SQLContext(@transient val sparkContext: SparkContext) extends org.apache.spark.Logging with CacheManager with ExpressionConversions - with UDFRegistration with Serializable { self => @@ -338,6 +337,33 @@ class SQLContext(@transient val sparkContext: SparkContext) */ val experimental: ExperimentalMethods = new ExperimentalMethods(this) + /** + * A collection of methods for registering user-defined functions (UDF). + * + * The following example registers a Scala closure as UDF: + * {{{ + * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1) + * }}} + * + * The following example registers a UDF in Java: + * {{{ + * sqlContext.udf().register("myUDF", DataTypes.StringType, + * new UDF2() { + * @Override + * public String call(Integer arg1, String arg2) { + * return arg2 + arg1; + * } + * }); + * }}} + * + * Or, to use Java 8 lambda syntax: + * {{{ + * sqlContext.udf().register("myUDF", DataTypes.StringType, + * (Integer arg1, String arg2) -> arg2 + arg1)); + * }}} + */ + val udf: UDFRegistration = new UDFRegistration(this) + protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala index 5fb472686c9e1..cde63110f8acf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala @@ -19,22 +19,26 @@ package org.apache.spark.sql import java.util.{List => JList, Map => JMap} +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.Accumulator import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf} import org.apache.spark.sql.execution.PythonUDF +import org.apache.spark.sql.types.DataType -import scala.reflect.runtime.universe.{TypeTag, typeTag} /** - * Functions for registering scala lambda functions as UDFs in a SQLContext. + * Functions for registering user-defined functions. */ -private[sql] trait UDFRegistration { - self: SQLContext => +class UDFRegistration (sqlContext: SQLContext) extends org.apache.spark.Logging { + + private val functionRegistry = sqlContext.functionRegistry - private[spark] def registerPython( + protected[sql] def registerPython( name: String, command: Array[Byte], envVars: JMap[String, String], @@ -55,7 +59,7 @@ private[sql] trait UDFRegistration { """.stripMargin) - val dataType = parseDataType(stringDataType) + val dataType = sqlContext.parseDataType(stringDataType) def builder(e: Seq[Expression]) = PythonUDF( @@ -72,133 +76,443 @@ private[sql] trait UDFRegistration { functionRegistry.registerFunction(name, builder) } - /** registerFunction 0-22 were generated by this script + /* registerFunction 0-22 were generated by this script (0 to 22).map { x => val types = (1 to x).foldRight("T")((_, s) => {s"_, $s"}) - s""" - def registerFunction[T: TypeTag](name: String, func: Function$x[$types]): Unit = { + println(s""" + /** + * Register a Scala closure of ${x} arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function$x[$types]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) - } - """ + }""") } - */ + + (1 to 22).foreach { i => + val extTypeArgs = (1 to i).map(_ => "_").mkString(", ") + val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ") + val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]" + val anyParams = (1 to i).map(_ => "_: Any").mkString(", ") + println(s""" + |/** + | * Register a user-defined function with ${i} arguments. + | */ + |def register(name: String, returnType: DataType, f: UDF$i[$extTypeArgs, _]) = { + | functionRegistry.registerFunction( + | name, + | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), returnType, e)) + |}""".stripMargin) + } + */ // scalastyle:off - def registerFunction[T: TypeTag](name: String, func: Function0[T]): Unit = { + /** + * Register a Scala closure of 0 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function0[T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = { + /** + * Register a Scala closure of 1 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function1[_, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = { + /** + * Register a Scala closure of 2 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = { + /** + * Register a Scala closure of 3 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 4 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 5 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 6 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 7 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 8 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 9 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 10 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 11 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 12 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 13 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 14 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 15 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 16 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 17 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 18 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 19 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 20 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 21 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 22 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } + + /** + * Register a user-defined function with 1 arguments. + */ + def register(name: String, returnType: DataType, f: UDF1[_, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), returnType, e)) + } + + /** + * Register a user-defined function with 2 arguments. + */ + def register(name: String, returnType: DataType, f: UDF2[_, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 3 arguments. + */ + def register(name: String, returnType: DataType, f: UDF3[_, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 4 arguments. + */ + def register(name: String, returnType: DataType, f: UDF4[_, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 5 arguments. + */ + def register(name: String, returnType: DataType, f: UDF5[_, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 6 arguments. + */ + def register(name: String, returnType: DataType, f: UDF6[_, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 7 arguments. + */ + def register(name: String, returnType: DataType, f: UDF7[_, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 8 arguments. + */ + def register(name: String, returnType: DataType, f: UDF8[_, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 9 arguments. + */ + def register(name: String, returnType: DataType, f: UDF9[_, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 10 arguments. + */ + def register(name: String, returnType: DataType, f: UDF10[_, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 11 arguments. + */ + def register(name: String, returnType: DataType, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 12 arguments. + */ + def register(name: String, returnType: DataType, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 13 arguments. + */ + def register(name: String, returnType: DataType, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 14 arguments. + */ + def register(name: String, returnType: DataType, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 15 arguments. + */ + def register(name: String, returnType: DataType, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 16 arguments. + */ + def register(name: String, returnType: DataType, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 17 arguments. + */ + def register(name: String, returnType: DataType, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 18 arguments. + */ + def register(name: String, returnType: DataType, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 19 arguments. + */ + def register(name: String, returnType: DataType, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 20 arguments. + */ + def register(name: String, returnType: DataType, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 21 arguments. + */ + def register(name: String, returnType: DataType, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 22 arguments. + */ + def register(name: String, returnType: DataType, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + // scalastyle:on } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cbdb3e64bb66b..6c95bad6974d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -766,7 +766,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("SPARK-3371 Renaming a function expression with group by gives error") { - registerFunction("len", (s: String) => s.length) + udf.register("len", (s: String) => s.length) checkAnswer( sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 720953ae3765a..0c98120031242 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -27,23 +27,22 @@ case class FunctionResult(f1: String, f2: String) class UDFSuite extends QueryTest { test("Simple UDF") { - registerFunction("strLenScala", (_: String).length) + udf.register("strLenScala", (_: String).length) assert(sql("SELECT strLenScala('test')").first().getInt(0) === 4) } test("ZeroArgument UDF") { - registerFunction("random0", () => { Math.random()}) + udf.register("random0", () => { Math.random()}) assert(sql("SELECT random0()").first().getDouble(0) >= 0.0) } test("TwoArgument UDF") { - registerFunction("strLenScala", (_: String).length + (_:Int)) + udf.register("strLenScala", (_: String).length + (_:Int)) assert(sql("SELECT strLenScala('test', 1)").first().getInt(0) === 5) } - test("struct UDF") { - registerFunction("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2)) + udf.register("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2)) val result= sql("SELECT returnStruct('test', 'test2') as ret") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index a0d54d17f5f13..fbc8704f7837b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -81,7 +81,7 @@ class UserDefinedTypeSuite extends QueryTest { } test("UDTs and UDFs") { - registerFunction("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) + udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) pointsRDD.registerTempTable("points") checkAnswer( sql("SELECT testType(features) from points"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 5fc8d8dbe3a9f..5dafcd6c0a76a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -50,7 +50,7 @@ class HiveUdfSuite extends QueryTest { import TestHive._ test("spark sql udf test that returns a struct") { - registerFunction("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) + udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) assert(sql( """ |SELECT getStruct(1).f1,