-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13056][SQL] map column would throw NPE if value is null #10964
Conversation
Test build #50253 has finished for PR 10964 at commit
|
@@ -307,7 +307,7 @@ case class GetMapValue(child: Expression, key: Expression) | |||
} | |||
} | |||
|
|||
if ($found) { | |||
if ($found && !$eval1.valueArray().isNullAt($index)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we assign $eval1.valueArray()
to a local variable?
|
||
test("SPARK-13056: Null in map value causes NPE") { | ||
Seq((1, "abc=somestring,cba")).toDF("key", "value").registerTempTable("mapsrc") | ||
sql("""CREATE TABLE maptest AS SELECT str_to_map(value, ",", "=") as col1 FROM mapsrc""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The str_to_map
stuff makes this test a little hard to read, can we create a DataFrame
by
val df = Seq(1 -> Map("a" -> "1", "b" -> null)).toDF("key", "value")
and test it by DataFrames APIs directly?
Test build #50339 has finished for PR 10964 at commit
|
|
||
test("SPARK-13056: Null in map value causes NPE") { | ||
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") | ||
df.registerTempTable("maptest") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to put the test in hive module? I think DataFrameSuite
in sql core module is a good place to put this test, we can just test df.select($"value".apply("abc"))
instead of registering a temp table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That suite mainly test df api functionality, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about SQLQuerySuite
in sql core? This bug has nothing to do with hive right?
Test build #50362 has finished for PR 10964 at commit
|
|
||
test("SPARK-13056: Null in map value causes NPE") { | ||
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") | ||
df.registerTempTable("maptest") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use withTempTable
to delete the table after test.
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") | ||
df.registerTempTable("maptest") | ||
checkAnswer(sql("SELECT value['abc'] FROM maptest"), Row("somestring")) | ||
checkAnswer(sql("SELECT value['cba'] FROM maptest"), Row(null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you verified if this throws NPE without this fix ? The test runs fine over trunk:
scala> val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value")
df: org.apache.spark.sql.DataFrame = [key: int, value: map<string,string>]
scala> df.registerTempTable("maptest")
scala> sqlContext.sql("SELECT value['cba'] FROM maptest").collect()
res28: Array[org.apache.spark.sql.Row] = Array([null])
scala> sqlContext.sql("SELECT value['cba'] FROM maptest").foreach(println)
[null]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to use RDD
instead of Seq
to build the DataFrame
, or the local optimization will evaluate it directly, without codegen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to modify the test case:
scala> sqlContext.sql("SELECT value['cba'] FROM maptest WHERE key = 1").collect()
16/01/31 21:22:13 ERROR Executor: Exception in task 15.0 in stage 2.0 (TID 47)
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$UTF8StringWriter.getSize(UnsafeRowWriters.java:90)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:90)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$3$$anonfun$apply$3.apply(basicOperators.scala:88)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)....
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tejasapatil OK, actually I used a udf to generate the map and got your exception, later I changed to this following the guide from @cloud-fan but didn't verify it myself. I'll modify the test case here, Thanks!
Test build #50472 has finished for PR 10964 at commit
|
Test build #50474 has finished for PR 10964 at commit
|
Test build #50477 has finished for PR 10964 at commit
|
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") | ||
withTempTable("maptest") { | ||
df.registerTempTable("maptest") | ||
checkAnswer(sql("SELECT value['abc'] FROM maptest where key = 1"), Row("somestring")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I explained before, the problem is local optimization: #10964 (comment) , so adding a filter here do fixes the problem, by breaking the local optimization, and we should add comments to say it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right
LGTM, pending test |
Test build #50483 has finished for PR 10964 at commit
|
retest this please. |
Test build #50485 has finished for PR 10964 at commit
|
Merged to master and 1.6 |
Jira: https://issues.apache.org/jira/browse/SPARK-13056 Create a map like { "a": "somestring", "b": null} Query like SELECT col["b"] FROM t1; NPE would be thrown. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #10964 from adrian-wang/npewriter. (cherry picked from commit 358300c) Signed-off-by: Michael Armbrust <michael@databricks.com> Conflicts: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Jira:
https://issues.apache.org/jira/browse/SPARK-13056
Create a map like
{ "a": "somestring", "b": null}
Query like
SELECT col["b"] FROM t1;
NPE would be thrown.