Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure query resources are fetched asynchronously during rewrite #25791

Merged
merged 20 commits into from
Jul 20, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.BiFunction;
import java.util.function.Supplier;

Expand Down Expand Up @@ -317,23 +318,57 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);

transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});

}
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
});
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
Expand Down Expand Up @@ -389,8 +424,8 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);

// this is super cheap and should not hit thread-pool rejections
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
false, true, new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
boolean canMatch = searchService.canMatch(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void validateAliasFilter(String alias, byte[] filter, QueryShardContext q

private static void validateAliasFilter(XContentParser parser, QueryShardContext queryShardContext) throws IOException {
QueryBuilder parseInnerQueryBuilder = parseInnerQueryBuilder(parser);
QueryBuilder queryBuilder = Rewriteable.rewrite(parseInnerQueryBuilder, queryShardContext);
QueryBuilder queryBuilder = Rewriteable.rewrite(parseInnerQueryBuilder, queryShardContext, true);
queryBuilder.toFilter(queryShardContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.spatial.prefix.RecursivePrefixTreeStrategy;
import org.apache.lucene.spatial.query.SpatialArgs;
import org.apache.lucene.spatial.query.SpatialOperation;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
Expand All @@ -47,7 +48,11 @@
import org.elasticsearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* {@link QueryBuilder} that builds a GeoShape Query
Expand Down Expand Up @@ -77,6 +82,7 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
private final String fieldName;

private final ShapeBuilder shape;
private final Supplier<ShapeBuilder> supplier;

private SpatialStrategy strategy;

Expand Down Expand Up @@ -133,6 +139,15 @@ private GeoShapeQueryBuilder(String fieldName, ShapeBuilder shape, String indexe
this.shape = shape;
this.indexedShapeId = indexedShapeId;
this.indexedShapeType = indexedShapeType;
this.supplier = null;
}

private GeoShapeQueryBuilder(String fieldName, Supplier<ShapeBuilder> supplier, String indexedShapeId, String indexedShapeType) {
this.fieldName = fieldName;
this.shape = null;
this.supplier = supplier;
this.indexedShapeId = indexedShapeId;
this.indexedShapeType = indexedShapeType;
}

/**
Expand All @@ -155,10 +170,14 @@ public GeoShapeQueryBuilder(StreamInput in) throws IOException {
relation = ShapeRelation.readFromStream(in);
strategy = in.readOptionalWriteable(SpatialStrategy::readFromStream);
ignoreUnmapped = in.readBoolean();
supplier = null;
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
if (supplier != null) {
throw new IllegalStateException("supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?");
}
out.writeString(fieldName);
boolean hasShape = shape != null;
out.writeBoolean(hasShape);
Expand Down Expand Up @@ -312,7 +331,7 @@ public boolean ignoreUnmapped() {

@Override
protected Query doToQuery(QueryShardContext context) {
if (shape == null) {
if (shape == null || supplier != null) {
throw new UnsupportedOperationException("query must be rewritten first");
}
final ShapeBuilder shapeToQuery = shape;
Expand Down Expand Up @@ -361,47 +380,59 @@ protected Query doToQuery(QueryShardContext context) {
* @param path
* Name or path of the field in the Shape Document where the
* Shape itself is located
* @return Shape with the given ID
* @throws IOException
* Can be thrown while parsing the Shape Document and extracting
* the Shape
*/
private ShapeBuilder fetch(Client client, GetRequest getRequest, String path) throws IOException {
private void fetch(Client client, GetRequest getRequest, String path, ActionListener<ShapeBuilder> listener) {
if (ShapesAvailability.JTS_AVAILABLE == false) {
throw new IllegalStateException("JTS not available");
}
getRequest.preference("_local");
getRequest.operationThreaded(false);
GetResponse response = client.get(getRequest).actionGet();
if (!response.isExists()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type() + "] not found");
}
if (response.isSourceEmpty()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type() +
"] source disabled");
}
client.get(getRequest, new ActionListener<GetResponse>(){

@Override
public void onResponse(GetResponse response) {
try {
if (!response.isExists()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type()
+ "] not found");
}
if (response.isSourceEmpty()) {
throw new IllegalArgumentException("Shape with ID [" + getRequest.id() + "] in type [" + getRequest.type() +
"] source disabled");
}

String[] pathElements = path.split("\\.");
int currentPathSlot = 0;

// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, response.getSourceAsBytesRef())) {
XContentParser.Token currentToken;
while ((currentToken = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (currentToken == XContentParser.Token.FIELD_NAME) {
if (pathElements[currentPathSlot].equals(parser.currentName())) {
parser.nextToken();
if (++currentPathSlot == pathElements.length) {
return ShapeBuilder.parse(parser);
String[] pathElements = path.split("\\.");
int currentPathSlot = 0;

// It is safe to use EMPTY here because this never uses namedObject
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, response.getSourceAsBytesRef())) {
XContentParser.Token currentToken;
while ((currentToken = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (currentToken == XContentParser.Token.FIELD_NAME) {
if (pathElements[currentPathSlot].equals(parser.currentName())) {
parser.nextToken();
if (++currentPathSlot == pathElements.length) {
listener.onResponse(ShapeBuilder.parse(parser));
}
} else {
parser.nextToken();
parser.skipChildren();
}
}
}
} else {
parser.nextToken();
parser.skipChildren();
throw new IllegalStateException("Shape with name [" + getRequest.id() + "] found but missing " + path + " field");
}
} catch (Exception e) {
onFailure(e);
}
}
throw new IllegalStateException("Shape with name [" + getRequest.id() + "] found but missing " + path + " field");
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

}

public static SpatialArgs getArgs(ShapeBuilder shape, ShapeRelation relation) {
Expand Down Expand Up @@ -573,14 +604,15 @@ protected boolean doEquals(GeoShapeQueryBuilder other) {
&& Objects.equals(indexedShapeType, other.indexedShapeType)
&& Objects.equals(relation, other.relation)
&& Objects.equals(shape, other.shape)
&& Objects.equals(supplier, other.supplier)
&& Objects.equals(strategy, other.strategy)
&& Objects.equals(ignoreUnmapped, other.ignoreUnmapped);
}

@Override
protected int doHashCode() {
return Objects.hash(fieldName, indexedShapeId, indexedShapeIndex,
indexedShapePath, indexedShapeType, relation, shape, strategy, ignoreUnmapped);
indexedShapePath, indexedShapeType, relation, shape, strategy, ignoreUnmapped, supplier);
}

@Override
Expand All @@ -589,11 +621,21 @@ public String getWriteableName() {
}

@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
if (this.shape == null) {
GetRequest getRequest = new GetRequest(indexedShapeIndex, indexedShapeType, indexedShapeId);
ShapeBuilder shape = fetch(queryShardContext.getClient(), getRequest, indexedShapePath);
return new GeoShapeQueryBuilder(this.fieldName, shape).relation(relation).strategy(strategy);
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
if (supplier != null) {
return supplier.get() == null ? this : new GeoShapeQueryBuilder(this.fieldName, supplier.get()).relation(relation).strategy
(strategy);
} else if (this.shape == null) {
AtomicReference<ShapeBuilder> supplier = new AtomicReference<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if AtomicReference is needed here given that we write from a single thread. It's just a visibility problem hence SetOnce would be a good fit, which has also the advantage of checking that we do set it only once (thanks for the suggestion!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that already thanks for the idea

queryRewriteContext.registerAsyncAction((client, listener) -> {
GetRequest getRequest = new GetRequest(indexedShapeIndex, indexedShapeType, indexedShapeId);
fetch(client, getRequest, indexedShapePath, ActionListener.wrap(builder-> {
supplier.set(builder);
listener.onResponse(null);
}, listener::onFailure));
});
return new GeoShapeQueryBuilder(this.fieldName, supplier::get, this.indexedShapeId, this.indexedShapeType).relation(relation)
.strategy(strategy);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,33 @@
*/
package org.elasticsearch.index.query;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

/**
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
*/
public class QueryRewriteContext {

private final NamedXContentRegistry xContentRegistry;
protected final Client client;
protected final LongSupplier nowInMillis;
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();


public QueryRewriteContext(NamedXContentRegistry xContentRegistry, Client client, LongSupplier nowInMillis) {
this.xContentRegistry = xContentRegistry;
this.client = client;
this.nowInMillis = nowInMillis;
}

/**
* Returns a clients to fetch resources from local or remove nodes.
*/
public Client getClient() {
return client;
}

/**
* The registry used to build new {@link XContentParser}s. Contains registered named parsers needed to parse the query.
*/
Expand All @@ -63,4 +62,41 @@ public long nowInMillis() {
public QueryShardContext convertToShardContext() {
return null;
}

public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncAction) {
asyncActions.add(asyncAction);
}

public boolean hasAsyncActions() {
return asyncActions.isEmpty() == false;
}

public void executeAsyncActions(ActionListener listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionListener<?> ?

if (asyncActions.isEmpty()) {
listener.onResponse(null);
} else {
CountDown done = new CountDown(asyncActions.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call it countDown instead of done?

ActionListener internalListener = new ActionListener() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionListener<?> here too?

@Override
public void onResponse(Object o) {
if (done.countDown()) {
listener.onResponse(null);
}
}

@Override
public void onFailure(Exception e) {
if (done.fastForward()) {
listener.onFailure(e);
}
}
};
ArrayList<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: List on the left side instead of ArrayList

asyncActions.clear();
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
action.accept(client, internalListener);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public ParsedQuery toQuery(QueryBuilder queryBuilder) {
private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction<QueryBuilder, Query, IOException> filterOrQuery) {
reset();
try {
QueryBuilder rewriteQuery = Rewriteable.rewrite(queryBuilder, this);
QueryBuilder rewriteQuery = Rewriteable.rewrite(queryBuilder, this, true);
return new ParsedQuery(filterOrQuery.apply(rewriteQuery), copyNamedQueries());
} catch(QueryShardException | ParsingException e ) {
throw e;
Expand Down Expand Up @@ -377,10 +377,9 @@ public final long nowInMillis() {
return super.nowInMillis();
}

@Override
public Client getClient() {
failIfFrozen(); // we somebody uses a terms filter with lookup for instance can't be cached...
return super.getClient();
return client;
}

public QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException {
Expand Down