Skip to content

Commit

Permalink
AMQP-560: Idle Container Events
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-560

Polishing and Docs

Polishing

Use `@EventListener`.
  • Loading branch information
garyrussell authored and artembilan committed Jan 7, 2016
1 parent bf412a2 commit e237d54
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@
* Base class for events.
*
* @author Gary Russell
* @since 1,5
* @since 1.5
*
*/
@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 the original author or authors.
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
if (this.phase != null) {
instance.setPhase(this.phase);
}
instance.setListenerId(endpoint.getId());

endpoint.setupListenerContainer(instance);
initializeContainer(instance);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,8 @@ public class RabbitNamespaceUtils {

private static final String CONSUMER_TAG_STRATEGY = "consumer-tag-strategy";

private static final String IDLE_EVENT_INTERVAL = "idle-event-interval";

public static BeanDefinition parseContainer(Element containerEle, ParserContext parserContext) {
RootBeanDefinition containerDef = new RootBeanDefinition(SimpleMessageListenerContainer.class);
containerDef.setSource(parserContext.extractSource(containerEle));
Expand Down Expand Up @@ -253,6 +255,11 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
new RuntimeBeanReference(consumerTagStrategy));
}

String idleEventInterval = containerEle.getAttribute(IDLE_EVENT_INTERVAL);
if (StringUtils.hasText(idleEventInterval)) {
containerDef.getPropertyValues().add("idleEventInterval", new TypedStringValue(idleEventInterval));
}

return containerDef;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 the original author or authors.
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
Expand All @@ -40,7 +42,8 @@
* @since 1.4
*/
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
implements ApplicationEventPublisherAware {

private Executor taskExecutor;

Expand Down Expand Up @@ -74,6 +77,10 @@ public class SimpleRabbitListenerContainerFactory

private ConsumerTagStrategy consumerTagStrategy;

private Long idleEventInterval;

private ApplicationEventPublisher applicationEventPublisher;

/**
* @param taskExecutor the {@link Executor} to use.
* @see SimpleMessageListenerContainer#setTaskExecutor
Expand Down Expand Up @@ -211,6 +218,18 @@ public void setConsumerTagStrategy(ConsumerTagStrategy consumerTagStrategy) {
this.consumerTagStrategy = consumerTagStrategy;
}

/**
* How often to publish idle container events.
* @param idleEventInterval the interval.
*/
public void setIdleEventInterval(Long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
Expand Down Expand Up @@ -268,6 +287,12 @@ protected void initializeContainer(SimpleMessageListenerContainer instance) {
if (this.consumerTagStrategy != null) {
instance.setConsumerTagStrategy(this.consumerTagStrategy);
}
if (this.idleEventInterval != null) {
instance.setIdleEventInterval(this.idleEventInterval);
}
if (this.applicationEventPublisher != null) {
instance.setApplicationEventPublisher(this.applicationEventPublisher);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,6 +99,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private volatile ApplicationContext applicationContext;

private String listenerId;

/**
* <p>
* Flag controlling the behaviour of the container with respect to message acknowledgement. The most common usage is
Expand Down Expand Up @@ -401,6 +403,18 @@ public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}

/**
* The 'id' attribute of the listener.
* @return the id (or the container bean name if no id set).
*/
public String getListenerId() {
return this.listenerId != null ? this.listenerId : this.beanName;
}

public void setListenerId(String listenerId) {
this.listenerId = listenerId;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;

import org.springframework.amqp.event.AmqpEvent;

/**
* An event that is emitted when a container is idle if the container
* is configured to do so.
*
* @author Gary Russell
* @since 1.6
*
*/
@SuppressWarnings("serial")
public class ListenerContainerIdleEvent extends AmqpEvent {

private final long idleTime;

public ListenerContainerIdleEvent(SimpleMessageListenerContainer source, long idleTime) {
super(source);
this.idleTime = idleTime;
}

/**
* How long the container has been idle.
* @return the time in milliseconds.
*/
public long getIdleTime() {
return idleTime;
}

/**
* The queues the container is listening to.
* @return the queue names.
*/
public String[] getQueues() {
return ((SimpleMessageListenerContainer) getSource()).getQueueNames();
}

/**
* The id of the listener (if {@code @RabbitListener}) or the container bean name.
* @return the id.
*/
public String getListenerId() {
return ((SimpleMessageListenerContainer) getSource()).getListenerId();
}

@Override
public String toString() {
return "ListenerContainerIdleEvent [idleTime="
+ ((float) this.idleTime / 1000) + "s, listenerId=" + getListenerId()
+ ", container=" + getSource() + "]";
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -112,6 +113,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;

private final AtomicLong lastNoMessageAlert = new AtomicLong();

private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;

private volatile long startConsumerMinInterval = DEFAULT_START_CONSUMER_MIN_INTERVAL;
Expand Down Expand Up @@ -194,6 +197,10 @@ public void invokeListener(Channel channel, Message message) throws Exception {

private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();

private Long idleEventInterval;

private volatile long lastReceive = System.currentTimeMillis();

/**
* Default constructor for convenient dependency injection via setters.
*/
Expand Down Expand Up @@ -643,6 +650,14 @@ public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclu
this.exclusiveConsumerExceptionLogger = exclusiveConsumerExceptionLogger;
}

/**
* How often to emit {@link ListenerContainerIdleEvent}s in milliseconds.
* @param idleEventInterval the interval.
*/
public void setIdleEventInterval(long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

/**
* Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
* consumers.
Expand Down Expand Up @@ -1219,6 +1234,21 @@ public void run() {
}
}
}
if (idleEventInterval != null) {
if (receivedOk) {
lastReceive = System.currentTimeMillis();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = lastNoMessageAlert.get();
long lastReceive = SimpleMessageListenerContainer.this.lastReceive;
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
Expand All @@ -1240,7 +1270,7 @@ public void run() {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishEvent("Consumer thread interrupted, processing stopped", true, e);
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
Expand All @@ -1249,20 +1279,20 @@ public void run() {
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishEvent("Consumer queue(s) not available", aborted, ex);
publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception on startup", true, ex);
publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishEvent("Consumer received fatal exception during processing", true, ex);
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
Expand All @@ -1278,7 +1308,7 @@ public void run() {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
exclusiveConsumerExceptionLogger.log(logger, "Exclusive consumer failure", e.getCause().getCause());
publishEvent("Consumer raised exception, attempting restart", false, e);
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else {
this.logConsumerException(e);
Expand Down Expand Up @@ -1341,16 +1371,23 @@ private void logConsumerException(Throwable t) {
logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
publishEvent("Consumer raised exception, attempting restart", false, t);
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
}

private void publishEvent(String reason, boolean fatal, Throwable t) {
private void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new ListenerContainerConsumerFailedEvent(
SimpleMessageListenerContainer.this, reason, t, fatal));
}
}

private void publishIdleContainerEvent(long idleTime) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new ListenerContainerIdleEvent(SimpleMessageListenerContainer.this, idleTime));
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,16 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="idle-event-interval" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The time in milliseconds between application events generated when the container is idle (no
message received). A 'ListenerContainerIdleEvent' is published each time this interval elapses
until a message is again received. These events can be received by an 'ApplicationListener'.
Events are not published unless this is set.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>

<xsd:complexType name="listenerType">
Expand Down
Loading

0 comments on commit e237d54

Please sign in to comment.