From 53cef54fe3684c4ae5271563d3dcabe1241c2f31 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 27 Sep 2016 19:18:20 -0700 Subject: [PATCH 1/3] wip --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6b4b3b84d4d14..391b5ffb79a0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -347,13 +347,16 @@ object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) + // These metadata values make scan plans uniquely identifiable for equality checking. + // TODO(ekl) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] if (pushedFilters.nonEmpty) { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - + pairs += ("ReadSchema" -> + StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) pairs.toMap } From 287b690063e22364f447983b42c2030efe3017da Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 27 Sep 2016 20:11:54 -0700 Subject: [PATCH 2/3] Tue Sep 27 20:11:54 PDT 2016 --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ec419e44e7930..1a6dba82b0e27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -770,4 +770,12 @@ class JDBCSuite extends SparkFunSuite val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { + val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL") + val df1 = df.groupBy("a").agg("c" -> "min") + val df2 = df.groupBy("a").agg("d" -> "min") + val res = df1.union(df2) + assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } } From dde4834120dcc0cc99ec8b52640f3ac4738ca404 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 27 Sep 2016 20:18:47 -0700 Subject: [PATCH 3/3] Tue Sep 27 20:18:47 PDT 2016 --- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 391b5ffb79a0a..277969465e424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -348,7 +348,7 @@ object DataSourceStrategy extends Strategy with Logging { val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) // These metadata values make scan plans uniquely identifiable for equality checking. - // TODO(ekl) using strings for equality checking is brittle + // TODO(SPARK-17701) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)]