-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6554] [SQL] Don't push down predicates which reference partition column(s) #5210
Conversation
Test build #29228 has started for PR 5210 at commit
|
Test build #29228 has finished for PR 5210 at commit
|
Test PASSed. |
val referencedColNames = pred.references.map(_.name).toSet | ||
referencedColNames.intersect(partitionColNames).isEmpty | ||
} | ||
.flatMap(ParquetFilters.createFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain what is going on with the flatMap and reduceOption here? Is this the simplest way to write this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty common construct throughout the query planner, but we can certainly add comments.
Its essentially take any filters that we can convert a parquet filter (skipping those that parquet does not support), and And
them together into a single predicate. Set that in the job conf, or do nothing if no filters can be converted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I will agree the explicit if is much clearer than .filter(_ => sqlContext.conf.parquetFilterPushDown)
from the previous implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is simply
val pushdown = predicates
.filter { pred =>
val partitionColNames = partitionColumns.map(_.name).toSet
val referencedColNames = pred.references.map(_.name).toSet
referencedColNames.intersect(partitionColNames).isEmpty
}
.flatMap(ParquetFilters.createFilter)
if (pushdown.nonEmpty) {
ParquetInputFormat.setFilterPredicate(jobConf, pushdown.reduce(FilterApi.and))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah reynold's one seems more clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you guys are optimizing for the wrong thing here. reduceOption(<and>)
is an incredibly common pattern when you are munging predicates. It occurs 15+ times in the code base.
In my experience, reduce
is always a red flag and it means you aren't considering what to do in the case that the list is empty. There have been several bugs in catalyst where this was the problem.
You are correct, that you can add an explicit if check. However, now as I scan the code, I have to connect these two disparate lines and reason about them independently. When a developer uses reduceOption
I know that the compiler is going to yell at them if they aren't handling all cases correctly, so I can focus my review on the other logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasons why I think our style guide should prefer reduceOption
to reduce
/reduceLeft
:
- SPARK-877
java.lang.UnsupportedOperationException: empty.reduceLeft in UI - SPARK-744
BlockManagerUI with no RDD: java.lang.UnsupportedOperationException: empty.reduceLeft - SPARK-4968
[SparkSQL] java.lang.UnsupportedOperationException when hive partition doesn't exist and order by and limit are used - SPARK-4318
Fix empty sum distinct. - SPARK-5852
Fail to convert a newly created empty metastore parquet table to a data source parquet table. - SPARK-6245
jsonRDD() of empty RDD results in exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think you are convincing me that we should ban reduce in the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to agree that if we're going to use reduce here, then reduceOption does seem clearer from a safety perspective. However, a transformation from Seq[T] -> Option[U] is a bit confusing, since we're doing two things at once; it's pretty hard to accurately follow the typing information, and the fact that foreach() can apply to either Seq or Option makes it somewhat harder to figure out what it's applying to and when.
This is one of those "trust the programmer" ordeals, where I just have to assume the programmer wrote it correctly and it does what it seems, but verifying that (i.e., reviewing the code) is made significantly harder.
…on column(s) There are two cases for the new Parquet data source: 1. Partition columns exist in the Parquet data files We don't need to push-down these predicates since partition pruning already handles them. 1. Partition columns don't exist in the Parquet data files We can't push-down these predicates since they are considered as invalid columns by Parquet. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5210) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5210 from liancheng/spark-6554 and squashes the following commits: 4f7ec03 [Cheng Lian] Adds comments e134ced [Cheng Lian] Don't push down predicates which reference partition column(s)
Thanks! I've merged this to master and 1.3. |
predicates | ||
// Don't push down predicates which reference partition columns | ||
.filter { pred => | ||
val partitionColNames = partitionColumns.map(_.name).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw we don't need to compute partitionColumns for every predicate, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good point, but likely inconsequential from a performance standpoint.
This PR addresses rxin's comments in PR #5210. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5219) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5219 from liancheng/spark-6554-followup and squashes the following commits: 41f3a09 [Cheng Lian] Addresses comments in #5210 (cherry picked from commit d3944b6) Signed-off-by: Reynold Xin <rxin@databricks.com>
This PR addresses rxin's comments in PR #5210. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5219) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5219 from liancheng/spark-6554-followup and squashes the following commits: 41f3a09 [Cheng Lian] Addresses comments in #5210
There are two cases for the new Parquet data source:
Partition columns exist in the Parquet data files
We don't need to push-down these predicates since partition pruning already handles them.
Partition columns don't exist in the Parquet data files
We can't push-down these predicates since they are considered as invalid columns by Parquet.