Skip to content

Commit

Permalink
Merge pull request #2661 from AxonFramework/invoke_interceptoers_when…
Browse files Browse the repository at this point in the history
…_processing_dlq

[#2639] Handler Interceptor support for Dead Letter Processing
  • Loading branch information
gklijs committed Apr 25, 2023
2 parents 7c1afcf + 172a5b3 commit 1f8c324
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 19 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptorSupport;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.annotation.HandlerDefinition;
Expand Down Expand Up @@ -313,6 +314,7 @@ private DeadLetteringEventHandlerInvoker deadLetteringInvoker(String processorNa
deadLetteringInvokerConfigs.getOrDefault(processingGroup, DeadLetteringInvokerConfiguration.noOp())
.apply(configuration, builder)
.build();
addInterceptors(processorName, deadLetteringInvoker);
deadLetteringEventHandlerInvokers.put(processingGroup, deadLetteringInvoker);
return deadLetteringInvoker;
}
Expand Down Expand Up @@ -362,21 +364,25 @@ private EventProcessor buildEventProcessor(List<Function<Configuration, EventHan
.getOrDefault(processorName, defaultEventProcessorBuilder)
.build(processorName, configuration, multiEventHandlerInvoker);

addInterceptors(processorName, eventProcessor);

return eventProcessor;
}

private void addInterceptors(String processorName, MessageHandlerInterceptorSupport<EventMessage<?>> processor){
handlerInterceptorsBuilders.getOrDefault(processorName, new ArrayList<>())
.stream()
.map(hi -> hi.apply(configuration))
.forEach(eventProcessor::registerHandlerInterceptor);
.forEach(processor::registerHandlerInterceptor);

defaultHandlerInterceptors.stream()
.map(f -> f.apply(configuration, processorName))
.filter(Objects::nonNull)
.forEach(eventProcessor::registerHandlerInterceptor);
.forEach(processor::registerHandlerInterceptor);

eventProcessor.registerHandlerInterceptor(
processor.registerHandlerInterceptor(
new CorrelationDataInterceptor<>(configuration.correlationDataProviders())
);

return eventProcessor;
}

//<editor-fold desc="configuration methods">
Expand Down
Expand Up @@ -1357,6 +1357,34 @@ void sequencedDeadLetterProcessorReturnsForProcessingGroupWithDlq(
assertFalse(eventProcessingConfig.sequencedDeadLetterProcessor("non-existing-group").isPresent());
}

@Test
void interceptorsOnDeadLetterProcessorShouldBePresent(
@Mock SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue
) throws NoSuchFieldException, IllegalAccessException {
String processingGroup = "pooled-streaming";
StubInterceptor interceptor1 = new StubInterceptor();
StubInterceptor interceptor2 = new StubInterceptor();

configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.eventProcessing()
.registerPooledStreamingEventProcessor(processingGroup)
.registerEventHandler(config -> new PooledStreamingEventHandler())
.registerDeadLetterQueue(processingGroup, c -> deadLetterQueue)
.registerTransactionManager(processingGroup, c -> NoTransactionManager.INSTANCE)
.registerHandlerInterceptor(processingGroup, c -> interceptor1)
.registerDefaultHandlerInterceptor((c, n) -> interceptor2);
;

Configuration config = configurer.start();
EventProcessingConfiguration eventProcessingConfig = config.eventProcessingConfiguration();

Optional<SequencedDeadLetterProcessor<EventMessage<?>>> optionalDeadLetterProcessor =
eventProcessingConfig.sequencedDeadLetterProcessor(processingGroup);
assertTrue(optionalDeadLetterProcessor.isPresent());
List<MessageHandlerInterceptor<?>> interceptors = getField("interceptors", optionalDeadLetterProcessor.get());
assertEquals(3, interceptors.size());
}

private <O, R> R getField(String fieldName, O object) throws NoSuchFieldException, IllegalAccessException {
return getField(object.getClass(), fieldName, object);
}
Expand Down Expand Up @@ -1476,6 +1504,7 @@ public static class AnnotatedBeanSubclass extends AnnotatedBean {
}

private static class StubInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {

@Override
public Object handle(@Nonnull UnitOfWork<? extends EventMessage<?>> unitOfWork,
@Nonnull InterceptorChain interceptorChain)
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventMessageHandler;
import org.axonframework.messaging.DefaultInterceptorChain;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueueDecision;
Expand Down Expand Up @@ -49,13 +51,16 @@ class DeadLetteredEventProcessingTask
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final List<EventMessageHandler> eventHandlingComponents;
private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors;
private final EnqueuePolicy<EventMessage<?>> enqueuePolicy;
private final TransactionManager transactionManager;

DeadLetteredEventProcessingTask(List<EventMessageHandler> eventHandlingComponents,
List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors,
EnqueuePolicy<EventMessage<?>> enqueuePolicy,
TransactionManager transactionManager) {
this.eventHandlingComponents = eventHandlingComponents;
this.interceptors = interceptors;
this.enqueuePolicy = enqueuePolicy;
this.transactionManager = transactionManager;
}
Expand Down Expand Up @@ -89,15 +94,28 @@ public EnqueueDecision<EventMessage<?>> process(DeadLetter<? extends EventMessag
.put(DeadLetter.class.getName(), letter);
unitOfWork.onPrepareCommit(uow -> decision.set(onCommit(letter)));
unitOfWork.onRollback(uow -> decision.set(onRollback(letter, uow.getExecutionResult().getExceptionResult())));
unitOfWork.executeWithResult(() -> handle(letter));
unitOfWork.executeWithResult(() -> handleWithInterceptors(unitOfWork));

return ObjectUtils.getOrDefault(decision.get(), Decisions::ignore);
}

private Object handle(DeadLetter<? extends EventMessage<?>> letter) throws Exception {
private void handle(EventMessage<?> eventMessage) throws Exception {
for (EventMessageHandler handler : eventHandlingComponents) {
handler.handle(letter.message());
handler.handle(eventMessage);
}
}

private Object handleWithInterceptors(
UnitOfWork<? extends EventMessage<?>> unitOfWork
) throws Exception {
new DefaultInterceptorChain<EventMessage<?>>(
unitOfWork,
interceptors,
m -> {
handle(m);
return null;
}
).proceed();
// There's no result of event handling to return here.
// We use this methods format to be able to define the Error Handler may throw Exceptions.
return null;
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.axonframework.eventhandling.deadletter;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
Expand All @@ -26,6 +27,8 @@
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptorSupport;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueueDecision;
Expand All @@ -39,6 +42,8 @@
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import javax.annotation.Nonnull;

Expand All @@ -63,14 +68,15 @@
*/
public class DeadLetteringEventHandlerInvoker
extends SimpleEventHandlerInvoker
implements SequencedDeadLetterProcessor<EventMessage<?>> {
implements SequencedDeadLetterProcessor<EventMessage<?>>, MessageHandlerInterceptorSupport<EventMessage<?>> {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final SequencedDeadLetterQueue<EventMessage<?>> queue;
private final EnqueuePolicy<EventMessage<?>> enqueuePolicy;
private final TransactionManager transactionManager;
private final boolean allowReset;
private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new CopyOnWriteArrayList<>();

/**
* Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a
Expand Down Expand Up @@ -153,13 +159,22 @@ public <R> void performReset(R resetContext) {
@Override
public boolean process(Predicate<DeadLetter<? extends EventMessage<?>>> sequenceFilter) {
DeadLetteredEventProcessingTask processingTask =
new DeadLetteredEventProcessingTask(super.eventHandlers(), enqueuePolicy, transactionManager);

new DeadLetteredEventProcessingTask(super.eventHandlers(),
interceptors,
enqueuePolicy,
transactionManager);
UnitOfWork<?> uow = new DefaultUnitOfWork<>(null);
uow.attachTransaction(transactionManager);
return uow.executeWithResult(() -> queue.process(sequenceFilter, processingTask::process)).getPayload();
}

@Override
public Registration registerHandlerInterceptor(
@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
interceptors.add(interceptor);
return () -> interceptors.remove(interceptor);
}

/**
* Builder class to instantiate a {@link DeadLetteringEventHandlerInvoker}.
* <p>
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventMessageHandler;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.DoNotEnqueue;
Expand All @@ -30,8 +31,10 @@
import org.junit.jupiter.api.*;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand All @@ -53,21 +56,26 @@ class DeadLetteredEventProcessingTaskTest {
private EnqueuePolicy<EventMessage<?>> enqueuePolicy;
private TransactionManager transactionManager;

private List<EventMessageHandler> eventHandlingComponents;

private DeadLetteredEventProcessingTask testSubject;

@BeforeEach
void setUp() {
eventHandlerOne = mock(EventMessageHandler.class);
eventHandlerTwo = mock(EventMessageHandler.class);
List<EventMessageHandler> eventHandlingComponents = new ArrayList<>();
eventHandlingComponents = new ArrayList<>();
eventHandlingComponents.add(eventHandlerOne);
eventHandlingComponents.add(eventHandlerTwo);
//noinspection unchecked
enqueuePolicy = mock(EnqueuePolicy.class);
when(enqueuePolicy.decide(any(), any())).thenReturn(TEST_DECISION);
transactionManager = spy(new StubTransactionManager());

testSubject = new DeadLetteredEventProcessingTask(eventHandlingComponents, enqueuePolicy, transactionManager);
testSubject = new DeadLetteredEventProcessingTask(eventHandlingComponents,
Collections.emptyList(),
enqueuePolicy,
transactionManager);
}

@Test
Expand Down Expand Up @@ -105,6 +113,31 @@ void taskProcessesLetterUnsuccessfullyWhenHandlersThrowsAnException() throws Exc
verify(enqueuePolicy).decide(testLetter, testException);
}

@Test
void useInterceptorToHandleError() throws Exception {
AtomicBoolean invoked = new AtomicBoolean(false);
testSubject = new DeadLetteredEventProcessingTask(eventHandlingComponents,
Collections.singletonList(errorCatchingInterceptor(invoked)),
enqueuePolicy,
transactionManager);
//noinspection unchecked
DeadLetter<EventMessage<?>> testLetter = mock(DeadLetter.class);
//noinspection unchecked
when(testLetter.message()).thenReturn(TEST_EVENT);
Exception testException = new RuntimeException();

when(eventHandlerTwo.handle(TEST_EVENT)).thenThrow(testException);

EnqueueDecision<EventMessage<?>> result = testSubject.process(testLetter);

assertFalse(result.shouldEnqueue());
assertTrue(invoked.get());
verify(transactionManager).startTransaction();
verify(eventHandlerOne).handle(TEST_EVENT);
verify(eventHandlerTwo).handle(TEST_EVENT);
verify(enqueuePolicy, never()).decide(testLetter, testException);
}

// This stub TransactionManager is used for spying.
private static class StubTransactionManager implements TransactionManager {

Expand All @@ -113,4 +146,16 @@ public Transaction startTransaction() {
return NoTransactionManager.INSTANCE.startTransaction();
}
}

private MessageHandlerInterceptor<? super EventMessage<?>> errorCatchingInterceptor(AtomicBoolean invoked) {
return (unitOfWork, chain) -> {
invoked.set(true);
try {
chain.proceed();
} catch (RuntimeException e) {
return unitOfWork;
}
return unitOfWork;
};
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,8 @@
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.GenericDeadLetter;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.utils.EventTestUtils;
import org.junit.jupiter.api.*;
import org.mockito.*;
Expand Down Expand Up @@ -82,6 +84,14 @@ void setUp() {
transactionManager = spy(new StubTransactionManager());

setTestSubject(createTestSubject());
DefaultUnitOfWork.startAndGet(null);
}

@AfterEach
void tearDown() {
while (CurrentUnitOfWork.isStarted()) {
CurrentUnitOfWork.clear(CurrentUnitOfWork.get());
}
}

private void setTestSubject(DeadLetteringEventHandlerInvoker testSubject) {
Expand Down Expand Up @@ -191,7 +201,6 @@ void handleMethodDoesNotEnqueueForShouldNotEnqueueDecisionWhenDelegateThrowsAnEx

doThrow(testCause).when(handler).handle(TEST_EVENT);
when(queue.enqueueIfPresent(any(), any())).thenReturn(false);

testSubject.handle(TEST_EVENT, Segment.ROOT_SEGMENT);

verify(sequencingPolicy, times(2)).getSequenceIdentifierFor(TEST_EVENT);
Expand Down

0 comments on commit 1f8c324

Please sign in to comment.