From 808ae3bb284bae163634d708ee699aebb671c9f7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 24 Aug 2015 15:32:35 +0800 Subject: [PATCH] Refactors ParquetHiveCompatibilitySuite and adds more test cases --- .../hive/ParquetHiveCompatibilitySuite.scala | 132 ++++++++++++------ 1 file changed, 93 insertions(+), 39 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 13452e71a1b3b..bc30180cf0917 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,15 +17,17 @@ package org.apache.spark.sql.hive +import java.sql.Timestamp +import java.util.{Locale, TimeZone} + import org.apache.hadoop.hive.conf.HiveConf +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.{Row, SQLConf, SQLContext} -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest.makeNullable - +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { override def _sqlContext: SQLContext = TestHive private val sqlContext = _sqlContext @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { */ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) - test("Read Parquet file generated by parquet-hive") { + private val originalTimeZone = TimeZone.getDefault + private val originalLocale = Locale.getDefault + + protected override def beforeAll(): Unit = { + TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) + Locale.setDefault(Locale.US) + } + + override protected def afterAll(): Unit = { + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + } + + override protected def logParquetSchema(path: String): Unit = { + val schema = readParquetSchema(path, { path => + !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) + }) + + logInfo( + s"""Schema of the Parquet file written by parquet-avro: + |$schema + """.stripMargin) + } + + private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = { withTable("parquet_compat") { withTempPath { dir => val path = dir.getCanonicalPath + // Hive columns are always nullable, so here we append a all-null row. + val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil + + // Don't convert Hive metastore Parquet tables to let Hive write those Parquet files. withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { withTempTable("data") { - sqlContext.sql( + val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" } + + val ddl = s"""CREATE TABLE parquet_compat( - | bool_column BOOLEAN, - | byte_column TINYINT, - | short_column SMALLINT, - | int_column INT, - | long_column BIGINT, - | float_column FLOAT, - | double_column DOUBLE, - | - | strings_column ARRAY, - | int_to_string_column MAP + |${fields.mkString(",\n")} |) |STORED AS PARQUET |LOCATION '$path' + """.stripMargin + + logInfo( + s"""Creating testing Parquet table with the following DDL: + |$ddl """.stripMargin) + sqlContext.sql(ddl) + val schema = sqlContext.table("parquet_compat").schema - val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1) + val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1) sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data") sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data") } } - val schema = readParquetSchema(path, { path => - !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) - }) - - logInfo( - s"""Schema of the Parquet file written by parquet-hive: - |$schema - """.stripMargin) + logParquetSchema(path) // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings. // Have to assume all BINARY values are strings here. withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { - checkAnswer(sqlContext.read.parquet(path), makeRows) + checkAnswer(sqlContext.read.parquet(path), rows) } } } } - def makeRows: Seq[Row] = { - (0 until 10).map { i => - def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) + test("simple primitives") { + testParquetHiveCompatibility( + Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"), + "BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING") + } + ignore("SPARK-10177 timestamp") { + testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP") + } + + test("array") { + testParquetHiveCompatibility( Row( - nullable(i % 2 == 0: java.lang.Boolean), - nullable(i.toByte: java.lang.Byte), - nullable((i + 1).toShort: java.lang.Short), - nullable(i + 2: Integer), - nullable(i.toLong * 10: java.lang.Long), - nullable(i.toFloat + 0.1f: java.lang.Float), - nullable(i.toDouble + 0.2d: java.lang.Double), - nullable(Seq.tabulate(3)(n => s"arr_${i + n}")), - nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap)) - } + Seq[Integer](1: Integer, null, 2: Integer, null), + Seq[String]("foo", null, "bar", null), + Seq[Seq[Integer]]( + Seq[Integer](1: Integer, null), + Seq[Integer](2: Integer, null))), + "ARRAY", + "ARRAY", + "ARRAY>") + } + + test("map") { + testParquetHiveCompatibility( + Row( + Map[Integer, String]( + (1: Integer) -> "foo", + (2: Integer) -> null)), + "MAP") + } + + // HIVE-11625: Parquet map entries with null keys are dropped by Hive + ignore("map entries with null keys") { + testParquetHiveCompatibility( + Row( + Map[Integer, String]( + null.asInstanceOf[Integer] -> "bar", + null.asInstanceOf[Integer] -> null)), + "MAP") + } + + test("struct") { + testParquetHiveCompatibility( + Row(Row(1, Seq("foo", "bar", null))), + "STRUCT>") } }