New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24167][SQL] ParquetFilters should not access SQLConf at executor side #21224
Conversation
cc @gatorsmile |
Test build #90096 has finished for PR 21224 at commit
|
Test build #90110 has finished for PR 21224 at commit
|
@cloud-fan, the change seems fine but would there be any clever trick to test this? Seems we could very likely do the similar thing by mistake. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed the PR description. So, this will be tested with TaskContext
and thread local. Sure, we can talk about it there.
LGTM too.
@@ -342,6 +342,7 @@ class ParquetFileFormat | |||
sparkSession.sessionState.conf.parquetFilterPushDown | |||
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly | |||
val returningBatch = supportBatch(sparkSession, resultSchema) | |||
val pushDownDate = sqlConf.parquetFilterPushDownDate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass pushed
instead of declaring new pushDownDate
?
The following can be handled at line 345 here, not inside (file: PartitionedFile) => {}
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we can't, see #21086
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Thank you, @cloud-fan !
thanks, merging to master! |
and branch 2-3 too ..? |
I realized #21086 is only in master, so this bug doesn't exist in 2.3 |
… on the driver ## What changes were proposed in this pull request? This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work. This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check: * `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged * `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods. * `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged * `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged * `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21190 from cloud-fan/minor.
… on the driver ## What changes were proposed in this pull request? This is a followup of apache#20136 . apache#20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work. This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check: * `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. apache#21223 merged * `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods. * `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. apache#21224 merged * `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. apache#21225 merged * `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. apache#21226 merged ## How was this patch tested? existing test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21190 from cloud-fan/minor.
What changes were proposed in this pull request?
This PR is extracted from #21190 , to make it easier to backport.
ParquetFilters
is used in the file scan function, which is executed in executor side, so we can't callconf.parquetFilterPushDownDate
there.How was this patch tested?
it's tested in #21190