Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Following primary should process operations once #34288

Merged
merged 13 commits into from Oct 10, 2018

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented Oct 4, 2018

Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.

This change ensures that the primary of a follower only processes an
operation if that operation was not processed before. The skipped
operations are guaranteed to be delivered to replicas via either
primary-replica resync or peer-recovery. However, the primary must not
acknowledge until the global checkpoint is at least the highest seqno of
all skipped ops (i.e., they all have been processed on every replica).

Relates #31751
Relates #31113

Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.

This change ensures that the primary of a follower only processes an
operation if that operation was not processed before.
@dnhatn dnhatn added >non-issue :Distributed/CCR Issues around the Cross Cluster State Replication features labels Oct 4, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking awesome. I left some nits and suggestions.

}
}
}
assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a message?

for (final Translog.Operation operation : request.getOperations()) {
final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (result.getResultType() != Engine.Result.Type.SUCCESS) {
assert false : "failure should never happens on replicas; op=[" + operation + "] error=" + result.getFailure() + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc level failure (normal failures are OK from an algorithmic perspective).

listener.onFailure(e);
} else {
assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp;
fillResponse.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fillResponse can throw an already closed exception. We should make sure we deal with exceptions here correctly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe warp the listener using ActionListener#wrap which does the write things and will simplify the code here too.


@Override
public NoOpResult noOp(NoOp noOp) {
// TODO: Make sure we process NoOp once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we do this now in this PR in the same way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because NoOps don't have _id and they are processed without the _id lock. I am not sure if we need to introduce a fake _id (for locking purpose) for Noops. Thus, I prefer to make it in a separate PR so we can see it more clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}
for (IndexShard replica : follower.getReplicas()) {
try (Translog.Snapshot rSnapshot = replica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment - can we check the content of the ops?

expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1)));

shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test");
assertThat(listener.actionGet(TimeValue.timeValueSeconds(5)).getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this just get()? I'm not so comfortable with 5s (it's short) but also we typically let the suite time out so we can get a thread dump (although I suspect it won't be that helpful here, it might)

long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint());
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger);
primaryResult.respond(listener);
assertThat(listener.actionGet(TimeValue.timeValueSeconds(5)).getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment

for (Engine.Operation op : operations) {
Engine.Operation.Origin nonPrimary = randomValueOtherThan(Engine.Operation.Origin.PRIMARY,
() -> randomFrom(Engine.Operation.Origin.values()));
Engine.Result result = applyOperation(followingEngine, op, nonPrimary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance we can also check that this wasn't indexed to lucene? maybe doc counts?

@dnhatn
Copy link
Member Author

dnhatn commented Oct 9, 2018

@bleskes Thanks so much for allocating some time on this. I have addressed your suggestions. Could you please have another look?

@dnhatn dnhatn requested a review from bleskes October 9, 2018 18:49
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

final SeqNoStats seqNoStats = primary.seqNoStats();
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lose the comment?

@dnhatn
Copy link
Member Author

dnhatn commented Oct 10, 2018

Thanks @bleskes.

@dnhatn dnhatn merged commit 33791ac into elastic:master Oct 10, 2018
@dnhatn dnhatn deleted the ccr-index-once branch October 10, 2018 19:40
dnhatn added a commit that referenced this pull request Oct 10, 2018
This issue was resolved by #34288.

Closes #33337
Relates #34288
dnhatn added a commit that referenced this pull request Oct 11, 2018
This issue was resolved by #34288.

Closes #33337
Relates #34288
dnhatn added a commit that referenced this pull request Oct 11, 2018
Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.

This change ensures that the primary of a follower only processes an
operation if that operation was not processed before. The skipped
operations are guaranteed to be delivered to replicas via either
primary-replica resync or peer-recovery. However, the primary must not
acknowledge until the global checkpoint is at least the highest seqno of
all skipped ops (i.e., they all have been processed on every replica).

Relates #31751
Relates #31113
@dnhatn
Copy link
Member Author

dnhatn commented Oct 11, 2018

Sadly, we might hit deadlock if the FollowTask has more fetchers than writers.

Suppose the leader has two operations [seq#0, seq#1]; FollowTask has two fetchers with fetch-size=1, and one writer with write-size=1.

  1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1}
  2. The request which fetches [seq#1] completes before; then it triggers a write request containing only seq#1
  3. The primary of a follower fails after it has replicated seq#1 to replicas
  4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request)
  5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1.

The problem is the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed.

One solution that I see is to delay the write requests if there is a gap between the last write request and the next write request (the fetched operations are sorted by seq_no already).

@dnhatn
Copy link
Member Author

dnhatn commented Oct 11, 2018

dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Oct 11, 2018
dnhatn added a commit that referenced this pull request Oct 19, 2018
Since #34288, we might hit deadlock if the FollowTask has more fetchers
than writers. This can happen in the following scenario:

Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has
two fetchers and one writer.

1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0,
num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1
respectively.

2. The second request which fetches seq#1 completes before, and then it
triggers a write request containing only seq#1.

3. The primary of a follower fails after it has replicated seq#1 to
replicas.

4. Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).

5. The new primary has seq#1 already; thus it won't replicate seq#1 to
replicas but will wait for the global checkpoint to advance at least
seq#1.

The problem is that the FollowTask has only one writer and that writer
is waiting for seq#0 which won't be delivered until the writer completed.

This PR proposes to replicate existing operations with the old primary
term (instead of the current term) on the follower. In particular, when
the following primary detects that it has processed an process already,
it will look up the term of an existing operation with the same seq_no
in the Lucene index, then rewrite that operation with the old term
before replicating it to the following replicas. This approach is
wait-free but requires soft-deletes on the follower.

Relates #34288
dnhatn added a commit that referenced this pull request Oct 20, 2018
dnhatn added a commit that referenced this pull request Oct 21, 2018
Since #34288, we might hit deadlock if the FollowTask has more fetchers
than writers. This can happen in the following scenario:

Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has
two fetchers and one writer.

1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0,
num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1
respectively.

2. The second request which fetches seq#1 completes before, and then it
triggers a write request containing only seq#1.

3. The primary of a follower fails after it has replicated seq#1 to
replicas.

4. Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).

5. The new primary has seq#1 already; thus it won't replicate seq#1 to
replicas but will wait for the global checkpoint to advance at least
seq#1.

The problem is that the FollowTask has only one writer and that writer
is waiting for seq#0 which won't be delivered until the writer completed.

This PR proposes to replicate existing operations with the old primary
term (instead of the current term) on the follower. In particular, when
the following primary detects that it has processed an process already,
it will look up the term of an existing operation with the same seq_no
in the Lucene index, then rewrite that operation with the old term
before replicating it to the following replicas. This approach is
wait-free but requires soft-deletes on the follower.

Relates #34288
dnhatn added a commit that referenced this pull request Oct 21, 2018
kcm pushed a commit that referenced this pull request Oct 30, 2018
Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.

This change ensures that the primary of a follower only processes an
operation if that operation was not processed before. The skipped
operations are guaranteed to be delivered to replicas via either
primary-replica resync or peer-recovery. However, the primary must not
acknowledge until the global checkpoint is at least the highest seqno of
all skipped ops (i.e., they all have been processed on every replica).

Relates #31751
Relates #31113
kcm pushed a commit that referenced this pull request Oct 30, 2018
This issue was resolved by #34288.

Closes #33337
Relates #34288
kcm pushed a commit that referenced this pull request Oct 30, 2018
Since #34288, we might hit deadlock if the FollowTask has more fetchers
than writers. This can happen in the following scenario:

Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has
two fetchers and one writer.

1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0,
num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1
respectively.

2. The second request which fetches seq#1 completes before, and then it
triggers a write request containing only seq#1.

3. The primary of a follower fails after it has replicated seq#1 to
replicas.

4. Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).

5. The new primary has seq#1 already; thus it won't replicate seq#1 to
replicas but will wait for the global checkpoint to advance at least
seq#1.

The problem is that the FollowTask has only one writer and that writer
is waiting for seq#0 which won't be delivered until the writer completed.

This PR proposes to replicate existing operations with the old primary
term (instead of the current term) on the follower. In particular, when
the following primary detects that it has processed an process already,
it will look up the term of an existing operation with the same seq_no
in the Lucene index, then rewrite that operation with the old term
before replicating it to the following replicas. This approach is
wait-free but requires soft-deletes on the follower.

Relates #34288
kcm pushed a commit that referenced this pull request Oct 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/CCR Issues around the Cross Cluster State Replication features >non-issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants