Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQ-6373
Browse files Browse the repository at this point in the history
Adding a new OpenWire command called BrokerSubscriptionInfo in order to
help synchronize durable subs across a network bridge.  Added OpenWire
version 12.  For dynamicallyIncludedDestination durable subs will now be
synchronized on a bridge reconnect as long as the bridge supports
conduitSubscriptions and dynamicOnly=false
  • Loading branch information
cshannon committed Jul 25, 2016
1 parent a65f5e7 commit 3953b9a
Show file tree
Hide file tree
Showing 75 changed files with 10,324 additions and 42 deletions.
Expand Up @@ -20,14 +20,14 @@
import java.io.IOException;
import java.net.SocketException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -41,9 +41,12 @@

import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.BrokerSubscriptionInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
Expand Down Expand Up @@ -100,6 +103,7 @@
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -1373,20 +1377,58 @@ protected synchronized void setPendingStop(boolean pendingStop) {
this.pendingStop = pendingStop;
}

public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
List<ConsumerInfo> subscriptionInfos = new ArrayList<>();
for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
if (sub != null) {
ConsumerInfo ci = sub.getConsumerInfo().copy();
ci.setClientId(key.getClientId());
subscriptionInfos.add(ci);
}
}
BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
return bsi;
}

private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
return config;
}

@Override
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
try {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
return null;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());

if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
}

// check for existing duplex connection hanging about

// We first look if existing network connection already exists for the same broker Id and network connector name
Expand Down Expand Up @@ -1698,4 +1740,12 @@ private int getConsumerCount(ConnectionId connectionId) {
public WireFormatInfo getRemoteWireFormatInfo() {
return wireFormatInfo;
}

/* (non-Javadoc)
* @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo)
*/
@Override
public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception {
return null;
}
}
Expand Up @@ -57,7 +57,9 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.BrokerSubscriptionInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
Expand Down Expand Up @@ -127,11 +129,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
Expand Down Expand Up @@ -311,6 +314,7 @@ public void run() {
startedLatch.countDown();
startedLatch.countDown();
localStartedLatch.countDown();
staticDestinationsLatch.countDown();

ss.throwFirstException();
}
Expand Down Expand Up @@ -440,6 +444,7 @@ private void doStartLocalAndRemoteBridges() {
try {
if (safeWaitUntilStarted()) {
setupStaticDestinations();
staticDestinationsLatch.countDown();
}
} catch (Throwable e) {
serviceLocalException(e);
Expand Down Expand Up @@ -549,6 +554,10 @@ protected void startRemoteBridge() throws Exception {
brokerInfo.setNetworkProperties(str);
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
}
}
if (remoteConnectionInfo != null) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
Expand Down Expand Up @@ -617,6 +626,31 @@ protected void serviceRemoteCommand(Command command) {
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
staticDestinationsLatch.await();
BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
this.brokerService.getBrokerName(), subInfo.getBrokerName());

if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly() && subInfo.getSubscriptionInfos() != null) {
if (started.get()) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
matchesDynamicallyIncludedDestinations(info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}

//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
}
}
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
Expand Down Expand Up @@ -831,24 +865,29 @@ public void run() {
DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
if (removed) {
if (ds.getDurableRemoteSubs().isEmpty()) {
cleanupDurableSub(ds, i);
}
}
}
}

// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);
private void cleanupDurableSub(final DemandSubscription ds,
Iterator<DemandSubscription> i) throws IOException {
if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()) {

// remove subscriber
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
// deactivate subscriber
RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
localBroker.oneway(removeInfo);

//remove subscriber from map
i.remove();
}
}
}
// remove subscriber
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);

//remove subscriber from map
i.remove();
}
}

Expand Down Expand Up @@ -1002,7 +1041,7 @@ protected void serviceLocalCommand(Command command) {
});
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
try {
// never request b/c they are eventually acked async
// never request b/c they are eventually acked async
remoteBroker.oneway(message);
} finally {
sub.decrementOutstandingResponses();
Expand Down Expand Up @@ -1064,6 +1103,8 @@ public void onCompletion(FutureResponse future) {
switch (command.getDataStructureType()) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected local command: {}", command);
}
Expand Down Expand Up @@ -1154,17 +1195,25 @@ protected boolean isPermissableDestination(ActiveMQDestination destination, bool
}

dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
return matchesDynamicallyIncludedDestinations(destination);
}

return true;
}

private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}

return false;
}
return true;

return false;
}

/**
Expand All @@ -1175,7 +1224,7 @@ protected void setupStaticDestinations() {
if (dests != null) {
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest)) {
DemandSubscription sub = createDemandSubscription(dest);
DemandSubscription sub = createDemandSubscription(dest, null);
sub.setStaticallyIncluded(true);
try {
addSubscription(sub);
Expand Down Expand Up @@ -1348,11 +1397,15 @@ protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throw
return result;
}

final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
ConsumerInfo info = new ConsumerInfo();
info.setNetworkSubscription(true);
info.setDestination(destination);

if (subscriptionName != null) {
info.setSubscriptionName(subscriptionName);
}

// Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[]{remoteBrokerId});

Expand Down
Expand Up @@ -73,7 +73,7 @@ protected void setupStaticDestinations() {
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
if (subName != null && subName.equals(candidateSubName)) {
DemandSubscription sub = createDemandSubscription(dest);
DemandSubscription sub = createDemandSubscription(dest, subName);
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
sub.setStaticallyIncluded(true);
addSubscription(sub);
Expand Down
Expand Up @@ -31,6 +31,7 @@ public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true;
private boolean useVirtualDestSubs;
private boolean dynamicOnly;
private boolean syncDurableSubs;
private boolean dispatchAsync = true;
private boolean decreaseNetworkConsumerPriority;
private int consumerPriorityBase = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
Expand Down Expand Up @@ -98,6 +99,14 @@ public void setDynamicOnly(boolean dynamicOnly) {
this.dynamicOnly = dynamicOnly;
}

public boolean isSyncDurableSubs() {
return syncDurableSubs;
}

public void setSyncDurableSubs(boolean syncDurableSubs) {
this.syncDurableSubs = syncDurableSubs;
}

/**
* @return the bridgeTempDestinations
*/
Expand Down
2 changes: 1 addition & 1 deletion activemq-client/pom.xml
Expand Up @@ -308,7 +308,7 @@
<tasks>
<echo>Running OpenWire Generator</echo>
<taskdef name="generate" classname="org.apache.activemq.openwire.tool.JavaGeneratorTask" classpathref="maven.compile.classpath" />
<generate version="11" basedir="${basedir}" generateTests="false" />
<generate version="12" basedir="${basedir}" generateTests="false" />
</tasks>
</configuration>
<dependencies>
Expand Down

0 comments on commit 3953b9a

Please sign in to comment.