diff --git a/config/src/main/java/org/axonframework/config/EventProcessingModule.java b/config/src/main/java/org/axonframework/config/EventProcessingModule.java index 95c17692e8..12d89aaed0 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingModule.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingModule.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2023. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore; import org.axonframework.messaging.Message; import org.axonframework.messaging.MessageHandlerInterceptor; +import org.axonframework.messaging.MessageHandlerInterceptorSupport; import org.axonframework.messaging.StreamableMessageSource; import org.axonframework.messaging.SubscribableMessageSource; import org.axonframework.messaging.annotation.HandlerDefinition; @@ -313,6 +314,7 @@ private DeadLetteringEventHandlerInvoker deadLetteringInvoker(String processorNa deadLetteringInvokerConfigs.getOrDefault(processingGroup, DeadLetteringInvokerConfiguration.noOp()) .apply(configuration, builder) .build(); + addInterceptors(processorName, deadLetteringInvoker); deadLetteringEventHandlerInvokers.put(processingGroup, deadLetteringInvoker); return deadLetteringInvoker; } @@ -362,21 +364,25 @@ private EventProcessor buildEventProcessor(List> processor){ handlerInterceptorsBuilders.getOrDefault(processorName, new ArrayList<>()) .stream() .map(hi -> hi.apply(configuration)) - .forEach(eventProcessor::registerHandlerInterceptor); + .forEach(processor::registerHandlerInterceptor); defaultHandlerInterceptors.stream() .map(f -> f.apply(configuration, processorName)) .filter(Objects::nonNull) - .forEach(eventProcessor::registerHandlerInterceptor); + .forEach(processor::registerHandlerInterceptor); - eventProcessor.registerHandlerInterceptor( + processor.registerHandlerInterceptor( new CorrelationDataInterceptor<>(configuration.correlationDataProviders()) ); - - return eventProcessor; } // diff --git a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java index d9a0ea7236..43ca8767de 100644 --- a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java +++ b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java @@ -1357,6 +1357,34 @@ void sequencedDeadLetterProcessorReturnsForProcessingGroupWithDlq( assertFalse(eventProcessingConfig.sequencedDeadLetterProcessor("non-existing-group").isPresent()); } + @Test + void interceptorsOnDeadLetterProcessorShouldBePresent( + @Mock SequencedDeadLetterQueue> deadLetterQueue + ) throws NoSuchFieldException, IllegalAccessException { + String processingGroup = "pooled-streaming"; + StubInterceptor interceptor1 = new StubInterceptor(); + StubInterceptor interceptor2 = new StubInterceptor(); + + configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine()) + .eventProcessing() + .registerPooledStreamingEventProcessor(processingGroup) + .registerEventHandler(config -> new PooledStreamingEventHandler()) + .registerDeadLetterQueue(processingGroup, c -> deadLetterQueue) + .registerTransactionManager(processingGroup, c -> NoTransactionManager.INSTANCE) + .registerHandlerInterceptor(processingGroup, c -> interceptor1) + .registerDefaultHandlerInterceptor((c, n) -> interceptor2); + ; + + Configuration config = configurer.start(); + EventProcessingConfiguration eventProcessingConfig = config.eventProcessingConfiguration(); + + Optional>> optionalDeadLetterProcessor = + eventProcessingConfig.sequencedDeadLetterProcessor(processingGroup); + assertTrue(optionalDeadLetterProcessor.isPresent()); + List> interceptors = getField("interceptors", optionalDeadLetterProcessor.get()); + assertEquals(3, interceptors.size()); + } + private R getField(String fieldName, O object) throws NoSuchFieldException, IllegalAccessException { return getField(object.getClass(), fieldName, object); } @@ -1476,6 +1504,7 @@ public static class AnnotatedBeanSubclass extends AnnotatedBean { } private static class StubInterceptor implements MessageHandlerInterceptor> { + @Override public Object handle(@Nonnull UnitOfWork> unitOfWork, @Nonnull InterceptorChain interceptorChain) diff --git a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvoker.java b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvoker.java index 285df3efde..369c9d1e5e 100644 --- a/messaging/src/main/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvoker.java +++ b/messaging/src/main/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvoker.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2023. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.axonframework.eventhandling.deadletter; import org.axonframework.common.AxonConfigurationException; +import org.axonframework.common.Registration; import org.axonframework.common.transaction.TransactionManager; import org.axonframework.eventhandling.EventHandlerInvoker; import org.axonframework.eventhandling.EventMessage; @@ -26,6 +27,9 @@ import org.axonframework.eventhandling.SimpleEventHandlerInvoker; import org.axonframework.eventhandling.async.SequencingPolicy; import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy; +import org.axonframework.messaging.DefaultInterceptorChain; +import org.axonframework.messaging.MessageHandlerInterceptor; +import org.axonframework.messaging.MessageHandlerInterceptorSupport; import org.axonframework.messaging.deadletter.DeadLetter; import org.axonframework.messaging.deadletter.Decisions; import org.axonframework.messaging.deadletter.EnqueueDecision; @@ -33,12 +37,15 @@ import org.axonframework.messaging.deadletter.GenericDeadLetter; import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor; import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue; +import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.DefaultUnitOfWork; import org.axonframework.messaging.unitofwork.UnitOfWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Predicate; import javax.annotation.Nonnull; @@ -63,7 +70,7 @@ */ public class DeadLetteringEventHandlerInvoker extends SimpleEventHandlerInvoker - implements SequencedDeadLetterProcessor> { + implements SequencedDeadLetterProcessor>, MessageHandlerInterceptorSupport> { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -71,6 +78,7 @@ public class DeadLetteringEventHandlerInvoker private final EnqueuePolicy> enqueuePolicy; private final TransactionManager transactionManager; private final boolean allowReset; + private final List>> interceptors = new CopyOnWriteArrayList<>(); /** * Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a @@ -121,7 +129,7 @@ public void handle(@Nonnull EventMessage message, @Nonnull Segment segment) t message, sequenceIdentifier); } try { - super.invokeHandlers(message); + handleWithInterceptors(message); } catch (Exception e) { DeadLetter> letter = new GenericDeadLetter<>(sequenceIdentifier, message, e); EnqueueDecision> decision = enqueuePolicy.decide(letter, e); @@ -134,6 +142,20 @@ public void handle(@Nonnull EventMessage message, @Nonnull Segment segment) t } } + @SuppressWarnings("unchecked") + private void handleWithInterceptors(@Nonnull EventMessage message) throws Exception { + UnitOfWork> unitOfWork = (UnitOfWork>) CurrentUnitOfWork.get(); + unitOfWork.transformMessage(m -> message); + new DefaultInterceptorChain>( + unitOfWork, + interceptors, + m -> { + super.invokeHandlers(m); + return null; + } + ).proceed(); + } + @Override public void performReset() { if (allowReset) { @@ -160,6 +182,13 @@ public boolean process(Predicate>> sequence return uow.executeWithResult(() -> queue.process(sequenceFilter, processingTask::process)).getPayload(); } + @Override + public Registration registerHandlerInterceptor( + @Nonnull MessageHandlerInterceptor> interceptor) { + interceptors.add(interceptor); + return () -> interceptors.remove(interceptor); + } + /** * Builder class to instantiate a {@link DeadLetteringEventHandlerInvoker}. *

diff --git a/messaging/src/main/java/org/axonframework/messaging/deadletter/SequencedDeadLetterProcessor.java b/messaging/src/main/java/org/axonframework/messaging/deadletter/SequencedDeadLetterProcessor.java index 34032081de..2ecb65e605 100644 --- a/messaging/src/main/java/org/axonframework/messaging/deadletter/SequencedDeadLetterProcessor.java +++ b/messaging/src/main/java/org/axonframework/messaging/deadletter/SequencedDeadLetterProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2023. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/messaging/src/test/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvokerTest.java b/messaging/src/test/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvokerTest.java index 3ec1718078..0515c729d4 100644 --- a/messaging/src/test/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvokerTest.java +++ b/messaging/src/test/java/org/axonframework/eventhandling/deadletter/DeadLetteringEventHandlerInvokerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2022. Axon Framework + * Copyright (c) 2010-2023. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,12 +28,15 @@ import org.axonframework.eventhandling.Segment; import org.axonframework.eventhandling.async.SequencingPolicy; import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy; +import org.axonframework.messaging.MessageHandlerInterceptor; import org.axonframework.messaging.deadletter.DeadLetter; import org.axonframework.messaging.deadletter.Decisions; import org.axonframework.messaging.deadletter.EnqueueDecision; import org.axonframework.messaging.deadletter.EnqueuePolicy; import org.axonframework.messaging.deadletter.GenericDeadLetter; import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue; +import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; +import org.axonframework.messaging.unitofwork.DefaultUnitOfWork; import org.axonframework.utils.EventTestUtils; import org.junit.jupiter.api.*; import org.mockito.*; @@ -82,6 +85,14 @@ void setUp() { transactionManager = spy(new StubTransactionManager()); setTestSubject(createTestSubject()); + DefaultUnitOfWork.startAndGet(null); + } + + @AfterEach + void tearDown() { + while (CurrentUnitOfWork.isStarted()) { + CurrentUnitOfWork.clear(CurrentUnitOfWork.get()); + } } private void setTestSubject(DeadLetteringEventHandlerInvoker testSubject) { @@ -177,6 +188,31 @@ void handleMethodEnqueuesOnShouldEnqueueDecisionWhenDelegateThrowsAnException() verifyNoInteractions(transactionManager); } + @Test + void useInterceptorToHandleError() throws Exception { + GenericDeadLetter.clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + + DeadLetter> expectedIfPresentLetter = new GenericDeadLetter<>(TEST_SEQUENCE_ID, TEST_EVENT); + + doThrow(new RuntimeException("some-cause")).when(handler).handle(TEST_EVENT); + when(queue.enqueueIfPresent(any(), any())).thenReturn(false); + testSubject.registerHandlerInterceptor(errorCatchingInterceptor()); + + testSubject.handle(TEST_EVENT, Segment.ROOT_SEGMENT); + + verify(sequencingPolicy, times(2)).getSequenceIdentifierFor(TEST_EVENT); + verify(handler).handle(TEST_EVENT); + + //noinspection unchecked + ArgumentCaptor>>> enqueueIfPresentCaptor = + ArgumentCaptor.forClass(Supplier.class); + verify(queue).enqueueIfPresent(eq(TEST_SEQUENCE_ID), enqueueIfPresentCaptor.capture()); + assertLetter(expectedIfPresentLetter, enqueueIfPresentCaptor.getValue().get()); + + verify(queue, never()).enqueue(eq(TEST_SEQUENCE_ID), any()); + verifyNoInteractions(transactionManager); + } + @Test void handleMethodDoesNotEnqueueForShouldNotEnqueueDecisionWhenDelegateThrowsAnException() throws Exception { when(enqueuePolicy.decide(any(), any())).thenReturn(Decisions.doNotEnqueue()); @@ -191,7 +227,6 @@ void handleMethodDoesNotEnqueueForShouldNotEnqueueDecisionWhenDelegateThrowsAnEx doThrow(testCause).when(handler).handle(TEST_EVENT); when(queue.enqueueIfPresent(any(), any())).thenReturn(false); - testSubject.handle(TEST_EVENT, Segment.ROOT_SEGMENT); verify(sequencingPolicy, times(2)).getSequenceIdentifierFor(TEST_EVENT); @@ -447,4 +482,15 @@ private static void assertLetter(DeadLetter> expected, assertEquals(expected.lastTouched(), result.lastTouched()); assertEquals(expected.diagnostics(), result.diagnostics()); } + + private MessageHandlerInterceptor> errorCatchingInterceptor() { + return (event, chain) -> { + try { + chain.proceed(); + } catch (RuntimeException e) { + return event; + } + return event; + }; + } }