Skip to content

Commit

Permalink
Add number of shards statistic to PercolateContext instead of throwin…
Browse files Browse the repository at this point in the history
…g exception.

Certain features like significant_terms aggregation rely on this statistic for sizing heuristics.

Closes #6037
Closes #6123
  • Loading branch information
martijnvg committed May 22, 2014
1 parent 16f94a2 commit f83fb72
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 16 deletions.
Expand Up @@ -107,7 +107,7 @@ protected ShardClearIndicesCacheRequest newShardRequest() {
}

@Override
protected ShardClearIndicesCacheRequest newShardRequest(ShardRouting shard, ClearIndicesCacheRequest request) {
protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) {
return new ShardClearIndicesCacheRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -99,7 +99,7 @@ protected ShardFlushRequest newShardRequest() {
}

@Override
protected ShardFlushRequest newShardRequest(ShardRouting shard, FlushRequest request) {
protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) {
return new ShardFlushRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -100,7 +100,7 @@ protected ShardOptimizeRequest newShardRequest() {
}

@Override
protected ShardOptimizeRequest newShardRequest(ShardRouting shard, OptimizeRequest request) {
protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) {
return new ShardOptimizeRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -136,7 +136,7 @@ protected ShardRecoveryRequest newShardRequest() {
}

@Override
protected ShardRecoveryRequest newShardRequest(ShardRouting shard, RecoveryRequest request) {
protected ShardRecoveryRequest newShardRequest(int numShards, ShardRouting shard, RecoveryRequest request) {
return new ShardRecoveryRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -100,7 +100,7 @@ protected ShardRefreshRequest newShardRequest() {
}

@Override
protected ShardRefreshRequest newShardRequest(ShardRouting shard, RefreshRequest request) {
protected ShardRefreshRequest newShardRequest(int numShards, ShardRouting shard, RefreshRequest request) {
return new ShardRefreshRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -124,7 +124,7 @@ protected IndexShardSegmentRequest newShardRequest() {
}

@Override
protected IndexShardSegmentRequest newShardRequest(ShardRouting shard, IndicesSegmentsRequest request) {
protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) {
return new IndexShardSegmentRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -125,7 +125,7 @@ protected IndexShardStatsRequest newShardRequest() {
}

@Override
protected IndexShardStatsRequest newShardRequest(ShardRouting shard, IndicesStatsRequest request) {
protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) {
return new IndexShardStatsRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -108,7 +108,7 @@ protected ShardValidateQueryRequest newShardRequest() {
}

@Override
protected ShardValidateQueryRequest newShardRequest(ShardRouting shard, ValidateQueryRequest request) {
protected ShardValidateQueryRequest newShardRequest(int numShards, ShardRouting shard, ValidateQueryRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardValidateQueryRequest(shard.index(), shard.id(), filteringAliases, request);
}
Expand Down
Expand Up @@ -112,7 +112,7 @@ protected ShardCountRequest newShardRequest() {
}

@Override
protected ShardCountRequest newShardRequest(ShardRouting shard, CountRequest request) {
protected ShardCountRequest newShardRequest(int numShards, ShardRouting shard, CountRequest request) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(shard.index(), request.indices());
return new ShardCountRequest(shard.index(), shard.id(), filteringAliases, request);
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.percolate;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -35,6 +36,7 @@ public class PercolateShardRequest extends BroadcastShardOperationRequest {
private BytesReference source;
private BytesReference docSource;
private boolean onlyCount;
private int numberOfShards;

public PercolateShardRequest() {
}
Expand All @@ -43,12 +45,13 @@ public PercolateShardRequest(String index, int shardId) {
super(index, shardId);
}

public PercolateShardRequest(String index, int shardId, PercolateRequest request) {
public PercolateShardRequest(String index, int shardId, int numberOfShards, PercolateRequest request) {
super(index, shardId, request);
this.documentType = request.documentType();
this.source = request.source();
this.docSource = request.docSource();
this.onlyCount = request.onlyCount();
this.numberOfShards = numberOfShards;
}

public PercolateShardRequest(ShardId shardId, PercolateRequest request) {
Expand Down Expand Up @@ -91,13 +94,20 @@ void onlyCount(boolean onlyCount) {
this.onlyCount = onlyCount;
}

public int getNumberOfShards() {
return numberOfShards;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
documentType = in.readString();
source = in.readBytesReference();
docSource = in.readBytesReference();
onlyCount = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
numberOfShards = in.readVInt();
}
}

@Override
Expand All @@ -107,6 +117,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(source);
out.writeBytesReference(docSource);
out.writeBoolean(onlyCount);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVInt(numberOfShards);
}
}

}
Expand Up @@ -173,8 +173,8 @@ protected PercolateShardRequest newShardRequest() {
}

@Override
protected PercolateShardRequest newShardRequest(ShardRouting shard, PercolateRequest request) {
return new PercolateShardRequest(shard.index(), shard.id(), request);
protected PercolateShardRequest newShardRequest(int numShards, ShardRouting shard, PercolateRequest request) {
return new PercolateShardRequest(shard.index(), shard.id(), numShards, request);
}

@Override
Expand Down
Expand Up @@ -93,7 +93,7 @@ protected ShardSuggestRequest newShardRequest() {
}

@Override
protected ShardSuggestRequest newShardRequest(ShardRouting shard, SuggestRequest request) {
protected ShardSuggestRequest newShardRequest(int numShards, ShardRouting shard, SuggestRequest request) {
return new ShardSuggestRequest(shard.index(), shard.id(), request);
}

Expand Down
Expand Up @@ -83,7 +83,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {

protected abstract ShardRequest newShardRequest();

protected abstract ShardRequest newShardRequest(ShardRouting shard, Request request);
protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);

protected abstract ShardResponse newShardResponse();

Expand Down Expand Up @@ -161,7 +161,7 @@ void performOperation(final ShardIterator shardIt, final ShardRouting shard, fin
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final ShardRequest shardRequest = newShardRequest(shard, request);
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
threadPool.executor(executor).execute(new Runnable() {
@Override
Expand Down
Expand Up @@ -93,6 +93,7 @@ public class PercolateContext extends SearchContext {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ConcurrentMap<HashedBytesRef, Query> percolateQueries;
private final int numberOfShards;
private String[] types;

private Engine.Searcher docSearcher;
Expand Down Expand Up @@ -127,6 +128,7 @@ public PercolateContext(PercolateShardRequest request, SearchShardTarget searchS
this.engineSearcher = indexShard.acquireSearcher("percolate");
this.searcher = new ContextIndexSearcher(this, engineSearcher);
this.scriptService = scriptService;
this.numberOfShards = request.getNumberOfShards();
}

public IndexSearcher docSearcher() {
Expand Down Expand Up @@ -327,7 +329,7 @@ public SearchContext searchType(SearchType searchType) {

@Override
public int numberOfShards() {
throw new UnsupportedOperationException();
return numberOfShards;
}

@Override
Expand Down
Expand Up @@ -37,6 +37,7 @@
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertMatchCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -122,4 +123,16 @@ public void testFacetsAndAggregations() throws Exception {
}
}

@Test
public void testSignificantAggs() throws Exception {
client().admin().indices().prepareCreate("test").execute().actionGet();
ensureGreen();
PercolateRequestBuilder percolateRequestBuilder = client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field1", "value").endObject()))
.addAggregation(AggregationBuilders.significantTerms("a").field("field2"));
PercolateResponse response = percolateRequestBuilder.get();
assertNoFailures(response);
}

}

0 comments on commit f83fb72

Please sign in to comment.