Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared PQ Based Early Termination for Concurrent Search #854

Open
wants to merge 2 commits into
base: master
from

Conversation

@atris
Copy link
Contributor

commented Sep 4, 2019

NOCOMMIT.

This is a WIP PR which implements a shared PQ based early termination. Each collector collects independently into a thread local PQ and updates a global count of hits (which will be refactored post merging of LUCENE-8939).

Once enough hits are accumulated, a global PQ is populated by all collectors and then the global PQ serves as the benchmark to filter further hits.

Several optimizations have been performed, such as non blocking building of global PQ, no reduce operation performed during CollectorManager.reduce but rather, returning results directly from the global PQ.

I need some eyes on the overall logic, and especially at two points: 1) Across segment values comparison and 2) Local to global DocID mapping during interactions of thread local PQs and global PQ. I am pretty sure there is a bug in either of the two, so would really appreciate a deep look.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 4, 2019

cc @jpountz.

This PR builds on top of #831

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 4, 2019

I thought that the follow up for LUCENE-8939 would be to allow the sharing of the minimum score (could be extended to a minimum FieldDoc) across slices ? Sharing the minimum score (or minimum FieldDoc) requires very little synchronization while a global priority queue seems much more costly. The other advantage is that we could add this ability in the current topdocs collector like we did for LUCENE-8939.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 5, 2019

I thought that the follow up for LUCENE-8939 would be to allow the sharing of the minimum score (could be extended to a minimum FieldDoc) across slices ? Sharing the minimum score (or minimum FieldDoc) requires very little synchronization while a global priority queue seems much more costly. The other advantage is that we could add this ability in the current topdocs collector like we did for LUCENE-8939.

@jimczi Yes, that is the third follow up PR that I am working on presently. The advantage of a shared priority queue is that it allows accurate count of hits. For eg, if a global hit is bested, then in a shared global priority queue, it can be replaced by the better hit. However, in the case where we do not share the priority queue, the hit which just got bested will be hard to replace (since it is in a thread local queue). The easiest way would be to let that hit live as well and collect the new hit, and then let TopDocs.merge do the job.

I think that is a great approach to have and many usecases will be satisfied by that. However, for a case of very large hits or when the user wants precise counting, we might need to go with the global shared PQ. So, maybe we should have both approaches? WDYT?

Copy link
Member

left a comment

Sharing the minimum score (or minimum FieldDoc) requires very little synchronization while a global priority queue seems much more costly.

Is this proposal to continue collecting the full N per slice (thread work unit), until some/all of them have a full (top N) PQ and at that point you sync to find the "best bottom" across all of them? And every time one of the threads finds a newly competitive hit, increasing its bottom, we sync again to find the best bottom?

This would still require sync-per-insert, and would require collecting top N per thread instead of top N overall? It's hard to judge which is the better tradeoff w/o building both and running perf tests :)

I think yet another (future) option is to explore lockless PQ implementations, e.g. https://tstentz.github.io/418proposal/

valuesArray = new Object[1];
valuesArray[0] = value;

bottomValuesArray = new Object[1];

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

Hmm, spooky we must allocate two Object[] per collected hit in this case ...

bottomValuesArray[0] = bottomValues;
}

int cmp;

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

Move the int cmp declaration down to where cmp is assigned?

return newTopDocs(results);
}

public int compareAndUpdateBottom(int docBase, int doc, Object value) {

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

private? And add javadoc explaining what this returns?

* Indicates that input has ended for the collector. This allows the collector to perform
* post processing (if any).
*/
default void postProcess() {

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

Could we rename this to something like finishLeaf?

Entry returnedEntry = earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(this, docBase);

if (returnedEntry != null) {
if (returnedEntry != null) {

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

You can remove one of these if (returnedEntry != null) {.

return;
}

updateMinCompetitiveScore(scorer);

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

Why can we updateMinCompetitiveScore here? The global queue may not yet be full at this point?

}
}
}
addedSelfToGlobalQueue = true;

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

I wish there were some way to swap in a different collector implementation once we absorb ourselves into the global queue ... the behavior of this collector is so modal depending on that.

++totalHits;

comparator.copy(slot, doc);
add(slot, doc, comparator.leafValue(doc));

This comment has been minimized.

Copy link
@mikemccand

mikemccand Sep 11, 2019

Member

The fact that we need leafValue on every insert will make this threaded implementation more costly ... but that is a good tradeoff for use case that benefit from multi-threaded search.

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

Is this proposal to continue collecting the full N per slice (thread work unit), until some/all of them have a full (top N) PQ and at that point you sync to find the "best bottom" across all of them? And every time one of the threads finds a newly competitive hit, increasing its bottom, we sync again to find the best bottom?

Yes that's another option that we should investigate imo.

This would still require sync-per-insert, and would require collecting top N per thread instead of top N overall? It's hard to judge which is the better tradeoff w/o building both and running perf tests :)

If we keep separate pq we'd not need sync-per-insert and could use a volatile to record the shared min score within the top docs collectors. It seems appealing to me because the top N should be small so the overhead of collecting a full topN per slice is small compared to the converging of the min score which can be completely different per slice.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

Is this proposal to continue collecting the full N per slice (thread work unit), until some/all of them have a full (top N) PQ and at that point you sync to find the "best bottom" across all of them? And every time one of the threads finds a newly competitive hit, increasing its bottom, we sync again to find the best bottom?

Yes that's another option that we should investigate imo.

Curious, why do we need a full local PQ before publishing the global min score? We can build upon the approach that HitsThresholdChecker exposes, and publish the bottom once we globally collect enough hits?

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

I think that you need to ensure that the slice publishing the new min score is full, otherwise the min score cannot be the global min score. If the other slices are not full they can use the new min score immediately but that's assuming that all slices have the same topN size.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

I think that you need to ensure that the slice publishing the new min score is full, otherwise the min score cannot be the global min score. If the other slices are not full they can use the new min score immediately but that's assuming that all slices have the same topN size.

How about all slices continuously publishing their minimum score in a shared state but without synchronization (maybe a shared array of numSlices length?). Once globally, numHits are collected, we do a pass through the array and pick the minimum? This should have minimal memory footprint since the number of slices should be a small value

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

I am not sure that synchronization would be an issue in this model since the update of the global min score should be quick, we can also use double checked locking or use a volatile. But it's hard to say without benchmarking as Mike said. It would be easier to discuss with a poc to compare with the current approach in this pr. I can work on one if you want to concentrate on the shared pq path ?

Once globally, numHits are collected, we do a pass through the array and pick the minimum?

If numHits means the global threshold then it's just one condition. The slice must also have a full pq to publish a min score and other slices can use it if they need to collect the same number of top docs.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

I am not sure that synchronization would be an issue in this model since the update of the global min score should be quick, we can also use double checked locking or use a volatile. But it's hard to say without benchmarking as Mike said.

Agreed.

I can work on one if you want to concentrate on the shared pq path ?

Thanks for offering. I have a skeletal PR in flight for this approach that I plan to publish tomorrow -- maybe we can iterate on that?

If numHits means the global threshold then it's just one condition. The slice must also have a full pq to publish a min score and other slices can use it if they need to collect the same number of top docs.

I am sure I am missing something here. If the user requested top N hits, then all slices can keep collecting hits in their thread local PQs and update a global counter to reflect if total hits collected globally has reached N. Once we have reached N globally, each collector can publish the value of the bottom of their thread local PQ. The minimum of all such values will be our global minimum score, since we know that, collectively, we have N hits available. Post that, all collectors will use the global minimum score to filter hits. If, a collector finds a competitive hit, it adds it to the local queue, updates its local minimum score and triggers a resync, where the minimum of all minimum scores (if that makes sense) is taken and kept as the global worst hit.

Maintaining the per collector worst score and calculating the minimum of them all can be done by storing those values in a global state. We could store them in an array, but then, each new competitive hit will incur an O(n) penalty for calculating the worst score. Another option is storing them in a heap, but it would require a Fibonacci heap to be able to change the value of a key without replacing the node.

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

Thanks for offering. I have a skeletal PR in flight for this approach that I plan to publish tomorrow -- maybe we can iterate on that?

Sure, thanks

I am sure I am missing something here. If the user requested top N hits, then all slices can keep collecting hits in their thread local PQs and update a global counter to reflect if total hits collected globally has reached N. Once we have reached N globally, each collector can publish the value of the bottom of their thread local PQ. The minimum of all such values will be our global minimum score, since we know that, collectively, we have N hits available. Post that, all collectors will use the global minimum score to filter hits. If, a collector finds a competitive hit, it adds it to the local queue, updates its local minimum score and triggers a resync, where the minimum of all minimum scores (if that makes sense) is taken and kept as the global worst hit.

I think it would be simpler to keep the maximum minimum score on each slice. Each time a slice publish a new minimum score we can broadcast a listener to all the other top docs collector that would update their local minimum score if needed. Synchronization shouldn't be the bottleneck here but happy to be proven wrong.
The global counter of total hits must be reached to publish any minimum score but the publisher must also ensure that his local pq is full before publishing since it is possible to reach the total hits threshold while none of the local pq are completely filled so this would break the contract.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

I think it would be simpler to keep the maximum minimum score on each slice. Each time a slice publish a new minimum score we can broadcast a listener to all the other top docs collector that would update their local minimum score if needed.

Sure, that is a viable approach.

The global counter of total hits must be reached to publish any minimum score

Agreed

the publisher must also ensure that his local pq is full before publishing

I am still missing the point as to why is it necessary?

since it is possible to reach the total hits threshold while none of the local pq are completely filled so this would break the contract.

Which contract would that be? I believe that today, the contract that we call updateMinCompetitiveScore only when the PQ is full is implied (since there is only one PQ) but I am still not able to understand why that is mandatory. Is there something that I missing?

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

Which contract would that be? I believe that today, the contract that we call updateMinCompetitiveScore only when the PQ is full is implied (since there is only one PQ) but I am still not able to understand why that is mandatory. Is there something that I missing?

We need to ensure that the minimum score of the slice can be set as the maximum minimum score for all slices, however if the slice is not full this means that some document with smaller scores can still enter the top N no matter what the top hits are in the other slices. So if the total hit threshold is 10 for instance and you ask for 2 top documents per slice, you cannot set the minimum score if only 1 document is inserted in each slice otherwise you can miss documents.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

We need to ensure that the minimum score of the slice can be set as the maximum minimum score for all slices, however if the slice is not full this means that some document with smaller scores can still enter the top N no matter what the top hits are in the other slices. So if the total hit threshold is 10 for instance and you ask for 2 top documents per slice, you cannot set the minimum score if only 1 document is inserted in each slice otherwise you can miss documents.

To clarify my understanding, consider the following case:

Top 10 hits have been requested, with totalHitsThreshold being 25. Number of collectors is 3.

Assume that we have reached 10 hits globally, with a random distribution of hits between collectors. Consider collector 1 has 3, Collector 2 has 6 and Collector 3 has 1 hit. Whenever a collector sees that numHits have been collected globally (for the first time), then it broadcasts its bottom PQ score to all collectors, and the collectors can update their bottom score if needed.
Consider the global bottom value to be X.

Now, for each collector, a document will be added to the local PQ only if the document has a score higher than X. If, theoretically, no document in any thread exceeds X, then we will cumulatively still have numHits. If there is a competitive hit, then we can add it to the thread local PQ and broadcast the minimum score.

However, if a priority queue is full, then for a further hit to be inserted in the queue, it needs to be better than that local bottom PQ's score i.e. just being better than global bottom will not suffice, but it needs to be better than the local bottom.

Makes sense?

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

I think we're talking of different approaches, hence the confusion. It is correct that we can start setting the minimum score when the global count of document that we collected reaches the requested size but if the local pqs are not full you can only use the minimum minimum score.
So the bottom score of the minimum scores.
Requiring a queue to be filled completely before publishing a minimum score allows to use the maximum minimum score among the slices that have a full pq. We can mix the two approaches, switching from the minimum minimum to the maximum minimum when pqs are filled but I wonder if this is really needed since topN is a small value ? Said differently I wonder if checking the global minimum score before a single pq is filled is a premature optimization ?

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 11, 2019

I think we're talking of different approaches, hence the confusion. It is correct that we can start setting the minimum score when the global count of document that we collected reaches the requested size but if the local pqs are not full you can only use the minimum minimum score.
So the bottom score of the minimum scores.
Requiring a queue to be filled completely before publishing a minimum score allows to use the maximum minimum score among the slices that have a full pq. We can mix the two approaches, switching from the minimum minimum to the maximum minimum when pqs are filled but I wonder if this is really needed since topN is a small value ? Said differently I wonder if checking the global minimum score before a single pq is filled is a premature optimization ?

I see your point. So, what you are proposing is that we basically allow only the PQs that are full to publish the minimum score, and if there are multiple full PQs publishing, take the maximum of all bottom scores as the global minimum?

Sounds like a fair approach -- I can post a patch to start with.

My only concern being (as you rightly captured) is the case when in Top N, N is significant. In that scenario, we might be sub optimal here. But, nevertheless, this approach will satisfy a large number of usecases.

RE: synchronization, what are your thoughts on using a global shared array where each collector publishes its bottom value vs message passing?

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

RE: synchronization, what are your thoughts on using a global shared array where each collector publishes its bottom value vs message passing?

Sure that's a good way to keep track of the maximum minimum score without synchronization.
However I think only benchmark can reveal the cost of this so my main concern is the change in the API that is required to share the minimum score globally. If we can come up with an approach that keeps the current top score docs collector simple and contained but isn't optimized in terms of synchronization I'd favor simplicity unless we have clear evidence in benchmarks that we can do better ;).

