Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public FullClusterRestartSystemIndexCompatibilityIT(Version version) {
/**
* 1. creates an index on N-2 and performs async_search on it that is kept in system index
* 2. After update to N-1 (latest) perform a system index migration step, also write block the index
* 3. on N, check that async search results are still retrievable and we can write to the system index
* 3. on N, check that N-1 search results are still retrievable and we can write to the system index
*/
public void testAsyncSearchIndexMigration() throws Exception {
final String index = suffix("index");
Expand Down Expand Up @@ -112,7 +112,7 @@ public void testAsyncSearchIndexMigration() throws Exception {

if (isFullyUpgradedTo(VERSION_CURRENT)) {
assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2));
assertAsyncSearchHitCount(async_search_ids.get("n-2_id"), numDocs);
// n-2 results should no longer be readable
assertAsyncSearchHitCount(async_search_ids.get("n-1_id"), numDocs);

// check system index is still writeable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ public int read() {
});
TransportVersion version = TransportVersion.readVersion(new InputStreamStreamInput(encodedIn));
assert version.onOrBefore(TransportVersion.current()) : version + " >= " + TransportVersion.current();
if (TransportVersion.isCompatible(version) == false) {
throw new IllegalArgumentException(
"Unable to retrieve async search results. Stored results were created with an incompatible version of Elasticsearch."
);
}
final StreamInput input;
input = CompressorFactory.COMPRESSOR.threadLocalStreamInput(encodedIn);
try (StreamInput in = new NamedWriteableAwareStreamInput(input, registry)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncSearchIndexServiceTests.TestAsyncResponse;
Expand All @@ -37,6 +41,8 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class AsyncResultsServiceTests extends ESSingleNodeTestCase {
private ClusterService clusterService;
Expand Down Expand Up @@ -132,7 +138,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,

@Before
public void setup() {
clusterService = getInstanceFromNode(ClusterService.class);
clusterService = spy(getInstanceFromNode(ClusterService.class));
TransportService transportService = getInstanceFromNode(TransportService.class);
BigArrays bigArrays = getInstanceFromNode(BigArrays.class);
taskManager = transportService.getTaskManager();
Expand Down Expand Up @@ -326,4 +332,52 @@ public void testRetrieveFromDisk() throws Exception {
deleteService.deleteResponse(new DeleteAsyncResultRequest(task.getExecutionId().getEncoded()), deleteListener);
assertFutureThrows(deleteListener, ResourceNotFoundException.class);
}

public void testFailWithIncompatibleResults() throws Exception {
// force the search results to be serialized with an incompatible transport version
when(clusterService.state()).thenAnswer(invocation -> {
ClusterState state = (ClusterState) invocation.callRealMethod();
return ClusterState.builder(state)
.putCompatibilityVersions(
node().getNodeEnvironment().nodeId(),
TransportVersionUtils.getPreviousVersion(TransportVersion.minimumCompatible()),
Map.of()
)
.build();
});

AsyncResultsService<TestTask, TestAsyncResponse> service = createResultsService(true);
TestRequest request = new TestRequest("test request");
TestTask task = (TestTask) taskManager.register("test", "test", request);
try {
long startTime = System.currentTimeMillis();
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());

// we need to store initial result
PlainActionFuture<DocWriteResponse> futureCreate = new PlainActionFuture<>();
indexService.createResponse(
task.getExecutionId().getDocId(),
task.getOriginHeaders(),
new TestAsyncResponse(null, task.getExpirationTime()),
futureCreate
);
futureCreate.actionGet(TimeValue.timeValueSeconds(10));

PlainActionFuture<UpdateResponse> futureUpdate = new PlainActionFuture<>();
indexService.updateResponse(
task.getExecutionId().getDocId(),
emptyMap(),
new TestAsyncResponse("final_response", task.getExpirationTime()),
futureUpdate
);
futureUpdate.actionGet(TimeValue.timeValueSeconds(10));
} finally {
taskManager.unregister(task);
}

PlainActionFuture<TestAsyncResponse> listener = new PlainActionFuture<>();
// not waiting for completion, so should return immediately with timeout
service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()), listener);
assertFutureThrows(listener, IllegalArgumentException.class, RestStatus.BAD_REQUEST);
}
}