diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 279671ced0a17..cec36da8df35a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -54,7 +54,6 @@ class SQLContext(@transient val sparkContext: SparkContext) extends org.apache.spark.Logging with CacheManager with ExpressionConversions - with UDFRegistration with Serializable { self => @@ -338,6 +337,33 @@ class SQLContext(@transient val sparkContext: SparkContext) */ val experimental: ExperimentalMethods = new ExperimentalMethods(this) + /** + * A collection of methods for registering user-defined functions (UDF). + * + * The following example registers a Scala closure as UDF: + * {{{ + * sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1) + * }}} + * + * The following example registers a UDF in Java: + * {{{ + * sqlContext.udf().register("myUDF", DataTypes.StringType, + * new UDF2() { + * @Override + * public String call(Integer arg1, String arg2) { + * return arg2 + arg1; + * } + * }); + * }}} + * + * Or, to use Java 8 lambda syntax: + * {{{ + * sqlContext.udf().register("myUDF", DataTypes.StringType, + * (Integer arg1, String arg2) -> arg2 + arg1)); + * }}} + */ + val udf: UDFRegistration = new UDFRegistration(this) + protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala index 5fb472686c9e1..cde63110f8acf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala @@ -19,22 +19,26 @@ package org.apache.spark.sql import java.util.{List => JList, Map => JMap} +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.Accumulator import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf} import org.apache.spark.sql.execution.PythonUDF +import org.apache.spark.sql.types.DataType -import scala.reflect.runtime.universe.{TypeTag, typeTag} /** - * Functions for registering scala lambda functions as UDFs in a SQLContext. + * Functions for registering user-defined functions. */ -private[sql] trait UDFRegistration { - self: SQLContext => +class UDFRegistration (sqlContext: SQLContext) extends org.apache.spark.Logging { + + private val functionRegistry = sqlContext.functionRegistry - private[spark] def registerPython( + protected[sql] def registerPython( name: String, command: Array[Byte], envVars: JMap[String, String], @@ -55,7 +59,7 @@ private[sql] trait UDFRegistration { """.stripMargin) - val dataType = parseDataType(stringDataType) + val dataType = sqlContext.parseDataType(stringDataType) def builder(e: Seq[Expression]) = PythonUDF( @@ -72,133 +76,443 @@ private[sql] trait UDFRegistration { functionRegistry.registerFunction(name, builder) } - /** registerFunction 0-22 were generated by this script + /* registerFunction 0-22 were generated by this script (0 to 22).map { x => val types = (1 to x).foldRight("T")((_, s) => {s"_, $s"}) - s""" - def registerFunction[T: TypeTag](name: String, func: Function$x[$types]): Unit = { + println(s""" + /** + * Register a Scala closure of ${x} arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function$x[$types]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) - } - """ + }""") } - */ + + (1 to 22).foreach { i => + val extTypeArgs = (1 to i).map(_ => "_").mkString(", ") + val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ") + val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]" + val anyParams = (1 to i).map(_ => "_: Any").mkString(", ") + println(s""" + |/** + | * Register a user-defined function with ${i} arguments. + | */ + |def register(name: String, returnType: DataType, f: UDF$i[$extTypeArgs, _]) = { + | functionRegistry.registerFunction( + | name, + | (e: Seq[Expression]) => ScalaUdf(f$anyCast.call($anyParams), returnType, e)) + |}""".stripMargin) + } + */ // scalastyle:off - def registerFunction[T: TypeTag](name: String, func: Function0[T]): Unit = { + /** + * Register a Scala closure of 0 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function0[T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = { + /** + * Register a Scala closure of 1 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function1[_, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = { + /** + * Register a Scala closure of 2 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function2[_, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = { + /** + * Register a Scala closure of 3 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function3[_, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 4 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function4[_, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 5 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function5[_, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 6 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function6[_, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 7 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function7[_, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 8 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function8[_, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 9 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function9[_, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 10 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function10[_, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 11 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function11[_, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 12 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function12[_, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 13 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function13[_, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 14 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 15 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 16 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 17 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 18 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 19 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 20 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 21 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } - def registerFunction[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { + /** + * Register a Scala closure of 22 arguments as user-defined function. + * @tparam T return type of user-defined function. + */ + def register[T: TypeTag](name: String, func: Function22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, T]): Unit = { def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[T].dataType, e) functionRegistry.registerFunction(name, builder) } + + /** + * Register a user-defined function with 1 arguments. + */ + def register(name: String, returnType: DataType, f: UDF1[_, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF1[Any, Any]].call(_: Any), returnType, e)) + } + + /** + * Register a user-defined function with 2 arguments. + */ + def register(name: String, returnType: DataType, f: UDF2[_, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 3 arguments. + */ + def register(name: String, returnType: DataType, f: UDF3[_, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 4 arguments. + */ + def register(name: String, returnType: DataType, f: UDF4[_, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 5 arguments. + */ + def register(name: String, returnType: DataType, f: UDF5[_, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 6 arguments. + */ + def register(name: String, returnType: DataType, f: UDF6[_, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 7 arguments. + */ + def register(name: String, returnType: DataType, f: UDF7[_, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 8 arguments. + */ + def register(name: String, returnType: DataType, f: UDF8[_, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 9 arguments. + */ + def register(name: String, returnType: DataType, f: UDF9[_, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 10 arguments. + */ + def register(name: String, returnType: DataType, f: UDF10[_, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 11 arguments. + */ + def register(name: String, returnType: DataType, f: UDF11[_, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 12 arguments. + */ + def register(name: String, returnType: DataType, f: UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 13 arguments. + */ + def register(name: String, returnType: DataType, f: UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 14 arguments. + */ + def register(name: String, returnType: DataType, f: UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 15 arguments. + */ + def register(name: String, returnType: DataType, f: UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 16 arguments. + */ + def register(name: String, returnType: DataType, f: UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 17 arguments. + */ + def register(name: String, returnType: DataType, f: UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 18 arguments. + */ + def register(name: String, returnType: DataType, f: UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 19 arguments. + */ + def register(name: String, returnType: DataType, f: UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 20 arguments. + */ + def register(name: String, returnType: DataType, f: UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 21 arguments. + */ + def register(name: String, returnType: DataType, f: UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + + /** + * Register a user-defined function with 22 arguments. + */ + def register(name: String, returnType: DataType, f: UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]) = { + functionRegistry.registerFunction( + name, + (e: Seq[Expression]) => ScalaUdf(f.asInstanceOf[UDF22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any), returnType, e)) + } + // scalastyle:on } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cbdb3e64bb66b..6c95bad6974d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -766,7 +766,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("SPARK-3371 Renaming a function expression with group by gives error") { - registerFunction("len", (s: String) => s.length) + udf.register("len", (s: String) => s.length) checkAnswer( sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 720953ae3765a..0c98120031242 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -27,23 +27,22 @@ case class FunctionResult(f1: String, f2: String) class UDFSuite extends QueryTest { test("Simple UDF") { - registerFunction("strLenScala", (_: String).length) + udf.register("strLenScala", (_: String).length) assert(sql("SELECT strLenScala('test')").first().getInt(0) === 4) } test("ZeroArgument UDF") { - registerFunction("random0", () => { Math.random()}) + udf.register("random0", () => { Math.random()}) assert(sql("SELECT random0()").first().getDouble(0) >= 0.0) } test("TwoArgument UDF") { - registerFunction("strLenScala", (_: String).length + (_:Int)) + udf.register("strLenScala", (_: String).length + (_:Int)) assert(sql("SELECT strLenScala('test', 1)").first().getInt(0) === 5) } - test("struct UDF") { - registerFunction("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2)) + udf.register("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2)) val result= sql("SELECT returnStruct('test', 'test2') as ret") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index a0d54d17f5f13..fbc8704f7837b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -81,7 +81,7 @@ class UserDefinedTypeSuite extends QueryTest { } test("UDTs and UDFs") { - registerFunction("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) + udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) pointsRDD.registerTempTable("points") checkAnswer( sql("SELECT testType(features) from points"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 5fc8d8dbe3a9f..5dafcd6c0a76a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -50,7 +50,7 @@ class HiveUdfSuite extends QueryTest { import TestHive._ test("spark sql udf test that returns a struct") { - registerFunction("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) + udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) assert(sql( """ |SELECT getStruct(1).f1,