Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17123][SQL] Use type-widened encoder for DataFrame rather than existing encoder to allow type-widening from set operations #15072

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Sep 13, 2016

What changes were proposed in this pull request?

This PR fixes set operations in DataFrame to be performed fine without exceptions when the types are non-scala native types. (e.g, TimestampType, DateType and DecimalType).

The problem is, it seems set operations such as union, intersect and except uses the encoder belonging to the Dataset in caller.

So, Dataset of the caller holds ExpressionEncoder[Row] as it is when the set operations are performed. However, the return types can be actually widen. So, we should use ExpressionEncoder[Row] constructed from executed plan rather than using existing one. Otherwise, this will generate some codes wrongly via StaticInvoke.

Running the codes below:

val dates = Seq(
  (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
  (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
).toDF("date", "timestamp", "decimal")

val widenTypedRows = Seq(
  (new Timestamp(2), 10.5D, "string")
).toDF("date", "timestamp", "decimal")

val results = dates.union(widenTypedRows).collect()
results.foreach(println)

prints below:

Before

23:08:54.490 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 28, Column 107: No applicable constructor/method found for actual parameters "long"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
/* 010 */   private org.apache.spark.sql.types.StructType schema;
/* 011 */
/* 012 */
/* 013 */   public SpecificSafeProjection(Object[] references) {
/* 014 */     this.references = references;
/* 015 */     mutableRow = (MutableRow) references[references.length - 1];
/* 016 */
/* 017 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 018 */   }
/* 019 */
/* 020 */   public java.lang.Object apply(java.lang.Object _i) {
/* 021 */     InternalRow i = (InternalRow) _i;
/* 022 */
/* 023 */     values = new Object[3];
/* 024 */
/* 025 */     boolean isNull2 = i.isNullAt(0);
/* 026 */     long value2 = isNull2 ? -1L : (i.getLong(0));
/* 027 */     boolean isNull1 = isNull2;
/* 028 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
/* 029 */     isNull1 = value1 == null;
/* 030 */     if (isNull1) {
/* 031 */       values[0] = null;
/* 032 */     } else {
/* 033 */       values[0] = value1;
/* 034 */     }
/* 035 */
/* 036 */     boolean isNull4 = i.isNullAt(1);
/* 037 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
/* 038 */
/* 039 */     boolean isNull3 = isNull4;
/* 040 */     java.math.BigDecimal value3 = null;
/* 041 */     if (!isNull3) {
/* 042 */
/* 043 */       Object funcResult = null;
/* 044 */       funcResult = value4.toJavaBigDecimal();
/* 045 */       if (funcResult == null) {
/* 046 */         isNull3 = true;
/* 047 */       } else {
/* 048 */         value3 = (java.math.BigDecimal) funcResult;
/* 049 */       }
/* 050 */
/* 051 */     }
/* 052 */     isNull3 = value3 == null;
/* 053 */     if (isNull3) {
/* 054 */       values[1] = null;
/* 055 */     } else {
/* 056 */       values[1] = value3;
/* 057 */     }
/* 058 */
/* 059 */     boolean isNull6 = i.isNullAt(2);
/* 060 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
/* 061 */     boolean isNull5 = isNull6;
/* 062 */     final java.sql.Timestamp value5 = isNull5 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
/* 063 */     isNull5 = value5 == null;
/* 064 */     if (isNull5) {
/* 065 */       values[2] = null;
/* 066 */     } else {
/* 067 */       values[2] = value5;
/* 068 */     }
/* 069 */
/* 070 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
/* 071 */     if (false) {
/* 072 */       mutableRow.setNullAt(0);
/* 073 */     } else {
/* 074 */
/* 075 */       mutableRow.update(0, value);
/* 076 */     }
/* 077 */
/* 078 */     return mutableRow;
/* 079 */   }
/* 080 */ }

After

[1969-12-31 00:00:00.0,1.0,1969-12-31 16:00:00.002]
[1969-12-31 00:00:00.0,4.0,1969-12-31 16:00:00.005]
[1969-12-31 16:00:00.002,10.5,string]

How was this patch tested?

Unit tests in DataFrameSuite

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Sep 13, 2016

@rxin and @JoshRosen Actually, I made this patch a couple of weeks ago but I was being hesitated to submit this PR because it seems hacky. Though, I found this not fixed for about a month. So, I decided to submit this. So, any suggestions are welcome.
Also, I am happy to close this if it looks so hacky.

@HyukjinKwon
Copy link
Member Author

cc @dongjoon-hyun too who I believe took a look for this ahead.

@HyukjinKwon
Copy link
Member Author

Maybe I should've left a comment in the JIRA firsr to ask if I can work on this. I also don't mind closing this if any of you is working on this in a better way.

@SparkQA
Copy link

SparkQA commented Sep 13, 2016

Test build #65302 has finished for PR 15072 at commit 6131810.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon changed the title [SPARK-17123][SQL] Use type-widen encoder for DataFrame rather than existing encoder to allow type-widening from set operations [SPARK-17123][SQL] Use type-widened encoder for DataFrame rather than existing encoder to allow type-widening from set operations Sep 13, 2016
@hvanhovell
Copy link
Contributor

@HyukjinKwon I cannot reproduce the bug using the example you give in the description. I can however reproduce it if I use the example in the JIRA.

@hvanhovell
Copy link
Contributor

Nvm - I need to call collect() instead of show()

@HyukjinKwon
Copy link
Member Author

Yeap. BTW, thats because show() calls toDF internally (as you already know).

@HyukjinKwon
Copy link
Member Author

Hi @hvanhovell, What do you think about this?
Would this one be like the one we might merge if we can't find any better way until the next release? or it seems too hacky? I am being hesitated whether I should close this or not.

@davies
Copy link
Contributor

davies commented Sep 19, 2016

@cloud-fan Could you review this one?

@SparkQA
Copy link

SparkQA commented Sep 22, 2016

Test build #65770 has finished for PR 15072 at commit e27fe51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Hi @cloud-fan , could you take a look please?

@@ -53,7 +53,15 @@ import org.apache.spark.util.Utils

private[sql] object Dataset {
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to add an overload version?

class RowEncoder extends ExpressionEncoder[Row]

def apply(sparkSession: SparkSession, logicalPlan: LogicalPlan)(implicit enc: RowEncoder): DataFrame

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback. I just tried the suggested change but it was failed.

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

I don't have a better idea either, so this LGTM

cc @liancheng do you have any ideas?

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66382 has finished for PR 15072 at commit e27fe51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[T]]
} else {
new Dataset(sparkSession, logicalPlan, encoder)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be cleaner and less hacky if we move this fix into all those set operator methods?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please guide me a bit more please? I am a bit uncertain of how/where to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In transformation methods of Dataset, normally we will call withTypedPlan to generate a new Dataset. However, for set operator methods, we should call a different method and put this special logic in it, so that the scope of this hack is narrowed down to only set operator methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need this for Union right? In all other cases we only return tuples from the first dataset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I manually tested. It seems except is failed too. It seems fine with intersect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would except fail? Only rows from the first dataset should be returned, or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, here is the codes I ran

val dates = Seq(
  (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
  (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
).toDF("date", "timestamp", "decimal")

val widenTypedRows = Seq(
  (new Timestamp(2), 10.5D, "string")
).toDF("date", "timestamp", "decimal")

dates.except(widenTypedRows).collect()

and error message.

23:10:05.331 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 30, Column 107: No applicable constructor/method found for actual parameters "long"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private InternalRow mutableRow;
/* 009 */   private Object[] values;
/* 010 */   private org.apache.spark.sql.types.StructType schema;
/* 011 */
/* 012 */   public SpecificSafeProjection(Object[] references) {
/* 013 */     this.references = references;
/* 014 */     mutableRow = (InternalRow) references[references.length - 1];
/* 015 */
/* 016 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 017 */
/* 018 */   }
/* 019 */
/* 020 */
/* 021 */
/* 022 */   public java.lang.Object apply(java.lang.Object _i) {
/* 023 */     InternalRow i = (InternalRow) _i;
/* 024 */
/* 025 */     values = new Object[3];
/* 026 */
/* 027 */     boolean isNull2 = i.isNullAt(0);
/* 028 */     long value2 = isNull2 ? -1L : (i.getLong(0));
/* 029 */     boolean isNull1 = isNull2;
/* 030 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
/* 031 */     isNull1 = value1 == null;
/* 032 */     if (isNull1) {
/* 033 */       values[0] = null;
/* 034 */     } else {
/* 035 */       values[0] = value1;
/* 036 */     }
/* 037 */
/* 038 */     boolean isNull4 = i.isNullAt(1);
/* 039 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
/* 040 */
/* 041 */     boolean isNull3 = isNull4;
/* 042 */     java.math.BigDecimal value3 = null;
/* 043 */     if (!isNull3) {
/* 044 */
/* 045 */       Object funcResult = null;
/* 046 */       funcResult = value4.toJavaBigDecimal();
/* 047 */       if (funcResult == null) {
/* 048 */         isNull3 = true;
/* 049 */       } else {
/* 050 */         value3 = (java.math.BigDecimal) funcResult;
/* 051 */       }
/* 052 */
/* 053 */     }
/* 054 */     isNull3 = value3 == null;
/* 055 */     if (isNull3) {
/* 056 */       values[1] = null;
/* 057 */     } else {
/* 058 */       values[1] = value3;
/* 059 */     }
/* 060 */
/* 061 */     boolean isNull6 = i.isNullAt(2);
/* 062 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
/* 063 */     boolean isNull5 = isNull6;
/* 064 */     final java.sql.Timestamp value5 = isNull5 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
/* 065 */     isNull5 = value5 == null;
/* 066 */     if (isNull5) {
/* 067 */       values[2] = null;
/* 068 */     } else {
/* 069 */       values[2] = value5;
/* 070 */     }
/* 071 */
/* 072 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
/* 073 */     if (false) {
/* 074 */       mutableRow.setNullAt(0);
/* 075 */     } else {
/* 076 */
/* 077 */       mutableRow.update(0, value);
/* 078 */     }
/* 079 */
/* 080 */     return mutableRow;
/* 081 */   }
/* 082 */ }
/* 028 */     long value2 = isNull2 ? -1L : (i.getLong(0));
/* 029 */     boolean isNull1 = isNull2;
/* 030 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);

Here, it seems toJavaDate takes Int but it seems long is given from Timestamp.
It (apparently) seems it needs widen schema to compare each. I will look into this deeper but do you have any idea on this maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah-ha, it seems we are widening types for set operators[1].

val dates = Seq(
  (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
  (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
).toDF("date", "timestamp", "decimal")

val widenTypedRows = Seq(
  (new Timestamp(2), 10.5D, "string")
).toDF("date", "timestamp", "decimal")

dates.printSchema()
dates.except(widenTypedRows).toDF().printSchema()

prints

root
 |-- date: date (nullable = true)
 |-- timestamp: decimal(38,18) (nullable = true)
 |-- decimal: timestamp (nullable = true)

root
 |-- date: timestamp (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- decimal: string (nullable = true)

[1]

* This rule is only applied to Union/Except/Intersect
*/
object WidenSetOperationTypes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if p.analyzed => p
case s @ SetOperation(left, right) if s.childrenResolved &&
left.output.length == right.output.length && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
assert(newChildren.length == 2)
s.makeCopy(Array(newChildren.head, newChildren.last))
case s: Union if s.childrenResolved &&
s.children.forall(_.output.length == s.children.head.output.length) && !s.resolved =>
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(s.children)
s.makeCopy(Array(newChildren))
}

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reason why intersect was fine seems because it does contains any data. intersect seems also failed:

val dates = Seq(Tuple1(BigDecimal.valueOf(10.5))).toDF("decimal")
val widenTypedRows =  Seq(Tuple1(10.5D)).toDF("decimal")
dates.intersect(widenTypedRows).collect()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I forgot about type widening.

@HyukjinKwon
Copy link
Member Author

@liancheng @hvanhovell @davies @cloud-fan Would there be other things maybe I should take care of ?

@hvanhovell
Copy link
Contributor

hvanhovell commented Oct 22, 2016

@HyukjinKwon can we move this into a set specific method within Dataset itself, for example:

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
  if (classTag.runtimeClass == classOf[Row]) {
    // Set operators widen types (change the schema), so we cannot reuse the row encoder.
    Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
  } else {
    Dataset(sparkSession, logicalPlan)
  }
}

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Oct 22, 2016

