From f3c23af1931a5d334e5ad9b0a32d680d7763fdc3 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 24 Sep 2014 11:32:27 +0200 Subject: [PATCH] Internal: split internal free context request used after scroll and search We currently use the same internal request when we need to free the search context after a search and a scroll. The two original requests though diverged after #6933 as `SearchRequest` implements `IndicesRequest` while `SearchScrollRequest` and `ClearScrollRequest` don't. That said, with #7319 we made `SearchFreeContextRequest` implement `IndicesRequest` by making it hold the original indices taken from the original request, which are null if the free context was originated by a scroll or by a clear scroll call, and that is why original indices are optional there. This commit introduces a separate free context request and transport action for scroll, which doesn't hold original indices. The new action is only used against nodes that expose it, the previous action name will be used for nodes older than 1.4.0.Beta1. As a result, in 1.4 we have a new `indices:data/read/search[free_context/scroll]` action that is equivalent to the previous `indices:data/read/search[free_context]` whose request implements now `IndicesRequest` and holds the original indices coming from the original request. The original indices in the latter requests can only be null during a rolling upgrade (already existing version checks make sure that serialization is bw compatible), when some nodes are still < 1.4. Closes #7856 --- .../action/SearchServiceTransportAction.java | 85 ++++++++++++++----- .../BasicBackwardsCompatibilityTest.java | 45 ++++++++++ ...ActionNamesBackwardsCompatibilityTest.java | 2 + .../transport/ActionNamesTests.java | 2 + 4 files changed, 113 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 9fa8623bf81be..457a1520da420 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -57,6 +57,7 @@ */ public class SearchServiceTransportAction extends AbstractComponent { + public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]"; public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]"; @@ -121,6 +122,7 @@ public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, Tr this.clusterService = clusterService; this.searchService = searchService; + transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler()); transportService.registerHandler(FREE_CONTEXT_ACTION_NAME, new SearchFreeContextTransportHandler()); transportService.registerHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsTransportHandler()); transportService.registerHandler(DFS_ACTION_NAME, new SearchDfsTransportHandler()); @@ -148,7 +150,14 @@ public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollReque final boolean freed = searchService.freeContext(contextId); actionListener.onResponse(freed); } else { - transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); + if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) { + //use the separate action for scroll when possible + transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); + } else { + //fallback to the previous action name if the new one is not supported by the node we are talking to. + //Do use the same request since it has the same binary format as the previous SearchFreeContextRequest (without the OriginalIndices addition). + transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener)); + } } } @@ -550,52 +559,75 @@ public void run() { } } - static class SearchFreeContextRequest extends TransportRequest implements IndicesRequest { - + static class ScrollFreeContextRequest extends TransportRequest { private long id; - private OriginalIndices originalIndices; - SearchFreeContextRequest() { + ScrollFreeContextRequest() { } - SearchFreeContextRequest(SearchRequest request, long id) { - super(request); - this.id = id; - this.originalIndices = new OriginalIndices(request); + ScrollFreeContextRequest(ClearScrollRequest request, long id) { + this((TransportRequest) request, id); } - SearchFreeContextRequest(TransportRequest request, long id) { + private ScrollFreeContextRequest(TransportRequest request, long id) { super(request); this.id = id; - this.originalIndices = OriginalIndices.EMPTY; } public long id() { return this.id; } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); + } + } + + static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { + private OriginalIndices originalIndices; + + SearchFreeContextRequest() { + } + + SearchFreeContextRequest(SearchRequest request, long id) { + super(request, id); + this.originalIndices = new OriginalIndices(request); + } + @Override public String[] indices() { + if (originalIndices == null) { + return null; + } return originalIndices.indices(); } @Override public IndicesOptions indicesOptions() { + if (originalIndices == null) { + return null; + } return originalIndices.indicesOptions(); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readLong(); - originalIndices = OriginalIndices.readOptionalOriginalIndices(in); + originalIndices = OriginalIndices.readOriginalIndices(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(id); - OriginalIndices.writeOptionalOriginalIndices(originalIndices, out); + OriginalIndices.writeOriginalIndices(originalIndices, out); } } @@ -633,15 +665,12 @@ public void writeTo(StreamOutput out) throws IOException { } } - class SearchFreeContextTransportHandler extends BaseTransportRequestHandler { - + private abstract class BaseFreeContextTransportHandler extends BaseTransportRequestHandler { @Override - public SearchFreeContextRequest newInstance() { - return new SearchFreeContextRequest(); - } + public abstract FreeContextRequest newInstance(); @Override - public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception { + public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception { boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); } @@ -654,6 +683,20 @@ public String executor() { } } + class ScrollFreeContextTransportHandler extends BaseFreeContextTransportHandler { + @Override + public ScrollFreeContextRequest newInstance() { + return new ScrollFreeContextRequest(); + } + } + + class SearchFreeContextTransportHandler extends BaseFreeContextTransportHandler { + @Override + public SearchFreeContextRequest newInstance() { + return new SearchFreeContextRequest(); + } + } + static class ClearScrollContextsRequest extends TransportRequest { ClearScrollContextsRequest() { diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index e824c74da8c9a..13e11fce7c2c0 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -36,7 +36,9 @@ import org.elasticsearch.action.get.*; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.termvector.TermVectorResponse; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; @@ -704,6 +706,49 @@ public void testMultiGet() throws ExecutionException, InterruptedException { } + @Test + public void testScroll() throws ExecutionException, InterruptedException { + createIndex("test"); + ensureYellow("test"); + + int numDocs = iterations(10, 100); + IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + Integer.toString(i)); + } + indexRandom(true, indexRequestBuilders); + + int size = randomIntBetween(1, 10); + SearchRequestBuilder searchRequestBuilder = client().prepareSearch("test").setScroll("1m").setSize(size); + boolean scan = randomBoolean(); + if (scan) { + searchRequestBuilder.setSearchType(SearchType.SCAN); + } + + SearchResponse searchResponse = searchRequestBuilder.get(); + assertThat(searchResponse.getScrollId(), notNullValue()); + assertHitCount(searchResponse, numDocs); + int hits = 0; + if (scan) { + assertThat(searchResponse.getHits().getHits().length, equalTo(0)); + } else { + assertThat(searchResponse.getHits().getHits().length, greaterThan(0)); + hits += searchResponse.getHits().getHits().length; + } + + try { + do { + searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll("1m").get(); + assertThat(searchResponse.getScrollId(), notNullValue()); + assertHitCount(searchResponse, numDocs); + hits += searchResponse.getHits().getHits().length; + } while (searchResponse.getHits().getHits().length > 0); + assertThat(hits, equalTo(numDocs)); + } finally { + clearScroll(searchResponse.getScrollId()); + } + } + private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } diff --git a/src/test/java/org/elasticsearch/transport/ActionNamesBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/transport/ActionNamesBackwardsCompatibilityTest.java index 8dae7a473d7ee..9f2b101b85a3c 100644 --- a/src/test/java/org/elasticsearch/transport/ActionNamesBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/transport/ActionNamesBackwardsCompatibilityTest.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.threadpool.ThreadPool; @@ -128,5 +129,6 @@ private static boolean isActionNotFoundExpected(Version version, String action) actionsVersions.put(UnicastZenPing.ACTION_NAME_GTE_1_4, Version.V_1_4_0_Beta1); + actionsVersions.put(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME, Version.V_1_4_0_Beta1); } } diff --git a/src/test/java/org/elasticsearch/transport/ActionNamesTests.java b/src/test/java/org/elasticsearch/transport/ActionNamesTests.java index 69cfddea39812..69bd945b5d9e8 100644 --- a/src/test/java/org/elasticsearch/transport/ActionNamesTests.java +++ b/src/test/java/org/elasticsearch/transport/ActionNamesTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.exists.ExistsAction; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; +import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; @@ -131,5 +132,6 @@ public void testIncomingAction() { post_1_4_actions.add(ExistsAction.NAME + "[s]"); post_1_4_actions.add(GetIndexAction.NAME); post_1_4_actions.add(UnicastZenPing.ACTION_NAME_GTE_1_4); + post_1_4_actions.add(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME); } }