diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index ed2fd1f376e..2ede7a547d1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -163,6 +164,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; + private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -851,11 +853,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); + // Complete the future with the connectionId if we completed + // the broker.addConnection() chain successfully + initialConnectionId.complete(info.getConnectionId()); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); + // complete with the exception + initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1387,13 +1394,10 @@ public Response processBrokerInfo(BrokerInfo info) { LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { try { - NetworkBridgeConfiguration config = getNetworkConfiguration(info); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(getNetworkConfiguration(info), info); } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); return null; } } else if (info.isNetworkConnection() && info.isDuplexConnection()) { @@ -1403,10 +1407,8 @@ public Response processBrokerInfo(BrokerInfo info) { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(config, info); // check for existing duplex connection hanging about @@ -1473,6 +1475,30 @@ public Response processBrokerInfo(BrokerInfo info) { return null; } + private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + // this will complete when the connection id has been set, or immediately if already set + initialConnectionId.whenComplete((connectionId, t) -> { + try { + if (t != null) { + LOG.warn("SyncDurableSubs will be skipped due to error {}", + t.getMessage()); + return; + } + // check connection still registered + if (lookupConnectionState(connectionId) != null) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( + this.broker.getBrokerService(), config)); + } + } catch (Exception e) { + LOG.error("Failed to respond to network bridge creation from broker {}", + info.getBrokerId(), e); + } + }); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private HashMap createMap(Properties properties) { return new HashMap(properties); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java new file mode 100644 index 00000000000..47df7ef8baa --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger( + AbstractDurableSyncNetworkBridgeTest.class); + + protected abstract void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception; + + protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception; + + protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { + stopLocalBroker(); + doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); + } + + protected void restartRemoteBroker() throws Exception { + final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + final File dataDir = remoteBroker.getDataDirectoryFile(); + stopRemoteBroker(); + try { + doSetUpRemoteBroker(false, dataDir, previousPort); + } catch (final IOException e) { + if (e.getCause() instanceof java.net.BindException) { + // Previous port still in TIME_WAIT — use a new ephemeral port + doSetUpRemoteBroker(false, dataDir, 0); + // Update the local broker's network connector to point to the new port + updateLocalNetworkConnectorUri(); + } else { + throw e; + } + } + } + + protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { + if (broker.getBrokerName().equals("localBroker")) { + restartLocalBroker(startNetworkConnector); + } else { + restartRemoteBroker(); + } + } + + protected void waitForBridgeFullyStarted() throws Exception { + waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true); + } + + protected void waitForBridgeFullyStarted(long millis, boolean duplex) throws Exception { + // Wait for the local bridge to be fully started (advisory consumers registered) + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, millis, 100)); + + // Also wait for the duplex bridge on the remote broker to be fully started. + // The duplex connector creates a separate DemandForwardingBridge on the remote side + // that also needs its advisory consumers registered before it can process events. + if (duplex) { + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { + final DemandForwardingBridge duplexBridge = findDuplexBridge( + remoteBroker.getTransportConnectors().get(0)); + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; + }, millis, 100)); + } + } + + + /** + * When the remote broker restarts on a new ephemeral port (BindException fallback), + * any existing network connector on the local broker still points to the old port. + * This method stops the old connector and replaces it with one targeting the new URI. + */ + protected void updateLocalNetworkConnectorUri() throws Exception { + if (localBroker == null) { + return; + } + final List connectors = localBroker.getNetworkConnectors(); + if (connectors.isEmpty()) { + return; + } + final NetworkConnector oldConnector = connectors.get(0); + oldConnector.stop(); + localBroker.removeNetworkConnector(oldConnector); + final NetworkConnector newConnector = configureLocalNetworkConnector(); + localBroker.addNetworkConnector(newConnector); + newConnector.start(); + } + + protected abstract NetworkConnector configureLocalNetworkConnector() throws Exception; + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java new file mode 100644 index 00000000000..646dd4f184d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class DurableSyncNetworkBridgeAuthTest extends AbstractDurableSyncNetworkBridgeTest { + + protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class); + + @Parameters(name="duplex={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, + {false} + }); + } + + @Rule + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + static { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + private static final String USER_PASSWORD = "password"; + private final boolean duplex; + private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private String ncPassword = USER_PASSWORD; + + public DurableSyncNetworkBridgeAuthTest(boolean duplex) { + this.duplex = duplex; + } + + @Before + public void setUp() throws Exception { + this.ncPassword = USER_PASSWORD; + this.brokerSubInfo.set(null); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + @Test + public void testAuthSuccess() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Simulate a connection exception and reconnect, we should receive again + brokerSubInfo.set(null); + localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow().serviceRemoteException(new Exception()); + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + @Test + public void testAuthFailure() throws Exception { + this.ncPassword = "badpassword"; + try { + // set a shorter wait time, it won't connect with bad password + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(5)); + throw new IllegalStateException("Should have received assertion error with bad password"); + } catch (AssertionError e) { + // expected + } + + // Because the local broker was not authenticated by the remote broker, the local broker + // should not have received back the BrokerSubscriptionInfo + assertNull(brokerSubInfo.get()); + } + + @Test + public void testRestartSync() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Restart, should receive again with new connection + brokerSubInfo.set(null); + restartRemoteBroker(); + + // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, + File remoteDataDir, long waitForStart) throws Exception { + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); + doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); + //Wait for the bridge to be fully started + if (startNetworkConnector) { + waitForBridgeFullyStarted(waitForStart, duplex); + } + } + + protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, + File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir, startNetworkConnector); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + + if (startNetworkConnector) { + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + // Keep timeout short (5s) to avoid growing the NC reconnect backoff too much, + // which would delay bridge formation when the remote broker starts later. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 500); + } + + } + + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception { + remoteBroker = createRemoteBroker(dataDir, port); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("localBroker"); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + if (startNetworkConnector) { + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + } + + //Use auto+nio+ssl to test out the transport works with bridging + brokerService.addConnector("auto+nio+ssl://localhost:0"); + + return brokerService; + } + + @Override + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, DiscoveryEvent event) { + // Add a listener so we can capture if the remote broker sends + // back a BrokerSubscriptionInfo object + final Transport remoteFilter = new TransportFilter(remoteTransport) { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerSubscriptionInfo) { + if (brokerSubInfo.get() != null) { + throw new IllegalStateException("Received BrokerSubscriptionInfo more than once."); + } + brokerSubInfo.set((BrokerSubscriptionInfo) command); + } + super.onCommand(command); + } + }; + return super.createBridge(localTransport, remoteFilter, event); + } + }; + connector.setName("networkConnector"); + connector.setUserName("user1"); + connector.setPassword(ncPassword); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(duplex); + connector.setStaticBridge(false); + connector.setSyncDurableSubs(true); + connector.setDynamicallyIncludedDestinations(List.of(new ActiveMQTopic("include.test.>"))); + return connector; + } + + protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + // Add authentication to the remote broker + AuthenticationUser user = new AuthenticationUser("user1", USER_PASSWORD, "group1"); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(); + authenticationPlugin.setUsers(List.of(user)); + brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin}); + + brokerService.addConnector("auto+nio+ssl://localhost:" + port); + + return brokerService; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index b849f083668..ede1593fd17 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -63,7 +63,7 @@ import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) -public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { +public class DurableSyncNetworkBridgeTest extends AbstractDurableSyncNetworkBridgeTest { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); @@ -706,14 +706,6 @@ protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination.. return compositeTopic; } - protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { - if (broker.getBrokerName().equals("localBroker")) { - restartLocalBroker(startNetworkConnector); - } else { - restartRemoteBroker(); - } - } - protected void restartBrokers(boolean startNetworkConnector) throws Exception { doTearDown(); doSetUp(false, startNetworkConnector, localBroker.getDataDirectoryFile(), @@ -734,73 +726,6 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, } } - private void waitForBridgeFullyStarted() throws Exception { - // Wait for the local bridge to be fully started (advisory consumers registered) - assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { - if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { - return false; - } - final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); - if (bridge instanceof DemandForwardingBridgeSupport) { - return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; - } - return true; - }, TimeUnit.SECONDS.toMillis(15), 100)); - - // Also wait for the duplex bridge on the remote broker to be fully started. - // The duplex connector creates a separate DemandForwardingBridge on the remote side - // that also needs its advisory consumers registered before it can process events. - assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { - final DemandForwardingBridge duplexBridge = findDuplexBridge( - remoteBroker.getTransportConnectors().get(0)); - return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; - }, TimeUnit.SECONDS.toMillis(15), 100)); - } - - protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { - stopLocalBroker(); - doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); - } - - protected void restartRemoteBroker() throws Exception { - final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); - final File dataDir = remoteBroker.getDataDirectoryFile(); - stopRemoteBroker(); - try { - doSetUpRemoteBroker(false, dataDir, previousPort); - } catch (final IOException e) { - if (e.getCause() instanceof java.net.BindException) { - // Previous port still in TIME_WAIT — use a new ephemeral port - doSetUpRemoteBroker(false, dataDir, 0); - // Update the local broker's network connector to point to the new port - updateLocalNetworkConnectorUri(); - } else { - throw e; - } - } - } - - /** - * When the remote broker restarts on a new ephemeral port (BindException fallback), - * any existing network connector on the local broker still points to the old port. - * This method stops the old connector and replaces it with one targeting the new URI. - */ - private void updateLocalNetworkConnectorUri() throws Exception { - if (localBroker == null) { - return; - } - final List connectors = localBroker.getNetworkConnectors(); - if (connectors.isEmpty()) { - return; - } - final NetworkConnector oldConnector = connectors.get(0); - oldConnector.stop(); - localBroker.removeNetworkConnector(oldConnector); - final NetworkConnector newConnector = configureLocalNetworkConnector(); - localBroker.addNetworkConnector(newConnector); - newConnector.start(); - } - protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception { localBroker = createLocalBroker(dataDir, startNetworkConnector); @@ -882,6 +807,7 @@ protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConn return brokerService; } + @Override protected NetworkConnector configureLocalNetworkConnector() throws Exception { List transportConnectors = remoteBroker.getTransportConnectors(); URI remoteURI = transportConnectors.get(0).getConnectUri();