Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12258] [SQL] passing null into ScalaUDF #10259

Closed
wants to merge 1 commit into from

Conversation

davies
Copy link
Contributor

@davies davies commented Dec 10, 2015

Check nullability and passing them into ScalaUDF.

Closes #10249

@yhuai
Copy link
Contributor

yhuai commented Dec 10, 2015

test this please

@gatorsmile
Copy link
Member

Thank you @davies !

I guess we might still have a bug in the code. As long as any input variable is Null, the return result is null. Is that by design?

For example, one of input variables is 3, but the return result is Row(null). We lost the whole row.

  test("SPARK-12258 UDF and Null value") {
    hiveContext.runSqlHive("CREATE TABLE test (ti TINYINT, si SMALLINT, i INT, bi BIGINT, " +
      "bo BOOLEAN, f FLOAT, d DOUBLE, s STRING, bin BINARY, t TIMESTAMP, da DATE)" +
      "STORED AS TEXTFILE")
    hiveContext.runSqlHive("INSERT INTO TABLE test VALUES(Null, Null, 3, Null, Null, " +
      "Null, Null, Null, Null, Null, Null)")
    hiveContext.udf.register("typeNullCheck",
      (ti: Byte, si: Short, i: Int, bi: Long, bo: Boolean, f: Float, d: Double, s: String,
       bin: Array[Byte], t: Timestamp, da: Date) =>
       (ti, si, i, bi, bo, f, d, s, bin, t, da))
    checkAnswer(
      sql("SELECT typeNullCheck(ti, si, i, bi, bo, f, d, s, bin, t, da) FROM test"),
      Row(null, null, 3, null, null, null, null, null, null, null, null))
  }

This is caused by the our Analyzer rule. Below is the physical plan:

