Skip to content

Commit

Permalink
Rename Cluster to EventProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
renedewaele committed Nov 19, 2015
1 parent ce79092 commit 9f4151f
Show file tree
Hide file tree
Showing 93 changed files with 1,505 additions and 1,581 deletions.
Expand Up @@ -22,32 +22,32 @@
public interface AMQPConsumerConfiguration {

/**
* The key of the property in the Cluster Meta Data that reflects the AMQPConsumerConfiguration instance for that
* cluster
* The key of the property in the Event Processor Meta Data that reflects the AMQPConsumerConfiguration instance for that
* event processor
*/
String AMQP_CONFIG_PROPERTY = "AMQP.Config";

/**
* Returns the Queue Name the Cluster should be connected to, or <code>null</code> if no explicit cluster is
* configured.
* Returns the Queue Name the EventProcessor should be connected to, or <code>null</code> if no explicit
* EventProcessor is configured.
*
* @return the Queue the cluster should be connected to, or <code>null</code> to revert to a default
* @return the Queue the event processor should be connected to, or <code>null</code> to revert to a default
*/
String getQueueName();

/**
* Indicates whether this Cluster wishes to be an exclusive consumer on a Queue. <code>null</code> indicated that
* no explicit preference is provided, and a default should be used.
* Indicates whether this EventProcessor wishes to be an exclusive consumer on a Queue. <code>null</code>
* indicated that no explicit preference is provided, and a default should be used.
*
* @return the exclusivity indicator for this cluster
* @return the exclusivity indicator for this event processor
*/
Boolean getExclusive();

/**
* Indicates how many messages this Cluster's connector may read read from the Queue before expecting messages to
* Indicates how many messages this EventProcessor's connector may read read from the Queue before expecting messages to
* be acknowledged. <code>null</code> means no specific value is provided and a default should be used.
*
* @return the number of messages a Cluster's connector may read ahead before waiting for acknowledgements.
* @return the number of messages a EventProcessor's connector may read ahead before waiting for acknowledgements.
*/
Integer getPrefetchCount();
}
Expand Up @@ -42,7 +42,7 @@ public interface AMQPMessageConverter {
*
* @param messageBody The body of the AMQP Message
* @param headers The headers attached to the AMQP Message
* @return The Event Message to publish on the local clusters
* @return The Event Message to publish on the local event processors
*/
EventMessage readAMQPMessage(byte[] messageBody, Map<String, Object> headers);
}
Expand Up @@ -30,7 +30,7 @@ public class DefaultAMQPConsumerConfiguration implements AMQPConsumerConfigurati
/**
* Initializes the configuration with the given <code>queueName</code>.
*
* @param queueName The name of the Queue a cluster should connect to
* @param queueName The name of the Queue a event processor should connect to
*/
public DefaultAMQPConsumerConfiguration(String queueName) {
this.queueName = queueName;
Expand Down
Expand Up @@ -17,8 +17,8 @@
package org.axonframework.eventhandling.amqp.spring;

import org.axonframework.common.Registration;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.amqp.AMQPMessageConverter;
import org.axonframework.serializer.UnknownSerializedTypeException;
import org.slf4j.Logger;
Expand All @@ -30,41 +30,41 @@
import java.util.concurrent.CopyOnWriteArrayList;

/**
* MessageListener implementation that deserializes incoming messages and forwards them to one or more clusters. The
* <code>byte[]</code> making up the message payload must the format as used by the {@link SpringAMQPEventBus}.
* MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
* The <code>byte[]</code> making up the message payload must the format as used by the {@link SpringAMQPEventBus}.
*
* @author Allard Buijze
* @since 2.0
*/
public class ClusterMessageListener implements MessageListener {
public class EventProcessorMessageListener implements MessageListener {

private static final Logger logger = LoggerFactory.getLogger(ClusterMessageListener.class);
private static final Logger logger = LoggerFactory.getLogger(EventProcessorMessageListener.class);

private final List<Cluster> clusters = new CopyOnWriteArrayList<>();
private final List<EventProcessor> eventProcessors = new CopyOnWriteArrayList<>();
private final AMQPMessageConverter messageConverter;

/**
* Initializes a ClusterMessageListener with given <code>serializer</code> to deserialize the message's contents
* Initializes a EventProcessorMessageListener with given <code>serializer</code> to deserialize the message's contents
* into an EventMessage.
*
* @param messageConverter The message converter to use to convert AMQP Messages to Event Messages
*/
public ClusterMessageListener(AMQPMessageConverter messageConverter) {
public EventProcessorMessageListener(AMQPMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

@Override
public void onMessage(Message message) {
if (clusters.isEmpty()) {
if (eventProcessors.isEmpty()) {
return;
}

try {
EventMessage eventMessage = messageConverter.readAMQPMessage(message.getBody(),
message.getMessageProperties().getHeaders());
if (eventMessage != null) {
for (Cluster cluster : clusters) {
cluster.handle(eventMessage);
for (EventProcessor eventProcessor : eventProcessors) {
eventProcessor.handle(eventMessage);
}
}
} catch (UnknownSerializedTypeException e) {
Expand All @@ -73,17 +73,17 @@ public void onMessage(Message message) {
}

/**
* Registers an additional cluster. This cluster will receive messages once registered.
* Registers an additional event processor. This processor will receive messages once registered.
*
* @param cluster the cluster to add to the listener
* @return a handle to unsubscribe the <code>cluster</code>. When unsubscribed it will no longer receive messages.
* @param eventProcessor the event processor to add to the listener
* @return a handle to unsubscribe the <code>eventProcessor</code>. When unsubscribed it will no longer receive messages.
*/
public Registration addCluster(Cluster cluster) {
clusters.add(cluster);
return () -> clusters.remove(cluster);
public Registration addEventProcessor(EventProcessor eventProcessor) {
eventProcessors.add(eventProcessor);
return () -> eventProcessors.remove(eventProcessor);
}

public boolean isEmpty() {
return clusters.isEmpty();
return eventProcessors.isEmpty();
}
}
Expand Up @@ -119,7 +119,7 @@ public void afterPropertiesSet() throws Exception {
}

/**
* Sets the connection factory to use for the cluster
* Sets the connection factory to use for the event processor
*
* @param connectionFactory the connection factory to set
* @see SimpleMessageListenerContainer#setConnectionFactory(org.springframework.amqp.rabbit.connection.ConnectionFactory)
Expand Down
Expand Up @@ -18,7 +18,7 @@

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.AMQPMessageConverter;
import org.slf4j.Logger;
Expand All @@ -34,7 +34,7 @@

/**
* Manages the lifecycle of the SimpleMessageListenerContainers that have been created to receive messages for
* Clusters. The ListenerContainerLifecycleManager starts each of the Listener Containers when the context is started
* Event Processors. The ListenerContainerLifecycleManager starts each of the Listener Containers when the context is started
* and will stop each of them when the context is being shut down.
* <p/>
* This class must be defined as a top-level Spring bean.
Expand All @@ -49,61 +49,61 @@ public class ListenerContainerLifecycleManager extends ListenerContainerFactory

// guarded by "this"
private final Map<String, SimpleMessageListenerContainer> containerPerQueue = new HashMap<>();
private final ConcurrentMap<Cluster, SimpleMessageListenerContainer> containerPerCluster = new ConcurrentHashMap<>();
private final ConcurrentMap<EventProcessor, SimpleMessageListenerContainer> containerPerProcessor = new ConcurrentHashMap<>();
// guarded by "this"
private boolean started = false;
private SpringAMQPConsumerConfiguration defaultConfiguration;
private int phase = Integer.MAX_VALUE;

/**
* Registers the given <code>cluster</code>, assigning it to a listener that listens to the given
* Registers the given <code>eventProcessor</code>, assigning it to a listener that listens to the given
* <code>queueName</code>. If no listener is present for the given <code>queueName</code>, it is created. If one
* already exists, it is assigned to the existing listener. Clusters that have been registered with the same
* already exists, it is assigned to the existing listener. Event processors that have been registered with the same
* <code>queueName</code> will each receive a copy of all message on that queue
*
* @param cluster The cluster to forward messages to
* @param config The configuration object for the cluster
* @param eventProcessor The event processor to forward messages to
* @param config The configuration object for the event processor
* @param messageConverter The message converter to use to convert the AMQP Message to an Event Message
* @return a handle to unsubscribe the <code>cluster</code>. When unsubscribed it will no longer receive messages.
* @return a handle to unsubscribe the <code>eventProcessor</code>. When unsubscribed it will no longer receive messages.
*/
public synchronized Registration registerCluster(Cluster cluster, AMQPConsumerConfiguration config,
AMQPMessageConverter messageConverter) {
public synchronized Registration registerEventProcessor(EventProcessor eventProcessor, AMQPConsumerConfiguration config,
AMQPMessageConverter messageConverter) {
SpringAMQPConsumerConfiguration amqpConfig = SpringAMQPConsumerConfiguration.wrap(config);
amqpConfig.setDefaults(defaultConfiguration);
String queueName = amqpConfig.getQueueName();
if (queueName == null) {
throw new AxonConfigurationException("The Cluster does not define a Queue Name, "
throw new AxonConfigurationException("The EventProcessor does not define a Queue Name, "
+ "nor is there a default Queue Name configured in the "
+ "ListenerContainerLifeCycleManager");
}
Registration registration;
if (containerPerQueue.containsKey(queueName)) {
final SimpleMessageListenerContainer container = containerPerQueue.get(queueName);
ClusterMessageListener existingListener = (ClusterMessageListener) container.getMessageListener();
registration = existingListener.addCluster(cluster);
containerPerCluster.put(cluster, container);
EventProcessorMessageListener existingListener = (EventProcessorMessageListener) container.getMessageListener();
registration = existingListener.addEventProcessor(eventProcessor);
containerPerProcessor.put(eventProcessor, container);
if (started && logger.isWarnEnabled()) {
logger.warn("A cluster was configured on queue [{}], "
logger.warn("An EventProcessor was configured on queue [{}], "
+ "while the Container for that queue was already processing events. "
+ "This may lead to Events not being published to all Clusters",
+ "This may lead to Events not being published to all EventProcessors",
queueName);
}
} else {
SimpleMessageListenerContainer newContainer = createContainer(amqpConfig);
newContainer.setQueueNames(queueName);
ClusterMessageListener newListener = new ClusterMessageListener(messageConverter);
registration = newListener.addCluster(cluster);
EventProcessorMessageListener newListener = new EventProcessorMessageListener(messageConverter);
registration = newListener.addEventProcessor(eventProcessor);
newContainer.setMessageListener(newListener);
containerPerQueue.put(queueName, newContainer);
containerPerCluster.put(cluster, newContainer);
containerPerProcessor.put(eventProcessor, newContainer);
if (started) {
newContainer.start();
}
}
return () -> {
if (registration.cancel()) {
SimpleMessageListenerContainer container = containerPerCluster.get(cluster);
final ClusterMessageListener listener = (ClusterMessageListener) container.getMessageListener();
SimpleMessageListenerContainer container = containerPerProcessor.get(eventProcessor);
final EventProcessorMessageListener listener = (EventProcessorMessageListener) container.getMessageListener();
if (listener.isEmpty()) {
container.stop();
}
Expand Down Expand Up @@ -179,10 +179,10 @@ public void setPhase(int phase) {
}

/**
* Sets the configuration with the entries to use as defaults in case a registered cluster does not provide
* Sets the configuration with the entries to use as defaults in case a registered event processor does not provide
* explicit values.
*
* @param defaultConfiguration The configuration instance containing defaults for each registered cluster
* @param defaultConfiguration The configuration instance containing defaults for each registered event processor
*/
public synchronized void setDefaultConfiguration(SpringAMQPConsumerConfiguration defaultConfiguration) {
this.defaultConfiguration = defaultConfiguration;
Expand Down

0 comments on commit 9f4151f

Please sign in to comment.