Skip to content

Commit

Permalink
fix some test failed and add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
windpiger committed Feb 15, 2017
1 parent 446ff43 commit 4aac7dd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object ScalaReflection extends ScalaReflection {

/** Returns the current path or `GetColumnByOrdinal`. */
def getPath: Expression = {
val dataType = schemaFor(tpe).dataType
val dataType = schemaForDefaultBinaryType(tpe).dataType
if (path.isDefined) {
path.get
} else {
Expand Down Expand Up @@ -409,7 +409,8 @@ object ScalaReflection extends ScalaReflection {
val cls = getClassFromType(tpe)

val arguments = params.zipWithIndex.map { case ((fieldName, fieldType), i) =>
val Schema(dataType, nullable) = schemaFor(fieldType)
val Schema(dataType, nullablity) = schemaForDefaultBinaryType(fieldType)

val clsName = getClassNameFromType(fieldType)
val newTypePath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath
// For tuples, we based grab the inner fields by ordinal instead of name.
Expand All @@ -424,7 +425,7 @@ object ScalaReflection extends ScalaReflection {
Some(addToPath(fieldName, dataType, newTypePath)),
newTypePath)

if (!nullable) {
if (!nullablity) {
AssertNotNull(constructor, newTypePath)
} else {
constructor
Expand All @@ -445,6 +446,7 @@ object ScalaReflection extends ScalaReflection {
}

case _ =>
// default kryo deserializer
DecodeUsingSerializer(getPath, ClassTag(getClassFromType(tpe)), true)
}
}
Expand Down Expand Up @@ -644,7 +646,8 @@ object ScalaReflection extends ScalaReflection {
val nullOutput = expressions.Literal.create(null, nonNullOutput.dataType)
expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)

case other =>
case _ =>
// default kryo serializer
EncodeUsingSerializer(inputObject, true)
}

Expand Down Expand Up @@ -712,6 +715,13 @@ object ScalaReflection extends ScalaReflection {
s.toAttributes
}

/**
* Returns a catalyst DataType and its nullability for the given Scala Type using reflection.
* If the tpe mismatched in schemaFor function, the default BinaryType returned
*/
def schemaForDefaultBinaryType(tpe: `Type`): Schema = scala.util.Try(schemaFor(tpe)).toOption
.getOrElse(Schema(BinaryType, nullable = true))

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])

Expand Down Expand Up @@ -775,7 +785,8 @@ object ScalaReflection extends ScalaReflection {
StructField(fieldName, dataType, nullable)
}), nullable = true)
case other =>
Schema(BinaryType, nullable = false)
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
// Schema(BinaryType, nullable = false)
}
}

Expand Down
15 changes: 15 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
assert(spark.range(1).map { x => new java.sql.Timestamp(100000) }.head ==
new java.sql.Timestamp(100000))
}

test("fallback to kryo for unknow classes in ExpressionEncoder") {
val ds = Seq(DefaultKryoEncoderForSubFiled("a", Seq(1), Some(Set(2))),
DefaultKryoEncoderForSubFiled("b", Seq(3), None)).toDS()
checkDataset(ds, DefaultKryoEncoderForSubFiled("a", Seq(1), Some(Set(2))),
DefaultKryoEncoderForSubFiled("b", Seq(3), None))

val df = ds.toDF()
val x = df.schema
assert(df.schema(0).dataType == StringType)
assert(df.schema(1).dataType == ArrayType(IntegerType, null = false))
assert(df.schema(2).dataType == BinaryType)
}
}

case class DefaultKryoEncoderForSubFiled(a: String, b: Seq[Int], c: Option[Set[Int]])

case class WithImmutableMap(id: String, map_test: scala.collection.immutable.Map[Long, String])
case class WithMap(id: String, map_test: scala.collection.Map[Long, String])

Expand Down

0 comments on commit 4aac7dd

Please sign in to comment.