New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399

Closed
wants to merge 9 commits into
base: master
from

Conversation

Projects
None yet
4 participants
@liancheng
Contributor

liancheng commented Nov 1, 2015

This PR adds a new method unhandledFilters to BaseRelation. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 1, 2015

Test build #44768 has finished for PR 9399 at commit 16f3ca3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 1, 2015

Test build #44768 has finished for PR 9399 at commit 16f3ca3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = {

This comment has been minimized.

@yhuai

yhuai Nov 1, 2015

Contributor

It is not obvious that we need both Seq[Expression] and Seq[Filter]. Can you add comments to explain what are these?

@yhuai

yhuai Nov 1, 2015

Contributor

It is not obvious that we need both Seq[Expression] and Seq[Filter]. Can you add comments to explain what are these?

val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),

This comment has been minimized.

@yhuai

yhuai Nov 1, 2015

Contributor

At here, it is not really necessary to pass in candidatePredicates because a data source may reject some filters. We can just pass in the equivalent forms of pushedFilters, right?

@yhuai

yhuai Nov 1, 2015

Contributor

At here, it is not really necessary to pass in candidatePredicates because a data source may reject some filters. We can just pass in the equivalent forms of pushedFilters, right?

This comment has been minimized.

@yhuai

yhuai Nov 1, 2015

Contributor

I mean pushedFilters contains all filters in the Filter form that can be handled by the data source, why not change candidatePredicates to catalyst filters that can be handled by the data source.

@yhuai

yhuai Nov 1, 2015

Contributor

I mean pushedFilters contains all filters in the Filter form that can be handled by the data source, why not change candidatePredicates to catalyst filters that can be handled by the data source.

This comment has been minimized.

@yhuai

yhuai Nov 2, 2015

Contributor

I think I understand what's going on. Actually, pushedFilters also contains those filters that cannot be handled by a data source and candidatePredicates contains filters that cannot be converted to public Filter interface.

@yhuai

yhuai Nov 2, 2015

Contributor

I think I understand what's going on. Actually, pushedFilters also contains those filters that cannot be handled by a data source and candidatePredicates contains filters that cannot be converted to public Filter interface.

This comment has been minimized.

@liancheng

liancheng Nov 2, 2015

Contributor

Yeah right, it's a little bit tricky. Adding comment to explain this.

@liancheng

liancheng Nov 2, 2015

Contributor

Yeah right, it's a little bit tricky. Adding comment to explain this.

val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}

This comment has been minimized.

@yhuai

yhuai Nov 1, 2015

Contributor

Let's add comments?

@yhuai

yhuai Nov 1, 2015

Contributor

Let's add comments?

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 2, 2015

Test build #44776 has finished for PR 9399 at commit fec7d25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 2, 2015

Test build #44776 has finished for PR 9399 at commit fec7d25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 2, 2015

Contributor

One more consideration for this improvement, as we probably need to optimize the filters by folding the expression, as the partition keys are actually are the constant value in execution, simply adding the unhandledFilters probably does not work for partition based data source. So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan.

Contributor

chenghao-intel commented Nov 2, 2015

One more consideration for this improvement, as we probably need to optimize the filters by folding the expression, as the partition keys are actually are the constant value in execution, simply adding the unhandledFilters probably does not work for partition based data source. So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan.

@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 2, 2015

Contributor

@chenghao-intel Can you give an example showing unhandledFilters is insufficient? Also, regarding "So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan", can you explain it?

Contributor

yhuai commented Nov 2, 2015

@chenghao-intel Can you give an example showing unhandledFilters is insufficient? Also, regarding "So I am wondering if we can leave the unhandledFilters and handledFilters to data source implementation itself, we can provide the utilities or the default implementation for the common operations within the buildScan", can you explain it?

@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 2, 2015

Contributor

Actually I am talking that it probably give us some troubles in getting the unhandledFilters if we planned to optimize the cases where partition keys combined in the filter, as the partition key will be constant value during the execution for EACH PARTITION, and we may not able to make it filterable in planning stage, at least the code will be more complicated, and besides, I don't see too much benefit if we exposed the unhandledFilters for DataSourceStrategy, so I am suggesting if we can leave the unhandledFilters for BaseRelation.buildScan, as unhandledFilters can be considered as a private/protected method in BaseRelation, or we can provide some common operation helper functions to simplify the implementation for the new data source developers.

Sorry if I missed something.

Contributor

chenghao-intel commented Nov 2, 2015

Actually I am talking that it probably give us some troubles in getting the unhandledFilters if we planned to optimize the cases where partition keys combined in the filter, as the partition key will be constant value during the execution for EACH PARTITION, and we may not able to make it filterable in planning stage, at least the code will be more complicated, and besides, I don't see too much benefit if we exposed the unhandledFilters for DataSourceStrategy, so I am suggesting if we can leave the unhandledFilters for BaseRelation.buildScan, as unhandledFilters can be considered as a private/protected method in BaseRelation, or we can provide some common operation helper functions to simplify the implementation for the new data source developers.

