-
Notifications
You must be signed in to change notification settings - Fork 24.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure query resources are fetched asynchronously during rewrite #25791
Changes from 12 commits
7856cec
ceea610
939222a
fe5435a
3fd130e
31aa7e9
52736a1
bb1d9c2
fd16b9b
36f233f
68c2908
4faa7d2
ebc9ec7
c080c70
aa0579c
d75def8
601d831
82ee01f
e008b7e
cd5ef94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,34 +18,33 @@ | |
*/ | ||
package org.elasticsearch.index.query; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.util.concurrent.CountDown; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.LongSupplier; | ||
|
||
/** | ||
* Context object used to rewrite {@link QueryBuilder} instances into simplified version. | ||
*/ | ||
public class QueryRewriteContext { | ||
|
||
private final NamedXContentRegistry xContentRegistry; | ||
protected final Client client; | ||
protected final LongSupplier nowInMillis; | ||
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>(); | ||
|
||
|
||
public QueryRewriteContext(NamedXContentRegistry xContentRegistry, Client client, LongSupplier nowInMillis) { | ||
this.xContentRegistry = xContentRegistry; | ||
this.client = client; | ||
this.nowInMillis = nowInMillis; | ||
} | ||
|
||
/** | ||
* Returns a clients to fetch resources from local or remove nodes. | ||
*/ | ||
public Client getClient() { | ||
return client; | ||
} | ||
|
||
/** | ||
* The registry used to build new {@link XContentParser}s. Contains registered named parsers needed to parse the query. | ||
*/ | ||
|
@@ -63,4 +62,41 @@ public long nowInMillis() { | |
public QueryShardContext convertToShardContext() { | ||
return null; | ||
} | ||
|
||
public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncAction) { | ||
asyncActions.add(asyncAction); | ||
} | ||
|
||
public boolean hasAsyncActions() { | ||
return asyncActions.isEmpty() == false; | ||
} | ||
|
||
public void executeAsyncActions(ActionListener listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if (asyncActions.isEmpty()) { | ||
listener.onResponse(null); | ||
} else { | ||
CountDown done = new CountDown(asyncActions.size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe call it countDown instead of done? |
||
ActionListener internalListener = new ActionListener() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
@Override | ||
public void onResponse(Object o) { | ||
if (done.countDown()) { | ||
listener.onResponse(null); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (done.fastForward()) { | ||
listener.onFailure(e); | ||
} | ||
} | ||
}; | ||
ArrayList<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
asyncActions.clear(); | ||
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) { | ||
action.accept(client, internalListener); | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if
AtomicReference
is needed here given that we write from a single thread. It's just a visibility problem henceSetOnce
would be a good fit, which has also the advantage of checking that we do set it only once (thanks for the suggestion!)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that already thanks for the idea