Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17765][SQL] Support for writing out user-defined type in ORC datasource #15361

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Oct 5, 2016

What changes were proposed in this pull request?

This PR adds the support for UserDefinedType when writing out instead of throwing ClassCastException in ORC data source.

In more details, OrcStruct is being created based on string fromDataType.catalogString. For user-defined type, it seems it returns sqlType.simpleString for catalogString by default[1]. However, during type-dispatching to match the output with the schema, it tries to cast to, for example, StructType[2].

So, running the codes below (MyDenseVector was borrowed[3]) :

val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc("/tmp/test.orc")

ends up throwing an exception as below:

java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot be cast to org.apache.spark.sql.types.ArrayType
    at org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381)
    at org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164)
...

So, this PR uses UserDefinedType.sqlType during finding the correct converter when writing out in ORC data source.

[1]

override def catalogString: String = sqlType.simpleString

[2]
case soi: StandardStructObjectInspector =>

[3]
@SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
private[sql] class MyDenseVector(val data: Array[Double]) extends Serializable {
override def hashCode(): Int = java.util.Arrays.hashCode(data)
override def equals(other: Any): Boolean = other match {
case v: MyDenseVector => java.util.Arrays.equals(this.data, v.data)
case _ => false
}
}
private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
override def sqlType: DataType = ArrayType(DoubleType, containsNull = false)
override def serialize(features: MyDenseVector): ArrayData = {
new GenericArrayData(features.data.map(_.asInstanceOf[Any]))
}
override def deserialize(datum: Any): MyDenseVector = {
datum match {
case data: ArrayData =>
new MyDenseVector(data.toDoubleArray())
}
}
override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
private[spark] override def asNullable: MyDenseVectorUDT = this
override def hashCode(): Int = getClass.hashCode()
override def equals(other: Any): Boolean = other.isInstanceOf[MyDenseVectorUDT]
}

How was this patch tested?

Unit tests in OrcQuerySuite.

@HyukjinKwon
Copy link
Member Author

@yhuai and @liancheng Do you mind if I ask to review this please?

@SparkQA
Copy link

SparkQA commented Oct 5, 2016

Test build #66386 has finished for PR 15361 at commit 948a5ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kxepal
Copy link
Member

kxepal commented Oct 6, 2016

@HyukjinKwon
Thanks for the patch, but suddenly it doesn't solves the issue. Tested with 2.0.0 Spark:

Caused by: java.lang.ClassCastException: org.apache.spark.mllib.linalg.VectorUDT cannot be cast to org.apache.spark.sql.types.StructType
  at org.apache.spark.sql.hive.HiveInspectors$class.wrap(HiveInspectors.scala:558)
  at org.apache.spark.sql.hive.orc.OrcSerializer.wrap(OrcFileFormat.scala:164)
  at org.apache.spark.sql.hive.orc.OrcSerializer.wrapOrcStruct(OrcFileFormat.scala:202)
  at org.apache.spark.sql.hive.orc.OrcSerializer.serialize(OrcFileFormat.scala:168)
  at org.apache.spark.sql.hive.orc.OrcOutputWriter.writeInternal(OrcFileFormat.scala:253)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:255)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)

Let me try to make simple scala test case that reproduces the issue from shell. May be this will be more helpful.

@HyukjinKwon
Copy link
Member Author

@kxepal, Sure, I will definitely try to reproduce as soon as you do. Meanwhile, let me double check this. Thanks.

@kxepal
Copy link
Member

kxepal commented Oct 6, 2016

@HyukjinKwon
Ok, try something like this:

scala> val sv = org.apache.spark.mllib.linalg.Vectors.sparse(7, Array(0, 42), Array(-127, 128))
sv: org.apache.spark.mllib.linalg.Vector = (7,[0,42],[-127.0,128.0])

scala> val df = Seq(("thing", sv)).toDF("thing", "vector")
df: org.apache.spark.sql.DataFrame = [thing: string, vector: vector]

scala> df.write.format("orc").save("/tmp/thing.orc")

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Oct 6, 2016

@kxepal I will test this and fix it here together as well within tomorrow if there are some more cases to handle. Thanks for verifying this PR.

@kxepal
Copy link
Member

kxepal commented Oct 6, 2016

@HyukjinKwon Thank you a lot! Staying tuned.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Oct 7, 2016