@mikemccand

This comment has been minimized.

Copy link
Member

commented Sep 11, 2019

I agree the "maximum minimum" score (for all local PQs that are full) is compelling as it'd require minimum synchronization. Maybe it performs best overall for most use cases.

It's downside is that it's collecting N * M (N = requested topN, M = number of thread work units). When N is smallish (common case) maybe the tradeoff is worthwhile?

The "minimum minimum" score (once N hits have been collected sum'd across all thread work units) is closer to optimal, since it only collects N before skipping/pruning, but requires more work to update the "minimum minimum" any time any thread collects a newly competitive hit. If it's a simple array that's an O(M) scan each time; if it's a PQ then O(log(M)).

@msokolov

This comment has been minimized.

Copy link
Contributor

commented Sep 11, 2019

There is also the option of something in between. To avoid thread contention, could we just make uncoordinated read/write of a minimum minimum score. If a thread fails to write (its write is overwritten by some other thread with a worse minimum, still an improvement), all it costs you (or some other thread) is some extra work, perhaps cheaper than the coordination and priority queue or array scanning. You could also choose to write only occasionally, to further reduce contention.

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 12, 2019

There is also the option of something in between. To avoid thread contention, could we just make uncoordinated read/write of a minimum minimum score. If a thread fails to write (its write is overwritten by some other thread with a worse minimum, still an improvement), all it costs you (or some other thread) is some extra work, perhaps cheaper than the coordination and priority queue or array scanning.

Like a CAS? That can be a good idea.

I wonder if we are over stressing the time complexity of calculating the global minimum/maximum. Ideally, the number of slices should not be too high, so the worst case complexity of the calculation (O(m)) should be still ok. Another advantage of this approach is that there is no scope of a thread performing rework in case the CAS fails.

You could also choose to write only occasionally, to further reduce contention.

That could lead to potentially stale hits being collected?

@atris

This comment has been minimized.

Copy link
Contributor Author

commented Sep 12, 2019

RE: synchronization, what are your thoughts on using a global shared array where each collector publishes its bottom value vs message passing?

Sure that's a good way to keep track of the maximum minimum score without synchronization.
However I think only benchmark can reveal the cost of this so my main concern is the change in the API that is required to share the minimum score globally. If we can come up with an approach that keeps the current top score docs collector simple and contained but isn't optimized in terms of synchronization I'd favor simplicity unless we have clear evidence in benchmarks that we can do better ;).

@jimczi , RE: message passing, I am not sure how it would work efficiently. Would we want each collector to maintain its own message queue, and then a collector, in order to publish a bottom value, sends a message to every collector's queue? or do we have a global shared message queue?

I was thinking of a class similar to HitsThresholdChecker, allowing shared state, since, as you rightly said, the size of critical section should be small. WDYT?

@jimczi

This comment has been minimized.

Copy link
Contributor

commented Sep 12, 2019

I was thinking of a class similar to HitsThresholdChecker, allowing shared state, since, as you rightly said, the size of critical section should be small. WDYT?

This is a good idea, it could be used to publish the maximum minimum value and collectors would use it to check with their local minimum too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.