Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Statistics}
import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -197,6 +198,8 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap

relaxParquetValidTypeMap

// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
Expand Down Expand Up @@ -239,6 +242,37 @@ private[sql] object ParquetFilters {
}
}

// !! HACK ALERT !!
//
// This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
// parquet-mr 1.8.1 or higher versions.
//
// In Parquet, not all types of columns can be used for filter push-down optimization. The set
// of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
// prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
// pushed down.
//
// This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
// to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
// a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
// legal except that it fails the `ValidTypeMap` check.
//
// Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
private lazy val relaxParquetValidTypeMap: Unit = {
val constructor = Class
.forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
.getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])

constructor.setAccessible(true)
val enumTypeDescriptor = constructor
.newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
.asInstanceOf[AnyRef]

val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
addMethod.setAccessible(true)
addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
}

/**
* Converts Catalyst predicate expressions to Parquet filter predicates.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ private[sql] object ParquetRelation extends Logging {
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)

// HACK ALERT:
// !! HACK ALERT !!
//
// Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es
// to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable`
Expand Down
16 changes: 6 additions & 10 deletions sql/core/src/test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@ The following directories and files are used for Parquet compatibility tests:
.
├── README.md # This file
├── avro
│   ├── parquet-compat.avdl # Testing Avro IDL
│   └── parquet-compat.avpr # !! NO TOUCH !! Protocol file generated from parquet-compat.avdl
│   ├── *.avdl # Testing Avro IDL(s)
│   └── *.avpr # !! NO TOUCH !! Protocol files generated from Avro IDL(s)
├── gen-java # !! NO TOUCH !! Generated Java code
├── scripts
│   └── gen-code.sh # Script used to generate Java code for Thrift and Avro
│   ├── gen-avro.sh # Script used to generate Java code for Avro
│   └── gen-thrift.sh # Script used to generate Java code for Thrift
└── thrift
└── parquet-compat.thrift # Testing Thrift schema
└── *.thrift # Testing Thrift schema(s)
```

Generated Java code are used in the following test suites:

- `org.apache.spark.sql.parquet.ParquetAvroCompatibilitySuite`
- `org.apache.spark.sql.parquet.ParquetThriftCompatibilitySuite`

To avoid code generation during build time, Java code generated from testing Thrift schema and Avro IDL are also checked in.

When updating the testing Thrift schema and Avro IDL, please run `gen-code.sh` to update all the generated Java code.
When updating the testing Thrift schema and Avro IDL, please run `gen-avro.sh` and `gen-thrift.sh` accordingly to update generated Java code.

## Prerequisites

Expand Down
13 changes: 12 additions & 1 deletion sql/core/src/test/avro/parquet-compat.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,19 @@
*/

// This is a test protocol for testing parquet-avro compatibility.
@namespace("org.apache.spark.sql.parquet.test.avro")
@namespace("org.apache.spark.sql.execution.datasources.parquet.test.avro")
protocol CompatibilityTest {
enum Suit {
SPADES,
HEARTS,
DIAMONDS,
CLUBS
}

record ParquetEnum {
Suit suit;
}

record Nested {
array<int> nested_ints_column;
string nested_string_column;
Expand Down
13 changes: 12 additions & 1 deletion sql/core/src/test/avro/parquet-compat.avpr
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
{
"protocol" : "CompatibilityTest",
"namespace" : "org.apache.spark.sql.parquet.test.avro",
"namespace" : "org.apache.spark.sql.execution.datasources.parquet.test.avro",
"types" : [ {
"type" : "enum",
"name" : "Suit",
"symbols" : [ "SPADES", "HEARTS", "DIAMONDS", "CLUBS" ]
}, {
"type" : "record",
"name" : "ParquetEnum",
"fields" : [ {
"name" : "suit",
"type" : "Suit"
} ]
}, {
"type" : "record",
"name" : "Nested",
"fields" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public interface CompatibilityTest {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"types\":[{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}");
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"types\":[{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]},{\"type\":\"record\",\"name\":\"ParquetEnum\",\"fields\":[{\"name\":\"suit\",\"type\":\"Suit\"}]},{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}");

@SuppressWarnings("all")
public interface Callback extends CompatibilityTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Nested extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Nested\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Nested\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public java.util.List<java.lang.Integer> nested_ints_column;
@Deprecated public java.lang.String nested_string_column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},\"avro.java.string\":\"String\"}}]}");
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},\"avro.java.string\":\"String\"}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public boolean bool_column;
@Deprecated public int int_column;
Expand Down
Loading