@kxepal , I just tested (build, copied and pasted) the codes below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark Hive Example").enableHiveSupport().getOrCreate()

import spark.implicits._

val sv = org.apache.spark.mllib.linalg.Vectors.sparse(7, Array(0, 42), Array(-127, 128))
val df = Seq(("thing", sv)).toDF("thing", "vector")
df.write.format("orc").save("/tmp/thing.orc")
spark.read.schema(df.schema).format("orc").load("/tmp/thing.orc").show()

prints below:

+-----+--------------------+
|thing|              vector|
+-----+--------------------+
|thing|(7,[0,42],[-127.0...|
+-----+--------------------+

and it seems fine with the current master branch. Do you mind if I try to verify this again when we hopefully backport to branch-2.0 after this only is hopefully merged?

@kxepal
Copy link
Member

kxepal commented Oct 7, 2016

@HyukjinKwon
Oh, great news! It seems it's me backported this patch to 2.0.0 incorrectly. I'm sorry for false alarm then - suddenly, I wasn't able to test it with master.

I'll do one more try today, but so far it looks like that you solved the problem \o/ Thank you!

@HyukjinKwon
Copy link
Member Author

@kxepal Sure, thanks for confirming!

@kxepal
Copy link
Member

kxepal commented Oct 7, 2016

@HyukjinKwon
It works great! Thank you! My mistake was by applying changes for the same wrapperFor method, while for 2.0.0 sources state it have to be placed in wrap method instead with a bit modification to pass third argument in recursive call.

@HyukjinKwon
Copy link
Member Author

Hi @chenghao-intel and @davies, it seems related code paths were updated by your before. Do you mind if I ask to take a look please?

@chenghao-intel
Copy link
Contributor

yes, please go ahead. :)

@HyukjinKwon
Copy link
Member Author

@chenghao-intel @davies Would there be other things maybe I should test or take care of?

@@ -246,6 +246,9 @@ private[hive] trait HiveInspectors {
* Wraps with Hive types based on object inspector.
*/
protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match {
case _ if dataType.isInstanceOf[UserDefinedType[_]] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This codepath is shared by many things apart from ORC. Won't those be affected ?
  • I would put this case in the every end. The reason being UserDefinedType are not that common compared to other types (esp. primitive types). So putting it below in the switch case will be better for perf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This codepath is shared by many things apart from ORC. Won't those be affected ?

It seems this path is being used in hiveUDFs.scala and hiveWriterContainers.scala.
Actually, it'd be fine that a value converter for UDT uses the equivalent type (inner sql type) converter internally. It is a common pattern for other data sources as well.

I would put this case in the every end. The reason being UserDefinedType are not that common compared to other types (esp. primitive types). So putting it below in the switch case will be better for perf.

Cool :) (BTW, this would be only executed once per task as you might already know.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually, I forgot that this path is doing a pattern matching via ObjectInspector not with DataType. I just remember I put this first for this reason (we should check dataType regardless of ObjectInspector first).

I definitely can avoid this via fixing more than here but I guess it is fine as it is because this is not in the critical path (executed once).

val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc(path.getAbsolutePath)
val readBack = spark.read.schema(udtDF.schema).orc(path.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious : how does this work ? I mean you added support for UserDefinedType in wrapper side, but at the unwrapper side I don't see UserDefinedType being handled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems fine for reading because it refers the schema from ORC (detecting the fields via field names). Internal projection for UDT uses udt.sqlType IIUC.

@HyukjinKwon
Copy link
Member Author

(gentle ping @chenghao-intel @davies)

@HyukjinKwon
Copy link
Member Author

ping ..

@kxepal
Copy link
Member

kxepal commented Nov 21, 2016

@HyukjinKwon May be we can reach someone else with commit bit? Do you know anyone to ping?

@HyukjinKwon
Copy link
Member Author

I think the recent related codes were committed by @rxin. Do you mind if I ask to take a look please?

@kxepal
Copy link
Member

kxepal commented Nov 21, 2016

@HyukjinKwon Please, do! Thanks a lot for helping here (:

@rxin
Copy link
Contributor

rxin commented Nov 21, 2016

Merging in master/branch-2.1.

@asfgit asfgit closed this in a2d4647 Nov 21, 2016
asfgit pushed a commit that referenced this pull request Nov 21, 2016
…atasource

## What changes were proposed in this pull request?

This PR adds the support for `UserDefinedType` when writing out instead of throwing `ClassCastException` in ORC data source.

In more details, `OrcStruct` is being created based on string from`DataType.catalogString`. For user-defined type, it seems it returns `sqlType.simpleString` for `catalogString` by default[1]. However, during type-dispatching to match the output with the schema, it tries to cast to, for example, `StructType`[2].

So, running the codes below (`MyDenseVector` was borrowed[3]) :

``` scala
val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc("/tmp/test.orc")
```

ends up throwing an exception as below:

```
java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot be cast to org.apache.spark.sql.types.ArrayType
    at org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381)
    at org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164)
...
```

So, this PR uses `UserDefinedType.sqlType` during finding the correct converter when writing out in ORC data source.

[1]https://github.com/apache/spark/blob/dfdcab00c7b6200c22883baa3ebc5818be09556f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L95
[2]https://github.com/apache/spark/blob/d2dc8c4a162834818190ffd82894522c524ca3e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L326
[3]https://github.com/apache/spark/blob/2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala#L38-L70
## How was this patch tested?

Unit tests in `OrcQuerySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15361 from HyukjinKwon/SPARK-17765.

(cherry picked from commit a2d4647)
Signed-off-by: Reynold Xin <rxin@databricks.com>
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…atasource

## What changes were proposed in this pull request?

This PR adds the support for `UserDefinedType` when writing out instead of throwing `ClassCastException` in ORC data source.

In more details, `OrcStruct` is being created based on string from`DataType.catalogString`. For user-defined type, it seems it returns `sqlType.simpleString` for `catalogString` by default[1]. However, during type-dispatching to match the output with the schema, it tries to cast to, for example, `StructType`[2].

So, running the codes below (`MyDenseVector` was borrowed[3]) :

``` scala
val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc("/tmp/test.orc")
```

ends up throwing an exception as below:

```
java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot be cast to org.apache.spark.sql.types.ArrayType
    at org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381)
    at org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164)
...
```

So, this PR uses `UserDefinedType.sqlType` during finding the correct converter when writing out in ORC data source.

[1]https://github.com/apache/spark/blob/dfdcab00c7b6200c22883baa3ebc5818be09556f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L95
[2]https://github.com/apache/spark/blob/d2dc8c4a162834818190ffd82894522c524ca3e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L326
[3]https://github.com/apache/spark/blob/2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala#L38-L70
## How was this patch tested?

Unit tests in `OrcQuerySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#15361 from HyukjinKwon/SPARK-17765.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…atasource

## What changes were proposed in this pull request?

This PR adds the support for `UserDefinedType` when writing out instead of throwing `ClassCastException` in ORC data source.

In more details, `OrcStruct` is being created based on string from`DataType.catalogString`. For user-defined type, it seems it returns `sqlType.simpleString` for `catalogString` by default[1]. However, during type-dispatching to match the output with the schema, it tries to cast to, for example, `StructType`[2].

So, running the codes below (`MyDenseVector` was borrowed[3]) :

``` scala
val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
val udtDF = data.toDF("id", "vectors")
udtDF.write.orc("/tmp/test.orc")
```

ends up throwing an exception as below:

```
java.lang.ClassCastException: org.apache.spark.sql.UDT$MyDenseVectorUDT cannot be cast to org.apache.spark.sql.types.ArrayType
    at org.apache.spark.sql.hive.HiveInspectors$class.wrapperFor(HiveInspectors.scala:381)
    at org.apache.spark.sql.hive.orc.OrcSerializer.wrapperFor(OrcFileFormat.scala:164)
...
```

So, this PR uses `UserDefinedType.sqlType` during finding the correct converter when writing out in ORC data source.

[1]https://github.com/apache/spark/blob/dfdcab00c7b6200c22883baa3ebc5818be09556f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala#L95
[2]https://github.com/apache/spark/blob/d2dc8c4a162834818190ffd82894522c524ca3e5/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L326
[3]https://github.com/apache/spark/blob/2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala#L38-L70
## How was this patch tested?

Unit tests in `OrcQuerySuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#15361 from HyukjinKwon/SPARK-17765.
@HyukjinKwon HyukjinKwon deleted the SPARK-17765 branch January 2, 2018 03:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants