From 6f3ec75c2c52f3f9f5db21c782620e564fb9652f Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Fri, 22 May 2026 18:46:10 -0400 Subject: [PATCH] Ensure connection info is processed before durable sync This update waits until the ConnectionInfo command is processed by the entire broker chain without error before sending the BrokerSubscriptionInfo command for durable sync back to a remote broker requesting it --- .../activemq/broker/TransportConnection.java | 46 ++- .../AbstractDurableSyncNetworkBridgeTest.java | 123 ++++++++ .../DurableSyncNetworkBridgeAuthTest.java | 270 ++++++++++++++++++ .../network/DurableSyncNetworkBridgeTest.java | 78 +---- 4 files changed, 431 insertions(+), 86 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 9079e24d676..f038f34771d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -164,6 +165,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; + private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -852,11 +854,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); + // Complete the future with the connectionId if we completed + // the broker.addConnection() chain successfully + initialConnectionId.complete(info.getConnectionId()); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); + // complete with the exception + initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1390,13 +1397,10 @@ public Response processBrokerInfo(BrokerInfo info) { LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { try { - NetworkBridgeConfiguration config = getNetworkConfiguration(info); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(getNetworkConfiguration(info), info); } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); return null; } } else if (info.isNetworkConnection() && info.isDuplexConnection()) { @@ -1406,10 +1410,8 @@ public Response processBrokerInfo(BrokerInfo info) { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(config, info); // check for existing duplex connection hanging about @@ -1475,6 +1477,30 @@ public Response processBrokerInfo(BrokerInfo info) { return null; } + private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + // this will complete when the connection id has been set, or immediately if already set + initialConnectionId.whenComplete((connectionId, t) -> { + try { + if (t != null) { + LOG.warn("SyncDurableSubs will be skipped due to error {}", + t.getMessage()); + return; + } + // check connection still registered + if (lookupConnectionState(connectionId) != null) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( + this.broker.getBrokerService(), config)); + } + } catch (Exception e) { + LOG.error("Failed to respond to network bridge creation from broker {}", + info.getBrokerId(), e); + } + }); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private HashMap createMap(Properties properties) { return new HashMap(properties); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java new file mode 100644 index 00000000000..47df7ef8baa --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger( + AbstractDurableSyncNetworkBridgeTest.class); + + protected abstract void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception; + + protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception; + + protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { + stopLocalBroker(); + doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); + } + + protected void restartRemoteBroker() throws Exception { + final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + final File dataDir = remoteBroker.getDataDirectoryFile(); + stopRemoteBroker(); + try { + doSetUpRemoteBroker(false, dataDir, previousPort); + } catch (final IOException e) { + if (e.getCause() instanceof java.net.BindException) { + // Previous port still in TIME_WAIT — use a new ephemeral port + doSetUpRemoteBroker(false, dataDir, 0); + // Update the local broker's network connector to point to the new port + updateLocalNetworkConnectorUri(); + } else { + throw e; + } + } + } + + protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { + if (broker.getBrokerName().equals("localBroker")) { + restartLocalBroker(startNetworkConnector); + } else { + restartRemoteBroker(); + } + } + + protected void waitForBridgeFullyStarted() throws Exception { + waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true); + } + + protected void waitForBridgeFullyStarted(long millis, boolean duplex) throws Exception { + // Wait for the local bridge to be fully started (advisory consumers registered) + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, millis, 100)); + + // Also wait for the duplex bridge on the remote broker to be fully started. + // The duplex connector creates a separate DemandForwardingBridge on the remote side + // that also needs its advisory consumers registered before it can process events. + if (duplex) { + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { + final DemandForwardingBridge duplexBridge = findDuplexBridge( + remoteBroker.getTransportConnectors().get(0)); + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; + }, millis, 100)); + } + } + + + /** + * When the remote broker restarts on a new ephemeral port (BindException fallback), + * any existing network connector on the local broker still points to the old port. + * This method stops the old connector and replaces it with one targeting the new URI. + */ + protected void updateLocalNetworkConnectorUri() throws Exception { + if (localBroker == null) { + return; + } + final List connectors = localBroker.getNetworkConnectors(); + if (connectors.isEmpty()) { + return; + } + final NetworkConnector oldConnector = connectors.get(0); + oldConnector.stop(); + localBroker.removeNetworkConnector(oldConnector); + final NetworkConnector newConnector = configureLocalNetworkConnector(); + localBroker.addNetworkConnector(newConnector); + newConnector.start(); + } + + protected abstract NetworkConnector configureLocalNetworkConnector() throws Exception; + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java new file mode 100644 index 00000000000..646dd4f184d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class DurableSyncNetworkBridgeAuthTest extends AbstractDurableSyncNetworkBridgeTest { + + protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class); + + @Parameters(name="duplex={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, + {false} + }); + } + + @Rule + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + static { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + private static final String USER_PASSWORD = "password"; + private final boolean duplex; + private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private String ncPassword = USER_PASSWORD; + + public DurableSyncNetworkBridgeAuthTest(boolean duplex) { + this.duplex = duplex; + } + + @Before + public void setUp() throws Exception { + this.ncPassword = USER_PASSWORD; + this.brokerSubInfo.set(null); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + @Test + public void testAuthSuccess() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Simulate a connection exception and reconnect, we should receive again + brokerSubInfo.set(null); + localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow().serviceRemoteException(new Exception()); + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + @Test + public void testAuthFailure() throws Exception { + this.ncPassword = "badpassword"; + try { + // set a shorter wait time, it won't connect with bad password + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(5)); + throw new IllegalStateException("Should have received assertion error with bad password"); + } catch (AssertionError e) { + // expected + } + + // Because the local broker was not authenticated by the remote broker, the local broker + // should not have received back the BrokerSubscriptionInfo + assertNull(brokerSubInfo.get()); + } + + @Test + public void testRestartSync() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Restart, should receive again with new connection + brokerSubInfo.set(null); + restartRemoteBroker(); + + // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, + File remoteDataDir, long waitForStart) throws Exception { + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); + doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); + //Wait for the bridge to be fully started + if (startNetworkConnector) { + waitForBridgeFullyStarted(waitForStart, duplex); + } + } + + protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, + File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir, startNetworkConnector); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + + if (startNetworkConnector) { + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + // Keep timeout short (5s) to avoid growing the NC reconnect backoff too much, + // which would delay bridge formation when the remote broker starts later. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 500); + } + + } + + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception { + remoteBroker = createRemoteBroker(dataDir, port); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("localBroker"); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + if (startNetworkConnector) { + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + } + + //Use auto+nio+ssl to test out the transport works with bridging + brokerService.addConnector("auto+nio+ssl://localhost:0"); + + return brokerService; + } + + @Override + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, DiscoveryEvent event) { + // Add a listener so we can capture if the remote broker sends + // back a BrokerSubscriptionInfo object + final Transport remoteFilter = new TransportFilter(remoteTransport) { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerSubscriptionInfo) { + if (brokerSubInfo.get() != null) { + throw new IllegalStateException("Received BrokerSubscriptionInfo more than once."); + } + brokerSubInfo.set((BrokerSubscriptionInfo) command); + } + super.onCommand(command); + } + }; + return super.createBridge(localTransport, remoteFilter, event); + } + }; + connector.setName("networkConnector"); + connector.setUserName("user1"); + connector.setPassword(ncPassword); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(duplex); + connector.setStaticBridge(false); + connector.setSyncDurableSubs(true); + connector.setDynamicallyIncludedDestinations(List.of(new ActiveMQTopic("include.test.>"))); + return connector; + } + + protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + // Add authentication to the remote broker + AuthenticationUser user = new AuthenticationUser("user1", USER_PASSWORD, "group1"); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(); + authenticationPlugin.setUsers(List.of(user)); + brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin}); + + brokerService.addConnector("auto+nio+ssl://localhost:" + port); + + return brokerService; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 484c9b036f4..35d6e939ec0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -63,7 +63,7 @@ import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) -public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { +public class DurableSyncNetworkBridgeTest extends AbstractDurableSyncNetworkBridgeTest { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); @@ -706,14 +706,6 @@ protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination.. return compositeTopic; } - protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { - if (broker.getBrokerName().equals("localBroker")) { - restartLocalBroker(startNetworkConnector); - } else { - restartRemoteBroker(); - } - } - protected void restartBrokers(boolean startNetworkConnector) throws Exception { doTearDown(); doSetUp(false, startNetworkConnector, localBroker.getDataDirectoryFile(), @@ -734,73 +726,6 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, } } - private void waitForBridgeFullyStarted() throws Exception { - // Wait for the local bridge to be fully started (advisory consumers registered) - assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { - if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { - return false; - } - final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); - if (bridge instanceof DemandForwardingBridgeSupport) { - return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; - } - return true; - }, TimeUnit.SECONDS.toMillis(15), 100)); - - // Also wait for the duplex bridge on the remote broker to be fully started. - // The duplex connector creates a separate DemandForwardingBridge on the remote side - // that also needs its advisory consumers registered before it can process events. - assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { - final DemandForwardingBridge duplexBridge = findDuplexBridge( - remoteBroker.getTransportConnectors().get(0)); - return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; - }, TimeUnit.SECONDS.toMillis(15), 100)); - } - - protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { - stopLocalBroker(); - doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); - } - - protected void restartRemoteBroker() throws Exception { - final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); - final File dataDir = remoteBroker.getDataDirectoryFile(); - stopRemoteBroker(); - try { - doSetUpRemoteBroker(false, dataDir, previousPort); - } catch (final IOException e) { - if (e.getCause() instanceof java.net.BindException) { - // Previous port still in TIME_WAIT — use a new ephemeral port - doSetUpRemoteBroker(false, dataDir, 0); - // Update the local broker's network connector to point to the new port - updateLocalNetworkConnectorUri(); - } else { - throw e; - } - } - } - - /** - * When the remote broker restarts on a new ephemeral port (BindException fallback), - * any existing network connector on the local broker still points to the old port. - * This method stops the old connector and replaces it with one targeting the new URI. - */ - private void updateLocalNetworkConnectorUri() throws Exception { - if (localBroker == null) { - return; - } - final List connectors = localBroker.getNetworkConnectors(); - if (connectors.isEmpty()) { - return; - } - final NetworkConnector oldConnector = connectors.get(0); - oldConnector.stop(); - localBroker.removeNetworkConnector(oldConnector); - final NetworkConnector newConnector = configureLocalNetworkConnector(); - localBroker.addNetworkConnector(newConnector); - newConnector.start(); - } - protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception { localBroker = createLocalBroker(dataDir, startNetworkConnector); @@ -882,6 +807,7 @@ protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConn return brokerService; } + @Override protected NetworkConnector configureLocalNetworkConnector() throws Exception { List transportConnectors = remoteBroker.getTransportConnectors(); URI remoteURI = transportConnectors.get(0).getConnectUri();