New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18884][SQL] Throw an exception in compile time if Array[_] used in ScalaUDF #16605
Conversation
Test build #71449 has finished for PR 16605 at commit
|
I'm looking for another approach not to break backward compatibility... |
Test build #71454 has finished for PR 16605 at commit
|
Test build #71456 has finished for PR 16605 at commit
|
Test build #71468 has finished for PR 16605 at commit
|
@dongjoon-hyun Could you take time to review this before committers do? Thanks! |
Sure, @maropu . I'll do that tomorrow morning (PST). |
many thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Maropo . This part consists of many generated codes. So, could you update the template in comment
together and update the newly added code to use the similar generated syntax?
udf { (ar1: Seq[Int], ar2: Seq[Int], ar3: Seq[Int], ar4: Seq[Int], ar5: Seq[Int], ar6: Seq[Int], ar7: Seq[Int], ar8: Seq[Int], ar9: Seq[Int], ar10: Seq[Int]) => (ar1 ++ ar2 ++ ar3 ++ ar4 ++ ar5 ++ ar6 ++ ar7 ++ ar8 ++ ar9 ++ ar10).sum } | ||
) | ||
).map { case (udf1, udf2, udf3, udf4, udf5, udf6, udf7, udf8, udf9, udf10) => | ||
val arVal = functions.array(lit(1), lit(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change this to access the column value instead of Literal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean this kind of this?:
val testUdf = udf { (ar: Array[Long]) => ar.sum }
val df = spark.range(10).select(array('id, 'id).as("arVal"))
checkAnswer(df.select(udf1(arVal)), Row(2) :: Nil)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay!
val inputConverters = Try( | ||
ScalaReflection.scalaConverterFor(typeTag[A1]) :: | ||
Nil | ||
).toOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the template in the comment and make val inputConverters
into single lines like val inputTypes
in line 3075.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll update
@@ -137,7 +137,11 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | |||
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = { | |||
val dataType = ScalaReflection.schemaFor[RT].dataType | |||
val inputTypes = Try(ScalaReflection.schemaFor[A1].dataType :: Nil).toOption | |||
def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes.getOrElse(Nil)) | |||
val inputConverters = Try( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please insert inputConverters
into the template comment and make inputConverters
into a single line like line 139.
@@ -84,7 +86,9 @@ case class ScalaUDF( | |||
case 1 => | |||
val func = function.asInstanceOf[(Any) => Any] | |||
val child0 = children(0) | |||
lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType) | |||
lazy val converter0 = inputConverters.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please update the template comment and follow the similar syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I think you missed this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry. I'll do it soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, fixed!
@dongjoon-hyun okay, I applied your comments into this pr. Could you check again to satisfy your intentions? |
Test build #71568 has finished for PR 16605 at commit
|
Test build #71570 has finished for PR 16605 at commit
|
Hi, @maropu . First of all, I generally agree with you on the purpose of this PR. However, for your failure example, we can simply do the following in the current master. I'm wondering about what you think about this. scala> val df = Seq((0, 1)).toDF("a", "b").select(array($"a", $"b").as("ar"))
scala> val testArrayUdf = udf { (ar: scala.collection.mutable.WrappedArray[Int]) => ar.sum }
testArrayUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(ArrayType(IntegerType,false))))
scala> df.select(testArrayUdf($"ar")).show
+-------+
|UDF(ar)|
+-------+
| 1|
+-------+ PS. For |
oh, yea. I didn't find that and I think it's a good point. Anyway, thanks alots for your reviews! |
Sure, @maropu . Hi, @gatorsmile and @cloud-fan . |
Well, it will be good if we can support considering the size of this PR, I don't think it worth. |
Test build #71631 has finished for PR 16605 at commit
|
okay. But, if this issue finished, I'm planning to take SPARK-12823 in a similar way. |
The workaround @cloud-fan said is good to me though, IMO the most critical issue here is that this cast exception happens not in analysis phases but in runtime. So, at least I think we should modify code to throw an exception in analysis phases and the exception might include a message like "you should use |
SGTM |
okay, I'll update this pr in that way, thanks! |
Test build #71729 has finished for PR 16605 at commit
|
Test build #71934 has finished for PR 16605 at commit
|
Test build #71935 has finished for PR 16605 at commit
|
a738158
to
94902ce
Compare
Test build #71939 has finished for PR 16605 at commit
|
Test build #71940 has finished for PR 16605 at commit
|
Test build #71952 has finished for PR 16605 at commit
|
This reverts commit 35715a4b6847f56f62038e9bbd77bf4a83250410. Revert the previous 6 commits
Test build #72350 has finished for PR 16605 at commit
|
Test build #72354 has finished for PR 16605 at commit
|
Test build #72366 has finished for PR 16605 at commit
|
Test build #72369 has finished for PR 16605 at commit
|
@cloud-fan Could you give me more insights on this? |
@cloud-fan ping |
1 similar comment
@cloud-fan ping |
I'll close for now |
What changes were proposed in this pull request?
This pr modified code to throw an exception in compile time if Array[_] used in the arguments of
ScalaUDF
. Currently, a query below throws an exception in runtime if we use the type inScalaUDF
;How was this patch tested?
Added tests in
DataFrameSuite
.