Skip to content

Commit

Permalink
[SPARK-39393][SQL] Parquet data source only supports push-down predic…
Browse files Browse the repository at this point in the history
…ate filters for non-repeated primitive types

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

Add an extra test to ensure problem solved.

Closes apache#36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
  • Loading branch information
Borjianamin98 authored and DeZepTup committed Oct 31, 2022
1 parent 0793813 commit 80c41b0
Showing 0 changed files with 0 additions and 0 deletions.

0 comments on commit 80c41b0

Please sign in to comment.