Skip to content

Commit

Permalink
ccr: Added maximum translog limit that a single shard changes respons…
Browse files Browse the repository at this point in the history
…e can return.

This limit is based on the number of estimate bytes in each translog
operation that fall between the minimum and maximum request sequence number.

If this limit is met then the shard follow task executor will make sure
that a subsequent shard changes request will be performed to fetch the
remaining translog operations.

This limit is needed in order to protect against returning too many
translog operations in a single shard changes response.

Relates to #2436
  • Loading branch information
martijnvg committed Mar 29, 2018
1 parent 5506565 commit bbd8be0
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 64 deletions.
Expand Up @@ -59,6 +59,7 @@ public static class Request extends ActionRequest {
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;

public String getLeaderIndex() {
return leaderIndex;
Expand Down Expand Up @@ -88,17 +89,20 @@ public void setBatchSize(long batchSize) {
this.batchSize = batchSize;
}

public int getConcurrentProcessors() {
return concurrentProcessors;
}

public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
}

public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
if (processorMaxTranslogBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -110,6 +114,7 @@ public void readFrom(StreamInput in) throws IOException {
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
processorMaxTranslogBytes = in.readVLong();
}

@Override
Expand All @@ -118,6 +123,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
out.writeVLong(processorMaxTranslogBytes);
}
}

Expand Down Expand Up @@ -182,7 +188,8 @@ public void onResponse(ClusterStateResponse clusterStateResponse) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize, request.concurrentProcessors);
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize, request.concurrentProcessors,
request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
Expand Down
Expand Up @@ -36,8 +36,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.TreeSet;

import static org.elasticsearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -65,6 +70,7 @@ public static class Request extends SingleShardRequest<Request> {
private long minSeqNo;
private long maxSeqNo;
private ShardId shardId;
private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;

public Request(ShardId shardId) {
super(shardId.getIndexName());
Expand Down Expand Up @@ -94,49 +100,64 @@ public void setMaxSeqNo(long maxSeqNo) {
this.maxSeqNo = maxSeqNo;
}

public long getMaxTranslogsBytes() {
return maxTranslogsBytes;
}

public void setMaxTranslogsBytes(long maxTranslogsBytes) {
this.maxTranslogsBytes = maxTranslogsBytes;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (minSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("minSeqNo cannot be unassigned", validationException);
if (minSeqNo < 0) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException);
}
if (maxSeqNo < minSeqNo) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo ["
+ maxSeqNo + "]", validationException);
}
if (maxTranslogsBytes <= 0) {
validationException = addValidationError("maxTranslogsBytes [" + maxTranslogsBytes + "] must be larger than 0",
validationException);
}
return validationException;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minSeqNo = in.readZLong();
maxSeqNo = in.readZLong();
minSeqNo = in.readVLong();
maxSeqNo = in.readVLong();
shardId = ShardId.readShardId(in);
maxTranslogsBytes = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(minSeqNo);
out.writeZLong(maxSeqNo);
out.writeVLong(minSeqNo);
out.writeVLong(maxSeqNo);
shardId.writeTo(out);
out.writeVLong(maxTranslogsBytes);
}


@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
final Request request = (Request) o;
return minSeqNo == request.minSeqNo &&
maxSeqNo == request.maxSeqNo &&
Objects.equals(shardId, request.shardId);
Objects.equals(shardId, request.shardId) &&
maxTranslogsBytes == request.maxTranslogsBytes;
}

@Override
public int hashCode() {
return Objects.hash(minSeqNo, maxSeqNo, shardId);
return Objects.hash(minSeqNo, maxSeqNo, shardId, maxTranslogsBytes);
}
}

Expand Down Expand Up @@ -210,8 +231,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());

Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo);
return new Response(operations);
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
}

@Override
Expand All @@ -236,24 +256,43 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo) throws IOException {
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}

long seenBytes = 0;
long nextExpectedSeqNo = minSeqNo;
final Queue<Translog.Operation> orderedOps = new PriorityQueue<>(Comparator.comparingLong(Translog.Operation::seqNo));

final List<Translog.Operation> operations = new ArrayList<>();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, minSeqNo);
try (Translog.Snapshot snapshot = indexShard.getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo)) {
for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) {
if (op.seqNo() >= minSeqNo && op.seqNo() <= maxSeqNo) {
operations.add(op);
tracker.markSeqNoAsCompleted(op.seqNo());
for (Translog.Operation unorderedOp = snapshot.next(); unorderedOp != null; unorderedOp = snapshot.next()) {
if (unorderedOp.seqNo() < minSeqNo || unorderedOp.seqNo() > maxSeqNo) {
continue;
}

orderedOps.add(unorderedOp);
while (orderedOps.peek() != null && orderedOps.peek().seqNo() == nextExpectedSeqNo) {
Translog.Operation orderedOp = orderedOps.poll();
if (seenBytes < byteLimit) {
nextExpectedSeqNo++;
seenBytes += orderedOp.estimateSize();
operations.add(orderedOp);
tracker.markSeqNoAsCompleted(orderedOp.seqNo());
if (nextExpectedSeqNo > maxSeqNo) {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
} else {
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
}
}
}
}

if (tracker.getCheckpoint() == maxSeqNo) {
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
} else {
String message = "Not all operations between min_seq_no [" + minSeqNo + "] and max_seq_no [" + maxSeqNo +
"] found, tracker checkpoint [" + tracker.getCheckpoint() + "]";
Expand Down
Expand Up @@ -32,10 +32,11 @@ public class ShardFollowTask implements PersistentTaskParams {
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size");
public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks");
public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes");

public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]),
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7]));
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7], (long) a[8]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
Expand All @@ -46,25 +47,30 @@ public class ShardFollowTask implements PersistentTaskParams {
PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST);
}

private final ShardId followShardId;
private final ShardId leaderShardId;
private final long maxChunkSize;
private final int numConcurrentChunks;
private final long processorMaxTranslogBytes;

ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks) {
ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks,
long processorMaxTranslogBytes) {
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.maxChunkSize = maxChunkSize;
this.numConcurrentChunks = numConcurrentChunks;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
}

public ShardFollowTask(StreamInput in) throws IOException {
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.maxChunkSize = in.readVLong();
this.numConcurrentChunks = in.readVInt();
this.processorMaxTranslogBytes = in.readVLong();
}

public ShardId getFollowShardId() {
Expand All @@ -83,6 +89,10 @@ public int getNumConcurrentChunks() {
return numConcurrentChunks;
}

public long getProcessorMaxTranslogBytes() {
return processorMaxTranslogBytes;
}

@Override
public String getWriteableName() {
return NAME;
Expand All @@ -94,6 +104,7 @@ public void writeTo(StreamOutput out) throws IOException {
leaderShardId.writeTo(out);
out.writeVLong(maxChunkSize);
out.writeVInt(numConcurrentChunks);
out.writeVLong(processorMaxTranslogBytes);
}

public static ShardFollowTask fromXContent(XContentParser parser) {
Expand All @@ -111,6 +122,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id());
builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize);
builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks);
builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes);
return builder.endObject();
}

Expand All @@ -122,12 +134,13 @@ public boolean equals(Object o) {
return Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId) &&
maxChunkSize == that.maxChunkSize &&
numConcurrentChunks == that.numConcurrentChunks;
numConcurrentChunks == that.numConcurrentChunks &&
processorMaxTranslogBytes == that.processorMaxTranslogBytes;
}

@Override
public int hashCode() {
return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks);
return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, processorMaxTranslogBytes);
}

public String toString() {
Expand Down

0 comments on commit bbd8be0

Please sign in to comment.