From 341b55a58964b1966a1919ac0774c8be5d5e7251 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 5 Sep 2018 21:13:16 +0800 Subject: [PATCH] [SPARK-25044][SQL][FOLLOWUP] add back UserDefinedFunction.inputTypes ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/22259 . Scala case class has a wide surface: apply, unapply, accessors, copy, etc. In https://github.com/apache/spark/pull/22259 , we change the type of `UserDefinedFunction.inputTypes` from `Option[Seq[DataType]]` to `Option[Seq[Schema]]`. This breaks backward compatibility. This PR changes the type back, and use a `var` to keep the new nullable info. ## How was this patch tested? N/A Closes #22319 from cloud-fan/revert. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- project/MimaExcludes.scala | 5 - .../apache/spark/sql/UDFRegistration.scala | 196 +++++++++--------- .../sql/expressions/UserDefinedFunction.scala | 29 ++- .../org/apache/spark/sql/functions.scala | 78 +++---- 4 files changed, 163 insertions(+), 145 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 45cc5ccf2ea92..7ff783da130af 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -47,11 +47,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"), - // [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12 - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.copy"), - // [SPARK-24296][CORE] Replicate large blocks as a stream. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.this"), // [SPARK-23528] Add numIter to ClusteringSummary diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 24ee46d0e8147..c37ba0c60c3d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF} import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.execution.python.UserDefinedPythonFunction -import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction} +import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedAggregateFunction, UserDefinedFunction} import org.apache.spark.sql.types.DataType import org.apache.spark.util.Utils @@ -113,7 +113,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends (0 to 22).foreach { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) - val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i] :: $s"}) + val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i] :: $s"}) println(s""" |/** | * Registers a deterministic Scala closure of $x arguments as user-defined function (UDF). @@ -122,16 +122,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | */ |def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { | val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - | val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try($inputTypes).toOption + | val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try($inputSchemas).toOption | def builder(e: Seq[Expression]) = if (e.length == $x) { - | ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - | udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + | ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + | udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) | } else { | throw new AnalysisException("Invalid number of arguments for function " + name + | ". Expected: $x; Found: " + e.length) | } | functionRegistry.createOrReplaceTempFunction(name, builder) - | val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + | val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) | if (nullable) udf else udf.asNonNullable() |}""".stripMargin) } @@ -168,16 +168,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 0) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 0; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -188,16 +188,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 1) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 1; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -208,16 +208,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 2) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 2; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -228,16 +228,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 3) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 3; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -248,16 +248,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 4) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 4; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -268,16 +268,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 5) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 5; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -288,16 +288,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 6) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 6; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -308,16 +308,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 7) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 7; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -328,16 +328,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 8) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 8; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -348,16 +348,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 9) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 9; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -368,16 +368,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 10) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 10; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -388,16 +388,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 11) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 11; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -408,16 +408,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 12) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 12; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -428,16 +428,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 13) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 13; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -448,16 +448,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 14) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 14; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -468,16 +468,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 15) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 15; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -488,16 +488,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 16) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 16; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -508,16 +508,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 17) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 17; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -528,16 +528,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 18) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 18; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -548,16 +548,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 19) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 19; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -568,16 +568,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 20) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 20; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -588,16 +588,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 21) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 21; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } @@ -608,16 +608,16 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends */ def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: ScalaReflection.schemaFor[A22] :: Nil).toOption + val inputSchemas: Option[Seq[ScalaReflection.Schema]] = Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: ScalaReflection.schemaFor[A3] :: ScalaReflection.schemaFor[A4] :: ScalaReflection.schemaFor[A5] :: ScalaReflection.schemaFor[A6] :: ScalaReflection.schemaFor[A7] :: ScalaReflection.schemaFor[A8] :: ScalaReflection.schemaFor[A9] :: ScalaReflection.schemaFor[A10] :: ScalaReflection.schemaFor[A11] :: ScalaReflection.schemaFor[A12] :: ScalaReflection.schemaFor[A13] :: ScalaReflection.schemaFor[A14] :: ScalaReflection.schemaFor[A15] :: ScalaReflection.schemaFor[A16] :: ScalaReflection.schemaFor[A17] :: ScalaReflection.schemaFor[A18] :: ScalaReflection.schemaFor[A19] :: ScalaReflection.schemaFor[A20] :: ScalaReflection.schemaFor[A21] :: ScalaReflection.schemaFor[A22] :: Nil).toOption def builder(e: Seq[Expression]) = if (e.length == 22) { - ScalaUDF(func, dataType, e, inputTypes.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, - udfDeterministic = true, nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil)) + ScalaUDF(func, dataType, e, inputSchemas.map(_.map(_.dataType)).getOrElse(Nil), Some(name), nullable, + udfDeterministic = true, nullableTypes = inputSchemas.map(_.map(_.nullable)).getOrElse(Nil)) } else { throw new AnalysisException("Invalid number of arguments for function " + name + ". Expected: 22; Found: " + e.length) } functionRegistry.createOrReplaceTempFunction(name, builder) - val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) + val udf = SparkUserDefinedFunction.create(func, dataType, inputSchemas).withName(name) if (nullable) udf else udf.asNonNullable() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala index 7bd20dbe8f6d0..697757f8a73ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala @@ -41,12 +41,16 @@ import org.apache.spark.sql.types.DataType case class UserDefinedFunction protected[sql] ( f: AnyRef, dataType: DataType, - inputTypes: Option[Seq[ScalaReflection.Schema]]) { + inputTypes: Option[Seq[DataType]]) { private var _nameOption: Option[String] = None private var _nullable: Boolean = true private var _deterministic: Boolean = true + // This is a `var` instead of in the constructor for backward compatibility of this case class. + // TODO: revisit this case class in Spark 3.0, and narrow down the public surface. + private[sql] var nullableTypes: Option[Seq[Boolean]] = None + /** * Returns true when the UDF can return a nullable value. * @@ -69,15 +73,19 @@ case class UserDefinedFunction protected[sql] ( */ @scala.annotation.varargs def apply(exprs: Column*): Column = { + if (inputTypes.isDefined && nullableTypes.isDefined) { + require(inputTypes.get.length == nullableTypes.get.length) + } + Column(ScalaUDF( f, dataType, exprs.map(_.expr), - inputTypes.map(_.map(_.dataType)).getOrElse(Nil), + inputTypes.getOrElse(Nil), udfName = _nameOption, nullable = _nullable, udfDeterministic = _deterministic, - nullableTypes = inputTypes.map(_.map(_.nullable)).getOrElse(Nil))) + nullableTypes = nullableTypes.getOrElse(Nil))) } private def copyAll(): UserDefinedFunction = { @@ -85,6 +93,7 @@ case class UserDefinedFunction protected[sql] ( udf._nameOption = _nameOption udf._nullable = _nullable udf._deterministic = _deterministic + udf.nullableTypes = nullableTypes udf } @@ -129,3 +138,17 @@ case class UserDefinedFunction protected[sql] ( } } } + +// We have to use a name different than `UserDefinedFunction` here, to avoid breaking the binary +// compatibility of the auto-generate UserDefinedFunction object. +private[sql] object SparkUserDefinedFunction { + + def create( + f: AnyRef, + dataType: DataType, + inputSchemas: Option[Seq[ScalaReflection.Schema]]): UserDefinedFunction = { + val udf = new UserDefinedFunction(f, dataType, inputSchemas.map(_.map(_.dataType))) + udf.nullableTypes = inputSchemas.map(_.map(_.nullable)) + udf + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index a261a7c1752d0..c120be469a268 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint} import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -3819,7 +3819,7 @@ object functions { (0 to 10).foreach { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) - val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor(typeTag[A$i]) :: $s"}) + val inputSchemas = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor(typeTag[A$i]) :: $s"}) println(s""" |/** | * Defines a Scala closure of $x arguments as user-defined function (UDF). @@ -3832,8 +3832,8 @@ object functions { | */ |def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = { | val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - | val inputTypes = Try($inputTypes).toOption - | val udf = UserDefinedFunction(f, dataType, inputTypes) + | val inputSchemas = Try($inputTypes).toOption + | val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) | if (nullable) udf else udf.asNonNullable() |}""".stripMargin) } @@ -3856,7 +3856,7 @@ object functions { | */ |def udf(f: UDF$i[$extTypeArgs], returnType: DataType): UserDefinedFunction = { | val func = f$anyCast.call($anyParams) - | UserDefinedFunction($funcCall, returnType, inputTypes = None) + | SparkUserDefinedFunction.create($funcCall, returnType, inputSchemas = None) |}""".stripMargin) } @@ -3877,8 +3877,8 @@ object functions { */ def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3893,8 +3893,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3909,8 +3909,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3925,8 +3925,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3941,8 +3941,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3957,8 +3957,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3973,8 +3973,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -3989,8 +3989,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -4005,8 +4005,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -4021,8 +4021,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -4037,8 +4037,8 @@ object functions { */ def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT] - val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: ScalaReflection.schemaFor(typeTag[A10]) :: Nil).toOption - val udf = UserDefinedFunction(f, dataType, inputTypes) + val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: ScalaReflection.schemaFor(typeTag[A3]) :: ScalaReflection.schemaFor(typeTag[A4]) :: ScalaReflection.schemaFor(typeTag[A5]) :: ScalaReflection.schemaFor(typeTag[A6]) :: ScalaReflection.schemaFor(typeTag[A7]) :: ScalaReflection.schemaFor(typeTag[A8]) :: ScalaReflection.schemaFor(typeTag[A9]) :: ScalaReflection.schemaFor(typeTag[A10]) :: Nil).toOption + val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) if (nullable) udf else udf.asNonNullable() } @@ -4057,7 +4057,7 @@ object functions { */ def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF0[Any]].call() - UserDefinedFunction(() => func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = None) } /** @@ -4071,7 +4071,7 @@ object functions { */ def udf(f: UDF1[_, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4085,7 +4085,7 @@ object functions { */ def udf(f: UDF2[_, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF2[Any, Any, Any]].call(_: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4099,7 +4099,7 @@ object functions { */ def udf(f: UDF3[_, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF3[Any, Any, Any, Any]].call(_: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4113,7 +4113,7 @@ object functions { */ def udf(f: UDF4[_, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF4[Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4127,7 +4127,7 @@ object functions { */ def udf(f: UDF5[_, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF5[Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4141,7 +4141,7 @@ object functions { */ def udf(f: UDF6[_, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF6[Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4155,7 +4155,7 @@ object functions { */ def udf(f: UDF7[_, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF7[Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4169,7 +4169,7 @@ object functions { */ def udf(f: UDF8[_, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF8[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4183,7 +4183,7 @@ object functions { */ def udf(f: UDF9[_, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF9[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } /** @@ -4197,7 +4197,7 @@ object functions { */ def udf(f: UDF10[_, _, _, _, _, _, _, _, _, _, _], returnType: DataType): UserDefinedFunction = { val func = f.asInstanceOf[UDF10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call(_: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any, _: Any) - UserDefinedFunction(func, returnType, inputTypes = None) + SparkUserDefinedFunction.create(func, returnType, inputSchemas = None) } // scalastyle:on parameter.number @@ -4216,7 +4216,7 @@ object functions { * @since 2.0.0 */ def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { - UserDefinedFunction(f, dataType, None) + SparkUserDefinedFunction.create(f, dataType, inputSchemas = None) } /**