From c91d279aaf3895acb63871da1db0b810522c4e24 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 17 Nov 2015 17:52:45 +0900 Subject: [PATCH 1/4] [SPARK-11692] [SPARK-11694] [SQL] Backports #9658 and #9754 --- .../datasources/parquet/ParquetIOSuite.scala | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 2aa5dca847c8f..177ab42f7767c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.util.Collections - import org.apache.parquet.column.{Encoding, ParquetProperties} import scala.collection.JavaConverters._ @@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( - new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) - + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { sqlContext.read.parquet(path.toString).printSchema() }.toString @@ -259,6 +251,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded types)") { + val parquetSchema = MessageTypeParser.parseMessageType( + """message root { + | required binary a(JSON); + | required binary b(BSON); + |} + """.stripMargin) + + val expectedSparkTypes = Seq(StringType, BinaryType) + + withTempPath { location => + val path = new Path(location.getCanonicalPath) + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) + val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) + assert(sparkTypes === expectedSparkTypes) + } + } + test("compression codec") { def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { @@ -582,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) } - // TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY` - // The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't - // provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`. Should add a test here once - // we upgrade to `PARQUET_2_0`. + test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") { + checkAnswer( + // Decimal column in this file is encoded using plain dictionary + readResourceParquetFile("dec-in-fixed-len.parquet"), + sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) From e14b40ecc85394bbd0fcc024673093b23036af49 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 17 Nov 2015 17:59:40 +0900 Subject: [PATCH 2/4] [SPARK-11692] [SPARK-11694] [SQL] Add a binary file for test. --- .../src/test/resources/dec-in-fixed-len.parquet | Bin 0 -> 460 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 sql/core/src/test/resources/dec-in-fixed-len.parquet diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet b/sql/core/src/test/resources/dec-in-fixed-len.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6ad37d5639511cdb430f33fa6165eb70cd9034c0 GIT binary patch literal 460 zcmZuu%SyvQ6rI#oO3~7VwT&8yPVu5U4^0 z3hK5WJW4SNaflr_N}BXrgbo(V<<*j?`)dokQEarvSr7`N-{ZFH k+I`P Date: Tue, 17 Nov 2015 18:03:50 +0900 Subject: [PATCH 3/4] [SPARK-11692] [SPARK-11694] [SQL] Add a support for bson and json --- .../datasources/parquet/CatalystSchemaConverter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index f28a18e2756e4..5f9f9083098a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -170,9 +170,10 @@ private[parquet] class CatalystSchemaConverter( case BINARY => originalType match { - case UTF8 | ENUM => StringType + case UTF8 | ENUM | JSON => StringType case null if assumeBinaryIsString => StringType case null => BinaryType + case BSON => BinaryType case DECIMAL => makeDecimalType() case _ => illegalType() } From bedb89a28acd88251c418b113bcb431bfe676ccc Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Tue, 17 Nov 2015 20:02:58 +0900 Subject: [PATCH 4/4] [SPARK-11694] [SQL] Take out SPARK-11692. --- .../parquet/CatalystSchemaConverter.scala | 3 +-- .../datasources/parquet/ParquetIOSuite.scala | 19 ------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 5f9f9083098a7..f28a18e2756e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -170,10 +170,9 @@ private[parquet] class CatalystSchemaConverter( case BINARY => originalType match { - case UTF8 | ENUM | JSON => StringType + case UTF8 | ENUM => StringType case null if assumeBinaryIsString => StringType case null => BinaryType - case BSON => BinaryType case DECIMAL => makeDecimalType() case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 177ab42f7767c..29a52827ae7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -251,25 +251,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded types)") { - val parquetSchema = MessageTypeParser.parseMessageType( - """message root { - | required binary a(JSON); - | required binary b(BSON); - |} - """.stripMargin) - - val expectedSparkTypes = Seq(StringType, BinaryType) - - withTempPath { location => - val path = new Path(location.getCanonicalPath) - val conf = sparkContext.hadoopConfiguration - writeMetadata(parquetSchema, path, conf) - val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) - assert(sparkTypes === expectedSparkTypes) - } - } - test("compression codec") { def compressionCodecFor(path: String, codecName: String): String = { val codecs = for {