Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9876] [SQL] Bumps parquet-mr to 1.8.1 #9225

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -140,7 +140,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
<parquet.version>1.7.0</parquet.version>
<parquet.version>1.8.1</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jblas.version>1.2.4</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
Expand Down
Expand Up @@ -25,11 +25,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -112,10 +112,30 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)

if (clippedParquetFields.isEmpty) {
// !! HACK ALERT !!
//
// PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
// which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
// This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
//
// To workaround this problem, here we first construct a `MessageType` with a single dummy
// field, and then remove the field to obtain an empty `MessageType`.
//
// TODO Reverts this change after upgrading parquet-mr to 1.8.2+
val messageType = Types
.buildMessage()
.addField(Types.required(INT32).named("dummy"))
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
messageType.getFields.clear()
messageType
} else {
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
}

private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
Expand Down
Expand Up @@ -22,8 +22,6 @@ import java.io.Serializable
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -53,18 +51,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
// Binary.fromString and Binary.fromByteArray don't accept null values
// Binary.fromString and Binary.fromConstantByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -79,17 +74,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
// Binary.fromString and Binary.fromConstantByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
Option(v).map(b => Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -101,17 +94,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.lt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -123,17 +111,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.ltEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -145,17 +128,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.gt(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -167,17 +145,12 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.gtEq(binaryColumn(n), Binary.fromConstantByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
Expand All @@ -193,18 +166,14 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
*/
SetInFilter(v.map(e => Binary.fromConstantByteArray(e.asInstanceOf[Array[Byte]]))))
}

/**
Expand All @@ -213,8 +182,6 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap

relaxParquetValidTypeMap

// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
Expand Down Expand Up @@ -271,35 +238,4 @@ private[sql] object ParquetFilters {
case _ => None
}
}

// !! HACK ALERT !!
//
// This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
// parquet-mr 1.8.1 or higher versions.
//
// In Parquet, not all types of columns can be used for filter push-down optimization. The set
// of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
// prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
// pushed down.
//
// This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
// to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
// a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
// legal except that it fails the `ValidTypeMap` check.
//
// Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
private lazy val relaxParquetValidTypeMap: Unit = {
val constructor = Class
.forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
.getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])

constructor.setAccessible(true)
val enumTypeDescriptor = constructor
.newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
.asInstanceOf[AnyRef]

val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
addMethod.setAccessible(true)
addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
}
}
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}

Expand All @@ -27,6 +26,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.parquet.test.avro._
Expand All @@ -35,14 +35,14 @@ import org.apache.spark.sql.test.SharedSQLContext
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
private def withWriter[T <: IndexedRecord]
(path: String, schema: Schema)
(f: AvroParquetWriter[T] => Unit): Unit = {
(f: ParquetWriter[T] => Unit): Unit = {
logInfo(
s"""Writing Avro records with the following Avro schema into Parquet file:
|
|${schema.toString(true)}
""".stripMargin)

val writer = new AvroParquetWriter[T](new Path(path), schema)
val writer = AvroParquetWriter.builder[T](new Path(path)).withSchema(schema).build()
try f(writer) finally writer.close()
}

Expand Down Expand Up @@ -163,8 +163,56 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared
}
}

ignore("nullable arrays (parquet-avro 1.7.0 does not properly support this)") {
// TODO Complete this test case after upgrading to parquet-mr 1.8+
test("nullable arrays") {
withTempPath { dir =>
import ParquetCompatibilityTest._

// This Parquet schema is translated from the following Avro schema, with Hadoop configuration
// `parquet.avro.write-old-list-structure` set to `false`:
//
// record AvroArrayOfOptionalInts {
// array<union { null, int }> f;
// }
val schema =
"""message AvroArrayOfOptionalInts {
| required group f (LIST) {
| repeated group list {
| optional int32 element;
| }
| }
|}
""".stripMargin

writeDirect(dir.getCanonicalPath, schema, { rc =>
rc.message {
rc.field("f", 0) {
rc.group {
rc.field("list", 0) {
rc.group {
rc.field("element", 0) {
rc.addInteger(0)
}
}

rc.group { /* null */ }

rc.group {
rc.field("element", 0) {
rc.addInteger(1)
}
}

rc.group { /* null */ }
}
}
}
}
})

checkAnswer(
sqlContext.read.parquet(dir.getCanonicalPath),
Row(Array(0: Integer, null, 1: Integer, null)))
}
}

test("SPARK-10136 array of primitive array") {
Expand Down
Expand Up @@ -1385,21 +1385,25 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""".stripMargin)

testSchemaClipping(
"empty requested schema",

parquetSchema =
"""message root {
| required group f0 {
| required int32 f00;
| required int64 f01;
| }
|}
""".stripMargin,

catalystSchema = new StructType(),

expectedSchema = "message root {}")
// PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType. Should
// re-enable this test case after upgrading to parquet-mr 1.8.2 or some later version.
ignore("empty requested schema") {
testSchemaClipping(
"empty requested schema",

parquetSchema =
"""message root {
| required group f0 {
| required int32 f00;
| required int64 f01;
| }
|}
""".stripMargin,

catalystSchema = new StructType(),

expectedSchema = "message root {}")
}

testSchemaClipping(
"disjoint field sets",
Expand Down