New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30120][ML] Use BoundedPriorityQueue for small dataset in LSH approxNearestNeighbors #26858
Conversation
…pproxNearestNeighbors
if (approxQuantile >= 1) { | ||
modelDatasetWithDist | ||
// for a small dataset, use BoundedPriorityQueue | ||
if (count < 1000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is a good number to use here?
Test build #115214 has finished for PR 26858 at commit
|
I am afraid this PR is wrong.
Not for small dataset but for small |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm yeah might be worthwhile to optimize this case. I'd keep the size limit small. How much does it speed things up though?
modelDatasetWithDist | ||
// for a small dataset, use BoundedPriorityQueue | ||
if (count < 1000) { | ||
val queue = new BoundedPriorityQueue[Double](count.toInt)(Ordering[Double]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the queue just need numNearestNeighbors elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I should use numNearestNeighbors. Will fix this.
} | ||
var sortedDistCol = queue.toArray.sorted(Ordering[Double]) | ||
queue.clear() | ||
modelDatasetWithDist.filter(col(distCol) <= sortedDistCol(numNearestNeighbors - 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might pull sortedDistCol(numNearestNeighbors - 1)
into a val
or else this has to send the whole array. (You don't have to clear the queue)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this. Thanks!
// for a small dataset, use BoundedPriorityQueue | ||
if (count < 1000) { | ||
val queue = new BoundedPriorityQueue[Double](count.toInt)(Ordering[Double]) | ||
modelDatasetWithDist.collect().foreach { case Row(keys, values, distCol: Double) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keys and values can be _
@zhengruifeng I think the logic does have to depend on the size of the dataset; you don't want to collect 10M elements to find 10 nearest neighbors. But I do think the priority queue size needs to be numNearestNeighbors, yes, if that's what you mean. |
modelDatasetWithDist | ||
// for a small dataset, use BoundedPriorityQueue | ||
if (count < 1000) { | ||
val queue = new BoundedPriorityQueue[Double](count.toInt)(Ordering[Double]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place should be like:
val exactThreshold = modelDatasetWithDist
.select(distCol)
.as[Double]
.rdd
.treeAggregate(new BoundedPriorityQueue[Double](numNearestNeighbors)(Ordering[Double].reverse))(
seqOp= (q, v) => q += v,
combOp = (q1, q2) => q1 ++= q2,
depth = 2
).toArray.max
And this impl should have no dependency on the size of dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only depends on numNearestNeighbors
, when it is small (maybe < 10000?).
On each partition, collect the minmum 10 values, and merge them by treeAggregate
to get the global minmum 10 values, and the max value in them is the threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoundedPriorityQueue
only maintains the topK entries, so it is safe to absorb a lot of entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approxQuantile already kind of works this way, so I think the point of this PR is avoiding several passes of Spark jobs for tree reduce in this case.
However it's a fair point, I wonder if, overall, this approach is faster than approxQuantile? it already does something like what you're suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a small test. I used existing BucketedRandomProjectionLSHSuite
but made the dataset bigger:
val data = {
for (i <- -200 until 200; j <- -200 until 200) yield Vectors.dense(i*10, j*10)
}
dataset = spark.createDataFrame(data1.map(Tuple1.apply)).toDF("keys")
So the dataset count is 160000 and I tested 10000, 9000, 8000, 7000, 6000, 5000, 4000, 3000, 2000 and 1000 nearest neighbors. I didn't see any performance gain using BoundedPriorityQueue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrongly thought that the approxNearestNeighbors
only return an approximate threshold, then we can use top-k to obtain an exact threshold.
Since the approxNearestNeighbors
already gaurantee an enough threshold which had already taken the relative error into account, so I guess we no longer need a top-k solution.
A BoundedPriorityQueue
only maintains the topK entries, so it should be much smaller than a QuantileSummaries
, however since there is only one column to process, so there should be no performance gain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrongly thought that the approxNearestNeighbors
only return an approximate threshold, then we can use top-k to obtain an exact threshold.
Since the approxNearestNeighbors
already gaurantee an enough threshold which had already taken the relative error into account, so I guess we no longer need a top-k solution.
A BoundedPriorityQueue
only maintains the topK entries, so it should be much smaller than a QuantileSummaries
, however since there is only one column to process, so there should be no performance gain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A slight performance gain may come from that BoundedPriorityQueue
do not need a count
job to compute the var approxQuantile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree just implementing this in terms of BoundedPriorityQueue is also a viable solution. I don't know which one is faster. I had assumed the approximate quantile would be as I think it does less work overall, but I haven't tested it. That is, I think it will hang on to fewer than k entries per partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to do some performance tests, however I found that in the public approxNearestNeighbors
methods , singleProbe
is always true for now, so our changes in LSH can not be reached in the code path.
I guess we do not need BoundedPriorityQueue any more, and maybe OK to close this PR? |
I think the question here is different from in #26948 : is it worthwhile to avoid the distributed operation entirely if the data set is small? It may or may not be faster, but it's more exact. I guess I'm neutral on it, just because it adds some complexity for a little gain, but I am not against it. |
I will close this PR. Thanks for reviewing! |
What changes were proposed in this pull request?
Use BoundedPriorityQueue for small datasets in
LSH.approxNearestNeighbors
Why are the changes needed?
For small datasets, we can get exact result instead of using
approxQuantile
Does this PR introduce any user-facing change?
no
How was this patch tested?
Use existing unit tests