Skip to content

Commit

Permalink
[SPARK-15658][SQL] UDT serializer should declare its data type as udt…
Browse files Browse the repository at this point in the history
… instead of udt.sqlType

## What changes were proposed in this pull request?

When we build serializer for UDT object, we should declare its data type as udt instead of udt.sqlType, or if we deserialize it again, we lose the information that it's a udt object and throw analysis exception.

## How was this patch tested?

new test in `UserDefiendTypeSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13402 from cloud-fan/udt.
  • Loading branch information
cloud-fan authored and yhuai committed May 31, 2016
1 parent d67c82e commit 2bfed1a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
Expand Up @@ -568,7 +568,7 @@ object ScalaReflection extends ScalaReflection {
udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(),
Nil,
dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt()))
Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
Invoke(obj, "serialize", udt, inputObject :: Nil)

case t if UDTRegistration.exists(getClassNameFromType(t)) =>
val udt = UDTRegistration.getUDTFor(getClassNameFromType(t)).get.newInstance()
Expand All @@ -577,7 +577,7 @@ object ScalaReflection extends ScalaReflection {
udt.getClass,
Nil,
dataType = ObjectType(udt.getClass))
Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
Invoke(obj, "serialize", udt, inputObject :: Nil)

case t if definedByConstructorParams(t) =>
val params = getConstructorParameters(t)
Expand Down
Expand Up @@ -88,7 +88,7 @@ object RowEncoder {
udtClass,
Nil,
dataType = ObjectType(udtClass), false)
Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
Invoke(obj, "serialize", udt, inputObject :: Nil)

case TimestampType =>
StaticInvoke(
Expand Down
Expand Up @@ -188,6 +188,10 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT

val toCatalystConverter = CatalystTypeConverters.createToCatalystConverter(udt)
assert(toCatalystConverter(null) === null)
}

test("SPARK-15658: Analysis exception if Dataset.map returns UDT object") {
// call `collect` to make sure this query can pass analysis.
pointsRDD.as[MyLabeledPoint].map(_.copy(label = 2.0)).collect()
}
}

0 comments on commit 2bfed1a

Please sign in to comment.