Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Don't fail shard follow tasks in case of a non-retryable error #34404

Merged
merged 4 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;

private volatile ElasticsearchException fatalException;

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -373,7 +375,7 @@ private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable tas
long delay = computeDelay(currentRetry, params.getPollTimeout().getMillis());
scheduler.accept(TimeValue.timeValueMillis(delay), task);
} else {
markAsFailed(e);
fatalException = ExceptionsHelper.convertToElastic(e);
}
}

Expand Down Expand Up @@ -423,7 +425,7 @@ protected void onCancelled() {
}

protected boolean isStopped() {
return isCancelled() || isCompleted();
return fatalException != null || isCancelled() || isCompleted();
}

public ShardId getFollowShardId() {
Expand Down Expand Up @@ -467,7 +469,8 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
.stream()
.collect(
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalException);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
Expand All @@ -26,11 +28,13 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class TransportFollowStatsAction extends TransportTasksAction<
ShardFollowNodeTask,
Expand Down Expand Up @@ -90,11 +94,19 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final Set<String> concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request)));
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
final Set<String> followerIndices = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
})
.collect(Collectors.toSet());

for (final Task task : taskManager.getTasks().values()) {
if (task instanceof ShardFollowNodeTask) {
final ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
if (concreteIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
if (followerIndices.contains(shardFollowNodeTask.getFollowShardId().getIndexName())) {
operation.accept(shardFollowNodeTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand All @@ -21,8 +20,10 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;

public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {

Expand All @@ -48,18 +49,32 @@ protected void doExecute(
final ActionListener<AcknowledgedResponse> listener) {

client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
IndexMetaData followIndexMetadata = r.getState().getMetaData().index(request.getFollowIndex());
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.getFollowIndex() + "] does not exist"));
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks [" + request.getFollowIndex() + "]"));
return;
}

final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
.filter(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
})
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
.collect(Collectors.toList());

if (shardFollowTaskIds.isEmpty()) {
listener.onFailure(new IllegalArgumentException("no shard follow tasks [" + request.getFollowIndex() + "]"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no shard follow tasks -> no shard follow tasks for

return;
}

final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
int i = 0;

for (String taskId : shardFollowTaskIds) {
final int shardId = i++;
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,17 @@ public void testDeleteLeaderIndex() throws Exception {
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand All @@ -663,6 +674,17 @@ public void testDeleteFollowerIndex() throws Exception {

client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomReadExceptions(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
Expand Down Expand Up @@ -48,7 +49,8 @@ protected FollowStatsAction.StatsResponses createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
Collections.emptyNavigableMap(),
randomLong());
randomLong(),
randomBoolean() ? new ElasticsearchException("fatal error") : null);
responses.add(new FollowStatsAction.StatsResponse(status));
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void testToXContent() throws IOException {
numberOfFailedBulkOperations,
numberOfOperationsIndexed,
fetchExceptions,
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
new ElasticsearchException("fatal error"));
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
assertThat(
Expand Down Expand Up @@ -181,7 +182,8 @@ public void testToXContent() throws IOException {
+ "}"
+ "}"
+ "],"
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis
+ "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + ","
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
+ "}"
+ "}"));
}
Expand Down Expand Up @@ -212,7 +214,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
0,
10,
fetchExceptions,
2);
2,
null);
XContentBuilder builder = jsonBuilder();
builder.value(status);
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
private static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed");
private static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions");
private static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField("time_since_last_fetch_millis");
private static final ParseField FATAL_EXCEPTION = new ParseField("fatal_exception");

@SuppressWarnings("unchecked")
static final ConstructingObjectParser<ShardFollowNodeTaskStatus, Void> STATUS_PARSER =
Expand Down Expand Up @@ -88,7 +89,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[21])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[22]));
(long) args[22],
(ElasticsearchException) args[23]));

public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";

Expand Down Expand Up @@ -121,6 +123,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD);
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_FETCH_MILLIS_FIELD);
STATUS_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
FATAL_EXCEPTION);
}

static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
Expand Down Expand Up @@ -274,6 +279,12 @@ public long timeSinceLastFetchMillis() {
return timeSinceLastFetchMillis;
}

private final ElasticsearchException fatalException;

public ElasticsearchException getFatalException() {
return fatalException;
}

public ShardFollowNodeTaskStatus(
final String leaderIndex,
final String followerIndex,
Expand All @@ -297,7 +308,8 @@ public ShardFollowNodeTaskStatus(
final long numberOfFailedBulkOperations,
final long numberOfOperationsIndexed,
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
final long timeSinceLastFetchMillis) {
final long timeSinceLastFetchMillis,
final ElasticsearchException fatalException) {
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId;
Expand All @@ -321,6 +333,7 @@ public ShardFollowNodeTaskStatus(
this.numberOfOperationsIndexed = numberOfOperationsIndexed;
this.fetchExceptions = Objects.requireNonNull(fetchExceptions);
this.timeSinceLastFetchMillis = timeSinceLastFetchMillis;
this.fatalException = fatalException;
}

public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
Expand Down Expand Up @@ -348,6 +361,7 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
this.fetchExceptions =
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
this.timeSinceLastFetchMillis = in.readZLong();
this.fatalException = in.readException();
}

@Override
Expand Down Expand Up @@ -386,6 +400,7 @@ public void writeTo(final StreamOutput out) throws IOException {
stream.writeException(value.v2());
});
out.writeZLong(timeSinceLastFetchMillis);
out.writeException(fatalException);
}

@Override
Expand Down Expand Up @@ -451,6 +466,14 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P
TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(),
"time_since_last_fetch",
new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS));
if (fatalException != null) {
builder.field(FATAL_EXCEPTION.getPreferredName());
builder.startObject();
{
ElasticsearchException.generateThrowableXContent(builder, params, fatalException);
}
builder.endObject();
}
return builder;
}

Expand All @@ -463,6 +486,8 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
String otherFatalExceptionMessage = that.fatalException != null ? that.fatalException.getMessage() : null;
return leaderIndex.equals(that.leaderIndex) &&
followerIndex.equals(that.followerIndex) &&
shardId == that.shardId &&
Expand Down Expand Up @@ -490,11 +515,13 @@ public boolean equals(final Object o) {
*/
fetchExceptions.keySet().equals(that.fetchExceptions.keySet()) &&
getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) &&
timeSinceLastFetchMillis == that.timeSinceLastFetchMillis;
timeSinceLastFetchMillis == that.timeSinceLastFetchMillis &&
Objects.equals(fatalExceptionMessage, otherFatalExceptionMessage);
}

@Override
public int hashCode() {
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
return Objects.hash(
leaderIndex,
followerIndex,
Expand Down Expand Up @@ -522,7 +549,8 @@ public int hashCode() {
*/
fetchExceptions.keySet(),
getFetchExceptionMessages(this),
timeSinceLastFetchMillis);
timeSinceLastFetchMillis,
fatalExceptionMessage);
}

private static List<String> getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {
Expand Down