-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-42664][CONNECT] Support bloomFilter function for DataFrameStatFunctions
#42414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @hvanhovell I make a clean one, let's restart this |
|
@LuciferYang does this return the same results as the one in sql/core? |
|
Let me check again, this pr has been put for too long, I also can't remember clearly ... |
|
@hvanhovell I generated some random sequences (covering 5 data types that need to be supported) and used different parameters to compare the output results (including So I think their results should be consistent. |
|
@LuciferYang by consistent you mean exactly the same? |
Yes, Have you found any cases with different results? |
| fpp: Double): BloomFilter = { | ||
|
|
||
| val agg = if (!fpp.isNaN) { | ||
| Column.fn("bloom_filter_agg", col, lit(expectedNumItems), lit(fpp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like the ambiguity here. Since we are managing this function ourselves, can we just have one way of invoking it. I kind of prefer Column.fn("bloom_filter_agg", col, lit(expectedNumItems), lit(numBits)).
Alternatively you pass all three, where you pick either fpp or numItems and pass null for the other field. Another idea would be to have different names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about how to refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fe958a6 chang e to only use Column.fn("bloom_filter_agg", col, lit(expectedNumItems), lit(numBits)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a negative test case where mightContain evaluates to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6ffbfa0 Added checks for values that are definitely not included.
| /** | ||
| * `BloomFilterHelper` is used to bridge helper methods in BloomFilter` | ||
| */ | ||
| private[spark] object BloomFilterHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't you directly reference BloomFilter.optimalNumOfBits(expectedNumItems, fpp)? Alternatively you can hide a lot of this by creating dedicated constructors for the BloomFilterAggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4709dd5 make BloomFilter.optimalNumOfBits public and call it directly
| SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) | ||
|
|
||
| // Mark as lazy so that `updater` is not evaluated during tree transformation. | ||
| private lazy val updater: BloomFilterUpdater = first.dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the records lazy vals are not for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I haven't thought of other ways yet. This is similar to the cases of estimatedNumItems and numBits. If it's not lazy, then there will be an issue of Invalid call to dataType on unresolved object
Lines 143 to 151 in 55b07b1
| // Mark as lazy so that `estimatedNumItems` is not evaluated during tree transformation. | |
| private lazy val estimatedNumItems: Long = | |
| Math.min(estimatedNumItemsExpression.eval().asInstanceOf[Number].longValue, | |
| SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) | |
| // Mark as lazy so that `numBits` is not evaluated during tree transformation. | |
| private lazy val numBits: Long = | |
| Math.min(numBitsExpression.eval().asInstanceOf[Number].longValue, | |
| SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) |
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! Can you address the comments?
|
|
||
| // Check expectedNumItems is LongType and value greater than 0L | ||
| val expectedNumItemsExpr = children(1) | ||
| val expectedNumItems = expectedNumItemsExpr match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to Column.fn("bloom_filter_agg", col, lit(expectedNumItems), lit(numBits), the logic indeed appears simpler now, and I have a point for discussion.
@hvanhovell Do you think we should check the validity of the input here? By checking here, the error message can be exactly the same as the api in sql/core. However, if we use the validation mechanism of BloomFilterAggregate, the content of the error message will be different, but the code will be more concise.
Perhaps we don't need to ensure that the error message is the same as before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that in a follow-up.
| val filter1 = df.stat.bloomFilter("id", 1000, 0.03) | ||
| assert(filter1.expectedFpp() - 0.03 < 1e-3) | ||
| assert(data.forall(filter1.mightContain)) | ||
| assert(notContainValues.forall(n => !filter1.mightContain(n))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added checks for values that are definitely not included.
| numBits | ||
| } | ||
|
|
||
| if (fpp <= 0d || fpp >= 1d) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the subsequent process, fpp is no longer involved, so a check is added here. Otherwise, if the user passes an invalid fpp value, the error message will "Number of bits must be positive", which is quite strange.
common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
Outdated
Show resolved
Hide resolved
| * @param p false positive rate (must be 0 < p < 1) | ||
| */ | ||
| private static long optimalNumOfBits(long n, double p) { | ||
| public static long optimalNumOfBits(long n, double p) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to public is because DataFrameStatFunctions#buildBloomFilter needs to use this method to calculate the numBits from expectedNumItems and fpp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you find (must be 0 < p < 1) to be quite messy, we can try changing it to (must be {@literal 0 < p < 1})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am good.
|
|
common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
Outdated
Show resolved
Hide resolved
0f2a7b1 to
80a6b4b
Compare
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…tatFunctions` ### What changes were proposed in this pull request? This is pr using `BloomFilterAggregate` to implement `bloomFilter` function for `DataFrameStatFunctions`. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Add new test - Manually check Scala 2.13 Closes #42414 from LuciferYang/SPARK-42664-backup. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit b9f1114) Signed-off-by: Herman van Hovell <herman@databricks.com>
|
Thanks @hvanhovell ~ |
…tatFunctions` ### What changes were proposed in this pull request? This is pr using `BloomFilterAggregate` to implement `bloomFilter` function for `DataFrameStatFunctions`. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Add new test - Manually check Scala 2.13 Closes apache#42414 from LuciferYang/SPARK-42664-backup. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
What changes were proposed in this pull request?
This is pr using
BloomFilterAggregateto implementbloomFilterfunction forDataFrameStatFunctions.Why are the changes needed?
Add Spark connect jvm client api coverage.
Does this PR introduce any user-facing change?
No
How was this patch tested?