Skip to content

Commit

Permalink
Rewrite shard follow node task logic (#31581)
Browse files Browse the repository at this point in the history
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a separate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086
  • Loading branch information
martijnvg committed Jul 10, 2018
1 parent 8425a5c commit b260ef5
Show file tree
Hide file tree
Showing 28 changed files with 1,269 additions and 1,131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@ public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteReq
implements RespondingWriteResult {
boolean finishedAsyncActions;
public final Location location;
public final IndexShard primary;
ActionListener<Response> listener = null;

public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary, Logger logger) {
super(request, finalResponse, operationFailure);
this.location = location;
this.primary = primary;
assert location == null || operationFailure == null
: "expected either failure to be null or translog location to be null, " +
"but found: [" + location + "] translog location and [" + operationFailure + "] failure";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ public Translog.Operation next() throws IOException {
private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
}
} else {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op.seqNo() != expectedSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testBasics() throws Exception {
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
assertThat(snapshot, SnapshotMatchers.size(0));
Expand Down Expand Up @@ -99,19 +99,19 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}finally {
IOUtils.close(searcher);
}
}else {
} else {
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
}finally {
} finally {
IOUtils.close(searcher);
}
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Expand All @@ -120,7 +120,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}finally {
IOUtils.close(searcher);
}
Expand All @@ -130,7 +130,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}finally {
} finally {
IOUtils.close(searcher);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testFollowIndex() throws Exception {
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

followIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(5));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow")));
Expand All @@ -95,14 +95,14 @@ public void testFollowIndex() throws Exception {
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
// Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);

followIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
Expand Down Expand Up @@ -146,12 +146,14 @@ private static void refresh(String index) throws IOException {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ private static void refresh(String index) throws IOException {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ public static class Request extends AcknowledgedRequest<Request> {

private FollowIndexAction.Request followRequest;

public FollowIndexAction.Request getFollowRequest() {
return followRequest;
public Request(FollowIndexAction.Request followRequest) {
this.followRequest = Objects.requireNonNull(followRequest);
}

Request() {
}

public void setFollowRequest(FollowIndexAction.Request followRequest) {
this.followRequest = followRequest;
public FollowIndexAction.Request getFollowRequest() {
return followRequest;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
Expand Down Expand Up @@ -79,50 +80,57 @@ public static class Request extends ActionRequest {

private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;

public String getLeaderIndex() {
return leaderIndex;
}

public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}

public String getFollowIndex() {
return followIndex;
}
private int maxBatchOperationCount;
private int maxConcurrentReadBatches;
private long maxOperationSizeInBytes;
private int maxConcurrentWriteBatches;
private int maxWriteBufferSize;
private TimeValue retryTimeout;
private TimeValue idleShardRetryDelay;

public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches,
long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize,
TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
if (maxBatchOperationCount < 1) {
throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0");
}
if (maxConcurrentReadBatches < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
if (maxOperationSizeInBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
if (maxConcurrentWriteBatches < 1) {
throw new IllegalArgumentException("maxConcurrentWriteBatches must be larger than 0");
}
if (maxWriteBufferSize < 1) {
throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0");
}

public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
this.leaderIndex = Objects.requireNonNull(leaderIndex);
this.followIndex = Objects.requireNonNull(followIndex);
this.maxBatchOperationCount = maxBatchOperationCount;
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = Objects.requireNonNull(retryTimeout);
this.idleShardRetryDelay = Objects.requireNonNull(idleShardRetryDelay);
}

public long getBatchSize() {
return batchSize;
Request() {
}

public void setBatchSize(long batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
}

this.batchSize = batchSize;
public String getLeaderIndex() {
return leaderIndex;
}

public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
public String getFollowIndex() {
return followIndex;
}

public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
if (processorMaxTranslogBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
public int getMaxBatchOperationCount() {
return maxBatchOperationCount;
}

@Override
Expand All @@ -135,36 +143,49 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
concurrentProcessors = in.readVInt();
processorMaxTranslogBytes = in.readVLong();
maxBatchOperationCount = in.readVInt();
maxConcurrentReadBatches = in.readVInt();
maxOperationSizeInBytes = in.readVLong();
maxConcurrentWriteBatches = in.readVInt();
maxWriteBufferSize = in.readVInt();
retryTimeout = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
out.writeVInt(concurrentProcessors);
out.writeVLong(processorMaxTranslogBytes);
out.writeVInt(maxBatchOperationCount);
out.writeVInt(maxConcurrentReadBatches);
out.writeVLong(maxOperationSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(idleShardRetryDelay);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return batchSize == request.batchSize &&
concurrentProcessors == request.concurrentProcessors &&
processorMaxTranslogBytes == request.processorMaxTranslogBytes &&
return maxBatchOperationCount == request.maxBatchOperationCount &&
maxConcurrentReadBatches == request.maxConcurrentReadBatches &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
maxWriteBufferSize == request.maxWriteBufferSize &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followIndex, request.followIndex);
}

@Override
public int hashCode() {
return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes);
return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay);
}
}

Expand Down Expand Up @@ -210,7 +231,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);

String[] indices = new String[]{request.getLeaderIndex()};
String[] indices = new String[]{request.leaderIndex};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
Expand Down Expand Up @@ -264,10 +285,13 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;

ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes,
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout,
request.idleShardRetryDelay, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
Expand Down
Loading

0 comments on commit b260ef5

Please sign in to comment.