Skip to content

Commit

Permalink
ARTEMIS-4419 Add federation support to AMQP broker connections
Browse files Browse the repository at this point in the history
Allows federation of addresses and queues over an outbound AMQP broker
connection and provide configuration via XML or broker propeties.
  • Loading branch information
tabish121 authored and clebertsuconic committed Sep 11, 2023
1 parent f860be4 commit d830f04
Show file tree
Hide file tree
Showing 78 changed files with 16,341 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public Map<String, FederationPolicy> getQueuePolicies() {
return federationPolicyMap;
}

// strange spelling!, it allows a type match for singular of correct plural Policies from properties
public FederationConfiguration addQueuePolicie(FederationQueuePolicyConfiguration federationPolicy) {
public FederationConfiguration addQueuePolicy(FederationQueuePolicyConfiguration federationPolicy) {
federationPolicyMap.put(federationPolicy.getName(), federationPolicy);
return this;
}
Expand All @@ -83,7 +82,7 @@ public Map<String, FederationPolicy> getAddressPolicies() {
return federationPolicyMap;
}

public FederationConfiguration addAddressPolicie(FederationAddressPolicyConfiguration federationPolicy) {
public FederationConfiguration addAddressPolicy(FederationAddressPolicyConfiguration federationPolicy) {
federationPolicyMap.put(federationPolicy.getName(), federationPolicy);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}

public AMQPSessionContext getAMQPSessionContext() {
return protonSession;
}

public ProtonProtocolManager getProtocolManager() {
return manager;
}
Expand All @@ -154,10 +158,8 @@ public void withinSessionExecutor(Runnable run) {
logger.warn(e.getMessage(), e);
}
});

}


public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext();
try {
Expand Down Expand Up @@ -305,33 +307,38 @@ public void createSharedVolatileQueue(SimpleString address,
}
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
return queueQuery(queueName, routingType, autoCreate, null);
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate, SimpleString filter) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
public QueueQueryResult queueQuery(QueueConfiguration configuration, boolean autoCreate) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(configuration.getName());

if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try {
serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter).setAutoCreated(true));
serverSession.createQueue(configuration.setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
queueQueryResult = serverSession.executeQueueQuery(queueName);
queueQueryResult = serverSession.executeQueueQuery(configuration.getName());
}

// if auto-create we will return whatever type was used before
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
//if routingType is null we bypass the check
if (routingType != null && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue " + queueName + ", expecting: " + routingType + " while it had " + queueQueryResult.getRoutingType());
final RoutingType desiredRoutingType = configuration.getRoutingType();
if (desiredRoutingType != null && queueQueryResult.getRoutingType() != desiredRoutingType) {
throw new IllegalStateException("Incorrect Routing Type for queried queue " + configuration.getName() +
", expecting: " + desiredRoutingType + " while the actual type was: " +
queueQueryResult.getRoutingType());
}
}

return queueQueryResult;
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
return queueQuery(queueName, routingType, autoCreate, null);
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate, SimpleString filter) throws Exception {
return queueQuery(new QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter), autoCreate);
}

public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception {
AutoCreateResult autoCreateResult = serverSession.checkAutoCreate(new QueueConfiguration(address).setRoutingType(routingType));
Expand Down Expand Up @@ -447,7 +454,7 @@ public void serverSend(ProtonServerReceiverContext context,
Delivery delivery,
SimpleString address,
RoutingContext routingContext,
AMQPMessage message) throws Exception {
Message message) throws Exception {

context.incrementSettle();

Expand Down Expand Up @@ -711,6 +718,24 @@ public Transaction getCurrentTransaction() {
return null;
}

/**
* Adds key / value based metadata into the underlying server session implementation
* for use by the connection resources.
*
* @param key
* The key to add into the linked server session.
* @param value
* The value to add into the linked server session attached to the given key.
*
* @return this {@link AMQPSessionCallback} instance.
*
* @throws Exception if an error occurs while adding the metadata.
*/
public AMQPSessionCallback addMetaData(String key, String value) throws Exception {
serverSession.addMetaData(key, value);
return this;
}

public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
return protonSPI.getTransaction(txid, remove);
}
Expand Down

0 comments on commit d830f04

Please sign in to comment.