Skip to content

Commit

Permalink
Use cancel instead of timeout for aborting publications (#37670)
Browse files Browse the repository at this point in the history
When publications were cancelled because a node turned to follower or candidate, it would still
show as time out, which can be confusing in the logs. This change adapts the improper call of
onTimeout by generalizing it to a cancel method.
  • Loading branch information
ywelsch committed Jan 22, 2019
1 parent ef2f5e4 commit 2a7b7cc
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ void becomeCandidate(String method) {

if (mode != Mode.CANDIDATE) {
mode = Mode.CANDIDATE;
cancelActivePublication();
cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

Expand Down Expand Up @@ -518,7 +518,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
cancelActivePublication();
cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode);

if (restartLeaderChecker) {
Expand Down Expand Up @@ -902,7 +902,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
@Override
public void run() {
synchronized (mutex) {
publication.onTimeout();
publication.cancel("timed out after " + publishTimeout);
}
}

Expand Down Expand Up @@ -958,10 +958,10 @@ public void onFailure(Exception e) {
};
}

private void cancelActivePublication() {
private void cancelActivePublication(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) {
currentPublication.get().onTimeout();
currentPublication.get().cancel(reason);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class Publication {

private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
private boolean isCompleted; // set when publication is completed
private boolean timedOut; // set when publication timed out
private boolean cancelled; // set when publication is cancelled

public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
this.publishRequest = publishRequest;
Expand All @@ -71,17 +71,17 @@ public void start(Set<DiscoveryNode> faultyNodes) {
publicationTargets.forEach(PublicationTarget::sendPublishRequest);
}

public void onTimeout() {
public void cancel(String reason) {
if (isCompleted) {
return;
}

assert timedOut == false;
timedOut = true;
assert cancelled == false;
cancelled = true;
if (applyCommitRequest.isPresent() == false) {
logger.debug("onTimeout: [{}] timed out before committing", this);
logger.debug("cancel: [{}] cancelled before committing (reason: {})", this, reason);
// fail all current publications
final Exception e = new ElasticsearchException("publication timed out before committing");
final Exception e = new ElasticsearchException("publication cancelled before committing: " + reason);
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
}
onPossibleCompletion();
Expand All @@ -101,7 +101,7 @@ private void onPossibleCompletion() {
return;
}

if (timedOut == false) {
if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return;
Expand All @@ -125,8 +125,8 @@ private void onPossibleCompletion() {
}

// For assertions only: verify that this invariant holds
private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() {
if (timedOut == false) {
private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) {
return isCompleted == false;
Expand Down Expand Up @@ -222,7 +222,7 @@ void sendPublishRequest() {
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
// TODO Can this ^ fail with an exception? Target should be failed if so.
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

void handlePublishResponse(PublishResponse publishResponse) {
Expand All @@ -245,7 +245,7 @@ void sendApplyCommit() {
state = PublicationTargetState.SENT_APPLY_COMMIT;
assert applyCommitRequest.isPresent();
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

void setAppliedCommit() {
Expand Down Expand Up @@ -300,7 +300,7 @@ private class PublishResponseHandler implements ActionListener<PublishWithJoinRe
public void onResponse(PublishWithJoinResponse response) {
if (isFailed()) {
logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode);
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
return;
}

Expand All @@ -319,7 +319,7 @@ public void onResponse(PublishWithJoinResponse response) {
state = PublicationTargetState.WAITING_FOR_QUORUM;
handlePublishResponse(response.getPublishResponse());

assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

@Override
Expand All @@ -330,7 +330,7 @@ public void onFailure(Exception e) {
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCommitFailure();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

}
Expand All @@ -346,7 +346,7 @@ public void onResponse(TransportResponse.Empty ignored) {
}
setAppliedCommit();
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

@Override
Expand All @@ -357,7 +357,7 @@ public void onFailure(Exception e) {
assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause());
onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut();
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2)) {
if (timeOut) {
publication.onTimeout();
publication.cancel("timed out");
} else {
e.getValue().onFailure(new TransportException(new Exception("dummy failure")));
}
Expand Down Expand Up @@ -407,7 +407,7 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx
}
});

publication.onTimeout();
publication.cancel("timed out");
assertTrue(publication.completed);
assertTrue(publication.committed);
assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS));
Expand Down

0 comments on commit 2a7b7cc

Please sign in to comment.