diff --git a/build.gradle b/build.gradle index a74a9fe319..e476f444b2 100644 --- a/build.gradle +++ b/build.gradle @@ -387,6 +387,7 @@ project('spring-rabbit') { optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' optionalApi 'io.micrometer:micrometer-core' + api 'io.micrometer:micrometer-observation' optionalApi 'io.micrometer:micrometer-tracing' // Spring Data projection message binding support optionalApi ("org.springframework.data:spring-data-commons") { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 4e276dd05c..5a905f7639 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -63,6 +63,8 @@ import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; +import org.springframework.amqp.rabbit.support.micrometer.MessageReceiverContext; +import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation; import org.springframework.amqp.support.ConditionalExceptionLogger; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils; @@ -91,6 +93,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.ShutdownSignalException; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; /** * @author Mark Pollack @@ -238,7 +242,9 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor private MicrometerHolder micrometerHolder; - private boolean micrometerEnabled = true; + private boolean micrometerEnabled = false; + + private boolean tracingEnabled = false; private boolean isBatchListener; @@ -254,6 +260,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { }; + private ObservationRegistry observationRegistry; + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -1151,7 +1159,8 @@ public void setMicrometerTags(Map tags) { } /** - * Set to false to disable micrometer listener timers. + * Set to true to enable micrometer listener timers. Ignored when + * {@link #setTracingEnabled(boolean)} is true. * @param micrometerEnabled false to disable. * @since 2.2 */ @@ -1159,6 +1168,15 @@ public void setMicrometerEnabled(boolean micrometerEnabled) { this.micrometerEnabled = micrometerEnabled; } + /** + * Enable tracing via micrometer. + * @param tracingEnabled true to enable. + * @since 3.0 + */ + public void setTracingEnabled(boolean tracingEnabled) { + this.tracingEnabled = tracingEnabled; + } + /** * Get the consumeDelay - a time to wait before consuming in ms. * @return the consume delay. @@ -1230,7 +1248,7 @@ public void afterPropertiesSet() { validateConfiguration(); initialize(); try { - if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled + if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.tracingEnabled && this.applicationContext != null) { String id = getListenerId(); if (id == null) { @@ -1246,6 +1264,7 @@ public void afterPropertiesSet() { if (this.isAsyncReplies() && !AcknowledgeMode.MANUAL.equals(this.acknowledgeMode)) { this.acknowledgeMode = AcknowledgeMode.MANUAL; } + // TODO - get obs registry from context if present } @Override @@ -1499,8 +1518,22 @@ protected void invokeErrorHandler(Throwable ex) { * @see #invokeListener * @see #handleListenerException */ - @SuppressWarnings(UNCHECKED) protected void executeListener(Channel channel, Object data) { + Observation observation; + if (!this.tracingEnabled || data instanceof List) { + observation = Observation.NOOP; + } + else { + observation = Observation.createNotStarted(RabbitListenerObservation.LISTENER_OBSERVATION.getName(), + new MessageReceiverContext((Message) data), this.observationRegistry) + .lowCardinalityKeyValue(RabbitListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(), + getListenerId()); + } + observation.observe(() -> executeListenerAndHandleException(channel, data)); + } + + @SuppressWarnings(UNCHECKED) + protected void executeListenerAndHandleException(Channel channel, Object data) { if (!isRunning()) { if (logger.isWarnEnabled()) { logger.warn( diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/MessageReceiverContext.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/MessageReceiverContext.java new file mode 100644 index 0000000000..6932017452 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/MessageReceiverContext.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.support.micrometer; + +import org.springframework.amqp.core.Message; + +import io.micrometer.observation.transport.ReceiverContext; + +/** + * {@link ReceiverContext} for {@link Message}s. + * + * @author Gary Russell + * @since 2.8 + * + */ +public class MessageReceiverContext extends ReceiverContext { + + public MessageReceiverContext(Message message) { + super((carrier, key) -> carrier.getMessageProperties().getHeader(key)); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/RabbitListenerObservation.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/RabbitListenerObservation.java new file mode 100644 index 0000000000..d561334d27 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/RabbitListenerObservation.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.support.micrometer; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.docs.DocumentedObservation; + +/** + * Spring Rabbit Observation for listeners. + * + * @author Gary Russell + * @since 3.0 + * + */ +public enum RabbitListenerObservation implements DocumentedObservation { + + /** + * Observation for Rabbit listeners. + */ + LISTENER_OBSERVATION { + + @Override + public String getName() { + return "spring.rabbit.listener"; + } + + @Override + public String getContextualName() { + return "RabbitListener Observation"; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return ListenerLowCardinalityTags.values(); + } + + }; + + /** + * Low cardinality tags. + */ + public enum ListenerLowCardinalityTags implements KeyName { + + /** + * Listener id. + */ + LISTENER_ID { + + @Override + public String asString() { + return "listener.id"; + } + + } + + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/package-info.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/package-info.java new file mode 100644 index 0000000000..8131427dac --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer/package-info.java @@ -0,0 +1,6 @@ +/** + * Provides classes for Micrometer support. + */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.amqp.rabbit.support.micrometer;