Sorry if I missed something.

@liancheng

This comment has been minimized.

Show comment
Hide comment
@liancheng

liancheng Nov 2, 2015

Contributor

@chenghao-intel Could you please give an example?

Contributor

liancheng commented Nov 2, 2015

@chenghao-intel Could you please give an example?

@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 2, 2015

Contributor

Oh, for example: let's say we have the table src (key, value) partition (p1)
For the query like "SELECT value FROM src WHERE key > p1",

And we assume the p1 candidates are 10, 100, and the key range is (0, 50).
-- unhandledFilter = Array.empty
This probably fail in key > 10 (p1 = 10), as we may not able to filter records during the scan, before we taking out all of the records, or in buildScan, we should add an extra filter operation on RDD[Row].
-- unhandledFilter = key > p1
We will loss the optimization for partition (p1 = 100), since the concrete filter is key > 100, and we should always return RDD[Row].empty, as the range of key is (0, 50).

I mean it will be confused to the new data source developers, how to define the unhandledFilter. as the partition key is not treated like the normal attributes, at least it requires more work in getting the concrete value and multiple filter in the planning stage for different partition keys, what's the unhandledFilter supposed to retrieve?

On the other hand, I am not sure if it's really necessary to expose the unhandledFilter, as it's will be new API for data source that the developer should be aware for optimization purpose, but, we we pass down the filters via API def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] and its variants already. Splitting the filter expressions into 2 parts, and executed in different operators (DataSourceStrategy and DataSource impelementation) seems making thing more complicated, despite we will do the splitting in the data source implementation, but probably not wise enough to expose that externally.

Contributor

chenghao-intel commented Nov 2, 2015

Oh, for example: let's say we have the table src (key, value) partition (p1)
For the query like "SELECT value FROM src WHERE key > p1",

And we assume the p1 candidates are 10, 100, and the key range is (0, 50).
-- unhandledFilter = Array.empty
This probably fail in key > 10 (p1 = 10), as we may not able to filter records during the scan, before we taking out all of the records, or in buildScan, we should add an extra filter operation on RDD[Row].
-- unhandledFilter = key > p1
We will loss the optimization for partition (p1 = 100), since the concrete filter is key > 100, and we should always return RDD[Row].empty, as the range of key is (0, 50).

I mean it will be confused to the new data source developers, how to define the unhandledFilter. as the partition key is not treated like the normal attributes, at least it requires more work in getting the concrete value and multiple filter in the planning stage for different partition keys, what's the unhandledFilter supposed to retrieve?

On the other hand, I am not sure if it's really necessary to expose the unhandledFilter, as it's will be new API for data source that the developer should be aware for optimization purpose, but, we we pass down the filters via API def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] and its variants already. Splitting the filter expressions into 2 parts, and executed in different operators (DataSourceStrategy and DataSource impelementation) seems making thing more complicated, despite we will do the splitting in the data source implementation, but probably not wise enough to expose that externally.

@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 2, 2015

Contributor

Sorry, I am challenging this as it's about the API, which probably difficult to change back once it's released, and we'd better think further, by adding the partition key cases.

Contributor

chenghao-intel commented Nov 2, 2015

Sorry, I am challenging this as it's about the API, which probably difficult to change back once it's released, and we'd better think further, by adding the partition key cases.

@liancheng

This comment has been minimized.

Show comment
Hide comment
@liancheng

liancheng Nov 2, 2015

Contributor

@chenghao-intel Thanks for the comment. That's a good point and I didn't consider this situation when writing this PR. However, fortunately we don't even try to push down predicates that reference any partition columns (see here). In general, when implementing a data source, developers shouldn't worry about partitioning. A HadoopFsRelation data source is only responsible for returning data within a single partition, the query planner does the rest including partition pruning. So I think this is fine?

Contributor

liancheng commented Nov 2, 2015

@chenghao-intel Thanks for the comment. That's a good point and I didn't consider this situation when writing this PR. However, fortunately we don't even try to push down predicates that reference any partition columns (see here). In general, when implementing a data source, developers shouldn't worry about partitioning. A HadoopFsRelation data source is only responsible for returning data within a single partition, the query planner does the rest including partition pruning. So I think this is fine?

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 2, 2015

Test build #44811 has finished for PR 9399 at commit b658aaa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 2, 2015

Test build #44811 has finished for PR 9399 at commit b658aaa.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 2, 2015

Test build #44814 has finished for PR 9399 at commit 7c17dd1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 2, 2015

Test build #44814 has finished for PR 9399 at commit 7c17dd1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 3, 2015

Contributor

Ok, actually I was planning to optimize the expression with partition key, which will introduce the ConstantFolding, as the partition key will be a constant value in runtime.

I know, for DataSource developer, the HadoopFsRelation will only return the single partition data as the RDD, that's what my question, how to define the unhandledFilter, will the filter with partition key always be part of the unhandledFilter?

Contributor

chenghao-intel commented Nov 3, 2015

