Skip to content

Commit

Permalink
Merge branch 'parquet_tests'
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Aug 24, 2015
2 parents 809e164 + 808ae3b commit 3491f2f
Showing 1 changed file with 93 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark.sql.hive

import java.sql.Timestamp
import java.util.{Locale, TimeZone}

import org.apache.hadoop.hive.conf.HiveConf
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest.makeNullable

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll {
override def _sqlContext: SQLContext = TestHive
private val sqlContext = _sqlContext

Expand All @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
*/
private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)

test("Read Parquet file generated by parquet-hive") {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault

protected override def beforeAll(): Unit = {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
Locale.setDefault(Locale.US)
}

override protected def afterAll(): Unit = {
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
}

override protected def logParquetSchema(path: String): Unit = {
val schema = readParquetSchema(path, { path =>
!path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
})

logInfo(
s"""Schema of the Parquet file written by parquet-avro:
|$schema
""".stripMargin)
}

private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = {
withTable("parquet_compat") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Hive columns are always nullable, so here we append a all-null row.
val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil

// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
sqlContext.sql(
val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" }

val ddl =
s"""CREATE TABLE parquet_compat(
| bool_column BOOLEAN,
| byte_column TINYINT,
| short_column SMALLINT,
| int_column INT,
| long_column BIGINT,
| float_column FLOAT,
| double_column DOUBLE,
|
| strings_column ARRAY<STRING>,
| int_to_string_column MAP<INT, STRING>
|${fields.mkString(",\n")}
|)
|STORED AS PARQUET
|LOCATION '$path'
""".stripMargin

logInfo(
s"""Creating testing Parquet table with the following DDL:
|$ddl
""".stripMargin)

sqlContext.sql(ddl)

val schema = sqlContext.table("parquet_compat").schema
val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}

val schema = readParquetSchema(path, { path =>
!path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
})

logInfo(
s"""Schema of the Parquet file written by parquet-hive:
|$schema
""".stripMargin)
logParquetSchema(path)

// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(sqlContext.read.parquet(path), makeRows)
checkAnswer(sqlContext.read.parquet(path), rows)
}
}
}
}

def makeRows: Seq[Row] = {
(0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
test("simple primitives") {
testParquetHiveCompatibility(
Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"),
"BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING")
}

ignore("SPARK-10177 timestamp") {
testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP")
}

test("array") {
testParquetHiveCompatibility(
Row(
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable(i.toLong * 10: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
}
Seq[Integer](1: Integer, null, 2: Integer, null),
Seq[String]("foo", null, "bar", null),
Seq[Seq[Integer]](
Seq[Integer](1: Integer, null),
Seq[Integer](2: Integer, null))),
"ARRAY<INT>",
"ARRAY<STRING>",
"ARRAY<ARRAY<INT>>")
}

test("map") {
testParquetHiveCompatibility(
Row(
Map[Integer, String](
(1: Integer) -> "foo",
(2: Integer) -> null)),
"MAP<INT, STRING>")
}

// HIVE-11625: Parquet map entries with null keys are dropped by Hive
ignore("map entries with null keys") {
testParquetHiveCompatibility(
Row(
Map[Integer, String](
null.asInstanceOf[Integer] -> "bar",
null.asInstanceOf[Integer] -> null)),
"MAP<INT, STRING>")
}

test("struct") {
testParquetHiveCompatibility(
Row(Row(1, Seq("foo", "bar", null))),
"STRUCT<f0: INT, f1: ARRAY<STRING>>")
}
}

0 comments on commit 3491f2f

Please sign in to comment.