Skip to content

Commit

Permalink
Internal: split internal free context request used after scroll and s…
Browse files Browse the repository at this point in the history
…earch

We currently use the same internal request when we need to free the search context after a search and a scroll. The two original requests though diverged after #6933 as `SearchRequest` implements `IndicesRequest` while `SearchScrollRequest` and `ClearScrollRequest` don't. That said, with #7319 we made `SearchFreeContextRequest` implement `IndicesRequest` by making it hold the original indices taken from the original request, which are null if the free context was originated by a scroll or by a clear scroll call, and that is why original indices are optional there.

This commit introduces a separate free context request and transport action for scroll, which doesn't hold original indices. The new action is only used against nodes that expose it, the previous action name will be used for nodes older than 1.4.0.Beta1.

As a result, in 1.4 we have a new `indices:data/read/search[free_context/scroll]` action that is equivalent to the previous `indices:data/read/search[free_context]` whose request implements now `IndicesRequest` and holds the original indices coming from the original request. The original indices in the latter requests can only be null during a rolling upgrade (already existing version checks make sure that serialization is bw compatible), when some nodes are still < 1.4.

Closes #7856
  • Loading branch information
javanna committed Sep 24, 2014
1 parent afe42cd commit 02adf3d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 21 deletions.
Expand Up @@ -57,6 +57,7 @@
*/
public class SearchServiceTransportAction extends AbstractComponent {

public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]";
public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]";
Expand Down Expand Up @@ -121,6 +122,7 @@ public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, Tr
this.clusterService = clusterService;
this.searchService = searchService;

transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler());
transportService.registerHandler(FREE_CONTEXT_ACTION_NAME, new SearchFreeContextTransportHandler());
transportService.registerHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsTransportHandler());
transportService.registerHandler(DFS_ACTION_NAME, new SearchDfsTransportHandler());
Expand Down Expand Up @@ -148,7 +150,14 @@ public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollReque
final boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(freed);
} else {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
//use the separate action for scroll when possible
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
} else {
//fallback to the previous action name if the new one is not supported by the node we are talking to.
//Do use the same request since it has the same binary format as the previous SearchFreeContextRequest (without the OriginalIndices addition).
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
}
}
}

Expand Down Expand Up @@ -550,52 +559,75 @@ public void run() {
}
}

static class SearchFreeContextRequest extends TransportRequest implements IndicesRequest {

static class ScrollFreeContextRequest extends TransportRequest {
private long id;
private OriginalIndices originalIndices;

SearchFreeContextRequest() {
ScrollFreeContextRequest() {
}

SearchFreeContextRequest(SearchRequest request, long id) {
super(request);
this.id = id;
this.originalIndices = new OriginalIndices(request);
ScrollFreeContextRequest(ClearScrollRequest request, long id) {
this((TransportRequest) request, id);
}

SearchFreeContextRequest(TransportRequest request, long id) {
private ScrollFreeContextRequest(TransportRequest request, long id) {
super(request);
this.id = id;
this.originalIndices = OriginalIndices.EMPTY;
}

public long id() {
return this.id;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
}
}

static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private OriginalIndices originalIndices;

SearchFreeContextRequest() {
}

SearchFreeContextRequest(SearchRequest request, long id) {
super(request, id);
this.originalIndices = new OriginalIndices(request);
}

@Override
public String[] indices() {
if (originalIndices == null) {
return null;
}
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
if (originalIndices == null) {
return null;
}
return originalIndices.indicesOptions();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
}

Expand Down Expand Up @@ -633,15 +665,12 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<SearchFreeContextRequest> {

private abstract class BaseFreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest> extends BaseTransportRequestHandler<FreeContextRequest> {
@Override
public SearchFreeContextRequest newInstance() {
return new SearchFreeContextRequest();
}
public abstract FreeContextRequest newInstance();

@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception {
public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
Expand All @@ -654,6 +683,20 @@ public String executor() {
}
}

class ScrollFreeContextTransportHandler extends BaseFreeContextTransportHandler<ScrollFreeContextRequest> {
@Override
public ScrollFreeContextRequest newInstance() {
return new ScrollFreeContextRequest();
}
}

class SearchFreeContextTransportHandler extends BaseFreeContextTransportHandler<SearchFreeContextRequest> {
@Override
public SearchFreeContextRequest newInstance() {
return new SearchFreeContextRequest();
}
}

static class ClearScrollContextsRequest extends TransportRequest {

ClearScrollContextsRequest() {
Expand Down
Expand Up @@ -36,7 +36,9 @@
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.termvector.TermVectorResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
Expand Down Expand Up @@ -704,6 +706,49 @@ public void testMultiGet() throws ExecutionException, InterruptedException {

}

@Test
public void testScroll() throws ExecutionException, InterruptedException {
createIndex("test");
ensureYellow("test");

int numDocs = iterations(10, 100);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + Integer.toString(i));
}
indexRandom(true, indexRequestBuilders);

int size = randomIntBetween(1, 10);
SearchRequestBuilder searchRequestBuilder = client().prepareSearch("test").setScroll("1m").setSize(size);
boolean scan = randomBoolean();
if (scan) {
searchRequestBuilder.setSearchType(SearchType.SCAN);
}

SearchResponse searchResponse = searchRequestBuilder.get();
assertThat(searchResponse.getScrollId(), notNullValue());
assertHitCount(searchResponse, numDocs);
int hits = 0;
if (scan) {
assertThat(searchResponse.getHits().getHits().length, equalTo(0));
} else {
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
hits += searchResponse.getHits().getHits().length;
}

try {
do {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll("1m").get();
assertThat(searchResponse.getScrollId(), notNullValue());
assertHitCount(searchResponse, numDocs);
hits += searchResponse.getHits().getHits().length;
} while (searchResponse.getHits().getHits().length > 0);
assertThat(hits, equalTo(numDocs));
} finally {
clearScroll(searchResponse.getScrollId());
}
}

private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -138,5 +139,6 @@ private static boolean isActionNotFoundExpected(Version version, String action)

actionsVersions.put(UnicastZenPing.ACTION_NAME_GTE_1_4, Version.V_1_4_0_Beta1);

actionsVersions.put(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME, Version.V_1_4_0_Beta1);
}
}
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.bench.BenchmarkStatusAction;
import org.elasticsearch.action.exists.ExistsAction;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand Down Expand Up @@ -141,5 +142,6 @@ public void testIncomingAction() {
post_1_4_actions.add(ExistsAction.NAME + "[s]");
post_1_4_actions.add(GetIndexAction.NAME);
post_1_4_actions.add(UnicastZenPing.ACTION_NAME_GTE_1_4);
post_1_4_actions.add(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME);
}
}

0 comments on commit 02adf3d

Please sign in to comment.