Skip to content

Commit

Permalink
[CCR] Don't fail shard follow tasks in case of a non-retryable error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Oct 16, 2018
1 parent 714aed6 commit 6de0a41
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 52 deletions.
Expand Up @@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

private volatile ElasticsearchException fatalException;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -373,7 +375,7 @@ private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable tas
long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
fatalException = ExceptionsHelper.convertToElastic(e);
}
}

Expand Down Expand Up @@ -423,7 +425,7 @@ protected void onCancelled() {
}

protected boolean isStopped() {
return isCancelled() || isCompleted();
return fatalException != null || isCancelled() || isCompleted();
}

public ShardId getFollowShardId() {
Expand Down Expand Up @@ -467,7 +469,8 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalException);
}

}
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -26,19 +27,17 @@
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class TransportFollowStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
FollowStatsAction.StatsRequest,
FollowStatsAction.StatsResponses, FollowStatsAction.StatsResponse> {

private final IndexNameExpressionResolver resolver;
private final CcrLicenseChecker ccrLicenseChecker;

@Inject
Expand All @@ -61,7 +60,6 @@ public TransportFollowStatsAction(
FollowStatsAction.StatsRequest::new,
FollowStatsAction.StatsResponses::new,
Ccr.CCR_THREAD_POOL_NAME);
this.resolver = Objects.requireNonNull(resolver);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}

Expand Down Expand Up @@ -94,11 +92,19 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.collect(Collectors.toSet());

for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
operation.accept(shardFollowNodeTask);
}
}
Expand Down
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -22,8 +21,10 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;

public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {

Expand All @@ -49,18 +50,32 @@ public TransportPauseFollowAction(
protected void doExecute(final PauseFollowAction.Request request, final ActionListener<AcknowledgedResponse> listener) {

client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex());
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist"));
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}

final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());

if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
return;
}

final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
int i = 0;

for (String taskId : shardFollowTaskIds) {
final int shardId = i++;
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
Expand Down
Expand Up @@ -647,6 +647,17 @@ public void testDeleteLeaderIndex() throws Exception {
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand All @@ -666,6 +677,17 @@ public void testDeleteFollowerIndex() throws Exception {

client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand Down
Expand Up @@ -56,7 +56,8 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
}

@Override
Expand Down
Expand Up @@ -45,7 +45,6 @@

public class ShardFollowNodeTaskTests extends ESTestCase {

private Exception fatalError;
private List<long[]> shardChangesRequests;
private List<List<Translog.Operation>> bulkShardOperationRequests;
private BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> task.run();
Expand Down Expand Up @@ -345,7 +344,7 @@ public void testReceiveNonRetryableError() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

assertTrue("task is stopped", task.isStopped());
assertThat(fatalError, sameInstance(failure));
assertThat(task.getStatus().getFatalException().getRootCause(), sameInstance(failure));
ShardFollowNodeTaskStatus status = task.getStatus();
assertThat(status.numberOfConcurrentReads(), equalTo(1));
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
Expand Down Expand Up @@ -791,19 +790,13 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con

@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}

@Override
public void markAsCompleted() {
stopped.set(true);
}

@Override
public void markAsFailed(Exception e) {
fatalError = e;
stopped.set(true);
}
};
}

Expand Down
Expand Up @@ -7,6 +7,7 @@

import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
Expand Down Expand Up @@ -46,7 +47,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -180,7 +180,8 @@ public void testChangeLeaderHistoryUUID() throws Exception {

assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "]"));
});
}
Expand Down Expand Up @@ -221,7 +222,8 @@ public void testChangeFollowerHistoryUUID() throws Exception {

assertBusy(() -> {
assertThat(shardFollowTask.isStopped(), is(true));
assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
ElasticsearchException failure = shardFollowTask.getStatus().getFatalException();
assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
"], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
});
}
Expand Down Expand Up @@ -325,7 +327,6 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup,

BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
AtomicReference<Exception> failureHolder = new AtomicReference<>();
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
Expand Down Expand Up @@ -403,24 +404,14 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co

@Override
protected boolean isStopped() {
return stopped.get();
return super.isStopped() || stopped.get();
}

@Override
public void markAsCompleted() {
stopped.set(true);
}

@Override
public void markAsFailed(Exception e) {
failureHolder.set(e);
stopped.set(true);
}

@Override
public Exception getFailure() {
return failureHolder.get();
}
};
}

Expand Down
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
Expand Down Expand Up @@ -48,7 +49,8 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
responses.add(new FollowStatsAction.StatsResponse(status));
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
Expand Down
Expand Up @@ -130,7 +130,8 @@ public void testToXContent() throws IOException {
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
fetchExceptions,
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
new ElasticsearchException("fatal error"));
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(
Expand Down Expand Up @@ -181,7 +182,8 @@ public void testToXContent() throws IOException {
+ "}"
+ "}"
+ "],"
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + ","
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
+ "}"
+ "}"));
}
Expand Down Expand Up @@ -212,7 +214,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
0,
10,
fetchExceptions,
2);
2,
null);
XContentBuilder builder = jsonBuilder();
builder.value(status);
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Expand Down

0 comments on commit 6de0a41

Please sign in to comment.