From ad53071628a34a2a7ad6cad88b210d661ead963f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 6 May 2016 17:18:45 +0800 Subject: [PATCH 1/5] Fixes SPARK-15112 --- .../spark/sql/catalyst/ScalaReflection.scala | 1 - .../sql/catalyst/optimizer/Optimizer.scala | 15 ++++++++++-- .../scala/org/apache/spark/sql/Dataset.scala | 5 ++-- .../org/apache/spark/sql/DatasetSuite.scala | 23 +++++++++++++++++++ .../datasources/json/JsonSuite.scala | 3 --- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index cb9a62dfd4e81..79ffaaddede24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -189,7 +189,6 @@ object ScalaReflection extends ScalaReflection { case _ => UpCast(expr, expected, walkedTypePath) } - val className = getClassNameFromType(tpe) tpe match { case t if !dataTypeFor(t).isInstanceOf[ObjectType] => getPath diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index af7532e0c03ec..e40aaf29c21fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,8 +21,9 @@ import scala.annotation.tailrec import scala.collection.immutable.HashSet import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -1572,7 +1573,17 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { val newCondition = condition transform { case a: Attribute if a == d.output.head => d.deserializer } - Filter(newCondition, d.child) + + val newFilter = Filter(newCondition, d.child) + + // SPARK-15112 + val output = d.child.resolve(StructType.fromAttributes(s.output), caseSensitiveResolution) + + if (output == d.child.output) { + newFilter + } else { + Project(output, newFilter) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 02dd6547a4adc..5427990816731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -343,7 +343,8 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema)) + def toDF(): DataFrame = + new Dataset[Row](sparkSession, queryExecution, RowEncoder(logicalPlan.schema)) /** * :: Experimental :: @@ -397,7 +398,7 @@ class Dataset[T] private[sql]( * @group basic * @since 1.6.0 */ - def schema: StructType = queryExecution.analyzed.schema + def schema: StructType = resolvedTEncoder.schema /** * Prints the schema to the console in a nice tree format. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 3b9feae4a31c9..4f51c0e4ce603 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,6 +24,7 @@ import scala.language.postfixOps import org.scalatest.words.MatcherWords.be +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ @@ -702,6 +703,28 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assert(e.message.contains("already exists")) dataset.sparkSession.catalog.dropTempView("tempView") } + + test("SPARK-15112: Dataset should accept out of order input columns") { + val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData] + + assertResult(ScalaReflection.schemaFor[ClassData].dataType) { + ds.schema + } + + assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) { + ds.collect().toSeq + } + + assertResult( + new StructType() + .add("b", IntegerType, nullable = false) + .add("a", StringType, nullable = true) + ) { + ds.toDF().schema + } + + assert(ds.filter(_.b > 1).collect().toSeq == Seq(ClassData("bar", 2))) + } } case class Generic[T](id: T, value: Double) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 63fe4658d67d5..ba1be40cc81fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,10 +21,7 @@ import java.io.{File, StringWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ - import com.fasterxml.jackson.core.JsonFactory -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec From 66522fb8494d1a447763b1fbe8b2ca810912ff6a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 6 May 2016 18:08:37 +0800 Subject: [PATCH 2/5] More inline comment --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e40aaf29c21fe..fe1d7b8479e86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1576,7 +1576,9 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { val newFilter = Filter(newCondition, d.child) - // SPARK-15112 + // SPARK-15112: Column order of input query plan may differ from output column order of the + // top-most `SerializeFromObject` operator. Here we add a projection to adjust column order + // when necessary. val output = d.child.resolve(StructType.fromAttributes(s.output), caseSensitiveResolution) if (output == d.child.output) { From a315229a9cdc9c7eab241f13d9ab43e3d144bf3f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 6 May 2016 19:03:46 +0800 Subject: [PATCH 3/5] Makes sure schema of DataFrame returned by Dataset.toDF() is consistent with the original Dataset --- .../sql/catalyst/optimizer/Optimizer.scala | 5 ++--- .../scala/org/apache/spark/sql/Dataset.scala | 14 +++++++++++-- .../org/apache/spark/sql/DatasetSuite.scala | 21 +++++-------------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fe1d7b8479e86..ed0458a85c902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1579,11 +1579,10 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] { // SPARK-15112: Column order of input query plan may differ from output column order of the // top-most `SerializeFromObject` operator. Here we add a projection to adjust column order // when necessary. - val output = d.child.resolve(StructType.fromAttributes(s.output), caseSensitiveResolution) - - if (output == d.child.output) { + if (s.schema == d.child.schema) { newFilter } else { + val output = d.child.resolve(StructType.fromAttributes(s.output), caseSensitiveResolution) Project(output, newFilter) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5427990816731..d4f6774b3927f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -343,8 +343,18 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = - new Dataset[Row](sparkSession, queryExecution, RowEncoder(logicalPlan.schema)) + def toDF(): DataFrame = { + val rowEncoder = RowEncoder(schema) + + if (schema == logicalPlan.schema) { + new Dataset[Row](sparkSession, queryExecution, rowEncoder) + } else { + // SPARK-15112: Adjust output column order so that query plan schema and encoder schema are + // consistent in the result DataFrame + val output = schema.map(f => UnresolvedAttribute(f.name)) + new Dataset[Row](sparkSession, Project(output, logicalPlan), rowEncoder) + } + } /** * :: Experimental :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 4f51c0e4ce603..cd035b2e8608e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -706,24 +706,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("SPARK-15112: Dataset should accept out of order input columns") { val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData] + val expectedSchema = ScalaReflection.schemaFor[ClassData].dataType - assertResult(ScalaReflection.schemaFor[ClassData].dataType) { - ds.schema - } - - assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) { - ds.collect().toSeq - } - - assertResult( - new StructType() - .add("b", IntegerType, nullable = false) - .add("a", StringType, nullable = true) - ) { - ds.toDF().schema - } + assert(ds.schema == expectedSchema) + assert(ds.toDF().schema == expectedSchema) - assert(ds.filter(_.b > 1).collect().toSeq == Seq(ClassData("bar", 2))) + checkDataset(ds, ClassData("foo", 1), ClassData("bar", 2)) + checkDataset(ds.filter(_.b > 1), ClassData("bar", 2)) } } From 5dc57b46d802453beea6a063459861bb882db6e6 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 6 May 2016 23:23:09 +0800 Subject: [PATCH 4/5] Fixes result schema of SparkSession.range() --- .../scala/org/apache/spark/sql/SparkSession.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 301643772b5d6..30bb47212fc77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation -import org.apache.spark.sql.types.{DataType, LongType, StructType} +import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils @@ -450,7 +450,12 @@ class SparkSession private( */ @Experimental def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = { - new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG) + val encoder = { + val schema = StructType(Seq(StructField("id", LongType, nullable = false))) + ExpressionEncoder[java.lang.Long]().copy[java.lang.Long](schema = schema) + } + + new Dataset(self, Range(start, end, step, numPartitions), encoder) } /** @@ -489,7 +494,7 @@ class SparkSession private( /* ------------------------ * | Catalog-related methods | - * ----------------- ------ */ + * ------------------------ */ /** * Interface through which the user may create, drop, alter or query underlying From 728ea9920fc0fd2d95e5187de2f16a9a083153f2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 9 May 2016 17:21:22 +0800 Subject: [PATCH 5/5] Reverts Dataset.toDF() changes --- .../scala/org/apache/spark/sql/Dataset.scala | 42 ++++++------------- .../org/apache/spark/sql/DatasetSuite.scala | 23 +++++++--- 2 files changed, 30 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d4f6774b3927f..312936d2f0864 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -150,7 +150,6 @@ private[sql] object Dataset { * @groupname func Functional Transformations * @groupname rdd RDD Operations * @groupname output Output Operations - * * @since 1.6.0 */ class Dataset[T] private[sql]( @@ -343,18 +342,8 @@ class Dataset[T] private[sql]( */ // This is declared with parentheses to prevent the Scala compiler from treating // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. - def toDF(): DataFrame = { - val rowEncoder = RowEncoder(schema) - - if (schema == logicalPlan.schema) { - new Dataset[Row](sparkSession, queryExecution, rowEncoder) - } else { - // SPARK-15112: Adjust output column order so that query plan schema and encoder schema are - // consistent in the result DataFrame - val output = schema.map(f => UnresolvedAttribute(f.name)) - new Dataset[Row](sparkSession, Project(output, logicalPlan), rowEncoder) - } - } + def toDF(): DataFrame = + new Dataset[Row](sparkSession, queryExecution, RowEncoder(logicalPlan.schema)) /** * :: Experimental :: @@ -497,7 +486,6 @@ class Dataset[T] private[sql]( * }}} * * @param numRows Number of rows to show - * * @group action * @since 1.6.0 */ @@ -517,7 +505,6 @@ class Dataset[T] private[sql]( * * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right - * * @group action * @since 1.6.0 */ @@ -533,10 +520,10 @@ class Dataset[T] private[sql]( * 1983 03 0.410516 0.442194 * 1984 04 0.450090 0.483521 * }}} + * * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will * be truncated and all cells will be aligned right - * * @group action * @since 1.6.0 */ @@ -574,7 +561,6 @@ class Dataset[T] private[sql]( * Note that cartesian joins are very expensive without an extra filter that can be pushed down. * * @param right Right side of the join operation. - * * @group untypedrel * @since 2.0.0 */ @@ -599,7 +585,6 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumn Name of the column to join on. This column must exist on both sides. - * * @group untypedrel * @since 2.0.0 */ @@ -624,7 +609,6 @@ class Dataset[T] private[sql]( * * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. - * * @group untypedrel * @since 2.0.0 */ @@ -645,7 +629,6 @@ class Dataset[T] private[sql]( * @param right Right side of the join operation. * @param usingColumns Names of the columns to join on. This columns must exist on both sides. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. - * * @group untypedrel * @since 2.0.0 */ @@ -696,7 +679,6 @@ class Dataset[T] private[sql]( * @param right Right side of the join. * @param joinExprs Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. - * * @group untypedrel * @since 2.0.0 */ @@ -759,7 +741,6 @@ class Dataset[T] private[sql]( * @param other Right side of the join. * @param condition Join expression. * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. - * * @group typedrel * @since 1.6.0 */ @@ -799,7 +780,6 @@ class Dataset[T] private[sql]( * * @param other Right side of the join. * @param condition Join expression. - * * @group typedrel * @since 1.6.0 */ @@ -1228,6 +1208,7 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} + * * @group untypedrel * @since 2.0.0 */ @@ -1343,6 +1324,7 @@ class Dataset[T] private[sql]( * "age" -> "max" * )) * }}} + * * @group untypedrel * @since 2.0.0 */ @@ -1483,7 +1465,6 @@ class Dataset[T] private[sql]( * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. * @param seed Seed for sampling. - * * @group typedrel * @since 1.6.0 */ @@ -1496,7 +1477,6 @@ class Dataset[T] private[sql]( * * @param withReplacement Sample with replacement or not. * @param fraction Fraction of rows to generate. - * * @group typedrel * @since 1.6.0 */ @@ -1511,7 +1491,6 @@ class Dataset[T] private[sql]( * @param seed Seed for sampling. * * For Java API, use [[randomSplitAsList]]. - * * @group typedrel * @since 2.0.0 */ @@ -1536,7 +1515,6 @@ class Dataset[T] private[sql]( * * @param weights weights for splits, will be normalized if they don't sum to 1. * @param seed Seed for sampling. - * * @group typedrel * @since 2.0.0 */ @@ -1865,7 +1843,6 @@ class Dataset[T] private[sql]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * * @group action * @since 1.6.0 */ @@ -1875,6 +1852,7 @@ class Dataset[T] private[sql]( /** * Returns the first row. + * * @group action * @since 1.6.0 */ @@ -1882,6 +1860,7 @@ class Dataset[T] private[sql]( /** * Returns the first row. Alias for head(). + * * @group action * @since 1.6.0 */ @@ -2160,6 +2139,7 @@ class Dataset[T] private[sql]( /** * Returns the number of rows in the [[Dataset]]. + * * @group action * @since 1.6.0 */ @@ -2252,10 +2232,10 @@ class Dataset[T] private[sql]( /** * Persist this [[Dataset]] with the given storage level. + * * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`, * `MEMORY_AND_DISK_2`, etc. - * * @group basic * @since 1.6.0 */ @@ -2268,7 +2248,6 @@ class Dataset[T] private[sql]( * Mark the [[Dataset]] as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. - * * @group basic * @since 1.6.0 */ @@ -2301,6 +2280,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s. + * * @group rdd * @since 1.6.0 */ @@ -2308,6 +2288,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the [[Dataset]] as a [[JavaRDD]] of [[Row]]s. + * * @group rdd * @since 1.6.0 */ @@ -2362,6 +2343,7 @@ class Dataset[T] private[sql]( /** * Returns the content of the [[Dataset]] as a Dataset of JSON strings. + * * @since 2.0.0 */ def toJSON: Dataset[String] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index cd035b2e8608e..1c6f33e39af97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -706,13 +706,26 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("SPARK-15112: Dataset should accept out of order input columns") { val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData] - val expectedSchema = ScalaReflection.schemaFor[ClassData].dataType - assert(ds.schema == expectedSchema) - assert(ds.toDF().schema == expectedSchema) + assertResult(ScalaReflection.schemaFor[ClassData].dataType) { + ds.schema + } + + assertResult( + new StructType() + .add("b", IntegerType, nullable = false) + .add("a", StringType, nullable = true) + ) { + ds.toDF().schema + } - checkDataset(ds, ClassData("foo", 1), ClassData("bar", 2)) - checkDataset(ds.filter(_.b > 1), ClassData("bar", 2)) + assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) { + ds.collect().toSeq + } + + assertResult(Seq(ClassData("bar", 2))) { + ds.filter(_.b > 1).collect().toSeq + } } }