@hvanhovell Oh, I think that would look cleaner and nicer. I will try.


/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass == classOf[Row]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also allow sub-classes?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it'd be fine within Spark as it seems always setting clsTag to Row via RowEncoder.scala#L59 in the path creating/loading DataFrame; however, it seems potentially problematic if another encoder is implemented and used with a subclass of Row as I guess you meant.

Let me try to handle this, at least, for safety. Thank you.

@hvanhovell
Copy link
Contributor

LGTM - pending jenkins

@HyukjinKwon
Copy link
Member Author

(@hvanhovell Thank you for reviewing closly)

@SparkQA
Copy link

SparkQA commented Oct 22, 2016

Test build #67392 has finished for PR 15072 at commit 8bcddc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Oct 22, 2016

Merging to master. Thanks!

@hvanhovell
Copy link
Contributor

@HyukjinKwon I cannot cherry-pick this into 2.0. Can you open a PR against 2.0?

@asfgit asfgit closed this in 5fa9f87 Oct 22, 2016
@HyukjinKwon
Copy link
Member Author

I definitely will tomorrow.

@SparkQA
Copy link

SparkQA commented Oct 22, 2016

Test build #67393 has finished for PR 15072 at commit 1298e79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Oct 23, 2016
… for set operations

## What changes were proposed in this pull request?

This PR backports #15072

Please note that the test code is a bit different with the master as #14786 was only merged into master and therefore, it does not support type-widening between `DateType` and `TimestampType`.

So, both types were taken out from the test.

## How was this patch tested?

Unit test in `DataFrameSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15601 from HyukjinKwon/backport-17123.
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
… existing encoder to allow type-widening from set operations

# What changes were proposed in this pull request?

This PR fixes set operations in `DataFrame` to be performed fine without exceptions when the types are non-scala native types. (e.g, `TimestampType`, `DateType` and `DecimalType`).

The problem is, it seems set operations such as `union`, `intersect` and `except` uses the encoder belonging to the `Dataset` in caller.

So, `Dataset` of the caller holds `ExpressionEncoder[Row]` as it is when the set operations are performed. However, the return types can be actually widen. So, we should use `ExpressionEncoder[Row]` constructed from executed plan rather than using existing one. Otherwise, this will generate some codes wrongly via `StaticInvoke`.

Running the codes below:

```scala
val dates = Seq(
  (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
  (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
).toDF("date", "timestamp", "decimal")

val widenTypedRows = Seq(
  (new Timestamp(2), 10.5D, "string")
).toDF("date", "timestamp", "decimal")

val results = dates.union(widenTypedRows).collect()
results.foreach(println)
```

prints below:

**Before**

```java
23:08:54.490 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 28, Column 107: No applicable constructor/method found for actual parameters "long"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
/* 010 */   private org.apache.spark.sql.types.StructType schema;
/* 011 */
/* 012 */
/* 013 */   public SpecificSafeProjection(Object[] references) {
/* 014 */     this.references = references;
/* 015 */     mutableRow = (MutableRow) references[references.length - 1];
/* 016 */
/* 017 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 018 */   }
/* 019 */
/* 020 */   public java.lang.Object apply(java.lang.Object _i) {
/* 021 */     InternalRow i = (InternalRow) _i;
/* 022 */
/* 023 */     values = new Object[3];
/* 024 */
/* 025 */     boolean isNull2 = i.isNullAt(0);
/* 026 */     long value2 = isNull2 ? -1L : (i.getLong(0));
/* 027 */     boolean isNull1 = isNull2;
/* 028 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
/* 029 */     isNull1 = value1 == null;
/* 030 */     if (isNull1) {
/* 031 */       values[0] = null;
/* 032 */     } else {
/* 033 */       values[0] = value1;
/* 034 */     }
/* 035 */
/* 036 */     boolean isNull4 = i.isNullAt(1);
/* 037 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
/* 038 */
/* 039 */     boolean isNull3 = isNull4;
/* 040 */     java.math.BigDecimal value3 = null;
/* 041 */     if (!isNull3) {
/* 042 */
/* 043 */       Object funcResult = null;
/* 044 */       funcResult = value4.toJavaBigDecimal();
/* 045 */       if (funcResult == null) {
/* 046 */         isNull3 = true;
/* 047 */       } else {
/* 048 */         value3 = (java.math.BigDecimal) funcResult;
/* 049 */       }
/* 050 */
/* 051 */     }
/* 052 */     isNull3 = value3 == null;
/* 053 */     if (isNull3) {
/* 054 */       values[1] = null;
/* 055 */     } else {
/* 056 */       values[1] = value3;
/* 057 */     }
/* 058 */
/* 059 */     boolean isNull6 = i.isNullAt(2);
/* 060 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
/* 061 */     boolean isNull5 = isNull6;
/* 062 */     final java.sql.Timestamp value5 = isNull5 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
/* 063 */     isNull5 = value5 == null;
/* 064 */     if (isNull5) {
/* 065 */       values[2] = null;
/* 066 */     } else {
/* 067 */       values[2] = value5;
/* 068 */     }
/* 069 */
/* 070 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
/* 071 */     if (false) {
/* 072 */       mutableRow.setNullAt(0);
/* 073 */     } else {
/* 074 */
/* 075 */       mutableRow.update(0, value);
/* 076 */     }
/* 077 */
/* 078 */     return mutableRow;
/* 079 */   }
/* 080 */ }
```

**After**

```bash
[1969-12-31 00:00:00.0,1.0,1969-12-31 16:00:00.002]
[1969-12-31 00:00:00.0,4.0,1969-12-31 16:00:00.005]
[1969-12-31 16:00:00.002,10.5,string]
```

## How was this patch tested?

Unit tests in `DataFrameSuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#15072 from HyukjinKwon/SPARK-17123.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… existing encoder to allow type-widening from set operations

# What changes were proposed in this pull request?

This PR fixes set operations in `DataFrame` to be performed fine without exceptions when the types are non-scala native types. (e.g, `TimestampType`, `DateType` and `DecimalType`).

The problem is, it seems set operations such as `union`, `intersect` and `except` uses the encoder belonging to the `Dataset` in caller.

So, `Dataset` of the caller holds `ExpressionEncoder[Row]` as it is when the set operations are performed. However, the return types can be actually widen. So, we should use `ExpressionEncoder[Row]` constructed from executed plan rather than using existing one. Otherwise, this will generate some codes wrongly via `StaticInvoke`.

Running the codes below:

```scala
val dates = Seq(
  (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
  (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
).toDF("date", "timestamp", "decimal")

val widenTypedRows = Seq(
  (new Timestamp(2), 10.5D, "string")
).toDF("date", "timestamp", "decimal")

val results = dates.union(widenTypedRows).collect()
results.foreach(println)
```

prints below:

**Before**

```java
23:08:54.490 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 28, Column 107: No applicable constructor/method found for actual parameters "long"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private MutableRow mutableRow;
/* 009 */   private Object[] values;
/* 010 */   private org.apache.spark.sql.types.StructType schema;
/* 011 */
/* 012 */
/* 013 */   public SpecificSafeProjection(Object[] references) {
/* 014 */     this.references = references;
/* 015 */     mutableRow = (MutableRow) references[references.length - 1];
/* 016 */
/* 017 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 018 */   }
/* 019 */
/* 020 */   public java.lang.Object apply(java.lang.Object _i) {
/* 021 */     InternalRow i = (InternalRow) _i;
/* 022 */
/* 023 */     values = new Object[3];
/* 024 */
/* 025 */     boolean isNull2 = i.isNullAt(0);
/* 026 */     long value2 = isNull2 ? -1L : (i.getLong(0));
/* 027 */     boolean isNull1 = isNull2;
/* 028 */     final java.sql.Date value1 = isNull1 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
/* 029 */     isNull1 = value1 == null;
/* 030 */     if (isNull1) {
/* 031 */       values[0] = null;
/* 032 */     } else {
/* 033 */       values[0] = value1;
/* 034 */     }
/* 035 */
/* 036 */     boolean isNull4 = i.isNullAt(1);
/* 037 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
/* 038 */
/* 039 */     boolean isNull3 = isNull4;
/* 040 */     java.math.BigDecimal value3 = null;
/* 041 */     if (!isNull3) {
/* 042 */
/* 043 */       Object funcResult = null;
/* 044 */       funcResult = value4.toJavaBigDecimal();
/* 045 */       if (funcResult == null) {
/* 046 */         isNull3 = true;
/* 047 */       } else {
/* 048 */         value3 = (java.math.BigDecimal) funcResult;
/* 049 */       }
/* 050 */
/* 051 */     }
/* 052 */     isNull3 = value3 == null;
/* 053 */     if (isNull3) {
/* 054 */       values[1] = null;
/* 055 */     } else {
/* 056 */       values[1] = value3;
/* 057 */     }
/* 058 */
/* 059 */     boolean isNull6 = i.isNullAt(2);
/* 060 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
/* 061 */     boolean isNull5 = isNull6;
/* 062 */     final java.sql.Timestamp value5 = isNull5 ? null : org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
/* 063 */     isNull5 = value5 == null;
/* 064 */     if (isNull5) {
/* 065 */       values[2] = null;
/* 066 */     } else {
/* 067 */       values[2] = value5;
/* 068 */     }
/* 069 */
/* 070 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
/* 071 */     if (false) {
/* 072 */       mutableRow.setNullAt(0);
/* 073 */     } else {
/* 074 */
/* 075 */       mutableRow.update(0, value);
/* 076 */     }
/* 077 */
/* 078 */     return mutableRow;
/* 079 */   }
/* 080 */ }
```

**After**

```bash
[1969-12-31 00:00:00.0,1.0,1969-12-31 16:00:00.002]
[1969-12-31 00:00:00.0,4.0,1969-12-31 16:00:00.005]
[1969-12-31 16:00:00.002,10.5,string]
```

## How was this patch tested?

Unit tests in `DataFrameSuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#15072 from HyukjinKwon/SPARK-17123.
@HyukjinKwon HyukjinKwon deleted the SPARK-17123 branch January 2, 2018 03:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants