diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java b/src/java/org/apache/cassandra/net/InboundSockets.java index b0e9ad156a80..573cccbf143f 100644 --- a/src/java/org/apache/cassandra/net/InboundSockets.java +++ b/src/java/org/apache/cassandra/net/InboundSockets.java @@ -99,15 +99,28 @@ private Future open(Consumer pipelineInjector) throw new IllegalStateException(); binding = InboundConnectionInitiator.bind(settings, connections, pipelineInjector); } - - return binding.addListener(ignore -> { + // isOpen is defined as "listen.isOpen", but this is set AFTER the binding future is set + // to make sure the future returned does not complete until listen is set, need a new + // future to replicate "Future.map" behavior. + AsyncChannelPromise promise = new AsyncChannelPromise(binding.channel()); + binding.addListener(f -> { + if (!f.isSuccess()) + { + synchronized (this) + { + binding = null; + } + promise.setFailure(f.cause()); + return; + } synchronized (this) { - if (binding.isSuccess()) - listen = binding.channel(); + listen = binding.channel(); binding = null; } + promise.setSuccess(null); }); + return promise; } /** diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index a82315b48b5f..871f5924abab 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -310,9 +310,12 @@ public void listenRequiredSecureConnectionWithBroadcastAddrAndLegacyPort() throw @Test public void listenOptionalSecureConnection() throws InterruptedException { - ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions() - .withOptional(true); - listen(serverEncryptionOptions, false); + for (int i = 0; i < 500; i++) // test used to be flaky, so run in a loop to make sure stable (see CASSANDRA-17033) + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions() + .withOptional(true); + listen(serverEncryptionOptions, false); + } } @Test @@ -339,8 +342,8 @@ private void listen(ServerEncryptionOptions serverEncryptionOptions, boolean lis InboundSockets connections = new InboundSockets(settings); try { - connections.open().await(); - Assert.assertTrue(connections.isListening()); + connections.open().sync(); + Assert.assertTrue("connections is not listening", connections.isListening()); Set expect = new HashSet<>(); expect.add(InetAddressAndPort.getByAddressOverrideDefaults(listenAddress, DatabaseDescriptor.getStoragePort()));