Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
ConnectionListener#onDisconnected_is_called_twice_for_every_disconnect,
Browse files Browse the repository at this point in the history
Fixes #164 & Fixes #219

- moved exception handler
- Add tests
- Don't reset HeartbeatHandler timeout on message send
  • Loading branch information
jkolobok authored and nicktindall committed Oct 28, 2022
1 parent e7f9dbe commit d924be6
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 143 deletions.
Expand Up @@ -18,15 +18,13 @@

package net.openhft.chronicle.network.cluster;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.network.VanillaNetworkContext;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VanillaClusteredNetworkContext<T extends VanillaClusteredNetworkContext<T, C>, C extends ClusterContext<C, T>>
extends VanillaNetworkContext<T> implements ClusteredNetworkContext<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(VanillaClusteredNetworkContext.class);

@NotNull
private final EventLoop eventLoop;
Expand Down Expand Up @@ -57,7 +55,7 @@ public C clusterContext() {
}

private boolean logMissedHeartbeat() {
LOGGER.warn("Missed heartbeat on network context " + socketChannel());
Jvm.warn().on(VanillaClusteredNetworkContext.class, "Missed heartbeat on network context " + socketChannel());
return false;
}
}
Expand Up @@ -46,9 +46,10 @@ public final class HeartbeatHandler<T extends ClusteredNetworkContext<T>> extend
private final long heartbeatIntervalMs;
private final long heartbeatTimeoutMs;
private final AtomicBoolean hasHeartbeats = new AtomicBoolean();
private final AtomicBoolean closed;
private volatile long lastTimeMessageReceived;
@Nullable
private ConnectionListener connectionMonitor;
private ConnectionListener connectionListener;
@Nullable
private Timer timer;

Expand All @@ -60,6 +61,7 @@ public HeartbeatHandler(@NotNull WireIn w) {
}

