From 1bab899dacbf116866604dde51a65e4d478816fb Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Wed, 6 May 2026 21:09:47 +0200 Subject: [PATCH 01/11] propagate connection close to client --- .../exception/ConnectionClosedException.java | 12 +++++ .../rlib/network/impl/AbstractConnection.java | 35 ++++++++++++- .../impl/AbstractNetworkPacketReader.java | 12 +++-- .../impl/AbstractSslNetworkPacketReader.java | 1 + .../rlib/network/ConnectionCloseTest.java | 49 +++++++++++++++++++ 5 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java create mode 100644 rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java diff --git a/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java b/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java new file mode 100644 index 00000000..c5e90114 --- /dev/null +++ b/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java @@ -0,0 +1,12 @@ +package javasabr.rlib.network.exception; + +public class ConnectionClosedException extends NetworkException { + + public ConnectionClosedException(String remoteAddress) { + super("Connection closed: %s".formatted(remoteAddress)); + } + + public ConnectionClosedException(String remoteAddress, Throwable cause) { + super("Connection closed: %s".formatted(remoteAddress), cause); + } +} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index 7e8e04c6..3d4c2ee9 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -16,6 +16,7 @@ import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; import javasabr.rlib.network.UnsafeConnection; +import javasabr.rlib.network.exception.ConnectionClosedException; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.NetworkPacketWriter; import javasabr.rlib.network.packet.ReadableNetworkPacket; @@ -64,6 +65,7 @@ public WritablePacketWithFeedback(CompletableFuture attachment, Writabl final MutableArray>> validPacketSubscribers; final MutableArray>> invalidPacketSubscribers; + final MutableArray> activeSinks; final int maxPacketsByRead; @@ -84,6 +86,7 @@ public AbstractConnection( this.closed = new AtomicBoolean(false); this.validPacketSubscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class); this.invalidPacketSubscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class); + this.activeSinks = ArrayFactory.stampedLockBasedArray(FluxSink.class); this.remoteAddress = String.valueOf(NetworkUtils.getRemoteAddress(channel)); } @@ -134,10 +137,12 @@ protected void registerFluxOnReceivedEvents( validPacketSubscribers.add(validListener); invalidPacketSubscribers.add(invalidListener); + activeSinks.add(sink); sink.onDispose(() -> { validPacketSubscribers.remove(validListener); validPacketSubscribers.remove(invalidListener); + activeSinks.remove(sink); }); network.inNetworkThread(() -> packetReader().startRead()); @@ -146,14 +151,22 @@ protected void registerFluxOnReceivedEvents( protected void registerFluxOnReceivedValidPackets(FluxSink> sink) { BiConsumer> listener = (connection, packet) -> sink.next(packet); validPacketSubscribers.add(listener); - sink.onDispose(() -> validPacketSubscribers.remove(listener)); + activeSinks.add(sink); + sink.onDispose(() -> { + validPacketSubscribers.remove(listener); + activeSinks.remove(sink); + }); network.inNetworkThread(() -> packetReader().startRead()); } protected void registerFluxOnReceivedInvalidPackets(FluxSink> sink) { BiConsumer> listener = (connection, packet) -> sink.next(packet); invalidPacketSubscribers.add(listener); - sink.onDispose(() -> invalidPacketSubscribers.remove(listener)); + activeSinks.add(sink); + sink.onDispose(() -> { + invalidPacketSubscribers.remove(listener); + activeSinks.remove(sink); + }); network.inNetworkThread(() -> packetReader().startRead()); } @@ -184,6 +197,24 @@ protected void doClose() { clearWaitPackets(); packetReader().close(); packetWriter().close(); + notifySinksOnError(); + } + + protected void notifySinksOnError() { + if (activeSinks.isEmpty()) { + return; + } + ConnectionClosedException error = new ConnectionClosedException(remoteAddress); + activeSinks + .iterations() + .forEach(error, (sink, exc) -> { + try { + sink.error(exc); + } catch (RuntimeException e) { + log.error(e.getMessage(), "Failed to notify sink of connection closure: "::formatted); + } + }); + activeSinks.clear(); } /** diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index de7e5b8a..ee683870 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -461,10 +461,14 @@ protected void handleFailedReceiving(Throwable exception, ByteBuffer readingBuff retryReadLater(); } } - case AsynchronousCloseException ex -> - log.info(remoteAddress(), "[%s] Connection was closed"::formatted); - case ClosedChannelException ex -> - log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + case AsynchronousCloseException ex -> { + log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + connection.close(); + } + case ClosedChannelException ex -> { + log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + connection.close(); + } default -> { log.error(exception); connection.close(); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java index 6ab75309..51701e2f 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java @@ -76,6 +76,7 @@ protected AbstractSslNetworkPacketReader( protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) { if (receivedBytes == -1) { doHandshake(sslNetworkBuffer(), -1); + connection.close(); return; } super.handleReceivedData(receivedBytes, readingBuffer); diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java new file mode 100644 index 00000000..42a70f76 --- /dev/null +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -0,0 +1,49 @@ +package javasabr.rlib.network; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javasabr.rlib.network.exception.ConnectionClosedException; +import javasabr.rlib.network.impl.AbstractConnection; +import javasabr.rlib.network.impl.DefaultConnection; +import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; +import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; +import org.junit.jupiter.api.Test; + +public class ConnectionCloseTest extends BaseNetworkTest { + + @Test + void shouldPropagateConnectionCloseToClient() throws InterruptedException { + // given + var packetRegistry = ReadableNetworkPacketRegistry.of( + DefaultReadableNetworkPacket.class, + DefaultConnection.class, + DefaultNetworkTest.ServerPackets.RequestEchoMessage.class, + DefaultNetworkTest.ServerPackets.RequestServerTime.class); + var serverNetwork = NetworkFactory.defaultServerNetwork(packetRegistry); + InetSocketAddress serverAddress = serverNetwork.start(); + serverNetwork.onAccept(AbstractConnection::close); + var clientNetwork = NetworkFactory.defaultClientNetwork(packetRegistry); + CountDownLatch closeLatch = new CountDownLatch(1); + + // when + clientNetwork + .connectReactive(serverAddress) + .flatMapMany(AbstractConnection::receivedEvents) + .doOnError(e -> { + if (e instanceof ConnectionClosedException) { + closeLatch.countDown(); + } + }) + .subscribe(); + + // then + assertThat(closeLatch.await(5000, TimeUnit.MILLISECONDS)) + .as("Client should be notified that connection is closed") + .isTrue(); + clientNetwork.shutdown(); + serverNetwork.shutdown(); + } +} From e4a1c33a1a2816e08d3220b6bc1dfa5f98aeca9c Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Thu, 7 May 2026 14:04:31 +0200 Subject: [PATCH 02/11] replace CopyOnWriteMutableArray with StampedLockBasedArray --- .../rlib/network/impl/AbstractConnection.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index 3d4c2ee9..712614dd 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -4,14 +4,18 @@ import java.nio.channels.AsynchronousChannel; import java.nio.channels.AsynchronousSocketChannel; +import java.util.Collection; import java.util.Deque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.StampedLock; import java.util.function.BiConsumer; +import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.ArrayFactory; +import javasabr.rlib.collections.array.LockableArray; import javasabr.rlib.collections.array.MutableArray; import javasabr.rlib.collections.deque.DequeFactory; +import javasabr.rlib.collections.operation.LockableOperations; import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; @@ -65,7 +69,8 @@ public WritablePacketWithFeedback(CompletableFuture attachment, Writabl final MutableArray>> validPacketSubscribers; final MutableArray>> invalidPacketSubscribers; - final MutableArray> activeSinks; + final LockableArray> activeSinks; + final LockableOperations>> activeSinksOperations; final int maxPacketsByRead; @@ -87,6 +92,7 @@ public AbstractConnection( this.validPacketSubscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class); this.invalidPacketSubscribers = ArrayFactory.copyOnModifyArray(BiConsumer.class); this.activeSinks = ArrayFactory.stampedLockBasedArray(FluxSink.class); + this.activeSinksOperations = activeSinks.operations(); this.remoteAddress = String.valueOf(NetworkUtils.getRemoteAddress(channel)); } @@ -137,12 +143,12 @@ protected void registerFluxOnReceivedEvents( validPacketSubscribers.add(validListener); invalidPacketSubscribers.add(invalidListener); - activeSinks.add(sink); + activeSinksOperations.inWriteLock(sink, Collection::add); sink.onDispose(() -> { validPacketSubscribers.remove(validListener); validPacketSubscribers.remove(invalidListener); - activeSinks.remove(sink); + activeSinksOperations.inWriteLock(sink, Collection::remove); }); network.inNetworkThread(() -> packetReader().startRead()); @@ -151,10 +157,10 @@ protected void registerFluxOnReceivedEvents( protected void registerFluxOnReceivedValidPackets(FluxSink> sink) { BiConsumer> listener = (connection, packet) -> sink.next(packet); validPacketSubscribers.add(listener); - activeSinks.add(sink); + activeSinksOperations.inWriteLock(sink, Collection::add); sink.onDispose(() -> { validPacketSubscribers.remove(listener); - activeSinks.remove(sink); + activeSinksOperations.inWriteLock(sink, Collection::remove); }); network.inNetworkThread(() -> packetReader().startRead()); } @@ -162,10 +168,10 @@ protected void registerFluxOnReceivedValidPackets(FluxSink> sink) { BiConsumer> listener = (connection, packet) -> sink.next(packet); invalidPacketSubscribers.add(listener); - activeSinks.add(sink); + activeSinksOperations.inWriteLock(sink, Collection::add); sink.onDispose(() -> { invalidPacketSubscribers.remove(listener); - activeSinks.remove(sink); + activeSinksOperations.inWriteLock(sink, Collection::remove); }); network.inNetworkThread(() -> packetReader().startRead()); } @@ -197,24 +203,28 @@ protected void doClose() { clearWaitPackets(); packetReader().close(); packetWriter().close(); - notifySinksOnError(); + notifyActiveSinks(); } - protected void notifySinksOnError() { - if (activeSinks.isEmpty()) { + protected void notifyActiveSinks() { + Boolean noActiveSinks = activeSinksOperations.getInReadLock(Array::isEmpty); + if (noActiveSinks) { return; } - ConnectionClosedException error = new ConnectionClosedException(remoteAddress); - activeSinks - .iterations() - .forEach(error, (sink, exc) -> { + notifySinksWithError(new ConnectionClosedException(remoteAddress)); + activeSinksOperations.inWriteLock(Collection::clear); + } + + protected void notifySinksWithError(Throwable error) { + Array> localActiveSinks = activeSinksOperations.getInReadLock(Array::copyOf); + localActiveSinks.iterations().forEach( + error, (sink, exc) -> { try { sink.error(exc); } catch (RuntimeException e) { log.error(e.getMessage(), "Failed to notify sink of connection closure: "::formatted); } }); - activeSinks.clear(); } /** From 6b19a970775b00d1ae40d09287e5ff834eed2c72 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Thu, 7 May 2026 16:41:34 +0200 Subject: [PATCH 03/11] update ConnectionCloseTest --- .../rlib/network/ConnectionCloseTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java index 42a70f76..4b108fc0 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -2,14 +2,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import javasabr.rlib.network.exception.ConnectionClosedException; import javasabr.rlib.network.impl.AbstractConnection; import javasabr.rlib.network.impl.DefaultConnection; import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; +import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; +import javasabr.rlib.network.util.NetworkUtils; +import javax.net.ssl.SSLContext; +import lombok.SneakyThrows; import org.junit.jupiter.api.Test; public class ConnectionCloseTest extends BaseNetworkTest { @@ -46,4 +52,47 @@ void shouldPropagateConnectionCloseToClient() throws InterruptedException { clientNetwork.shutdown(); serverNetwork.shutdown(); } + + @Test + @SneakyThrows + void shouldCloseServerConnectionWhenClientClosesTcpChannelAbruptly() { + // Given: established SSL connection with completed handshake + InputStream keystoreFile = ConnectionCloseTest.class.getResourceAsStream("/ssl/rlib_test_cert.p12"); + SSLContext serverSslContext = NetworkUtils.createSslContext(keystoreFile, "test"); + SSLContext clientSslContext = NetworkUtils.createAllTrustedClientSslContext(); + + try (var testNetwork = buildStringSSLNetwork(serverSslContext, clientSslContext)) { + var serverConnection = testNetwork.serverToClient; + var clientConnection = testNetwork.clientToServer; + + // Register handler to start reading on server side + CountDownLatch dataReceivedLatch = new CountDownLatch(1); + serverConnection.onReceiveValidPacket((conn, packet) -> dataReceivedLatch.countDown()); + + // Send data to complete SSL handshake and deliver a packet + clientConnection.sendInBackground(new StringWritableNetworkPacket<>("handshake")); + + // Wait for the handshake to complete and data to be received + assertThat(dataReceivedLatch.await(5, TimeUnit.SECONDS)) + .as("SSL handshake should complete and data should be received by server") + .isTrue(); + + // When: close client's raw TCP channel without SSL close_notify + clientConnection.channel().close(); + + assertThat(awaitMillis(5000, serverConnection::closed)) + .as("Server connection should be closed after receiving EOF from abruptly closed client channel") + .isTrue(); + } + } + + private static boolean awaitMillis(long millis, Supplier a) throws InterruptedException { + for (int i = 0; i < millis / 100; i++) { + if (a.get()) { + return true; + } + Thread.sleep(100); + } + return false; + } } From eaf92769868bf58e64bc9ebcc90116d20072315b Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 15:16:15 +0200 Subject: [PATCH 04/11] fix closed connection detection --- .../javasabr/rlib/common/util/AwaitUtils.java | 38 ++++++++++ .../rlib/common/util/AwaitUtilsTest.java | 69 +++++++++++++++++++ .../impl/AbstractSslNetworkPacketReader.java | 2 +- .../rlib/network/ConnectionCloseTest.java | 39 ++++------- 4 files changed, 123 insertions(+), 25 deletions(-) create mode 100644 rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java create mode 100644 rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java diff --git a/rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java b/rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java new file mode 100644 index 00000000..78b22dc7 --- /dev/null +++ b/rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java @@ -0,0 +1,38 @@ +package javasabr.rlib.common.util; + +import java.time.temporal.ChronoUnit; +import java.util.function.Supplier; +import org.jspecify.annotations.NullMarked; + +/** + * The utility class to await some conditions. + * + * @author crazyrokr + */ +@NullMarked +public final class AwaitUtils { + + /** + * Await for the condition during the amount of time units. + * + * @param amount the amount of time units. + * @param unit the time unit. + * @param condition the condition. + * @return true if the condition was met. + * @throws InterruptedException if the current thread was interrupted. + */ + public static boolean await(long amount, ChronoUnit unit, Supplier condition) throws InterruptedException { + if (condition.get()) { + return true; + } + var timeoutMillis = unit.getDuration().toMillis() * amount; + var endTime = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < endTime) { + if (condition.get()) { + return true; + } + Thread.sleep(Math.clamp(endTime - System.currentTimeMillis(), 1, 10)); + } + return condition.get(); + } +} diff --git a/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java new file mode 100644 index 00000000..90bb7cc4 --- /dev/null +++ b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java @@ -0,0 +1,69 @@ +package javasabr.rlib.common.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +/** + * Tests of {@link AwaitUtils} methods. + * + * @author crazyrokr + */ +public class AwaitUtilsTest { + + @Test + void shouldAwaitCondition() throws InterruptedException { + // given + var condition = new AtomicBoolean(false); + var thread = new Thread(() -> { + try { + Thread.sleep(100); + condition.set(true); + } catch (InterruptedException e) { + // ignore + } + }); + + // when + thread.start(); + boolean result = AwaitUtils.await(500, ChronoUnit.MILLIS, condition::get); + + // then + assertThat(result).isTrue(); + } + + @Test + void shouldTimeoutIfConditionNotMet() throws InterruptedException { + // given + var condition = new AtomicBoolean(false); + + // when + boolean result = AwaitUtils.await(100, ChronoUnit.MILLIS, condition::get); + + // then + assertThat(result).isFalse(); + } + + @Test + void shouldAwaitWithChronoUnit() throws InterruptedException { + // given + var condition = new AtomicBoolean(false); + var thread = new Thread(() -> { + try { + Thread.sleep(100); + condition.set(true); + } catch (InterruptedException e) { + // ignore + } + }); + + // when + thread.start(); + boolean result = AwaitUtils.await(1, ChronoUnit.SECONDS, condition::get); + + // then + assertThat(result).isTrue(); + } +} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java index 51701e2f..5b7033d8 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java @@ -76,7 +76,7 @@ protected AbstractSslNetworkPacketReader( protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) { if (receivedBytes == -1) { doHandshake(sslNetworkBuffer(), -1); - connection.close(); + handleEmptyReadFromChannel(); return; } super.handleReceivedData(receivedBytes, readingBuffer); diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java index 4b108fc0..c2057d96 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -4,9 +4,10 @@ import java.io.InputStream; import java.net.InetSocketAddress; +import java.time.temporal.ChronoUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import javasabr.rlib.common.util.AwaitUtils; import javasabr.rlib.network.exception.ConnectionClosedException; import javasabr.rlib.network.impl.AbstractConnection; import javasabr.rlib.network.impl.DefaultConnection; @@ -18,6 +19,12 @@ import lombok.SneakyThrows; import org.junit.jupiter.api.Test; + +/** + * Checking that the connections are closed correctly + * + * @author crazyrokr + */ public class ConnectionCloseTest extends BaseNetworkTest { @Test @@ -56,43 +63,27 @@ void shouldPropagateConnectionCloseToClient() throws InterruptedException { @Test @SneakyThrows void shouldCloseServerConnectionWhenClientClosesTcpChannelAbruptly() { - // Given: established SSL connection with completed handshake + // given InputStream keystoreFile = ConnectionCloseTest.class.getResourceAsStream("/ssl/rlib_test_cert.p12"); SSLContext serverSslContext = NetworkUtils.createSslContext(keystoreFile, "test"); SSLContext clientSslContext = NetworkUtils.createAllTrustedClientSslContext(); - try (var testNetwork = buildStringSSLNetwork(serverSslContext, clientSslContext)) { var serverConnection = testNetwork.serverToClient; var clientConnection = testNetwork.clientToServer; - - // Register handler to start reading on server side CountDownLatch dataReceivedLatch = new CountDownLatch(1); serverConnection.onReceiveValidPacket((conn, packet) -> dataReceivedLatch.countDown()); - - // Send data to complete SSL handshake and deliver a packet clientConnection.sendInBackground(new StringWritableNetworkPacket<>("handshake")); + dataReceivedLatch.await(5, TimeUnit.SECONDS); - // Wait for the handshake to complete and data to be received - assertThat(dataReceivedLatch.await(5, TimeUnit.SECONDS)) - .as("SSL handshake should complete and data should be received by server") - .isTrue(); - - // When: close client's raw TCP channel without SSL close_notify + // when clientConnection.channel().close(); + assertThat(AwaitUtils.await(5000, ChronoUnit.MILLIS, clientConnection::closed)) + .as("Client connection should be closed prior server side verification").isTrue(); - assertThat(awaitMillis(5000, serverConnection::closed)) + // then + assertThat(AwaitUtils.await(5000, ChronoUnit.MILLIS, serverConnection::closed)) .as("Server connection should be closed after receiving EOF from abruptly closed client channel") .isTrue(); } } - - private static boolean awaitMillis(long millis, Supplier a) throws InterruptedException { - for (int i = 0; i < millis / 100; i++) { - if (a.get()) { - return true; - } - Thread.sleep(100); - } - return false; - } } From 1a0244c385955acf8bfd2c77d73923098ade0457 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 15:44:55 +0200 Subject: [PATCH 05/11] remove redundant test --- .../rlib/common/util/AwaitUtilsTest.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java index 90bb7cc4..032e2bad 100644 --- a/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java +++ b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java @@ -45,25 +45,4 @@ void shouldTimeoutIfConditionNotMet() throws InterruptedException { // then assertThat(result).isFalse(); } - - @Test - void shouldAwaitWithChronoUnit() throws InterruptedException { - // given - var condition = new AtomicBoolean(false); - var thread = new Thread(() -> { - try { - Thread.sleep(100); - condition.set(true); - } catch (InterruptedException e) { - // ignore - } - }); - - // when - thread.start(); - boolean result = AwaitUtils.await(1, ChronoUnit.SECONDS, condition::get); - - // then - assertThat(result).isTrue(); - } } From e517502d5db015fd91fc7fcb47d082b77c788687 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 18:41:25 +0200 Subject: [PATCH 06/11] move AwaitUtils to testFixtures --- .../java/javasabr/rlib/common/util/AwaitUtils.java | 2 -- rlib-network/build.gradle | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) rename rlib-common/src/{main => testFixtures}/java/javasabr/rlib/common/util/AwaitUtils.java (94%) diff --git a/rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java similarity index 94% rename from rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java rename to rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java index 78b22dc7..30c35828 100644 --- a/rlib-common/src/main/java/javasabr/rlib/common/util/AwaitUtils.java +++ b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java @@ -2,14 +2,12 @@ import java.time.temporal.ChronoUnit; import java.util.function.Supplier; -import org.jspecify.annotations.NullMarked; /** * The utility class to await some conditions. * * @author crazyrokr */ -@NullMarked public final class AwaitUtils { /** diff --git a/rlib-network/build.gradle b/rlib-network/build.gradle index 8ddfd5d0..c8763542 100644 --- a/rlib-network/build.gradle +++ b/rlib-network/build.gradle @@ -11,4 +11,5 @@ dependencies { api libs.project.reactor.core testRuntimeOnly projects.rlibLoggerImpl loadTestRuntimeOnly projects.rlibLoggerImpl + testImplementation testFixtures(projects.rlibCommon) } From 8b7509c55ae1d6c49e02b8903ece044d83368b69 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 18:54:12 +0200 Subject: [PATCH 07/11] switch from ChronoUnit to TimeUnit --- .../test/java/javasabr/rlib/common/util/AwaitUtilsTest.java | 6 +++--- .../java/javasabr/rlib/common/util/AwaitUtils.java | 6 +++--- .../java/javasabr/rlib/network/ConnectionCloseTest.java | 5 ++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java index 032e2bad..92958d7b 100644 --- a/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java +++ b/rlib-common/src/test/java/javasabr/rlib/common/util/AwaitUtilsTest.java @@ -2,7 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -28,7 +28,7 @@ void shouldAwaitCondition() throws InterruptedException { // when thread.start(); - boolean result = AwaitUtils.await(500, ChronoUnit.MILLIS, condition::get); + boolean result = AwaitUtils.await(500, TimeUnit.MILLISECONDS, condition::get); // then assertThat(result).isTrue(); @@ -40,7 +40,7 @@ void shouldTimeoutIfConditionNotMet() throws InterruptedException { var condition = new AtomicBoolean(false); // when - boolean result = AwaitUtils.await(100, ChronoUnit.MILLIS, condition::get); + boolean result = AwaitUtils.await(100, TimeUnit.MILLISECONDS, condition::get); // then assertThat(result).isFalse(); diff --git a/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java index 30c35828..89be3c35 100644 --- a/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java +++ b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java @@ -1,6 +1,6 @@ package javasabr.rlib.common.util; -import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** @@ -19,11 +19,11 @@ public final class AwaitUtils { * @return true if the condition was met. * @throws InterruptedException if the current thread was interrupted. */ - public static boolean await(long amount, ChronoUnit unit, Supplier condition) throws InterruptedException { + public static boolean await(long amount, TimeUnit unit, Supplier condition) throws InterruptedException { if (condition.get()) { return true; } - var timeoutMillis = unit.getDuration().toMillis() * amount; + var timeoutMillis = unit.toMillis(amount); var endTime = System.currentTimeMillis() + timeoutMillis; while (System.currentTimeMillis() < endTime) { if (condition.get()) { diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java index c2057d96..597839dd 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -4,7 +4,6 @@ import java.io.InputStream; import java.net.InetSocketAddress; -import java.time.temporal.ChronoUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javasabr.rlib.common.util.AwaitUtils; @@ -77,11 +76,11 @@ void shouldCloseServerConnectionWhenClientClosesTcpChannelAbruptly() { // when clientConnection.channel().close(); - assertThat(AwaitUtils.await(5000, ChronoUnit.MILLIS, clientConnection::closed)) + assertThat(AwaitUtils.await(5, TimeUnit.SECONDS, clientConnection::closed)) .as("Client connection should be closed prior server side verification").isTrue(); // then - assertThat(AwaitUtils.await(5000, ChronoUnit.MILLIS, serverConnection::closed)) + assertThat(AwaitUtils.await(5, TimeUnit.SECONDS, serverConnection::closed)) .as("Server connection should be closed after receiving EOF from abruptly closed client channel") .isTrue(); } From 35939cee7afbfc724d229ea08b733a7005393bdc Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 18:58:03 +0200 Subject: [PATCH 08/11] apply formatting --- .../test/java/javasabr/rlib/network/ConnectionCloseTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java index 597839dd..5e4c3062 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -77,7 +77,8 @@ void shouldCloseServerConnectionWhenClientClosesTcpChannelAbruptly() { // when clientConnection.channel().close(); assertThat(AwaitUtils.await(5, TimeUnit.SECONDS, clientConnection::closed)) - .as("Client connection should be closed prior server side verification").isTrue(); + .as("Client connection should be closed prior server side verification") + .isTrue(); // then assertThat(AwaitUtils.await(5, TimeUnit.SECONDS, serverConnection::closed)) From 514b2e5e18fec2a96b2e678b90842557793c7a9b Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 19:38:18 +0200 Subject: [PATCH 09/11] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../java/javasabr/rlib/network/impl/AbstractConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index 712614dd..18fe3929 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -147,7 +147,7 @@ protected void registerFluxOnReceivedEvents( sink.onDispose(() -> { validPacketSubscribers.remove(validListener); - validPacketSubscribers.remove(invalidListener); + invalidPacketSubscribers.remove(invalidListener); activeSinksOperations.inWriteLock(sink, Collection::remove); }); From 78dedb9ec39d50d450dd8f903878fd872bfdce69 Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 19:59:19 +0200 Subject: [PATCH 10/11] fixes according comments --- rlib-common/build.gradle | 1 + .../javasabr/rlib/common/util/AwaitUtils.java | 2 + .../exception/ConnectionClosedException.java | 16 ++++++ .../rlib/network/ConnectionCloseTest.java | 52 ++++++++++--------- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/rlib-common/build.gradle b/rlib-common/build.gradle index a70affdd..c1ade836 100644 --- a/rlib-common/build.gradle +++ b/rlib-common/build.gradle @@ -5,4 +5,5 @@ plugins { dependencies { api projects.rlibLoggerApi + testFixturesImplementation libs.lombok } diff --git a/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java index 89be3c35..869b3d6e 100644 --- a/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java +++ b/rlib-common/src/testFixtures/java/javasabr/rlib/common/util/AwaitUtils.java @@ -2,12 +2,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.experimental.UtilityClass; /** * The utility class to await some conditions. * * @author crazyrokr */ +@UtilityClass public final class AwaitUtils { /** diff --git a/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java b/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java index c5e90114..6964489e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/exception/ConnectionClosedException.java @@ -1,11 +1,27 @@ package javasabr.rlib.network.exception; +/** + * Thrown when a network connection has been closed + * + * @since 10.0.0 + */ public class ConnectionClosedException extends NetworkException { + /** + * Creates a new exception for a closed connection + * + * @param remoteAddress the remote address + */ public ConnectionClosedException(String remoteAddress) { super("Connection closed: %s".formatted(remoteAddress)); } + /** + * Creates a new exception for a closed connection with a cause + * + * @param remoteAddress the remote address + * @param cause the cause + */ public ConnectionClosedException(String remoteAddress, Throwable cause) { super("Connection closed: %s".formatted(remoteAddress), cause); } diff --git a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java index 5e4c3062..e831d302 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/ConnectionCloseTest.java @@ -1,8 +1,9 @@ package javasabr.rlib.network; +import static javasabr.rlib.network.util.NetworkUtils.createAllTrustedClientSslContext; +import static javasabr.rlib.network.util.NetworkUtils.createSslContext; import static org.assertj.core.api.Assertions.assertThat; -import java.io.InputStream; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -13,12 +14,9 @@ import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; -import javasabr.rlib.network.util.NetworkUtils; -import javax.net.ssl.SSLContext; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; - /** * Checking that the connections are closed correctly * @@ -41,38 +39,44 @@ void shouldPropagateConnectionCloseToClient() throws InterruptedException { CountDownLatch closeLatch = new CountDownLatch(1); // when - clientNetwork - .connectReactive(serverAddress) - .flatMapMany(AbstractConnection::receivedEvents) - .doOnError(e -> { - if (e instanceof ConnectionClosedException) { - closeLatch.countDown(); - } - }) - .subscribe(); + try { + clientNetwork + .connectReactive(serverAddress) + .flatMapMany(AbstractConnection::receivedEvents) + .doOnError(e -> { + if (e instanceof ConnectionClosedException) { + closeLatch.countDown(); + } + }) + .subscribe(); - // then - assertThat(closeLatch.await(5000, TimeUnit.MILLISECONDS)) - .as("Client should be notified that connection is closed") - .isTrue(); - clientNetwork.shutdown(); - serverNetwork.shutdown(); + // then + assertThat(closeLatch.await(5000, TimeUnit.MILLISECONDS)) + .as("Client should be notified that connection is closed") + .isTrue(); + } finally { + // cleanup + clientNetwork.shutdown(); + serverNetwork.shutdown(); + } } @Test @SneakyThrows void shouldCloseServerConnectionWhenClientClosesTcpChannelAbruptly() { // given - InputStream keystoreFile = ConnectionCloseTest.class.getResourceAsStream("/ssl/rlib_test_cert.p12"); - SSLContext serverSslContext = NetworkUtils.createSslContext(keystoreFile, "test"); - SSLContext clientSslContext = NetworkUtils.createAllTrustedClientSslContext(); - try (var testNetwork = buildStringSSLNetwork(serverSslContext, clientSslContext)) { + try (var keystoreFile = ConnectionCloseTest.class.getResourceAsStream("/ssl/rlib_test_cert.p12"); + var testNetwork = buildStringSSLNetwork( + createSslContext(keystoreFile, "test"), + createAllTrustedClientSslContext())) { var serverConnection = testNetwork.serverToClient; var clientConnection = testNetwork.clientToServer; CountDownLatch dataReceivedLatch = new CountDownLatch(1); serverConnection.onReceiveValidPacket((conn, packet) -> dataReceivedLatch.countDown()); clientConnection.sendInBackground(new StringWritableNetworkPacket<>("handshake")); - dataReceivedLatch.await(5, TimeUnit.SECONDS); + assertThat(dataReceivedLatch.await(5, TimeUnit.SECONDS)) + .as("Client connection should be closed prior server side verification") + .isTrue(); // when clientConnection.channel().close(); From 13db757a9221a28e3548f9d19c0da636647843bb Mon Sep 17 00:00:00 2001 From: Maksim Kashapov <56276969+crazyrokr@users.noreply.github.com> Date: Sat, 9 May 2026 20:14:12 +0200 Subject: [PATCH 11/11] optimize forEach processing --- .../rlib/network/impl/AbstractConnection.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index 18fe3929..fb12d4c5 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -217,14 +217,13 @@ protected void notifyActiveSinks() { protected void notifySinksWithError(Throwable error) { Array> localActiveSinks = activeSinksOperations.getInReadLock(Array::copyOf); - localActiveSinks.iterations().forEach( - error, (sink, exc) -> { - try { - sink.error(exc); - } catch (RuntimeException e) { - log.error(e.getMessage(), "Failed to notify sink of connection closure: "::formatted); - } - }); + for (FluxSink sink : localActiveSinks) { + try { + sink.error(error); + } catch (RuntimeException e) { + log.error(e.getMessage(), "Failed to notify sink of connection closure: "::formatted); + } + } } /**