Skip to content

Commit

Permalink
[SPARK-18122][SQL]Fallback to Kryo for unsupported encoder for class'…
Browse files Browse the repository at this point in the history
…s subfiled
  • Loading branch information
root authored and root committed Nov 17, 2016
1 parent c8c0906 commit bb11c93
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst

import scala.reflect.ClassTag

import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
Expand Down Expand Up @@ -403,6 +405,9 @@ object ScalaReflection extends ScalaReflection {
} else {
newInstance
}

case _ =>
DecodeUsingSerializer(getPath, ClassTag(getClassFromType(tpe)), true)
}
}

Expand Down Expand Up @@ -583,8 +588,7 @@ object ScalaReflection extends ScalaReflection {
expressions.If(IsNull(inputObject), nullOutput, nonNullOutput)

case other =>
throw new UnsupportedOperationException(
s"No Encoder found for $tpe\n" + walkedTypePath.mkString("\n"))
EncodeUsingSerializer(inputObject, true)
}

}
Expand Down Expand Up @@ -701,7 +705,7 @@ object ScalaReflection extends ScalaReflection {
StructField(fieldName, dataType, nullable)
}), nullable = true)
case other =>
throw new UnsupportedOperationException(s"Schema for type $other is not supported")
Schema(BinaryType, nullable = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ case class NestedArray(a: Array[Array[Int]]) {
}
}

case class KryoUnsupportedEncoderForSubFiled(
a: String,
b: Seq[Int],
c: Option[Set[Int]])

case class BoxedData(
intField: java.lang.Integer,
longField: java.lang.Long,
Expand Down Expand Up @@ -179,6 +184,10 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
encodeDecodeTest(new KryoSerializable(15), "kryo object")(
encoderFor(Encoders.kryo[KryoSerializable]))

// use kryo to ser/deser the type which has a unsupported Encoder
encodeDecodeTest(Seq(KryoUnsupportedEncoderForSubFiled("a", Seq(1), Some(Set(2))),
KryoUnsupportedEncoderForSubFiled("b", Seq(3), None)), "type with unsupported encoder,use kryo")

// Java encoders
encodeDecodeTest("hello", "java string")(encoderFor(Encoders.javaSerialization[String]))
encodeDecodeTest(new JavaSerializable(15), "java object")(
Expand Down

0 comments on commit bb11c93

Please sign in to comment.