Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test cases #8392

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark.sql.hive

import java.sql.Timestamp
import java.util.{Locale, TimeZone}

import org.apache.hadoop.hive.conf.HiveConf
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest.makeNullable

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll {
override def _sqlContext: SQLContext = TestHive
private val sqlContext = _sqlContext

Expand All @@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
*/
private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)

test("Read Parquet file generated by parquet-hive") {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault

protected override def beforeAll(): Unit = {
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
Locale.setDefault(Locale.US)
}

override protected def afterAll(): Unit = {
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
}

override protected def logParquetSchema(path: String): Unit = {
val schema = readParquetSchema(path, { path =>
!path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
})

logInfo(
s"""Schema of the Parquet file written by parquet-avro:
|$schema
""".stripMargin)
}

private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit = {
withTable("parquet_compat") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Hive columns are always nullable, so here we append a all-null row.
val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil

// Don't convert Hive metastore Parquet tables to let Hive write those Parquet files.
withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
sqlContext.sql(
val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s" col_$index $typ" }

val ddl =
s"""CREATE TABLE parquet_compat(
| bool_column BOOLEAN,
| byte_column TINYINT,
| short_column SMALLINT,
| int_column INT,
| long_column BIGINT,
| float_column FLOAT,
| double_column DOUBLE,
|
| strings_column ARRAY<STRING>,
| int_to_string_column MAP<INT, STRING>
|${fields.mkString(",\n")}
|)
|STORED AS PARQUET
|LOCATION '$path'
""".stripMargin

logInfo(
s"""Creating testing Parquet table with the following DDL:
|$ddl
""".stripMargin)

sqlContext.sql(ddl)

val schema = sqlContext.table("parquet_compat").schema
val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}

val schema = readParquetSchema(path, { path =>
!path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
})

logInfo(
s"""Schema of the Parquet file written by parquet-hive:
|$schema
""".stripMargin)
logParquetSchema(path)

// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(sqlContext.read.parquet(path), makeRows)
checkAnswer(sqlContext.read.parquet(path), rows)
}
}
}
}

def makeRows: Seq[Row] = {
(0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
test("simple primitives") {
testParquetHiveCompatibility(
Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"),
"BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", "STRING")
}

ignore("SPARK-10177 timestamp") {
testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 00:31:00")), "TIMESTAMP")
}

test("array") {
testParquetHiveCompatibility(
Row(
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable(i.toLong * 10: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
}
Seq[Integer](1: Integer, null, 2: Integer, null),
Seq[String]("foo", null, "bar", null),
Seq[Seq[Integer]](
Seq[Integer](1: Integer, null),
Seq[Integer](2: Integer, null))),
"ARRAY<INT>",
"ARRAY<STRING>",
"ARRAY<ARRAY<INT>>")
}

test("map") {
testParquetHiveCompatibility(
Row(
Map[Integer, String](
(1: Integer) -> "foo",
(2: Integer) -> null)),
"MAP<INT, STRING>")
}

// HIVE-11625: Parquet map entries with null keys are dropped by Hive
ignore("map entries with null keys") {
testParquetHiveCompatibility(
Row(
Map[Integer, String](
null.asInstanceOf[Integer] -> "bar",
null.asInstanceOf[Integer] -> null)),
"MAP<INT, STRING>")
}

test("struct") {
testParquetHiveCompatibility(
Row(Row(1, Seq("foo", "bar", null))),
"STRUCT<f0: INT, f1: ARRAY<STRING>>")
}
}