Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26893][SQL] Allow partition pruning with subquery filters on file source #23802

Closed
wants to merge 13 commits into from

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Feb 15, 2019

What changes were proposed in this pull request?

This PR introduces leveraging of subquery filters for partition pruning in file source.

Subquery expressions are not allowed to be used for partition pruning in FileSourceStrategy now, instead a FilterExec is added around the FileSourceScanExec to do the job.
This PR optimizes the process by allowing partition pruning subquery expressions as partition filters.

How was this patch tested?

Added new UT and run existing UTs especially SPARK-25482 and SPARK-24085 related ones.

@peter-toth
Copy link
Contributor Author

cc. @mgaido91 @cloud-fan as you worked on #22518

a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
}
}
val normalizedFilters =
Copy link
Contributor Author

@peter-toth peter-toth Feb 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers please note that this change is not related to this PR actually. But I touched DataSourceStrategy.normalizeFilters() so I thought this is it is good occasion to refactor this.

@mgaido91
Copy link
Contributor

@peter-toth I expressed my major concern on the JIRA actually. I don't see them addressed here and actually IIRC before that change subqueries were pushed down but not used for actually filtering out partitions (I don't see this change making them of use). I have not yet carefully reviewed this PR (will do as soon as I have time), so I may be missing something, I apologize in advance if I do.

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 15, 2019

@peter-toth I expressed my major concern on the JIRA actually. I don't see them addressed here and actually IIRC before that change subqueries were pushed down but not used for actually filtering out partitions (I don't see this change making them of use). I have not yet carefully reviewed this PR (will do as soon as I have time), so I may be missing something, I apologize in advance if I do.

I think before your PR they were pushed down as dataFilters and that was the reason for not using them and have them 2 times in the plan.

@cloud-fan
Copy link
Contributor

do you have a prototype about how to leverage the subquery filters in file source scan?

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 16, 2019

I might be wrong but this PR does that.
FileSourceScanExec takes into account partitionFilters to compute selectedPartitions so the inputRDD will be filtered.

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 16, 2019

I extended the UT a bit. Now you can see that there is no additional filter above FileSourceScanExec but the result is correct (the subquery expression is taken into account). Also only p=0 partition files are scanned.

@viirya
Copy link
Member

viirya commented Feb 16, 2019

I don't see the subquery filters are actually pushed down to file source. Looks like you just revert #22518.

if partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.filePath.contains("p=0")))) => true
Copy link
Member

@viirya viirya Feb 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This asserts partition p=0 is read, but doesn't assert that other partitions are not read. Isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, nvm. i read it wrong.

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth I checked and the subquery is executed twice in your UT. Please set a breakpoint in updateResult of scalar subquery. I am not sure why honestly since in the execution plan I indeed see only one subquery exec: may you please investigate this? Moreover, it would be great if we could enforce this in a UT: the only way I can think of it to create a dummy plan on the tests which extends ScalarSubquery and keeps a global counter about the number of times updateResult is invoked.

@@ -170,7 +173,7 @@ object FileSourceStrategy extends Strategy with Logging {
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)

// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
val dataFilters = normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why data filters use normalizedFiltersWithoutSubqueries instead of normalizedFilters? subquery filters only apply to partition keys?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this assumes that data filters are only used in data source pushing down filters. But FileIndex.listFiles also has data filters as parameter. listFiles doesn't clearly define how it will use...

Copy link
Contributor Author

@peter-toth peter-toth Feb 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would bring back #22518 again. If you check it you will see that partitionKeyFilters were filtered to exclude subqueries even before #22518 so dataFilters must have caused that issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a few lines of comment here and mention why we use normalizedFiltersWithoutSubqueries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -147,7 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
// - filters that need to be evaluated again after the scan
val filterSet = ExpressionSet(filters)

val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output)
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing this, is it meaning the issue at #22518 is back again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment above.

@peter-toth
Copy link
Contributor Author

@peter-toth I checked and the subquery is executed twice in your UT. Please set a breakpoint in updateResult of scalar subquery. I am not sure why honestly since in the execution plan I indeed see only one subquery exec: may you please investigate this? Moreover, it would be great if we could enforce this in a UT: the only way I can think of it to create a dummy plan on the tests which extends ScalarSubquery and keeps a global counter about the number of times updateResult is invoked.

Hmm, thanks. I will look into it.
Actually I just used your SPARK-25482 UT to make sure this PR doesn't cause that issue again. If this double execution gets confirmed we might need to revise that UT as well...

@@ -433,8 +433,14 @@ object DataSourceStrategy {
*/
protected[sql] def normalizeFilters(
filters: Seq[Expression],
attributes: Seq[AttributeReference]): Seq[Expression] = {
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filtering out subqueries is out of the purpose of normalizeFilters. It was not good to implicitly do this. I think we can either updating method document, or moving of filtering of subqueries as a small helper method filterSubqueryFilters.

Then we can do normalizeFilters(filters, attributes) and normalizeFilters(filterSubqueryFilters(filters), attributes), individually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@viirya
Copy link
Member

viirya commented Feb 16, 2019

@peter-toth Thanks for replying the comments.

I understood more about what this fix goes to do. #22518 removed subquery filters in FileSourceStrategy to avoid re-execution of the subquery when pushing down it to data source. But it also removes it from partition filters. And this PR proposes to add subquery back to partition filters.

Can you update the PR description? I think the current description is unclear.

@peter-toth
Copy link
Contributor Author

@peter-toth Thanks for replying the comments.

I understood more about what this fix goes to do. #22518 removed subquery filters in FileSourceStrategy to avoid re-execution of the subquery when pushing down it to data source. But it also removes it from partition filters. And this PR proposes to add subquery back to partition filters.

Can you update the PR description? I think the current description is unclear.

Based on https://github.com/apache/spark/pull/22518/files#diff-7393042ad384133558061bee01daa3f7L166 subquery filters were never included in partition filtering.

Sorry for the description, I will make it more detailed.
But it seems @mgaido91 is right and somehow the subquery is executed 2 times though it is once in the plan. I want to understand that first.

@mgaido91
Copy link
Contributor

Actually I just used your SPARK-25482 UT to make sure this PR doesn't cause that issue again. If this double execution gets confirmed we might need to revise that UT as well...

Well the goal of that UT was to ensure the presence of a single scalar subquery instance in the plan, because a single subquery cannot be executed more than once. Here actually, despite I saw only one subquery in the plan, 2 different instances are executed. This needs further investigation in order to understand the root cause: once we know that, we can decide how to move forward, also w.r.t. the UT.

subquery filters were never included in partition filtering.

Yes, that's my understanding too.

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 17, 2019

@mgaido91 I debugged it and double execution seems expected to me. I use checkAnswer in my UT and it executes 2 times the query using 2 different QueryExecution. First df.rdd.count() to check RDD serializability and then a few rows later df.collect() to collect the results.

@peter-toth peter-toth changed the title [SPARK-26893][SQL] Allow pushdown of partition pruning subquery filters to file source [SPARK-26893][SQL] Allow partition pruning using subquery filters on file source Feb 17, 2019
@peter-toth peter-toth changed the title [SPARK-26893][SQL] Allow partition pruning using subquery filters on file source [SPARK-26893][SQL] Allow partition pruning with subquery filters on file source Feb 17, 2019
@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 17, 2019

I don't see the subquery filters are actually pushed down to file source. Looks like you just revert #22518.

I changed the title and description to make it clear what this PR want to do. I removed the term pushdown as now I see we use it to refer something different.
Please let me know if you think it needs more elaboration.

@peter-toth peter-toth closed this Feb 17, 2019
@peter-toth peter-toth reopened this Feb 17, 2019
Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you're right, I checked with 2 collects and the subquery is executed only once, porting to rdd creates a new logical plan and hence a new subquery plan. I was afraid that the transformations happening when normalizing filters could cause a re-execution, but I haven't found any case when this happens.

The change makes sense to me. @cloud-fan do you think we can trigger the tests?

@cloud-fan
Copy link
Contributor

ok to test

@@ -185,6 +185,13 @@ case class FileSourceScanExec(
ret
}

@transient private lazy val selectedPartitionsCount =
if (partitionFilters.exists(ExecSubqueryExpression.hasSubquery)) {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this?

Copy link
Contributor Author

@peter-toth peter-toth Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is invoked from FileSourceScanExec.metadata and part of FileSourceScanExec's toString value too, so it would throw

requirement failed: Subquery subquery250 has not finished
java.lang.IllegalArgumentException: requirement failed: Subquery subquery250 has not finished
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.sql.execution.ScalarSubquery.eval(subquery.scala:95)

exception when printing executed plan.
And I didn't want to get rid of PartitionCount metadata entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this problem present before https://issues.apache.org/jira/browse/SPARK-25482 ? i.e. Spark 2.4 and prior

Copy link
Contributor Author

@peter-toth peter-toth Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is required due to this PR. partitionFilters always filtered out subqueries. SPARK-25482 just filtered out subqueries from dataFilters (pushed down filters) because they were not used but caused double execution.

@SparkQA
Copy link

SparkQA commented Feb 18, 2019

Test build #102468 has finished for PR 23802 at commit 4a65fad.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

if subquery filter can be a partition filter, why can't it be a data filter?

@mgaido91
Copy link
Contributor

if subquery filter can be a partition filter, why can't it be a data filter?

the point is: when it is a partition filter it is present only once in the plan, while if it is a dataFilter it is present both in the FileScanExec and in the Filter (which cannot be removed in this case because we cannot enforce that the data source performs properly the filtering according to the dataFilters).

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 18, 2019

if subquery filter can be a partition filter, why can't it be a data filter?

DataSourceStrategy.translateFilter can't transform them to Filter so they will not be pushed down to data source.
(Partition filters are handled in FileSourceScanExec.selectedPartitions and there is no need to transform them to Filter.)

@cloud-fan
Copy link
Contributor

shall we turn subquery filter into normal filter before calling listFiles(partitionFilters, dataFilters)? Then we can even prune the partitions at the metastore side.

@mgaido91
Copy link
Contributor

+1, I like the idea (it should be feasible I think). Maybe in a followup though?

…an't touch selectedPartitions before execution time if it contains subquery expression

Change-Id: I0188d007c858e0dc9f885731f2f7bfda2b7834ad
@@ -221,7 +221,8 @@ case class FileSourceScanExec(
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)

val sortOrder = if (sortColumns.nonEmpty) {
val sortOrder = if (sortColumns.nonEmpty &&
!partitionFilters.exists(ExecSubqueryExpression.hasSubquery)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it needed?

Copy link
Contributor Author

@peter-toth peter-toth Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because outputPartitioning and outputOrdering are used before execution (in EnsureRequirements as far as I see) and partitionFilters can't be evaluated before execution if they have subquery expressions, otherwise we get a Subquery ... has not finished exception.

Copy link
Contributor

@cloud-fan cloud-fan Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionFilters.exists(ExecSubqueryExpression.hasSubquery) is used more than once, maybe we can create

private def partitionsOnlyAvailableAtRuntime: Boolean = {
  partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

@peter-toth
Copy link
Contributor Author

shall we turn subquery filter into normal filter before calling listFiles(partitionFilters, dataFilters)? Then we can even prune the partitions at the metastore side.

Sorry, I don't get this. Can you please elaborate on it? Where they should be turned to normal filters?

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 20, 2019

does it really help with performance? The HiveShim#convertFilters doesn't support subquery filter either, so partition pruning can't get the benefit of it.

Hmm, I don't see in HiveTableScans strategy that it filters out subquery expressions from partition pruning filters (pruningPredicates) before passing it to HiveTableScanExec and then to HiveShim#convertFilters. Nor I see that it includes these filters in FilterExec around HiveTableScanExec. Seems like a bug to me.

Never mind. I get it now.

@peter-toth
Copy link
Contributor Author

shall we turn subquery filter into normal filter before calling listFiles(partitionFilters, dataFilters)? Then we can even prune the partitions at the metastore side.

Sorry, I don't get this. Can you please elaborate on it? Where they should be turned to normal filters?

I might be wrong but PruneFileSourcePartitions already filters at the metastore side (and it can't use subquery filters there) so does it make sense to convert subquery filters to normal filters before calling listFiles(partitionFilters, dataFilters)?

@mgaido91
Copy link
Contributor

I might be wrong but PruneFileSourcePartitions already filters at the metastore side (and it can't use subquery filters there) so does it make sense to convert subquery filters to normal filters before calling listFiles(partitionFilters, dataFilters)?

@peter-toth I think what @cloud-fan is suggesting is to replace the subquery with its literal value before listing the partitions from the Hive metastore (please see HiveShim.listPartitionsByFilter). But IIUC, this should be done in HiveStrategies and I'd propose to do this in another PR since I consider it a different/indipendent change from this one. What do you think guys?

@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102544 has finished for PR 23802 at commit c4fb9bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

@peter-toth I think what @cloud-fan is suggesting is to replace the subquery with its literal value before listing the partitions from the Hive metastore (please see HiveShim.listPartitionsByFilter). But IIUC, this should be done in HiveStrategies and I'd propose to do this in another PR since I consider it a different/indipendent change from this one. What do you think guys?

@mgaido91 thanks, I get it now and agree about independent change.

Change-Id: I0a2fc1752644e1983d8bc8e1cdc9518cf0654d93
@SparkQA
Copy link

SparkQA commented Feb 20, 2019

Test build #102559 has finished for PR 23802 at commit 9b7edc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems reasonable to me, I have just a minor comment, otherwise seems fine to me

!partitionOnlyAvailableAtRuntime) {
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
} else {
metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering about adding here "PartitionCount" -> "unknown" or something which shows that we are reading partitioned data but we don't know how many of them. On the other side maybe people trust this field to be a numeric one so I am not sure about the best thing to do honestly. @cloud-fan @viirya thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think current way is good. We don't need to include unknown metadata.

@peter-toth
Copy link
Contributor Author

@cloud-fan @viirya do you have any comments on this PR?

@cloud-fan
Copy link
Contributor

retest this please

@cloud-fan
Copy link
Contributor

LGTM pending jenkins

@@ -185,6 +185,10 @@ case class FileSourceScanExec(
ret
}

private def partitionOnlyAvailableAtRuntime: Boolean = {
partitionFilters.exists(ExecSubqueryExpression.hasSubquery)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why this is named as partitionOnlyAvailableAtRuntime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it should read partitionsOnlyAvailableAtRuntime at the suggestion of @cloud-fan
will rename it soon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to rethink to link the meaning of partitionOnlyAvailableAtRuntime to its implementation and understand its usage below.

I'd prefer to rename this as something like hasPartitionsAvailableAtRunTime and add some comments to explain its usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Renamed it and added some comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. LGTM.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this LGTM but with a comment regarding code readability.

Change-Id: I9b80799dfbc2b63328380ae690c9a43d1cd027ec
Change-Id: Ia3c2a52c659537fe65fd92545ace79e1e110e815
Change-Id: I4f4fcea6faebe04261c8b534b1f768f5067893a8
@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102864 has finished for PR 23802 at commit 9b7edc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102867 has finished for PR 23802 at commit 1a06c70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2019

Test build #102868 has finished for PR 23802 at commit 7d7d350.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 82820f8 Mar 4, 2019
@peter-toth
Copy link
Contributor Author

Thanks @cloud-fan @mgaido91 and @viirya for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants