From 539a2e1f6c94782d916e7eac12ed1614f0ebfc35 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 29 Jul 2014 21:37:23 -0700 Subject: [PATCH 1/4] Resolve original attributes in ParquetTableScan --- .../sql/parquet/ParquetTableOperations.scala | 7 +++++-- .../spark/sql/parquet/HiveParquetSuite.scala | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index ea74320d06c86..9b88938efbca4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -53,12 +53,16 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext} case class ParquetTableScan( // note: output cannot be transient, see // https://issues.apache.org/jira/browse/SPARK-1367 - output: Seq[Attribute], + attributes: Seq[Attribute], relation: ParquetRelation, columnPruningPred: Seq[Expression])( @transient val sqlContext: SQLContext) extends LeafNode { + // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes + // by exprId. + val output = attributes.map(a => relation.output.find(o => o.exprId == a.exprId).get) + override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) @@ -113,7 +117,6 @@ case class ParquetTableScan( ParquetTableScan(prunedAttributes, relation, columnPruningPred)(sqlContext) } else { sys.error("Warning: Could not validate Parquet schema projection in pruneColumns") - this } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 3bfe49a760be5..47526e3596e44 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row} @@ -27,6 +29,8 @@ import org.apache.spark.util.Utils // Implicits import org.apache.spark.sql.hive.test.TestHive._ +case class Cases(lower: String, UPPER: String) + class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { val dirname = Utils.createTempDir() @@ -55,6 +59,19 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft Utils.deleteRecursively(dirname) } + test("Case insensitive attribute names") { + val tempFile = File.createTempFile("parquet", "") + tempFile.delete() + sparkContext.parallelize(1 to 10) + .map(_.toString) + .map(i => Cases(i, i)) + .saveAsParquetFile(tempFile.getCanonicalPath) + + parquetFile(tempFile.getCanonicalPath).registerAsTable("cases") + hql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + hql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString) + } + test("SELECT on Parquet table") { val rdd = hql("SELECT * FROM testsource").collect() assert(rdd != null) From e6870bf4e3b7dcb4b75b6c177397cdf21bd52c49 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 30 Jul 2014 13:01:54 -0700 Subject: [PATCH 2/4] Better error message. --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 9b88938efbca4..975f49e88f11c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -61,7 +61,11 @@ case class ParquetTableScan( // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes // by exprId. - val output = attributes.map(a => relation.output.find(o => o.exprId == a.exprId).get) + val output = attributes.map { a => + relation.output + .find(o => o.exprId == a.exprId) + .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}")) + } override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext From bb35d5b130222a5d95f640dc6b91b4b9c4fd33aa Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 30 Jul 2014 13:02:07 -0700 Subject: [PATCH 3/4] Fix test case that produced an invalid plan. --- .../spark/sql/parquet/ParquetQuerySuite.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 3c911e9a4e7b1..836184d8b4811 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -207,18 +207,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } test("Projection of simple Parquet file") { - val scanner = new ParquetTableScan( - ParquetTestData.testData.output, - ParquetTestData.testData, - Seq())(TestSQLContext) - val projected = scanner.pruneColumns(ParquetTypesConverter - .convertToAttributes(MessageTypeParser - .parseMessageType(ParquetTestData.subTestSchema))) - assert(projected.output.size === 2) - val result = projected - .execute() - .map(_.copy()) - .collect() + val result = ParquetTestData.testData.select('myboolean, 'mylong).collect() result.zipWithIndex.foreach { case (row, index) => { if (index % 3 == 0) From a1799b74a0be98f5fd67cf5516a2d9f0762e7532 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 30 Jul 2014 20:03:42 -0700 Subject: [PATCH 4/4] move comment --- .../apache/spark/sql/parquet/ParquetTableOperations.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index e27722ba4dd79..759a2a586b926 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -51,15 +51,14 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext} * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ case class ParquetTableScan( - // note: output cannot be transient, see - // https://issues.apache.org/jira/browse/SPARK-1367 attributes: Seq[Attribute], relation: ParquetRelation, columnPruningPred: Seq[Expression]) extends LeafNode { // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes - // by exprId. + // by exprId. note: output cannot be transient, see + // https://issues.apache.org/jira/browse/SPARK-1367 val output = attributes.map { a => relation.output .find(o => o.exprId == a.exprId)