Skip to content

Commit

Permalink
Do not interrupt timer threads when cancelling timers.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 11, 2015
1 parent 66a0d97 commit fd25461
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 14 deletions.
Expand Up @@ -345,7 +345,6 @@ protected void applyEntry(long index) {
@Override @Override
public CompletableFuture<PollResponse> poll(PollRequest request) { public CompletableFuture<PollResponse> poll(PollRequest request) {
context.checkThread(); context.checkThread();
logger().debug("{} - Received {}", context.getLocalMember(), request);
return CompletableFuture.completedFuture(logResponse(handlePoll(logRequest(request)))); return CompletableFuture.completedFuture(logResponse(handlePoll(logRequest(request))));
} }


Expand Down
Expand Up @@ -88,7 +88,7 @@ private void resetTimer() {


// Cancel the current timer task and purge the election timer of cancelled tasks. // Cancel the current timer task and purge the election timer of cancelled tasks.
if (currentTimer != null) { if (currentTimer != null) {
currentTimer.cancel(true); currentTimer.cancel(false);
} }


// When the election timer is reset, increment the current term and // When the election timer is reset, increment the current term and
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/java/net/kuujo/copycat/internal/FollowerState.java
Expand Up @@ -61,20 +61,20 @@ public CompletableFuture<Void> open() {
*/ */
private void startTimer() { private void startTimer() {
LOGGER.debug("{} - Starting heartbeat timer", context.getLocalMember()); LOGGER.debug("{} - Starting heartbeat timer", context.getLocalMember());
resetTimer(); resetHeartbeatTimer();
} }


/** /**
* Resets the heartbeat timer. * Resets the heartbeat timer.
*/ */
private void resetTimer() { private void resetHeartbeatTimer() {
context.checkThread(); context.checkThread();
if (isClosed()) return; if (isClosed()) return;


// If a timer is already set, cancel the timer. // If a timer is already set, cancel the timer.
if (currentTimer != null) { if (currentTimer != null) {
LOGGER.debug("{} - Reset heartbeat timeout", context.getLocalMember()); LOGGER.debug("{} - Reset heartbeat timeout", context.getLocalMember());
currentTimer.cancel(true); currentTimer.cancel(false);
} }


// Reset the last voted for candidate. // Reset the last voted for candidate.
Expand All @@ -92,20 +92,20 @@ private void resetTimer() {
transition(CopycatState.CANDIDATE); transition(CopycatState.CANDIDATE);
} else { } else {
// If the node voted for a candidate then reset the election timer. // If the node voted for a candidate then reset the election timer.
resetTimer(); resetHeartbeatTimer();
} }
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
} }


@Override @Override
public CompletableFuture<PingResponse> ping(PingRequest request) { public CompletableFuture<PingResponse> ping(PingRequest request) {
resetTimer(); resetHeartbeatTimer();
return super.ping(request); return super.ping(request);
} }


@Override @Override
public CompletableFuture<AppendResponse> append(AppendRequest request) { public CompletableFuture<AppendResponse> append(AppendRequest request) {
resetTimer(); resetHeartbeatTimer();
return super.append(request); return super.append(request);
} }


Expand All @@ -115,7 +115,7 @@ public CompletableFuture<AppendResponse> append(AppendRequest request) {
private void cancelTimer() { private void cancelTimer() {
if (currentTimer != null) { if (currentTimer != null) {
LOGGER.debug("{} - Cancelling heartbeat timer", context.getLocalMember()); LOGGER.debug("{} - Cancelling heartbeat timer", context.getLocalMember());
currentTimer.cancel(true); currentTimer.cancel(false);
} }
} }


Expand Down
Expand Up @@ -251,7 +251,7 @@ public CompletableFuture<CommitResponse> commit(final CommitRequest request) {
private void cancelPingTimer() { private void cancelPingTimer() {
if (currentTimer != null) { if (currentTimer != null) {
LOGGER.debug("{} - Cancelling ping timer", context.getLocalMember()); LOGGER.debug("{} - Cancelling ping timer", context.getLocalMember());
currentTimer.cancel(true); currentTimer.cancel(false);
} }
} }


Expand Down
13 changes: 9 additions & 4 deletions core/src/main/java/net/kuujo/copycat/internal/PassiveState.java
Expand Up @@ -65,17 +65,17 @@ public CompletableFuture<Void> open() {
*/ */
private void startSyncTimer() { private void startSyncTimer() {
LOGGER.debug("{} - Setting sync timer", context.getLocalMember()); LOGGER.debug("{} - Setting sync timer", context.getLocalMember());
setSyncTimer(); resetSyncTimer();
} }


/** /**
* Sets the sync timer. * Sets the sync timer.
*/ */
private void setSyncTimer() { private void resetSyncTimer() {
currentTimer = context.executor().schedule(() -> { currentTimer = context.executor().schedule(() -> {
if (isOpen()) { if (isOpen()) {
sync(); sync();
setSyncTimer(); resetSyncTimer();
} }
}, context.getHeartbeatInterval(), TimeUnit.MILLISECONDS); }, context.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
} }
Expand All @@ -84,6 +84,9 @@ private void setSyncTimer() {
* Synchronizes with random nodes via a gossip protocol. * Synchronizes with random nodes via a gossip protocol.
*/ */
private void sync() { private void sync() {
context.checkThread();
if (isClosed()) return;

// Create a list of currently active members. // Create a list of currently active members.
List<ReplicaInfo> activeMembers = new ArrayList<>(context.getMembers().size()); List<ReplicaInfo> activeMembers = new ArrayList<>(context.getMembers().size());
for (ReplicaInfo member : context.getMemberInfo()) { for (ReplicaInfo member : context.getMemberInfo()) {
Expand Down Expand Up @@ -136,6 +139,8 @@ private void sync() {


@Override @Override
public CompletableFuture<SyncResponse> sync(SyncRequest request) { public CompletableFuture<SyncResponse> sync(SyncRequest request) {
context.checkThread();

if (request.term() > context.getTerm()) { if (request.term() > context.getTerm()) {
context.setTerm(request.term()); context.setTerm(request.term());
context.setLeader(request.leader()); context.setLeader(request.leader());
Expand Down Expand Up @@ -176,7 +181,7 @@ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
private void cancelSyncTimer() { private void cancelSyncTimer() {
if (currentTimer != null) { if (currentTimer != null) {
LOGGER.debug("{} - Cancelling sync timer", context.getLocalMember()); LOGGER.debug("{} - Cancelling sync timer", context.getLocalMember());
currentTimer.cancel(true); currentTimer.cancel(false);
} }
} }


Expand Down

0 comments on commit fd25461

Please sign in to comment.