Project [if (((((((isnull(ti#0) || isnull(si#1)) || isnull(i#2)) || isnull(bi#3L)) || isnull(bo#4)) || isnull(f#5)) || isnull(d#6))) null else UDF(ti#0,si#1,i#2,bi#3L,bo#4,f#5,d#6,s#7,bin#8,t#9,da#10) AS _c0#11]
+- HiveTableScan [t#9,bin#8,bi#3L,i#2,bo#4,d#6,f#5,s#7,da#10,si#1,ti#0], MetastoreRelation default, test, None

@yhuai
Copy link
Contributor

yhuai commented Dec 10, 2015

Does INSERT INTO TABLE test VALUES actually work?

@gatorsmile
Copy link
Member

Yeah. Below is the result of

sql("SELECT * FROM test").show();
+----+----+---+----+----+----+----+----+----+----+----+
|  ti|  si|  i|  bi|  bo|   f|   d|   s| bin|   t|  da|
+----+----+---+----+----+----+----+----+----+----+----+
|null|null|  3|null|null|null|null|null|null|null|null|
+----+----+---+----+----+----+----+----+----+----+----+

@yhuai
Copy link
Contributor

yhuai commented Dec 11, 2015

the analyzer rule was introduced by #9770 ?

@gatorsmile
Copy link
Member

Yeah. That is my understanding.

inputsNullCheck.map(If(_, Literal.create(null, udf.dataType), udf)).getOrElse(udf)

@davies
Copy link
Contributor Author

davies commented Dec 11, 2015

@gatorsmile Because your UDF is using primitive types, we have to chance to pass null in. In order to got the behavior you expect, you should change the UDF to use boxed type as parameters. Does it work for you?

@gatorsmile
Copy link
Member

Ok, I see your point. This is a possible workaround. It works well when the input values of primitive types are not null, even if the input values of Date and Timestamp columns are null.

I am just afraid how users know this? We might see more related JIRAs in the future. Is it documented? Or any way to avoid it?

@yhuai
Copy link
Contributor

yhuai commented Dec 11, 2015

I tried use java.lang types and the result look good.

@davies how about we update our doc (the Data Types section) to explain that those primitive types are not nullable. Specially, for those scala primitive types, even if users provide null, scala will convert them to the default value. We can do the doc update in another PR.

@yhuai
Copy link
Contributor

yhuai commented Dec 11, 2015

LGTM pending jenkins.

@gatorsmile
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Dec 11, 2015

Test build #47552 has finished for PR 10259 at commit 4636fe3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Dec 11, 2015
Check nullability and passing them into ScalaUDF.

Closes #10249

Author: Davies Liu <davies@databricks.com>

Closes #10259 from davies/udf_null.

(cherry picked from commit b1b4ee7)
Signed-off-by: Yin Huai <yhuai@databricks.com>
@asfgit asfgit closed this in b1b4ee7 Dec 11, 2015
@markhamstra
Copy link
Contributor

Sorry, but after this I am now seeing codeGen errors. Like this:

testTimestampArithmeticUDFs(com.clearstorydata.dataengine.udfs.TestTimestampArithmeticUDFs)  Time elapsed: 3.84 sec  <<< FAILURE!
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
execute, tree:
Exchange rangepartitioning(difference#0L ASC,8), None
+- ConvertToSafe
   +- Project [TIMESTAMPDIFF(second,ts#9,TIMESTAMPADD(second,-1,ts#9)) AS difference#0L]
      +- HiveTableScan [ts#9], MetastoreRelation default, ts_test, None

    at com.clearstorydata.dataengine.udfs.TestTimestampArithmeticUDFs.testTimestampArithmeticUDFs(TestTimestampArithmeticUDFs.scala:31)
Caused by: org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 78, Column 154: Incompatible expression types "void" and "int"
/* 001 */ 
/* 002 */ public java.lang.Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] exprs) {
/* 003 */   return new SpecificUnsafeProjection(exprs);
/* 004 */ }
/* 005 */ 
/* 006 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 007 */   
/* 008 */   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
/* 009 */   
/* 010 */   private scala.Function1 catalystConverter2;
/* 011 */   private scala.Function1 converter4;
/* 012 */   private scala.Function1 converter5;
/* 013 */   private scala.Function1 converter6;
/* 014 */   private scala.Function3 udf7;
/* 015 */   private scala.Function1 catalystConverter15;
/* 016 */   private scala.Function1 converter17;
/* 017 */   private scala.Function1 converter18;
/* 018 */   private scala.Function1 converter19;
/* 019 */   private scala.Function3 udf20;
/* 020 */   private UnsafeRow result28;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bufferHolder29;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter30;
/* 023 */   
/* 024 */   
/* 025 */   
/* 026 */   public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
/* 027 */     this.expressions = expressions;
/* 028 */     this.catalystConverter2 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[0]).dataType());
/* 029 */     this.converter4 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[0]).getChildren().apply(0))).dataType());
/* 030 */     this.converter5 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[0]).getChildren().apply(1))).dataType());
/* 031 */     this.converter6 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[0]).getChildren().apply(2))).dataType());
/* 032 */     this.udf7 = (scala.Function3)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[0]).userDefinedFunc());
/* 033 */     this.catalystConverter15 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[2]).dataType());
/* 034 */     this.converter17 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[2]).getChildren().apply(0))).dataType());
/* 035 */     this.converter18 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[2]).getChildren().apply(1))).dataType());
/* 036 */     this.converter19 = (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[2]).getChildren().apply(2))).dataType());
/* 037 */     this.udf20 = (scala.Function3)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)expressions[2]).userDefinedFunc());
/* 038 */     this.result28 = new UnsafeRow();
/* 039 */     this.bufferHolder29 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder();
/* 040 */     this.rowWriter30 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter();
/* 041 */   }
/* 042 */   
/* 043 */   // Scala.Function1 need this
/* 044 */   public java.lang.Object apply(java.lang.Object row) {
/* 045 */     return apply((InternalRow) row);
/* 046 */   }
/* 047 */   
/* 048 */   public UnsafeRow apply(InternalRow i) {
/* 049 */     bufferHolder29.reset();
/* 050 */     
/* 051 */     rowWriter30.initialize(bufferHolder29, 1);
/* 052 */     
/* 053 */     /* TIMESTAMPDIFF(second,input[0, TimestampType],TIMESTAMPADD(second,-1,input[0, TimestampType])) */
/* 054 */     /* second */
/* 055 */     /* expression: second */
/* 056 */     java.lang.Object obj10 = expressions[1].eval(i);
/* 057 */     boolean isNull8 = obj10 == null;
/* 058 */     UTF8String primitive9 = null;
/* 059 */     if (!isNull8) {
/* 060 */       primitive9 = (UTF8String) obj10;
/* 061 */     }/* input[0, TimestampType] */
/* 062 */     boolean isNull11 = i.isNullAt(0);
/* 063 */     long primitive12 = isNull11 ? -1L : (i.getLong(0));/* TIMESTAMPADD(second,-1,input[0, TimestampType]) */
/* 064 */     /* second */
/* 065 */     /* expression: second */
/* 066 */     java.lang.Object obj23 = expressions[3].eval(i);
/* 067 */     boolean isNull21 = obj23 == null;
/* 068 */     UTF8String primitive22 = null;
/* 069 */     if (!isNull21) {
/* 070 */       primitive22 = (UTF8String) obj23;
/* 071 */     }/* -1 */
/* 072 */     /* input[0, TimestampType] */
/* 073 */     boolean isNull26 = i.isNullAt(0);
/* 074 */     long primitive27 = isNull26 ? -1L : (i.getLong(0));
/* 075 */     long primitive14 = -1L;
/* 076 */     Boolean isNull13;
/* 077 */     
/* 078 */     Long result16 = (Long)catalystConverter15.apply(udf20.apply(converter17.apply(isNull21 ? null : (UTF8String) primitive22),converter18.apply(false ? null : (Integer) -1),converter19.apply(isNull26 ? null : (Long) primitive27)));
/* 079 */     
/* 080 */     primitive14 = result16;
/* 081 */     isNull13 = result16 == null;
/* 082 */     long primitive1 = -1L;
/* 083 */     Boolean isNull0;
/* 084 */     
/* 085 */     Long result3 = (Long)catalystConverter2.apply(udf7.apply(converter4.apply(isNull8 ? null : (UTF8String) primitive9),converter5.apply(isNull11 ? null : (Long) primitive12),converter6.apply(isNull13 ? null : (Long) primitive14)));
/* 086 */     
/* 087 */     primitive1 = result3;
/* 088 */     isNull0 = result3 == null;
/* 089 */     if (isNull0) {
/* 090 */       rowWriter30.setNullAt(0);
/* 091 */     } else {
/* 092 */       rowWriter30.write(0, primitive1);
/* 093 */     }
/* 094 */     
/* 095 */     result28.pointTo(bufferHolder29.buffer, 1, bufferHolder29.totalSize());
/* 096 */     return result28;
/* 097 */   }
/* 098 */ }
/* 099 */ 

    at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:515)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:358)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:313)
    at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
    at org.apache.spark.sql.execution.Project$$anonfun$1.apply(basicOperators.scala:47)
    at org.apache.spark.sql.execution.Project$$anonfun$1.apply(basicOperators.scala:46)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 78, Column 154: Incompatible expression types "void" and "int"
.
.
.

@cloud-fan
Copy link
Contributor

hi @markhamstra , can you share you test code? so that we can reproduce it, thanks!

@davies
Copy link
Contributor Author

davies commented Dec 11, 2015

@markhamstra Does #10266 fix it?

@markhamstra
Copy link
Contributor

@davies No -- see the other PR.

asfgit pushed a commit that referenced this pull request Dec 11, 2015
This is a follow-up PR for #10259

Author: Davies Liu <davies@databricks.com>

Closes #10266 from davies/null_udf2.

(cherry picked from commit c119a34)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
asfgit pushed a commit that referenced this pull request Dec 11, 2015
This is a follow-up PR for #10259

Author: Davies Liu <davies@databricks.com>

Closes #10266 from davies/null_udf2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants