Skip to content

Commit

Permalink
ARTEMIS-4641 Support events for add of missing federation resources
Browse files Browse the repository at this point in the history
When an AMQP federation instance attempts to federate an address or queue
it can fail if the remote address or queue is not present or cannot be
created based on broker policy. A federation link can also closed if the
federated resource is removed from the remote broker by management etc.
In those cases the remote broker should note the resources that were
targets of federation and send alerts to the source federation broker to
notify it that these resources become available for federation and the
source should attempt again to create federation links if demand still
exists. This allows an AMQP federation instance to heal itself based on
updates from the remote.
  • Loading branch information
tabish121 authored and gemmellr committed Feb 12, 2024
1 parent 29781bd commit 94b6c0e
Show file tree
Hide file tree
Showing 23 changed files with 3,057 additions and 64 deletions.
Expand Up @@ -70,6 +70,9 @@ public abstract class AMQPFederation implements FederationInternal {
protected final String name;
protected final ActiveMQServer server;

protected AMQPFederationEventDispatcher eventDispatcher;
protected AMQPFederationEventProcessor eventProcessor;

// Connection and Session are updated after each reconnect.
protected volatile AMQPConnectionContext connection;
protected volatile AMQPSessionContext session;
Expand Down Expand Up @@ -168,6 +171,23 @@ public final synchronized void stop() throws ActiveMQException {
handleFederationStopped();
signalFederationStopped();
started = false;

try {
if (eventDispatcher != null) {
eventDispatcher.close();
}

if (eventProcessor != null) {
eventProcessor.close(false);
}
} catch (ActiveMQException amqEx) {
throw amqEx;
} catch (Exception ex) {
throw (ActiveMQException) new ActiveMQException(ex.getMessage()).initCause(ex);
} finally {
eventDispatcher = null;
eventProcessor = null;
}
}
}

Expand Down Expand Up @@ -253,6 +273,109 @@ public synchronized AMQPFederation addAddressMatchPolicy(FederationReceiveFromAd
return this;
}

/**
* Register an event sender instance with this federation for use in sending federation level
* events from this federation instance to the remote peer.
*
* @param dispatcher
* The event sender instance to be registered.
*/
synchronized void registerEventSender(AMQPFederationEventDispatcher dispatcher) {
if (eventDispatcher != null) {
throw new IllegalStateException("Federation event dipsatcher already registered on this federation instance.");
}

eventDispatcher = dispatcher;
}

/**
* Register an event receiver instance with this federation for use in receiving federation level
* events sent to this federation instance from the remote peer.
*
* @param dispatcher
* The event receiver instance to be registered.
*/
synchronized void registerEventReceiver(AMQPFederationEventProcessor processor) {
if (eventProcessor != null) {
throw new IllegalStateException("Federation event processor already registered on this federation instance.");
}

eventProcessor = processor;
}

/**
* Register an address by name that was either not present when an address federation consumer
* was initiated or was removed and the active address federation consumer was force closed.
* Upon (re)creation of the registered address a one time event will be sent to the remote
* federation instance which allows it to check if demand still exists and make another attempt
* at creating a consumer to federate messages from that address.
*
* @param address
* The address that is currently missing which should be watched for creation.
*/
synchronized void registerMissingAddress(String address) {
if (eventDispatcher != null) {
eventDispatcher.addAddressWatch(address);
}
}

/**
* Register a queue by name that was either not present when an queue federation consumer was
* initiated or was removed and the active queue federation consumer was force closed. Upon
* (re)creation of the registered address and queue a one time event will be sent to the remote
* federation instance which allows it to check if demand still exists and make another attempt
* at creating a consumer to federate messages from that queue.
*
* @param queue
* The queue that is currently missing which should be watched for creation.
*/
synchronized void registerMissingQueue(String queue) {
if (eventDispatcher != null) {
eventDispatcher.addQueueWatch(queue);
}
}

/**
* Triggers scan of federation address policies for local address demand on the given address
* that was added on the remote peer which was previously absent and could not be auto created
* or was removed while a federation receiver was attached and caused an existing federation
* receiver to be closed.
*
* @param addressName
* The address that has been added on the remote peer.
*/
synchronized void processRemoteAddressAdded(String addressName) {
addressMatchPolicies.values().forEach(policy -> {
try {
policy.afterRemoteAddressAdded(addressName);
} catch (Exception e) {
logger.warn("Error processing remote address added event: ", e);
signalError(e);
}
});
}

/**
* Triggers scan of federation queue policies for local queue demand on the given queue
* that was added on the remote peer which was previously absent at the time of a federation
* receiver attach or was removed and caused an existing federation receiver to be closed.
*
* @param addressName
* The address that has been added on the remote peer.
* @param queueName
* The queue that has been added on the remote peer.
*/
synchronized void processRemoteQueueAdded(String addressName, String queueName) {
queueMatchPolicies.values().forEach(policy -> {
try {
policy.afterRemoteQueueAdded(addressName, queueName);
} catch (Exception e) {
logger.warn("Error processing remote queue added event: ", e);
signalError(e);
}
});
}

/**
* Error signaling API that must be implemented by the specific federation implementation
* to handle error when creating a federation resource such as an outgoing receiver link.
Expand Down
Expand Up @@ -17,12 +17,12 @@

package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE_CAPABILITY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyOfferedCapabilities;
Expand All @@ -42,6 +42,7 @@
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotImplementedException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
Expand All @@ -66,7 +67,7 @@
*/
public final class AMQPFederationAddressSenderController extends AMQPFederationBaseSenderController {

public AMQPFederationAddressSenderController(AMQPSessionContext session) {
public AMQPFederationAddressSenderController(AMQPSessionContext session) throws ActiveMQAMQPException {
super(session);
}

Expand All @@ -80,10 +81,16 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
final Connection protonConnection = sender.getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments = protonConnection.attachments();

if (attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class) == null) {
AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);

if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
}

if (source == null) {
throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links.");
}

// Match the settlement mode of the remote instead of relying on the default of MIXED.
sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
// We don't currently support SECOND so enforce that the answer is always FIRST
Expand Down Expand Up @@ -139,6 +146,8 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
}

if (!addressQueryResult.isExists()) {
federation.registerMissingAddress(address.toString());

throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}

Expand Down Expand Up @@ -178,6 +187,11 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
", but it is already mapped to a different address: " + queueQuery.getAddress());
}

// Configure an action to register a watcher for this federated address to be created if it is
// removed during the lifetime of the federation receiver, if restored an event will be sent
// to the remote to prompt it to create a new receiver.
resourceDeletedAction = (e) -> federation.registerMissingAddress(address.toString());

return (Consumer) sessionSPI.createSender(senderContext, queueName, null, false);
}

Expand Down
Expand Up @@ -17,10 +17,13 @@

package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
Expand All @@ -29,6 +32,8 @@
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;

/**
* A base class abstract {@link SenderController} implementation for use by federation address and
Expand All @@ -46,7 +51,9 @@ public abstract class AMQPFederationBaseSenderController implements SenderContro

protected boolean tunnelCoreMessages; // only enabled if remote offers support.

public AMQPFederationBaseSenderController(AMQPSessionContext session) {
protected Consumer<ErrorCondition> resourceDeletedAction;

public AMQPFederationBaseSenderController(AMQPSessionContext session) throws ActiveMQAMQPException {
this.session = session;
this.sessionSPI = session.getSessionSPI();
}
Expand All @@ -64,6 +71,15 @@ public void close() throws Exception {
// Currently there isn't anything needed on close of this controller.
}

@Override
public void close(ErrorCondition error) {
if (error != null && AmqpError.RESOURCE_DELETED.equals(error.getCondition())) {
if (resourceDeletedAction != null) {
resourceDeletedAction.accept(error);
}
}
}

@Override
public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext sender, MessageReference reference) {
final MessageWriter selected;
Expand Down
Expand Up @@ -41,6 +41,12 @@ public final class AMQPFederationConstants {
*/
public static final Symbol FEDERATION_CONTROL_LINK = Symbol.getSymbol("AMQ_FEDERATION_CONTROL_LINK");

/**
* A desired capability added to the federation events links that must be offered
* in return for a federation event link to be successfully established.
*/
public static final Symbol FEDERATION_EVENT_LINK = Symbol.getSymbol("AMQ_FEDERATION_EVENT_LINK");

/**
* Property name used to embed a nested map of properties meant to be applied if the federation
* resources created on the remote end of the control link if configured to do so. These properties
Expand Down Expand Up @@ -197,4 +203,37 @@ public final class AMQPFederationConstants {
*/
public static final String TRANSFORMER_PROPERTIES_MAP = "transformer-properties-map";

/**
* Events sent across the events link will each carry an event type to indicate
* the event type which controls how the remote reacts to the given event. The type of
* event infers the payload of the structure of the message payload.
*/
public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-amq-federation-ev-type");

/**
* Indicates that the message carries an address and queue name that was previously
* requested but did not exist, or that was federated but the remote consumer was closed
* due to removal of the queue on the target peer.
*/
public static final String REQUESTED_QUEUE_ADDED = "REQUESTED_QUEUE_ADDED_EVENT";

/**
* Indicates that the message carries an address name that was previously requested
* but did not exist, or that was federated but the remote consumer was closed due to
* removal of the address on the target peer.
*/
public static final String REQUESTED_ADDRESS_ADDED = "REQUESTED_ADDRESS_ADDED_EVENT";

/**
* Carries the name of a Queue that was either not present when a federation consumer was
* initiated and subsequently rejected, or was removed and has been recreated.
*/
public static final String REQUESTED_QUEUE_NAME = "REQUESTED_QUEUE_NAME";

/**
* Carries the name of an Address that was either not present when a federation consumer was
* initiated and subsequently rejected, or was removed and has been recreated.
*/
public static final String REQUESTED_ADDRESS_NAME = "REQUESTED_ADDRESS_NAME";

}

0 comments on commit 94b6c0e

Please sign in to comment.