Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite shard follow node task logic #31581

Merged
merged 46 commits into from Jul 10, 2018

Conversation

martijnvg
Copy link
Member

@martijnvg martijnvg commented Jun 26, 2018

The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:

  • Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
    This allows for better unit testing and makes it easier to add stats.
  • All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
    This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
    will be performed until the number of ops is below that limit.
  • The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
    instead of relying on a background thread to fetch the leader shard's global checkpoint.
  • Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api).
    Whereas before a read would immediately result into a write.
  • The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
  • Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
  • Moved over the changes from [CCR] Made shard follow task more resilient against node failure and #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086

@martijnvg martijnvg added review :Distributed/CCR Issues around the Cross Cluster State Replication features labels Jun 26, 2018
@martijnvg martijnvg requested a review from bleskes June 26, 2018 14:06
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@@ -274,6 +287,15 @@ protected Response newResponse() {
break;
}
}
} catch (IllegalStateException e) {
// TODO: handle peek reads better.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not cool. I'm still thinking about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify what's happening here? if we move to maximum size semantics (rather than a hard limit) I don't think this is a problem? we just return empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I changed the size semantics, allowing to return less documents then was requested (as long as there are no gaps). But then I ran into a problem that when the shard follow task requested a specific range of ops it knew to be there, a replica shard copy may not have it and thus less ops were returned. This was before I added the logic to deal with less operations being returned because max_translog_bytes limit met and I realize now that this problem should be solved in the same way as the max translog bytes limit scenario. So I can safely change the maximum size semantics in the shard changes api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' get it. can you point us to the place where the ISE is coming from? Is it due toe source not being available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is gone now. I did make a change to LuceneChangesSnapshot's rangeCheck(...) method to not be strict about not returning up to toSeqNo.

The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from elastic#31242 to make shard follow mechanism resilient from node and shard failures.

Relates to elastic#30086
@martijnvg martijnvg force-pushed the ccr_follow_shard_task_rewrite_2 branch from 038d90c to 516dcb7 Compare June 27, 2018 05:29
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martijnvg for putting it up so quickly. I did an initial pass mostly focussed on ShardFollowNodeTask

@@ -54,9 +54,9 @@ public Response newResponse() {
public static class Request extends SingleShardRequest<Request> {

private long minSeqNo;
private long maxSeqNo;
private Long maxSeqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we switch everything for maxSeqNo to size? (I struggle to find a size name with an operation count in it, maybe people prefer maxOperationCount and maxOperationSizeInBytes as the byte limiter.

Copy link
Contributor

@bleskes bleskes Jun 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I don't think this should be nullable (I'll comment separately about how to change the reason why it's null now)

this.operations = operations;
}

public long getIndexMetadataVersion() {
return indexMetadataVersion;
}

public long getLeaderGlobalCheckpoint() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call it global checkpoint? everything here is leader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@@ -274,6 +287,15 @@ protected Response newResponse() {
break;
}
}
} catch (IllegalStateException e) {
// TODO: handle peek reads better.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify what's happening here? if we move to maximum size semantics (rather than a hard limit) I don't think this is a problem? we just return empty.

String description,
TaskId parentTask,
Map<String, String> headers,
Client leaderClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing the clients in the constructor, I would like to make this class abstract where all the methods that require a client are abstract. Then the PersistentTaskExecutor can instantiate a method that delegates async requests via clients but tests can do something else (synchronously return something, throw exceptions or what ever)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't this be done with a filter client as well? I don't think we should do this as abstract classes it will make things more complicated?

void start(long leaderGlobalCheckpoint, long followGlobalCheckpoint) {
this.lastRequestedSeqno = followGlobalCheckpoint;
this.processedGlobalCheckpoint = followGlobalCheckpoint;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can set this to the followGlobalCheckpoint and send a pick request. No need to preflight imo


private synchronized void coordinateWrites() {
while (true) {
if (buffer.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same request - please have a method called haveWriteBudget and do while(haveWriteBudget() && buffer.isEmpty() == false) {

}
}

private void sendShardChangesRequest(long from, Long to) {
Copy link
Contributor

@bleskes bleskes Jun 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current to parameter represents a hard upper bound this request is responsible for. Can we name it something that reflects this requireOperationsUpTo and also never set it to null? if need be we can set it to from or the leader global checkpoint (and it shouldn't be used as the size limit of the request).

e -> handleFailure(e, () -> sendShardChangesRequest(from, to)));
}

private synchronized void handleResponse(long from, Long to, ShardChangesAction.Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need fo synchronizing here, better to sync the maybeUpdateMapping if need be?

private void handleFailure(Exception e, Runnable task) {
assert e != null;
if (shouldRetry(e)) {
if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) {
Copy link
Contributor

@bleskes bleskes Jun 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this retry counter is tricky as we need to have a budget that allows all current read/writers to fail on a network hiccup. There's also the question on how people know what happen when the task is failed (where we might need support from persistent tasks). I think we can leave this for now but have to deal with it in a follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this retry counter is truck as we need to have a budget that allows all current read/writers to fail on a network hiccup.

Do you mean have a counter for reading, writing and mapping updates?

There's also the question on how people know what happen when the task is failed (where we might need support from persistent tasks).

👍

I think we can leave this for now but have to deal with it in a follow up.

Agreed

// TODO: What other exceptions should be retried?
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
e instanceof ActionTransportException ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to reuse org.elasticsearch.action.support.TransportActions#isShardNotAvailableException ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this one?

Not sure. I removed this one.

do we want to reuse TransportActions#isShardNotAvailableException?

Makes sense when TransportSingleShardAction bubbles this exception up.

String description,
TaskId parentTask,
Map<String, String> headers,
Client leaderClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't this be done with a filter client as well? I don't think we should do this as abstract classes it will make things more complicated?

numConcurrentReads++;
long from = lastRequestedSeqno + 1;
long to = from + maxReadSize <= leaderGlobalCheckpoint ? from + maxReadSize : leaderGlobalCheckpoint;
LOGGER.debug("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, from, to);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO lets drop them all. IF you have to make them trace you can also just add them back if you need it.

scheduler.accept(TimeValue.timeValueMillis(500), this::coordinateReads);
}
} else {
if (numConcurrentReads == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

// TODO: What other exceptions should be retried?
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
e instanceof ActionTransportException ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

request.setMinSeqNo(from);
request.setMaxSeqNo(to);
request.setMaxTranslogsBytes(params.getMaxTranslogBytes());
leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use ActionListener.wrap(handler::accept, errorHandler::accept) instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that

this.processedGlobalCheckpoint = processedGlobalCheckpoint;
this.numberOfConcurrentReads = numberOfConcurrentReads;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure they are positive or don't use vint to serialize them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is validation for that in the api that creates the persistent task


ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
ShardFollowNodeTask(long id,
String type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not have every arg on a sep line please?


private volatile int numConcurrentReads = 0;
private volatile int numConcurrentWrites = 0;
private volatile long processedGlobalCheckpoint = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those variables seem to be read together. I wonder if we should not make the volatile but rather synchronize their reading?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I don't want to make the getStatus() call synchronized which can be invoked when list tasks api is used.


private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);

final Client leaderClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this pkg private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this gone now. It was for something silly.

@@ -274,6 +287,15 @@ protected Response newResponse() {
break;
}
}
} catch (IllegalStateException e) {
// TODO: handle peek reads better.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' get it. can you point us to the place where the ISE is coming from? Is it due toe source not being available?

@martijnvg
Copy link
Member Author

@bleskes @simonw I've updated the PR.

… response after

it replication has been completed
@martijnvg
Copy link
Member Author

@bleskes Thanks for reviewing. I think I've addressed all of your comments.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @martijnvg - I left some final comments based on our discussions.

if (maxConcurrentWriteBatches < 1) {
throw new IllegalArgumentException("maxConcurrentWriteBatches must be larger than 0");
}
if (maxWriteBufferSize < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also check the time values are non null?

@@ -251,10 +272,21 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
TimeValue retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
if (request.retryTimeout != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these can be null?

this.idleShardChangesRequestDelay = idleShardChangesRequestDelay;
}

void start(long followGlobalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: followerGlobalCheckpoint ?

while (hasReadBudget() && lastRequestedSeqno < globalCheckpoint) {
numConcurrentReads++;
long from = lastRequestedSeqno + 1;
LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, from, maxReadSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this after the maxRequiredSeqno and use maxRequiredSeqno for the range?

lastRequestedSeqno = maxRequiredSeqno;
}

if (numConcurrentReads == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just relized this has a bug - we need to check if we have budget here too (add this to the list of tests :)) - we may have no readers because the write buffer is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that the case the buffer is full and we don't start a read then the shard follow task comes to a halt, because when the write buffer has been consumed nothing else will happen as this was the last point a read will be started.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so maybe when handeling bulk shard operation responses we should also check whether a shard changes request should be fired off?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while in this case we only make a copy if maxBatchSizeInBytes limit has been reached.

he - I'm pretty I saw we did in some previous iterations. And yes - it must allow reads because we block reads when we don't consume the write buffer. In a sense the right moment is when you coordinate writes and you consumed some buffer elements. Both are fine with me

@@ -90,6 +358,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
{
builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint);
}
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - what do we need these extra {} - just inline this in the previous block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started doing this here, because I agreed with this comment: https://github.com/elastic/x-pack-elasticsearch/pull/4290#discussion_r179411015

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one set of {} is awesome. But you can inline all fields in one set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that makes sense.

ops[i] = buffer.remove();
sumEstimatedSize += ops[i].estimateSize();
if (sumEstimatedSize > params.getMaxBatchSizeInBytes()) {
ops = Arrays.copyOf(ops, i + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should use an ArrayList with initial capacity. We can then change the request etc to use List<> instead of array

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we always make a copy of the underlying array in ArrayList, while in this case we only make a copy if maxBatchSizeInBytes limit has been reached.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we always make a copy of the underlying array in ArrayList,

Not if you change the request and such to use lists.

while in this case we only make a copy if maxBatchSizeInBytes limit has been reached.
which may be frequent (i.e., every request), which is why I was considering making a change.

e -> handleFailure(e, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo)));
}

private void handleReadResponse(long from, int maxOperationCount, long maxRequiredSeqNo, ShardChangesAction.Response response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of:

    private void handleReadResponse(long from, int maxOperationCount, long maxRequiredSeqNo, ShardChangesAction.Response response) {
        maybeUpdateMapping(response.getIndexMetadataVersion(), () -> {
            synchronized (ShardFollowNodeTask.this) {
                globalCheckpoint = Math.max(globalCheckpoint, response.getGlobalCheckpoint());
                final long newMinRequiredSeqNo;
                if (response.getOperations().length == 0) {
                    newMinRequiredSeqNo = from;
                } else {
                    assert response.getOperations()[0].seqNo() == from : 
                        "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0];
                    buffer.addAll(Arrays.asList(response.getOperations()));
                    final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();
                    assert maxSeqNo==
                        Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong();
                    newMinRequiredSeqNo = maxSeqNo + 1;
                    // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
                    lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo);
                    assert lastRequestedSeqno <= globalCheckpoint: 
                        "lastRequestedSeqno [" + lastRequestedSeqno + "] is larger than the global checkpoint [" + globalCheckpoint + "]";
                    coordinateWrites();
                }
                
                if (newMinRequiredSeqNo < maxRequiredSeqNo) {
                    int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1;
                    LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
                        params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo);
                    sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo);
                } else {
                    // read is completed, decrement
                    numConcurrentReads--;
                    if (response.getOperations().length == 0 && globalCheckpoint == lastRequestedSeqno)  {
                        // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
                        // future requests
                        LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", 
                            params.getFollowShardId());
                        scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads);
                    } else {
                        coordinateReads();
                    }
                }
            }
        });
    }

PS - note the difference in handling of lastRequestedSeqno - I think the way you had it had a bug.

}

public static class Status implements Task.Status {

public static final String NAME = "shard-follow-node-task-status";

static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'nm still missing the buffer size, the max requested seq no, leader global checkpoint , follower global checkpoint etc. I'm fine with a follow up for those, but that's what I meant.

@Override
protected void respondIfPossible(Exception ex) {
assert Thread.holdsLock(this);
// maybe invoked multiple times, but that is ok as global checkpoint does not go backwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should override the respond method - this one is called once while the replication operation is completed (but post writes may still be inflight). Seems like cleaner logic to follow and we can write a comment there as the why we do it (get a fresh global checkpoint once the current batch has been fully replicated)

@martijnvg
Copy link
Member Author

@bleskes I've updated the PR.

ops.get(ops.size() - 1).seqNo(), ops.size());
sendBulkShardOperationsRequest(ops);

// In case that buffer is higher than max write buffer size then reads may all have been stopped,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a bit weird to have this here, in the coordinateWrite loop. Since it's always safe to call coordinateReads, how about just calling it once in handleWriteResponse and be done with it?

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, assuming you agree with my feedback. There is no need for another cycle if you do.

}

public static class Status implements Task.Status {

public static final String NAME = "shard-follow-node-task-status";

static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint");
static final ParseField GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we align the name with the field (LEADER_GLOBAL_CHECKPOINT_FIELD & FOLLOWER_GLOBAL_CHECKPOINT_FIELD)?

@martijnvg martijnvg merged commit 8e1ef0c into elastic:ccr Jul 10, 2018
martijnvg added a commit that referenced this pull request Jul 10, 2018
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a separate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/CCR Issues around the Cross Cluster State Replication features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants