-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24638][SQL] StringStartsWith support push down #21623
Conversation
cc @rdblue |
Test build #92258 has finished for PR 21623 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question regarding sources.StringStartsWith("_1", null)): if you have a nullable string column and some of the values are null will this operator (parameterized with null) matches against them?
LGTM otherwise.
@@ -270,6 +272,29 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { | |||
case sources.Not(pred) => | |||
createFilter(schema, pred).map(FilterApi.not) | |||
|
|||
case sources.StringStartsWith(name, prefix) if canMakeFilterOn(name) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about adding a configuration to control this and set it true by default? It's basically dependent on an user defined predicate we manually wrote here.
assertResult(None) { | ||
parquetFilters.createFilter( | ||
df.schema, | ||
sources.StringStartsWith("_1", null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @attilapiros , sources.StringStartsWith("_1", null)
will not matches them, same as before.
comparator.compare(min.slice(0, math.min(size, min.length)), strToBinary) > 0 | ||
} | ||
|
||
override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't this evaluate the inverse of StartsWith
? If the min and max values exclude the prefix, then this should be able to filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
Let me illustrate this with an example: let's assume min="BBB", max="DDD" canDrop() means if your prefix is before "BBB" (like "A") we can stop as there is no reason to search within this range. This is also true for prefixes after "DDD" (like "E").
Now if your operator is negated. What can you say when your prefix is "C" and the range is "BBB" and "DDD"? Can you drop it? No. And if the prefix is "A" or "E". Still not. You see you should check the range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant if the min and max both include the prefix, then we should be able to drop the range. The situation is where both min and max match, so all values must also match the filter. If we are looking for values that do not match the filter, then we can eliminate the row group.
The example is prefix=CCC and values are between min=CCCa and max=CCCZ: all values start with CCC, so the entire row group can be skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one rare case when you can drop it with inverse when both min and max starts the perfix. @wangyum please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue oh, sorry I have not seen your reply. Yes, in that case we can and you are right it is worth to do.
Test build #92342 has finished for PR 21623 at commit
|
Test build #92362 has finished for PR 21623 at commit
|
@@ -660,6 +661,56 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |||
assert(df.where("col > 0").count() === 2) | |||
} | |||
} | |||
|
|||
test("filter pushdown - StringStartsWith") { | |||
withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that all of these tests go through the keep
method instead of the canDrop
and inverseCanDrop
. I think those methods need to be tested. You can do that by constructing a Parquet file with row groups that have predictable statistics, but that would be difficult. An easier way to do this is to define the predicate class elsewhere and create a unit test for it that passes in different statistics values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added testStringStartsWith
to test that exactly go through the canDrop
and inverseCanDrop
.
private[parquet] class ParquetFilters(pushDownDate: Boolean) { | ||
private[parquet] class ParquetFilters() { | ||
|
||
val sqlConf: SQLConf = SQLConf.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should pass in pushDownDate
and pushDownStartWith
like the previous version did with just the date setting.
The SQLConf is already available in ParquetFileFormat and it would be better to pass it in. The problem is that this class is instantiated in the function ((file: PartitionedFile) => { ... }
) that gets serialized and sent to executors. That means we don't want SQLConf and its references in the function's closure. The way we got around this before was to put boolean config vals in the closure instead. I think you should go with that approach.
I'm not sure what SQLConf.get
is for or what a correct use would be. @gatorsmile, can you comment on use of SQLConf.get
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I hit a bug here.
Overall, I think this is close. The tests need to cover the row group stats case and we should update how configuration is passed to the filters. Thanks for working on this, @wangyum! |
@@ -378,6 +378,14 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED = | |||
buildConf("spark.sql.parquet.filterPushdown.string.startsWith") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we added .enabled
postfix.
Test build #92449 has finished for PR 21623 at commit
|
Test build #92454 has finished for PR 21623 at commit
|
Jenkins, retest this please. |
@@ -378,6 +378,14 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED = | |||
buildConf("spark.sql.parquet.filterPushdown.string.startsWith") | |||
.doc("If true, enables Parquet filter push-down optimization for string starts with. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for string startsWith function
} | ||
|
||
override def keep(value: Binary): Boolean = { | ||
UTF8String.fromBytes(value.getBytes).startsWith(UTF8String.fromString(v)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UTF8String.fromString(v)
-> UTF8String.fromBytes(strToBinary.getBytes)
?
|
||
val df = spark.read.parquet(path).filter(filter) | ||
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0))) | ||
df.collect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this collect
do? foreachPartition
is already an action
|
||
test("filter pushdown - StringStartsWith") { | ||
withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => | ||
// Test canDrop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to confirm, do they test canDrop
or keep
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both methods have been executed but it can't be confirmed which method has taken effect.
Test build #92463 has finished for PR 21623 at commit
|
Test build #92474 has finished for PR 21623 at commit
|
Benchmark result:
|
thanks, merging to master! @dongjoon-hyun is there something similar in ORC? |
@cloud-fan . AFAIK, ORC doesn't support custom filter yet. I'll follow up that stuff in ORC. |
What changes were proposed in this pull request?
StringStartsWith
support push down. About 50% savings in compute time.How was this patch tested?
unit tests, manual tests and performance test:
result: