From 50bb48d6f2a59e2e88fe68699fceac308153e08a Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Sun, 12 Jun 2016 21:01:57 -0700 Subject: [PATCH 1/3] SPARK-15910: Check schema --- .../src/main/scala/org/apache/spark/sql/Encoders.scala | 9 +++++++-- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index 673c587b18325..fb7e7acdaae16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, UpCast} import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer} -import org.apache.spark.sql.catalyst.expressions.BoundReference import org.apache.spark.sql.types._ /** @@ -209,7 +209,12 @@ object Encoders { BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), deserializer = DecodeUsingSerializer[T]( - GetColumnByOrdinal(0, BinaryType), classTag[T], kryo = useKryo), + UpCast( + GetColumnByOrdinal(0, BinaryType), + BinaryType, + Seq(s"root object (class: ${classOf[BinaryType]}, name: value}")), + classTag[T], + kryo = useKryo), clsTag = classTag[T] ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 4536a7356f017..3c3291ab81692 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -451,6 +451,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (KryoData(2), KryoData(2)))) } + test("Kryo encoder: check the schema mismatch when converting DataFrame to Dataset") { + implicit val kryoEncoder = Encoders.kryo[KryoData] + val df = Seq((1)).toDF("a") + val e = intercept[AnalysisException] { + df.as[KryoData] + }.message + assert(e.contains("cannot cast IntegerType to BinaryType")) + } + test("Java encoder") { implicit val kryoEncoder = Encoders.javaSerialization[JavaData] val ds = Seq(JavaData(1), JavaData(2)).toDS() From 35a1ee09b82d4d66a95d4e88d8dda4e056cd0e11 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Mon, 13 Jun 2016 15:44:53 -0700 Subject: [PATCH 2/3] update --- sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index fb7e7acdaae16..ed4c21f50a52c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -212,7 +212,7 @@ object Encoders { UpCast( GetColumnByOrdinal(0, BinaryType), BinaryType, - Seq(s"root object (class: ${classOf[BinaryType]}, name: value}")), + Seq(s"input object to be serialized")), classTag[T], kryo = useKryo), clsTag = classTag[T] From e889f85165acbb1d685e70c959abe04955d24a17 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Mon, 13 Jun 2016 16:06:36 -0700 Subject: [PATCH 3/3] On Wenchen's comment --- .../src/main/scala/org/apache/spark/sql/Encoders.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index ed4c21f50a52c..e72f67c48a296 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -25,7 +25,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} -import org.apache.spark.sql.catalyst.expressions.{BoundReference, UpCast} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast} import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer} import org.apache.spark.sql.types._ @@ -209,10 +209,7 @@ object Encoders { BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), deserializer = DecodeUsingSerializer[T]( - UpCast( - GetColumnByOrdinal(0, BinaryType), - BinaryType, - Seq(s"input object to be serialized")), + Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), classTag[T], kryo = useKryo), clsTag = classTag[T]