Skip to content

Commit

Permalink
Adds ParquetHiveCompatibilitySuite
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 8, 2015
1 parent c6fbc06 commit 360fe18
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.{Row, SQLContext}
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._

override def sqlContext: SQLContext = TestSQLContext
override val sqlContext: SQLContext = TestSQLContext

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{QueryTest, SQLContext}
import org.apache.spark.sql.QueryTest
import org.apache.spark.util.Utils

abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll {
override def sqlContext: SQLContext = TestSQLContext

protected var parquetStore: File = _

override protected def beforeAll(): Unit = {
Expand All @@ -45,10 +42,9 @@ abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with

def readParquetSchema(path: String): MessageType = {
val fsPath = new Path(path)
val footers =
ParquetFileReader.readAllFootersInParallel(
configuration, fsPath.getFileSystem(configuration).listStatus(fsPath).toSeq, true)

val fs = fsPath.getFileSystem(configuration)
val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot(_.getPath.getName.startsWith("_"))
val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true)
footers.head.getParquetMetadata.getFileMetaData.getSchema
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.{Row, SQLContext}
class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest._

override def sqlContext: SQLContext = TestSQLContext
override val sqlContext: SQLContext = TestSQLContext

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.{Row, SQLConf, SQLContext}

class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
import ParquetCompatibilityTest.makeNullable

override val sqlContext: SQLContext = TestHive

override protected def beforeAll(): Unit = {
super.beforeAll()

withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
withTempTable("data") {
sqlContext.sql(
s"""CREATE TABLE parquet_compat(
| bool_column BOOLEAN,
| byte_column TINYINT,
| short_column SMALLINT,
| int_column INT,
| long_column BIGINT,
| float_column FLOAT,
| double_column DOUBLE,
|
| strings_column ARRAY<STRING>,
| int_to_string_column MAP<INT, STRING>
|)
|STORED AS PARQUET
|LOCATION '${parquetStore.getCanonicalPath}'
""".stripMargin)

val schema = sqlContext.table("parquet_compat").schema
val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
}
}
}

override protected def afterAll(): Unit = {
sqlContext.sql("DROP TABLE parquet_compat")
}

test("Read Parquet file generated by parquet-hive") {
logInfo(
s"""Schema of the Parquet file written by parquet-hive:
|${readParquetSchema(parquetStore.getCanonicalPath)}
""".stripMargin)

// Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
// Have to assume all BINARY values are strings here.
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows)
}
}

def makeRows: Seq[Row] = {
(0 until 10).map { i =>
def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)

Row(
nullable(i % 2 == 0: java.lang.Boolean),
nullable(i.toByte: java.lang.Byte),
nullable((i + 1).toShort: java.lang.Short),
nullable(i + 2: Integer),
nullable(i.toLong * 10: java.lang.Long),
nullable(i.toFloat + 0.1f: java.lang.Float),
nullable(i.toDouble + 0.2d: java.lang.Double),
nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
}
}
}

0 comments on commit 360fe18

Please sign in to comment.