From e3d6dbfdf6af274090e3ba6cc5795745600d26c1 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Sun, 4 Feb 2018 23:08:25 +0100 Subject: [PATCH 1/3] Add checks for collection element Encoders Implicit methods providing Encoders for collections did not check for element Encoders, making related operations fail during run-time. --- .../scala/org/apache/spark/sql/SQLImplicits.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 05db292bd41b1..2d12fe02ce730 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import scala.collection.Map -import scala.language.implicitConversions +import scala.language.{higherKinds, implicitConversions} import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.InterfaceStability @@ -165,11 +165,15 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() /** @since 2.2.0 */ - implicit def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newSequenceEncoder[T[_], E : Encoder] + (implicit ev: T[E] <:< Seq[E], tag: TypeTag[T[E]]): Encoder[T[E]] = + ExpressionEncoder() // Maps /** @since 2.3.0 */ - implicit def newMapEncoder[T <: Map[_, _] : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newMapEncoder[T[_, _], K : Encoder, V : Encoder] + (implicit ev: T[K, V] <:< Map[K, V], tag: TypeTag[T[K, V]]): Encoder[T[K, V]] = + ExpressionEncoder() /** * Notice that we serialize `Set` to Catalyst array. The set property is only kept when @@ -179,7 +183,9 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { * * @since 2.3.0 */ - implicit def newSetEncoder[T <: Set[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + implicit def newSetEncoder[T[_], E : Encoder] + (implicit ev: T[E] <:< Set[E], tag: TypeTag[T[E]]): Encoder[T[E]] = + ExpressionEncoder() // Arrays From d2167607874c574961c490f188ca5c618a6864be Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Mon, 5 Feb 2018 00:15:31 +0100 Subject: [PATCH 2/3] Add unit tests --- .../scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index edcdd77908d3a..eb383a21ec3a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -383,6 +383,12 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { checkDataset(Seq(HSet(Set(1, 2), Set(3, 4))).toDS(), HSet(Set(1, 2), Set(3, 4))) } + test("collections without element encoders") { + assertTypeError("Seq(Seq(1: Any)).toDS()") + assertTypeError("Seq(Map(1 -> (1: Any))).toDS()") + assertTypeError("Seq(Set(1: Any)).toDS()") + } + test("package objects") { import packageobject._ checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1)) From cd7a1fffb76b7b6e2727bff2219247f868f58565 Mon Sep 17 00:00:00 2001 From: Michal Senkyr Date: Mon, 5 Feb 2018 18:37:36 +0100 Subject: [PATCH 3/3] Fix backward compatibility --- .../org/apache/spark/sql/SQLImplicits.scala | 54 +++++++++++++------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 2d12fe02ce730..20c1fbee64f5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -112,66 +112,88 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newIntSeqEncoder: Encoder[Seq[Int]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newLongSeqEncoder: Encoder[Seq[Long]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newDoubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newFloatSeqEncoder: Encoder[Seq[Float]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newByteSeqEncoder: Encoder[Seq[Byte]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newShortSeqEncoder: Encoder[Seq[Short]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newBooleanSeqEncoder: Encoder[Seq[Boolean]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newStringSeqEncoder: Encoder[Seq[String]] = ExpressionEncoder() /** * @since 1.6.1 - * @deprecated use [[newSequenceEncoder]] + * @deprecated use [[newCheckedSequenceEncoder]] */ def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() - /** @since 2.2.0 */ - implicit def newSequenceEncoder[T[_], E : Encoder] + /** + * @since 2.2.0 + * @deprecated use [[newCheckedSequenceEncoder]] + */ + def newSequenceEncoder[T <: Seq[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + + /** + * @since 2.3.0 + * @deprecated use [[newCheckedMapEncoder]] + */ + def newMapEncoder[T <: Map[_, _] : TypeTag]: Encoder[T] = ExpressionEncoder() + + /** + * Notice that we serialize `Set` to Catalyst array. The set property is only kept when + * manipulating the domain objects. The serialization format doesn't keep the set property. + * When we have a Catalyst array which contains duplicated elements and convert it to + * `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated. + * + * @since 2.3.0 + * @deprecated use [[newCheckedSetEncoder]] + */ + def newSetEncoder[T <: Set[_] : TypeTag]: Encoder[T] = ExpressionEncoder() + + /** @since 2.4.0 */ + implicit def newCheckedSequenceEncoder[T[_], E : Encoder] (implicit ev: T[E] <:< Seq[E], tag: TypeTag[T[E]]): Encoder[T[E]] = ExpressionEncoder() - // Maps - /** @since 2.3.0 */ - implicit def newMapEncoder[T[_, _], K : Encoder, V : Encoder] + /** @since 2.4.0 */ + implicit def newCheckedMapEncoder[T[_, _], K : Encoder, V : Encoder] (implicit ev: T[K, V] <:< Map[K, V], tag: TypeTag[T[K, V]]): Encoder[T[K, V]] = ExpressionEncoder() @@ -181,9 +203,9 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { * When we have a Catalyst array which contains duplicated elements and convert it to * `Dataset[Set[T]]` by using the encoder, the elements will be de-duplicated. * - * @since 2.3.0 + * @since 2.4.0 */ - implicit def newSetEncoder[T[_], E : Encoder] + implicit def newCheckedSetEncoder[T[_], E : Encoder] (implicit ev: T[E] <:< Set[E], tag: TypeTag[T[E]]): Encoder[T[E]] = ExpressionEncoder()