Skip to content

Commit

Permalink
Improve heartbeat implementation structure and its logging, being abl…
Browse files Browse the repository at this point in the history
…e to see when deadlines move
  • Loading branch information
CodeDrivenMitch committed Jul 8, 2022
1 parent 3c180cd commit 7020b64
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 116 deletions.
162 changes: 110 additions & 52 deletions src/main/java/io/axoniq/axonserver/connector/impl/HeartbeatMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class HeartbeatMonitor {

private final ScheduledExecutorService executor;
private final HeartbeatSender sender;
private final Runnable onHeartbeatMissed;
private final Runnable onConnectionCorrupted;

private final AtomicLong nextHeartbeatDeadline = new AtomicLong();
private final AtomicLong nextHeartbeat = new AtomicLong();
Expand All @@ -56,18 +56,18 @@ public class HeartbeatMonitor {
/**
* Constructs a {@link HeartbeatMonitor}.
*
* @param executor the {@link ScheduledExecutorService} used to schedule operations to validate if a
* heartbeat should be send with the given {@code heartbeatSender}
* @param heartbeatSender the {@link HeartbeatSender} used to send heartbeats with
* @param onHeartbeatMissed operation to perform if a heartbeat has been missed. Can be used to force a reconnect of
* a channel for example
* @param executor the {@link ScheduledExecutorService} used to schedule operations to validate if a
* heartbeat should be send with the given {@code heartbeatSender}
* @param heartbeatSender the {@link HeartbeatSender} used to send heartbeats with
* @param onConnectionCorrupted operation to perform if a heartbeat has been missed. Can be used to force a
* reconnect of a channel for example
*/
public HeartbeatMonitor(ScheduledExecutorService executor,
HeartbeatSender heartbeatSender,
Runnable onHeartbeatMissed) {
Runnable onConnectionCorrupted) {
this.executor = executor;
this.sender = heartbeatSender;
this.onHeartbeatMissed = onHeartbeatMissed;
this.onConnectionCorrupted = onConnectionCorrupted;
}

/**
Expand Down Expand Up @@ -102,12 +102,13 @@ public void disableHeartbeat() {

private void checkAndReschedule(int task) {
if (task != taskId.get()) {
// heartbeats should not be considered valid when a change was made
return;
}
long delay = Math.min(interval.get(), 1000);
try {
// heartbeats should not be considered valid when a change was made
checkBeat();
checkBeatDeadline();
sendBeatIfTimeElapsed();
logger.debug("Heartbeat status checked. Scheduling next heartbeat verification in {}ms", delay);
} catch (Exception e) {
logger.warn("Was unable to send heartbeat due to exception", e);
Expand Down Expand Up @@ -138,66 +139,123 @@ public void resume() {
}
}

private void checkBeat() {
/**
* Send a heartbeat if the next time a heartbeat should be sent has passed.
*/
private void sendBeatIfTimeElapsed() {
if (shouldSendBeat()) {
if (logger.isDebugEnabled()) {
logger.debug("Sending heartbeat due to elapsed next beat interval.");
}
sender.sendHeartbeat().whenComplete((r, e) -> handleHeartbeatCallResult(e));

Instant newNextHeartbeatTime = extendNextHeartbeatTime();
if (logger.isDebugEnabled()) {
logger.debug("Next heartbeat has been planned for {} due to sent heartbeat", newNextHeartbeatTime);
}
}
}

/**
* Checks whether the deadline has elapsed, marking the connection as dead and calling the provided
* {@link Runnable}.
*/
private void checkBeatDeadline() {
long now = clock.millis();
long nextDeadline = nextHeartbeatDeadline.get();
if (nextDeadline <= now) {
logger.info("Did not receive heartbeat acknowledgement within {}ms", this.timeout.get());
onHeartbeatMissed.run();
nextHeartbeatDeadline.compareAndSet(nextDeadline, now + this.interval.get());
onConnectionCorrupted.run();
extendHeartbeatDeadline();
}
if (planNextBeat(now, this.interval.get())) {
long currentInterval = this.interval.get();
long beatTimeout = this.timeout.get();
sender.sendHeartbeat().whenComplete((r, e) -> {

boolean success = e == null
|| (e instanceof AxonServerException && ((AxonServerException) e).getErrorCategory() == ErrorCategory.UNSUPPORTED_INSTRUCTION);
// if AxonServer indicates it doesn't know this instruction, we have at least reached it.
// We can assume the connection is alive
if (success) {
if (currentInterval != Long.MAX_VALUE) {
long newDeadline = nextHeartbeatDeadline.updateAndGet(
currentDeadline -> Math.max(now + beatTimeout + currentInterval, currentDeadline)
);
if (logger.isDebugEnabled()) {
logger.debug("Heartbeat Acknowledgement received. Extending deadline to {}",
Instant.ofEpochMilli(newDeadline));
}
} else if (logger.isDebugEnabled()) {
logger.debug("Heartbeat Acknowledgment received.");
}
}
});
}

/**
* Handles the result of the heartbeat call the client does with the server. In case of success, we extend the
* deadline. In case of error, we mark the connection as corrupted.
*
* @param e Optional error result of the heartbeat call initiated by this client.
*/
private void handleHeartbeatCallResult(Throwable e) {
boolean success = e == null || isUnsupportedInstructionError(e);
if (!success) {
logger.debug("Heartbeat call resulted in an error.");
onConnectionCorrupted.run();
return;
}
boolean heartbeatsDisabled = interval.get() == Long.MAX_VALUE;
if (heartbeatsDisabled) {
if (logger.isDebugEnabled()) {
logger.debug("Heartbeat Acknowledgment received but heartbeats were disabled.");
}
return;
}

Instant extendedDeadline = extendHeartbeatDeadline();
if (logger.isDebugEnabled()) {
logger.debug("Heartbeat call succeeded and extended deadline to {}", extendedDeadline);
}
}

/**
* Calculates the time for the next heartbeat based on the most recent heartbeat time and the interval, indicating
* whether the time has been reached to send a heartbeat message.
* Checks whether the given error is an Unsupported instruction error. If we get this while sending a heartbeat
* call, we are talking with an older version of the server. However, reaching it in the first place means the
* connection is alive, so we can count this as a valid heartbeat.
*/
private boolean isUnsupportedInstructionError(Throwable e) {
return e instanceof AxonServerException
&& ((AxonServerException) e).getErrorCategory() == ErrorCategory.UNSUPPORTED_INSTRUCTION;
}

/**
* Whether a beat should be sent, based on the current time and the next scheduled heartbeat.
*/
private boolean shouldSendBeat() {
long now = clock.millis();
return nextHeartbeat.get() <= now;
}

/**
* Set the next heartbeat send time to the configured interval in the future.
*
* @return The new time the next heartbeat will be sent
*/
private Instant extendNextHeartbeatTime() {
long now = clock.millis();
long newNextHeartbeat = nextHeartbeat.updateAndGet(current -> now + interval.get());
return Instant.ofEpochMilli(newNextHeartbeat);
}

/**
* Sets the heartbeat deadline to a new value, based on the current time and provided configuration.
*
* @param currentTime the current timestamp
* @param interval the interval at which heartbeats occur
* @return whether or not the time for a new heartbeat has elapsed
* @return The new heartbeat deadline, after which we can assume the connection is dead.
*/
private boolean planNextBeat(long currentTime, long interval) {
return nextHeartbeat.getAndAccumulate(
interval, (next, currentInterval) -> next <= currentTime ? currentTime + currentInterval : next
) <= currentTime;
private Instant extendHeartbeatDeadline() {
long now = clock.millis();
long newDeadline = nextHeartbeatDeadline.updateAndGet(
currentDeadline -> Math.max(now + timeout.get() + interval.get(), currentDeadline)
);
return Instant.ofEpochMilli(newDeadline);
}

/**
* Handler of {@link PlatformInboundInstruction} requesting a heartbeat from this connector. The given {@link
* ReplyChannel} is used to send responding heartbeat message with.
* Handler of {@link PlatformInboundInstruction} requesting a heartbeat from this connector. The given
* {@link ReplyChannel} is used to send responding heartbeat message with.
*
* @param reply the {@link ReplyChannel} to send a heartbeat reply message over
*/
public void handleIncomingBeat(ReplyChannel<PlatformInboundInstruction> reply) {
long now = clock.millis();
long currentInterval = this.interval.get();
// receiving a heartbeat from Server is equivalent to receiving an acknowledgement
planNextBeat(now, currentInterval);
nextHeartbeatDeadline.updateAndGet(current -> Math.max(now + currentInterval, current));
Instant newHeartbeatTime = extendNextHeartbeatTime();
Instant extendedDeadline = extendHeartbeatDeadline();

if (logger.isDebugEnabled()) {
logger.debug(
"Received heartbeat call from server, extending deadline to {} and planned next heartbeat for {}",
extendedDeadline,
newHeartbeatTime);
}

try {
reply.send(HEARTBEAT_MESSAGE);
} finally {
Expand Down

0 comments on commit 7020b64

Please sign in to comment.