private HeartbeatHandler(long heartbeatTimeoutMs, long heartbeatIntervalMs) {
closed = new AtomicBoolean(false);
this.heartbeatTimeoutMs = heartbeatTimeoutMs;
this.heartbeatIntervalMs = heartbeatIntervalMs;
validateHeartbeatParameters(this.heartbeatTimeoutMs, this.heartbeatIntervalMs);
Expand Down Expand Up @@ -100,7 +102,7 @@ public void onInitialize(@NotNull WireOut outWire) {

@NotNull final WriteMarshallable heartbeatMessage = new HeartbeatMessage();

connectionMonitor = nc().acquireConnectionListener();
connectionListener = nc().acquireConnectionListener();
timer = new Timer(nc().eventLoop());
startPeriodicHeartbeatCheck();
startPeriodicallySendingHeartbeats(heartbeatMessage);
Expand Down Expand Up @@ -134,12 +136,19 @@ public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {

@Override
public void close() {
if (connectionMonitor != null)
connectionMonitor.onDisconnected(localIdentifier(), remoteIdentifier(), nc().isAcceptor());
lastTimeMessageReceived = Long.MAX_VALUE;
Closeable closable = closable();
if (closable != null && !closable.isClosed()) {
Closeable.closeQuietly(closable);
if (closed.compareAndSet(false, true)) {
if (connectionListener != null) {
try {
connectionListener.onDisconnected(localIdentifier(), remoteIdentifier(), nc().isAcceptor());
} catch (Exception e) {
Jvm.error().on(getClass(), "Exception thrown by ConnectionListener#onDisconnected", e);
}
}
lastTimeMessageReceived = Long.MAX_VALUE;
Closeable closable = closable();
if (closable != null && !closable.isClosed()) {
Closeable.closeQuietly(closable);
}
}
}

Expand Down Expand Up @@ -224,9 +233,6 @@ public boolean action() throws InvalidEventHandlerException {

if (hasHeartbeats != prev) {
if (!hasHeartbeats) {
connectionMonitor.onDisconnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());

final Runnable socketReconnector = HeartbeatHandler.this.nc().socketReconnector();
if (socketReconnector == null)
Jvm.warn().on(getClass(), "socketReconnector == null");
Expand All @@ -237,8 +243,12 @@ public boolean action() throws InvalidEventHandlerException {

throw newClosedInvalidEventHandlerException();
} else
connectionMonitor.onConnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());
try {
connectionListener.onConnected(HeartbeatHandler.this.localIdentifier(),
HeartbeatHandler.this.remoteIdentifier(), HeartbeatHandler.this.nc().isAcceptor());
} catch (RuntimeException e) {
Jvm.error().on(HeartbeatCheckHandler.class, "Exception thrown by ConnectionListener#onConnected", e);
}
}

return true;
Expand Down Expand Up @@ -291,4 +301,4 @@ InvalidEventHandlerException newClosedInvalidEventHandlerException() {
return new InvalidEventHandlerException("closed");
}

}
}
Expand Up @@ -22,7 +22,6 @@
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIllegalStateException;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.network.ConnectionListener;
import net.openhft.chronicle.network.api.session.SubHandler;
import net.openhft.chronicle.network.api.session.WritableSubHandler;
import net.openhft.chronicle.network.cluster.ClusteredNetworkContext;
Expand Down Expand Up @@ -149,17 +148,6 @@ protected void performClose() {
if (connectionChangedNotifier != null) {
eventEmitterToken = connectionChangedNotifier.onConnectionChanged(false, nc, eventEmitterToken);
}

try {
if (nc != null) {
final ConnectionListener listener = nc.acquireConnectionListener();
if (listener != null)
listener.onDisconnected(localIdentifier, remoteIdentifier(), nc.isAcceptor());
}
} catch (Exception e) {
Jvm.error().on(getClass(), "close:", e);
throw Jvm.rethrow(e);
}
Closeable.closeQuietly(writers);
writers.clear();
super.performClose();
Expand Down Expand Up @@ -187,7 +175,7 @@ protected void onRead(@NotNull final DocumentContext dc, @NotNull final WireOut
}
}

onMessageReceivedOrWritten();
onMessageReceived();

final Wire inWire = dc.wire();
if (dc.isMetaData()) {
Expand Down Expand Up @@ -250,7 +238,6 @@ public void performIdleWork() {

@Override
protected void onBytesWritten() {
onMessageReceivedOrWritten();
}

/**
Expand Down Expand Up @@ -286,7 +273,7 @@ protected void onWrite(@NotNull final WireOut outWire) {
}
}

private void onMessageReceivedOrWritten() {
private void onMessageReceived() {
final HeartbeatEventHandler heartbeatEventHandler = heartbeatEventHandler();
if (heartbeatEventHandler != null)
heartbeatEventHandler.onMessageReceived();
Expand All @@ -299,4 +286,4 @@ public String toString() {
", localIdentifier=" + localIdentifier +
'}';
}
}
}
@@ -0,0 +1,214 @@
package net.openhft.chronicle.network;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.network.cluster.HostDetails;
import net.openhft.chronicle.network.test.TestClusterContext;
import net.openhft.chronicle.testframework.Waiters;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static net.openhft.chronicle.network.TCPRegistry.createServerSocketChannelFor;
import static net.openhft.chronicle.network.test.TestClusterContext.forHosts;
import static org.junit.jupiter.api.Assertions.assertEquals;

class ConnectionListenerTest extends NetworkTestCommon {

private HostDetails initiatorHost;
private HostDetails acceptorHost;
private CountingConnectionListener initiatorCounter;
private CountingConnectionListener acceptorCounter;

@BeforeEach
void setUp() throws IOException {
createServerSocketChannelFor("initiator", "acceptor");
initiatorHost = new HostDetails().hostId(2).connectUri("initiator");
acceptorHost = new HostDetails().hostId(1).connectUri("acceptor");
acceptorCounter = new CountingConnectionListener();
initiatorCounter = new CountingConnectionListener();
}

@Test
void onConnectAndOnDisconnectAreCalledOnce_OnOrderlyConnectionAndDisconnection() {
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {

acceptorCtx.addConnectionListener(acceptorCounter);
initiatorCtx.addConnectionListener(initiatorCounter);

acceptorCtx.cluster().start(acceptorHost.hostId());
initiatorCtx.cluster().start(initiatorHost.hostId());

Waiters.waitForCondition("acceptor and initiator to connect",
() -> acceptorCounter.onConnectedCalls > 0 && initiatorCounter.onConnectedCalls > 0,
5_000);
}
assertEquals(1, acceptorCounter.onConnectedCalls);
assertEquals(1, acceptorCounter.onDisconnectedCalls);
assertEquals(1, initiatorCounter.onConnectedCalls);
assertEquals(1, initiatorCounter.onDisconnectedCalls);
}

@Test
void onConnectAndOnDisconnectAreCalledOnce_WhenConnectionTimesOut_InUberHandler() {
expectException("missed heartbeat, lastTimeMessageReceived=");
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {
initiatorCtx.overrideNetworkContextTimeout(5_000); // we want the heartbeat handler to timeout
initiatorCtx.disableReconnect();
acceptorCtx.overrideNetworkContextTimeout(5_000); // we want the heartbeat handler to timeout
acceptorCtx.disableReconnect();

initiatorCtx.heartbeatTimeoutMs(1_000); // set to minimum, initiator dictates

acceptorCtx.addConnectionListener(acceptorCounter);
initiatorCtx.addConnectionListener(initiatorCounter);

acceptorCtx.cluster().start(acceptorHost.hostId());
initiatorCtx.cluster().start(initiatorHost.hostId());

Waiters.waitForCondition("acceptor and initiator to connect",
() -> acceptorCounter.onConnectedCalls > 0 && initiatorCounter.onConnectedCalls > 0,
5_000);
// jam up the acceptor event loop to trigger an initiator timeout
acceptorCtx.cluster().clusterContext().eventLoop().addHandler(() -> {
Jvm.pause(3_000);
throw InvalidEventHandlerException.reusable();
});
Waiters.waitForCondition("initiator to timeout",
() -> initiatorCounter.onDisconnectedCalls > 0, 3_000);
}
assertEquals(1, acceptorCounter.onConnectedCalls);
assertEquals(1, acceptorCounter.onDisconnectedCalls);
assertEquals(1, initiatorCounter.onConnectedCalls);
assertEquals(1, initiatorCounter.onDisconnectedCalls);
}

@Test
void onConnectAndOnDisconnectAreCalledOnce_WhenConnectionTimesOut_InTcpHandler() {
expectException("Missed heartbeat on network context");
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {
initiatorCtx.overrideNetworkContextTimeout(1_000); // we want the TcpEventHandler to timeout
initiatorCtx.disableReconnect();
acceptorCtx.overrideNetworkContextTimeout(1_000); // we want the TcpEventHandler to timeout
acceptorCtx.disableReconnect();

initiatorCtx.heartbeatTimeoutMs(5_000);

acceptorCtx.addConnectionListener(acceptorCounter);
initiatorCtx.addConnectionListener(initiatorCounter);

acceptorCtx.cluster().start(acceptorHost.hostId());
initiatorCtx.cluster().start(initiatorHost.hostId());

Waiters.waitForCondition("acceptor and initiator to connect",
() -> acceptorCounter.onConnectedCalls > 0 && initiatorCounter.onConnectedCalls > 0,
5_000);

// jam up the acceptor event loop to trigger an initiator timeout
acceptorCtx.cluster().clusterContext().eventLoop().addHandler(() -> {
Jvm.pause(3_000);
throw InvalidEventHandlerException.reusable();
});
Waiters.waitForCondition("initiator to timeout",
() -> initiatorCounter.onDisconnectedCalls == 1, 3_000);
}
assertEquals(1, acceptorCounter.onConnectedCalls);
assertEquals(1, acceptorCounter.onDisconnectedCalls);
assertEquals(1, initiatorCounter.onConnectedCalls);
assertEquals(1, initiatorCounter.onDisconnectedCalls);
}

@Test
void onConnectAndOnDisconnectAreNotCalled_WhenNoConnectionIsEstablished_Initiator() {
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {

acceptorCtx.addConnectionListener(acceptorCounter);
initiatorCtx.addConnectionListener(initiatorCounter);

// only start initiator
initiatorCtx.cluster().start(initiatorHost.hostId());
Jvm.pause(1_000);
}
assertEquals(0, acceptorCounter.onConnectedCalls);
assertEquals(0, acceptorCounter.onDisconnectedCalls);
assertEquals(0, initiatorCounter.onConnectedCalls);
assertEquals(0, initiatorCounter.onDisconnectedCalls);
}

@Test
void onConnectAndOnDisconnectAreNotCalled_WhenNoConnectionIsEstablished_Acceptor() {
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {

acceptorCtx.addConnectionListener(acceptorCounter);
initiatorCtx.addConnectionListener(initiatorCounter);

// only start acceptor
acceptorCtx.cluster().start(initiatorHost.hostId());
Jvm.pause(1_000);
}
assertEquals(0, acceptorCounter.onConnectedCalls);
assertEquals(0, acceptorCounter.onDisconnectedCalls);
assertEquals(0, initiatorCounter.onConnectedCalls);
assertEquals(0, initiatorCounter.onDisconnectedCalls);
}

@Test
void onConnectAndOnDisconnect_WillLogWhenAnExceptionIsThrown() {
expectException("Something went wrong - onConnect");
expectException("Something went wrong - onDisconnect");
try (TestClusterContext acceptorCtx = forHosts(acceptorHost, initiatorHost);
TestClusterContext initiatorCtx = forHosts(initiatorHost, acceptorHost)) {

acceptorCtx.addConnectionListener(new ThrowingConnectionListener());
initiatorCtx.addConnectionListener(initiatorCounter);

acceptorCtx.cluster().start(acceptorHost.hostId());
initiatorCtx.cluster().start(initiatorHost.hostId());

Waiters.waitForCondition("acceptor and initiator to connect",
() -> initiatorCounter.onConnectedCalls > 0,
5_000);

// this shouldn't trigger a disconnect
Jvm.pause(1_000);
assertEquals(0, initiatorCounter.onDisconnectedCalls);
}
assertEquals(1, initiatorCounter.onConnectedCalls);
assertEquals(1, initiatorCounter.onDisconnectedCalls);
}

private static class ThrowingConnectionListener implements ConnectionListener {

@Override
public void onConnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
throw new RuntimeException("Something went wrong - onConnect");
}

@Override
public void onDisconnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
throw new RuntimeException("Something went wrong - onDisconnect");
}
}

private static class CountingConnectionListener implements ConnectionListener {

private int onConnectedCalls = 0;
private int onDisconnectedCalls = 0;

@Override
public void onConnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
onConnectedCalls++;
}

@Override
public void onDisconnected(int localIdentifier, int remoteIdentifier, boolean isAcceptor) {
onDisconnectedCalls++;
}
}
}

0 comments on commit d924be6

Please sign in to comment.