Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
ConnectionListener#onDisconnected_is_called_twice_for_every_disconnect,
Browse files Browse the repository at this point in the history
Fixes #164 & Fixes #219

- moved exception handler
- Add tests
- Don't reset HeartbeatHandler timeout on message send
- Tolerate null ConnectionListener
  • Loading branch information
jkolobok authored and nicktindall committed Oct 28, 2022
1 parent e7f9dbe commit f80e3a0
Show file tree
Hide file tree
Showing 8 changed files with 508 additions and 143 deletions.
Expand Up @@ -18,15 +18,13 @@

package net.openhft.chronicle.network.cluster;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.network.VanillaNetworkContext;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VanillaClusteredNetworkContext<T extends VanillaClusteredNetworkContext<T, C>, C extends ClusterContext<C, T>>
extends VanillaNetworkContext<T> implements ClusteredNetworkContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(VanillaClusteredNetworkContext.class);

@NotNull
private final EventLoop eventLoop;
Expand Down Expand Up @@ -57,7 +55,7 @@ public C clusterContext() {
}

private boolean logMissedHeartbeat() {
LOGGER.warn("Missed heartbeat on network context " + socketChannel());
Jvm.warn().on(VanillaClusteredNetworkContext.class, "Missed heartbeat on network context " + socketChannel());
return false;
}
}
Expand Up @@ -46,9 +46,10 @@ public final class HeartbeatHandler<T extends ClusteredNetworkContext<T>> extend
private final long heartbeatIntervalMs;
private final long heartbeatTimeoutMs;
private final AtomicBoolean hasHeartbeats = new AtomicBoolean();
private final AtomicBoolean closed;
private volatile long lastTimeMessageReceived;
@Nullable
private ConnectionListener connectionMonitor;
private ConnectionListener connectionListener;
@Nullable
private Timer timer;

Expand All @@ -60,6 +61,7 @@ public HeartbeatHandler(@NotNull WireIn w) {
}

private HeartbeatHandler(long heartbeatTimeoutMs, long heartbeatIntervalMs) {
closed = new AtomicBoolean(false);
this.heartbeatTimeoutMs = heartbeatTimeoutMs;
this.heartbeatIntervalMs = heartbeatIntervalMs;
validateHeartbeatParameters(this.heartbeatTimeoutMs, this.heartbeatIntervalMs);
Expand Down Expand Up @@ -100,7 +102,7 @@ public void onInitialize(@NotNull WireOut outWire) {

@NotNull final WriteMarshallable heartbeatMessage = new HeartbeatMessage();

connectionMonitor = nc().acquireConnectionListener();
connectionListener = nc().acquireConnectionListener();
timer = new Timer(nc().eventLoop());
startPeriodicHeartbeatCheck();
startPeriodicallySendingHeartbeats(heartbeatMessage);
Expand Down Expand Up @@ -134,12 +136,19 @@ public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {

@Override
public void close() {
if (connectionMonitor != null)
connectionMonitor.onDisconnected(localIdentifier(), remoteIdentifier(), nc().isAcceptor());
lastTimeMessageReceived = Long.MAX_VALUE;
Closeable closable = closable();
if (closable != null && !closable.isClosed()) {
Closeable.closeQuietly(closable);
if (closed.compareAndSet(false, true)) {
if (connectionListener != null) {
try {
connectionListener.onDisconnected(localIdentifier(), remoteIdentifier(), nc().isAcceptor());
} catch (Exception e) {
Jvm.error().on(getClass(), "Exception thrown by ConnectionListener#onDisconnected", e);
}
}
lastTimeMessageReceived = Long.MAX_VALUE;
Closeable closable = closable();
if (closable != null && !closable.isClosed()) {
Closeable.closeQuietly(closable);
}
}
}

Expand Down Expand Up @@ -224,9 +233,6 @@ public boolean action() throws InvalidEventHandlerException {

if (hasHeartbeats != prev) {
if (!hasHeartbeats) {
connectionMonitor.onDisconnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());

final Runnable socketReconnector = HeartbeatHandler.this.nc().socketReconnector();
if (socketReconnector == null)
Jvm.warn().on(getClass(), "socketReconnector == null");
Expand All @@ -237,8 +243,14 @@ public boolean action() throws InvalidEventHandlerException {

throw newClosedInvalidEventHandlerException();
} else
connectionMonitor.onConnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());
try {
if (connectionListener != null) {
connectionListener.onConnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());
}
} catch (RuntimeException e) {
Jvm.error().on(HeartbeatCheckHandler.class, "Exception thrown by ConnectionListener#onConnected", e);
}
}

return true;
Expand Down Expand Up @@ -291,4 +303,4 @@ InvalidEventHandlerException newClosedInvalidEventHandlerException() {
return new InvalidEventHandlerException("closed");
}

}
}
Expand Up @@ -22,7 +22,6 @@
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIllegalStateException;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.network.ConnectionListener;
import net.openhft.chronicle.network.api.session.SubHandler;
import net.openhft.chronicle.network.api.session.WritableSubHandler;
import net.openhft.chronicle.network.cluster.ClusteredNetworkContext;
Expand Down Expand Up @@ -149,17 +148,6 @@ protected void performClose() {
if (connectionChangedNotifier != null) {
eventEmitterToken = connectionChangedNotifier.onConnectionChanged(false, nc, eventEmitterToken);
}

try {
if (nc != null) {
final ConnectionListener listener = nc.acquireConnectionListener();
if (listener != null)
listener.onDisconnected(localIdentifier, remoteIdentifier(), nc.isAcceptor());
}
} catch (Exception e) {
Jvm.error().on(getClass(), "close:", e);
throw Jvm.rethrow(e);
}
Closeable.closeQuietly(writers);
writers.clear();
super.performClose();
Expand Down Expand Up @@ -187,7 +175,7 @@ protected void onRead(@NotNull final DocumentContext dc, @NotNull final WireOut
}
}

onMessageReceivedOrWritten();
onMessageReceived();

final Wire inWire = dc.wire();
if (dc.isMetaData()) {
Expand Down Expand Up @@ -250,7 +238,6 @@ public void performIdleWork() {

@Override
protected void onBytesWritten() {
onMessageReceivedOrWritten();
}

/**
Expand Down Expand Up @@ -286,7 +273,7 @@ protected void onWrite(@NotNull final WireOut outWire) {
}
}

private void onMessageReceivedOrWritten() {
private void onMessageReceived() {
final HeartbeatEventHandler heartbeatEventHandler = heartbeatEventHandler();
if (heartbeatEventHandler != null)
heartbeatEventHandler.onMessageReceived();
Expand All @@ -299,4 +286,4 @@ public String toString() {
", localIdentifier=" + localIdentifier +
'}';
}
}
}

0 comments on commit f80e3a0

Please sign in to comment.