Ok, actually I was planning to optimize the expression with partition key, which will introduce the ConstantFolding, as the partition key will be a constant value in runtime.

I know, for DataSource developer, the HadoopFsRelation will only return the single partition data as the RDD, that's what my question, how to define the unhandledFilter, will the filter with partition key always be part of the unhandledFilter?

@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

unhandledFilter will not see filters using partitioning columns.

Contributor

yhuai commented Nov 3, 2015

unhandledFilter will not see filters using partitioning columns.

@chenghao-intel

This comment has been minimized.

Show comment
Hide comment
@chenghao-intel

chenghao-intel Nov 3, 2015

Contributor

Ok, thanks for explanation.

Contributor

chenghao-intel commented Nov 3, 2015

Ok, thanks for explanation.

@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

Overall LGTM. Once we update the FilteredScanSuite, we are good to go.

Contributor

yhuai commented Nov 3, 2015

Overall LGTM. Once we update the FilteredScanSuite, we are good to go.

@liancheng

This comment has been minimized.

Show comment
Hide comment
@liancheng

liancheng Nov 3, 2015

Contributor

retest this please

Contributor

liancheng commented Nov 3, 2015

retest this please

@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

test this please

Contributor

yhuai commented Nov 3, 2015

test this please

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 3, 2015

Test build #44928 has finished for PR 9399 at commit 326ea24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 3, 2015

Test build #44928 has finished for PR 9399 at commit 326ea24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 3, 2015

Test build #44927 has finished for PR 9399 at commit ddac7ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 3, 2015

Test build #44927 has finished for PR 9399 at commit ddac7ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

I will merge it once it passes jenkins. Let's have a test to make sure those handled filters will not show up in the Filter operator.

Contributor

yhuai commented Nov 3, 2015

I will merge it once it passes jenkins. Let's have a test to make sure those handled filters will not show up in the Filter operator.

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 3, 2015

Test build #44933 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 3, 2015

Test build #44933 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

Thanks! Merging!

Contributor

yhuai commented Nov 3, 2015

Thanks! Merging!

@asfgit asfgit closed this in ebf8b0b Nov 3, 2015

@yhuai

This comment has been minimized.

Show comment
Hide comment
@yhuai

yhuai Nov 3, 2015

Contributor

Let's also have some test cases that having a column that is used in handled filters as well as in unhandled/unconvertible filters.

Contributor

yhuai commented Nov 3, 2015

Let's also have some test cases that having a column that is used in handled filters as well as in unhandled/unconvertible filters.

@SparkQA

This comment has been minimized.

Show comment
Hide comment
@SparkQA

SparkQA Nov 3, 2015

Test build #44934 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

SparkQA commented Nov 3, 2015

Test build #44934 has finished for PR 9399 at commit 92dfc55.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

vundela pushed a commit to vundela/spark that referenced this pull request Nov 3, 2015

[SPARK-10978][SQL] Allow data sources to eliminate filters
This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9399 from liancheng/spark-10978.unhandled-filters.

@liancheng liancheng deleted the liancheng:spark-10978.unhandled-filters branch Nov 4, 2015

liancheng added a commit to liancheng/spark that referenced this pull request Nov 4, 2015

asfgit pushed a commit that referenced this pull request Nov 6, 2015

[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399
This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes #9468 from liancheng/spark-10978.follow-up.

(cherry picked from commit c048929)
Signed-off-by: Yin Huai <yhuai@databricks.com>

asfgit pushed a commit that referenced this pull request Nov 6, 2015

[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399
This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes #9468 from liancheng/spark-10978.follow-up.

ajbozarth added a commit to ajbozarth/spark that referenced this pull request Nov 12, 2015

[SPARK-10978][SQL] Allow data sources to eliminate filters
This PR adds a new method `unhandledFilters` to `BaseRelation`. Data sources which implement this method properly may avoid the overhead of defensive filtering done by Spark SQL.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9399 from liancheng/spark-10978.unhandled-filters.

ajbozarth added a commit to ajbozarth/spark that referenced this pull request Nov 12, 2015

[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR apache#…
…9399

This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9468 from liancheng/spark-10978.follow-up.

nakul02 added a commit to nakul02/spark that referenced this pull request Nov 16, 2015

[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR apache#…
…9399

This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9468 from liancheng/spark-10978.follow-up.

JoshRosen added a commit to databricks/spark-redshift that referenced this pull request Nov 24, 2015

Implement Spark 1.6's new unhandledFilters API
Spark 1.6 extends `BaseRelation` with a new API which allows data sources to tell Spark which filters they handle, allowing Spark to eliminate its own defensive filtering for filters that are handled by the data source (see apache/spark#9399 for more details).

This patch implements this new API in `spark-redshift` and adds tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #128 from JoshRosen/support-filter-skipping-in-spark-1.6.

mwws pushed a commit to mwws/spark that referenced this pull request Dec 10, 2015

[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR apache#…
…9399

This PR adds test cases that test various column pruning and filter push-down cases.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9468 from liancheng/spark-10978.follow-up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment