Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39393][SQL] Parquet data source only supports push-down predicate filters for non-repeated primitive types #36781

Closed
wants to merge 1 commit into from

Conversation

Borjianamin98
Copy link
Contributor

@Borjianamin98 Borjianamin98 commented Jun 6, 2022

What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the PARQUET-34 issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

message Model {
    string name = 1;
    repeated string keywords = 2;
}

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...

The cause of the problem is due to a change in the data filtering conditions:

spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
... 
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ... 
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

How was this patch tested?

Add an extra test to ensure problem solved.

@github-actions github-actions bot added the SQL label Jun 6, 2022
@huaxingao
Copy link
Contributor

@Borjianamin98 Could you please add a test?

@Borjianamin98
Copy link
Contributor Author

Borjianamin98 commented Jun 7, 2022

@Borjianamin98 Could you please add a test?

Surely. I added one test for this.

@huaxingao
@HyukjinKwon

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Borjianamin98 Borjianamin98 force-pushed the master branch 2 times, most recently from 0e4801c to db7db35 Compare June 8, 2022 04:16
…ate filters for non-repeated primitive types

What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the existing issue in the parquet library:

https://issues.apache.org/jira/browse/PARQUET-34

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...

Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

How was this patch tested?

Add an extra test to ensure problem solved.
@LuciferYang
Copy link
Contributor

I think this pr should be backport to previous Spark version, because when run SPARK-39393: Do not push down predicate filters for repeated primitive fields without this pr, I found the error logs as follows:

Last failure message: There are 1 possibly leaked file streams..
	at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:185)
	at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:192)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:402)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:401)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterSuite.eventually(ParquetFilterSuite.scala:77)
	at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:312)
	at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:311)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterSuite.eventually(ParquetFilterSuite.scala:77)
	at org.apache.spark.sql.test.SharedSparkSessionBase.afterEach(SharedSparkSession.scala:164)
	at org.apache.spark.sql.test.SharedSparkSessionBase.afterEach$(SharedSparkSession.scala:158)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFilterSuite.afterEach(ParquetFilterSuite.scala:100)
	at org.scalatest.BeforeAndAfterEach.$anonfun$runTest$1(BeforeAndAfterEach.scala:247)
	at org.scalatest.Status.$anonfun$withAfterEffect$1(Status.scala:377)
	at org.scalatest.Status.$anonfun$withAfterEffect$1$adapted(Status.scala:373)
	at org.scalatest.FailedStatus$.whenCompleted(Status.scala:505)
	at org.scalatest.Status.withAfterEffect(Status.scala:373)
	at org.scalatest.Status.withAfterEffect$(Status.scala:371)
	at org.scalatest.FailedStatus$.withAfterEffect(Status.scala:477)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:246)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: java.lang.IllegalStateException: There are 1 possibly leaked file streams.
	at org.apache.spark.DebugFilesystem$.assertNoOpenStreams(DebugFilesystem.scala:54)
	at org.apache.spark.sql.test.SharedSparkSessionBase.$anonfun$afterEach$1(SharedSparkSession.scala:165)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.enablers.Retrying$$anon$4.makeAValiantAttempt$1(Retrying.scala:150)
	at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:162)
	... 54 more

It seems that this issue is related to parquet-mr:

https://github.com/LuciferYang/parquet-mr/blob/a2da156b251d13bce1fa81eb95b555da04880bc1/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L790

the line 790 may throw IOE but not close f, this issue needs to be fixed in parquet-mr and this pr seems can protect this issue in Spark

Copy link
Contributor

@dcoliversun dcoliversun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@huaxingao
Copy link
Contributor

The fix looks good but the explain result bothers me. Here is what I got from the explain result:

spark.read.parquet(dir.getCanonicalPath).filter("isnotnull(f)").explain(true)

== Physical Plan ==
*(1) Filter isnotnull(f#0)
+- *(1) ColumnarToRow
   +- FileScan parquet [f#0] Batched: true, DataFilters: [isnotnull(f#0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-42..., PartitionFilters: [], PushedFilters: [IsNotNull(f)], ReadSchema: struct<f:array<int>>

The explain has PushedFilters: [IsNotNull(f)] but the filter actually is not pushed down.

The problem is that the pushed down filter information in explain comes from here. As long as the data filters don't include any metadata col filters and can be translated OK, Spark assumes the filters can be pushed down OK.

I am thinking if we should just fix the repeated primitive types for now and fix the explain in another PR, or we should fix the explain problem in this PR too.

@huaxingao
Copy link
Contributor

I think over. I think it's better to have a separate PR to fix the explain problem.

huaxingao pushed a commit that referenced this pull request Jun 8, 2022
…ate filters for non-repeated primitive types

### What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

### Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

### Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

### How was this patch tested?

Add an extra test to ensure problem solved.

Closes #36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ac2881a)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
@huaxingao huaxingao closed this in ac2881a Jun 8, 2022
huaxingao pushed a commit that referenced this pull request Jun 8, 2022
…ate filters for non-repeated primitive types

### What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

### Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

### Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

### How was this patch tested?

Add an extra test to ensure problem solved.

Closes #36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ac2881a)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
huaxingao pushed a commit that referenced this pull request Jun 8, 2022
…ate filters for non-repeated primitive types

### What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

### Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

### Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

### How was this patch tested?

Add an extra test to ensure problem solved.

Closes #36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ac2881a)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
@huaxingao
Copy link
Contributor

Merged to master/3.3/3.2/3.1. Thanks @Borjianamin98 for your first contribution and welcome to Spark community!

Also thanks @LuciferYang @dcoliversun for reviewing!

@huaxingao
Copy link
Contributor

@Borjianamin98 Do you have a jira account? I tried to assign the jira to you but can't find you.

@Borjianamin98
Copy link
Contributor Author

Borjianamin98 commented Jun 8, 2022

@Borjianamin98 Do you have a jira account? I tried to assign the jira to you but can't find you.

My username in jira is borjianamin like what I created for issue SPARK-39393. I do not have another account if you mean something else.

@huaxingao
Copy link
Contributor

@Borjianamin98 I forgot that I need to add you to the contributors list first. I just did and assigned the jira OK :)

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @Borjianamin98 , @huaxingao , @LuciferYang , @dcoliversun .

cc @sunchao and @viirya too

DeZepTup pushed a commit to DeZepTup/spark-custom that referenced this pull request Oct 31, 2022
…ate filters for non-repeated primitive types

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

Add an extra test to ensure problem solved.

Closes apache#36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
DeZepTup pushed a commit to DeZepTup/spark-custom that referenced this pull request Oct 31, 2022
…ate filters for non-repeated primitive types

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

Add an extra test to ensure problem solved.

Closes apache#36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…ate filters for non-repeated primitive types

### What changes were proposed in this pull request?

In Spark version 3.1.0 and newer, Spark creates extra filter predicate conditions for repeated parquet columns.
These fields do not have the ability to have a filter predicate, according to the [PARQUET-34](https://issues.apache.org/jira/browse/PARQUET-34) issue in the parquet library.

This PR solves this problem until the appropriate functionality is provided by the parquet.

Before this PR:

Assume follow Protocol buffer schema:

```
message Model {
    string name = 1;
    repeated string keywords = 2;
}
```

Suppose a parquet file is created from a set of records in the above format with the help of the parquet-protobuf library.
Using Spark version 3.1.0 or newer, we get following exception when run the following query using spark-shell:

```
val data = spark.read.parquet("/path/to/parquet")
data.registerTempTable("models")
spark.sql("select * from models where array_contains(keywords, 'X')").show(false)
```

```
Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:176)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
  at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
  at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
  at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
  at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
  at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
  at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:870)
  at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:789)
  at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
  at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:162)
  at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:373)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
...
```

The cause of the problem is due to a change in the data filtering conditions:

```
spark.sql("select * from log where array_contains(keywords, 'X')").explain(true);

// Spark 3.0.2 and older
== Physical Plan ==
...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [array_contains(keywords#1, Google)]
  PushedFilters: []
  ...

// Spark 3.1.0 and newer
== Physical Plan == ...
+- FileScan parquet [link#0,keywords#1]
  DataFilters: [isnotnull(keywords#1),  array_contains(keywords#1, Google)]
  PushedFilters: [IsNotNull(keywords)]
  ...
```

Pushing filters down for repeated columns of parquet is not necessary because it is not supported by parquet library for now. So we can exclude them from pushed predicate filters and solve issue.

### Why are the changes needed?

Predicate filters that are pushed down to parquet should not be created on repeated-type fields.

### Does this PR introduce any user-facing change?

No, It's only fixed a bug and before this, due to the limitations of the parquet library, no more work was possible.

### How was this patch tested?

Add an extra test to ensure problem solved.

Closes apache#36781 from Borjianamin98/master.

Authored-by: Amin Borjian <borjianamin98@outlook.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>
(cherry picked from commit ac2881a)
Signed-off-by: huaxingao <huaxin_gao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants