Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,7 @@ public Response processBrokerInfo(BrokerInfo info) {
}
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(brokerService);
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.activemq.network;

import org.apache.activemq.transport.Transport;

/**
* Encapsulation of bridge creation logic.
*
* This SPI interface is intended to customize or decorate existing bridge implementations.
*/
public interface BridgeFactory {

/**
* Create a network bridge between two specified transports.
*
* @param configuration Bridge configuration.
* @param localTransport Local side of bridge.
* @param remoteTransport Remote side of bridge.
* @param listener Bridge listener.
* @return the NetworkBridge
*/
DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void bridgeFailed() {
}
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());

DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener);
result.setBrokerService(getBrokerService());
return configureBridge(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class NetworkBridgeConfiguration {
private long gcSweepTime = 60 * 1000;
private boolean checkDuplicateMessagesOnDuplex = false;

/**
* Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change.
*/
private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE;

/**
* @return the conduitSubscriptions
*/
Expand Down Expand Up @@ -541,6 +546,14 @@ public boolean isUseVirtualDestSubs() {
return useVirtualDestSubs;
}

public BridgeFactory getBridgeFactory() {
return bridgeFactory;
}

public void setBridgeFactory(BridgeFactory bridgeFactory) {
this.bridgeFactory = bridgeFactory;
}

/**
* This was a typo, so this is deprecated as of 5.13.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import java.net.URI;
import java.util.HashMap;
import org.apache.activemq.broker.Broker;
import java.util.LinkedHashSet;
import java.util.ServiceLoader;
import java.util.Set;

import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.URISupport;
Expand All @@ -28,34 +31,44 @@
*
*
*/
public final class NetworkBridgeFactory {
public final class NetworkBridgeFactory implements BridgeFactory {

public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();

private NetworkBridgeFactory() {

}

@Override
public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
if (configuration.isConduitSubscriptions()) {
// dynamicOnly determines whether durables are auto bridged
return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
}
return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
}

private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
if (listener != null) {
bridge.setNetworkBridgeListener(listener);
}
return bridge;
}

/**
* create a network bridge
* Create a network bridge
*
* @param configuration
* @param localTransport
* @param remoteTransport
* @param listener
* @return the NetworkBridge
*/
@Deprecated
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
Transport localTransport, Transport remoteTransport,
final NetworkBridgeListener listener) {
DemandForwardingBridge result = null;
if (configuration.isConduitSubscriptions()) {
// dynamicOnly determines whether durables are auto bridged
result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
} else {
result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
}
if (listener != null) {
result.setNetworkBridgeListener(listener);
}
return result;
return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
}

public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
Expand All @@ -74,4 +87,5 @@ private static Transport createLocalTransport(URI uri, boolean async) throws Exc
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
return TransportFactory.connect(uri);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.activemq.network;

import java.net.URI;

import javax.jms.Connection;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class BaseNetworkTest {

protected final Logger LOG = LoggerFactory.getLogger(getClass());

protected Connection localConnection;
protected Connection remoteConnection;
protected BrokerService localBroker;
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;

@Before
public final void setUp() throws Exception {
doSetUp(true);
}

@After
public final void tearDown() throws Exception {
doTearDown();
}

protected void doTearDown() throws Exception {
localConnection.close();
remoteConnection.close();
localBroker.stop();
remoteBroker.stop();
}

protected void doSetUp(boolean deleteAllMessages) throws Exception {
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
remoteBroker.start();
remoteBroker.waitUntilStarted();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
URI remoteURI = remoteBroker.getVmConnectorURI();
fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker.xml";
}

protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker.xml";
}

protected BrokerService createBroker(String uri) throws Exception {
Resource resource = new ClassPathResource(uri);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
resource = new ClassPathResource(uri);
factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService result = factory.getBroker();
return result;
}

protected BrokerService createLocalBroker() throws Exception {
return createBroker(getLocalBrokerURI());
}

protected BrokerService createRemoteBroker() throws Exception {
return createBroker(getRemoteBrokerURI());
}
}
Loading