Skip to content

Commit

Permalink
Invoke interceptors for dlq.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Mar 24, 2023
1 parent e5e1020 commit 7f08372
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 12 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptorSupport;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.annotation.HandlerDefinition;
Expand Down Expand Up @@ -313,6 +314,7 @@ private DeadLetteringEventHandlerInvoker deadLetteringInvoker(String processorNa
deadLetteringInvokerConfigs.getOrDefault(processingGroup, DeadLetteringInvokerConfiguration.noOp())
.apply(configuration, builder)
.build();
addInterceptors(processorName, deadLetteringInvoker);
deadLetteringEventHandlerInvokers.put(processingGroup, deadLetteringInvoker);
return deadLetteringInvoker;
}
Expand Down Expand Up @@ -362,21 +364,25 @@ private EventProcessor buildEventProcessor(List<Function<Configuration, EventHan
.getOrDefault(processorName, defaultEventProcessorBuilder)
.build(processorName, configuration, multiEventHandlerInvoker);

addInterceptors(processorName, eventProcessor);

return eventProcessor;
}

private void addInterceptors(String processorName, MessageHandlerInterceptorSupport<EventMessage<?>> processor){
handlerInterceptorsBuilders.getOrDefault(processorName, new ArrayList<>())
.stream()
.map(hi -> hi.apply(configuration))
.forEach(eventProcessor::registerHandlerInterceptor);
.forEach(processor::registerHandlerInterceptor);

defaultHandlerInterceptors.stream()
.map(f -> f.apply(configuration, processorName))
.filter(Objects::nonNull)
.forEach(eventProcessor::registerHandlerInterceptor);
.forEach(processor::registerHandlerInterceptor);

eventProcessor.registerHandlerInterceptor(
processor.registerHandlerInterceptor(
new CorrelationDataInterceptor<>(configuration.correlationDataProviders())
);

return eventProcessor;
}

//<editor-fold desc="configuration methods">
Expand Down
Expand Up @@ -1357,6 +1357,34 @@ void sequencedDeadLetterProcessorReturnsForProcessingGroupWithDlq(
assertFalse(eventProcessingConfig.sequencedDeadLetterProcessor("non-existing-group").isPresent());
}

@Test
void interceptorsOnDeadLetterProcessorShouldBePresent(
@Mock SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue
) throws NoSuchFieldException, IllegalAccessException {
String processingGroup = "pooled-streaming";
StubInterceptor interceptor1 = new StubInterceptor();
StubInterceptor interceptor2 = new StubInterceptor();

configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.eventProcessing()
.registerPooledStreamingEventProcessor(processingGroup)
.registerEventHandler(config -> new PooledStreamingEventHandler())
.registerDeadLetterQueue(processingGroup, c -> deadLetterQueue)
.registerTransactionManager(processingGroup, c -> NoTransactionManager.INSTANCE)
.registerHandlerInterceptor(processingGroup, c -> interceptor1)
.registerDefaultHandlerInterceptor((c, n) -> interceptor2);
;

Configuration config = configurer.start();
EventProcessingConfiguration eventProcessingConfig = config.eventProcessingConfiguration();

Optional<SequencedDeadLetterProcessor<EventMessage<?>>> optionalDeadLetterProcessor =
eventProcessingConfig.sequencedDeadLetterProcessor(processingGroup);
assertTrue(optionalDeadLetterProcessor.isPresent());
List<MessageHandlerInterceptor<?>> interceptors = getField("interceptors", optionalDeadLetterProcessor.get());
assertEquals(3, interceptors.size());
}

private <O, R> R getField(String fieldName, O object) throws NoSuchFieldException, IllegalAccessException {
return getField(object.getClass(), fieldName, object);
}
Expand Down Expand Up @@ -1476,6 +1504,7 @@ public static class AnnotatedBeanSubclass extends AnnotatedBean {
}

private static class StubInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {

@Override
public Object handle(@Nonnull UnitOfWork<? extends EventMessage<?>> unitOfWork,
@Nonnull InterceptorChain interceptorChain)
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.axonframework.eventhandling.deadletter;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
Expand All @@ -26,19 +27,25 @@
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.messaging.DefaultInterceptorChain;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptorSupport;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueueDecision;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.GenericDeadLetter;
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import javax.annotation.Nonnull;

Expand All @@ -63,14 +70,15 @@
*/
public class DeadLetteringEventHandlerInvoker
extends SimpleEventHandlerInvoker
implements SequencedDeadLetterProcessor<EventMessage<?>> {
implements SequencedDeadLetterProcessor<EventMessage<?>>, MessageHandlerInterceptorSupport<EventMessage<?>> {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final SequencedDeadLetterQueue<EventMessage<?>> queue;
private final EnqueuePolicy<EventMessage<?>> enqueuePolicy;
private final TransactionManager transactionManager;
private final boolean allowReset;
private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new CopyOnWriteArrayList<>();

/**
* Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a
Expand Down Expand Up @@ -121,7 +129,7 @@ public void handle(@Nonnull EventMessage<?> message, @Nonnull Segment segment) t
message, sequenceIdentifier);
}
try {
super.invokeHandlers(message);
handleWithInterceptors(message);
} catch (Exception e) {
DeadLetter<EventMessage<?>> letter = new GenericDeadLetter<>(sequenceIdentifier, message, e);
EnqueueDecision<EventMessage<?>> decision = enqueuePolicy.decide(letter, e);
Expand All @@ -134,6 +142,20 @@ public void handle(@Nonnull EventMessage<?> message, @Nonnull Segment segment) t
}
}

@SuppressWarnings("unchecked")
private void handleWithInterceptors(@Nonnull EventMessage<?> message) throws Exception {
UnitOfWork<? extends EventMessage<?>> unitOfWork = (UnitOfWork<? extends EventMessage<?>>) CurrentUnitOfWork.get();
unitOfWork.transformMessage(m -> message);
new DefaultInterceptorChain<EventMessage<?>>(
unitOfWork,
interceptors,
m -> {
super.invokeHandlers(m);
return null;
}
).proceed();
}

@Override
public void performReset() {
if (allowReset) {
Expand All @@ -160,6 +182,13 @@ public boolean process(Predicate<DeadLetter<? extends EventMessage<?>>> sequence
return uow.executeWithResult(() -> queue.process(sequenceFilter, processingTask::process)).getPayload();
}

@Override
public Registration registerHandlerInterceptor(
@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> interceptor) {
interceptors.add(interceptor);
return () -> interceptors.remove(interceptor);
}

/**
* Builder class to instantiate a {@link DeadLetteringEventHandlerInvoker}.
* <p>
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,12 +28,15 @@
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueueDecision;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.GenericDeadLetter;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.utils.EventTestUtils;
import org.junit.jupiter.api.*;
import org.mockito.*;
Expand Down Expand Up @@ -82,6 +85,14 @@ void setUp() {
transactionManager = spy(new StubTransactionManager());

setTestSubject(createTestSubject());
DefaultUnitOfWork.startAndGet(null);
}

@AfterEach
void tearDown() {
while (CurrentUnitOfWork.isStarted()) {
CurrentUnitOfWork.clear(CurrentUnitOfWork.get());
}
}

private void setTestSubject(DeadLetteringEventHandlerInvoker testSubject) {
Expand Down Expand Up @@ -177,6 +188,31 @@ void handleMethodEnqueuesOnShouldEnqueueDecisionWhenDelegateThrowsAnException()
verifyNoInteractions(transactionManager);
}

@Test
void useInterceptorToHandleError() throws Exception {
GenericDeadLetter.clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());

DeadLetter<EventMessage<?>> expectedIfPresentLetter = new GenericDeadLetter<>(TEST_SEQUENCE_ID, TEST_EVENT);

doThrow(new RuntimeException("some-cause")).when(handler).handle(TEST_EVENT);
when(queue.enqueueIfPresent(any(), any())).thenReturn(false);
testSubject.registerHandlerInterceptor(errorCatchingInterceptor());

testSubject.handle(TEST_EVENT, Segment.ROOT_SEGMENT);

verify(sequencingPolicy, times(2)).getSequenceIdentifierFor(TEST_EVENT);
verify(handler).handle(TEST_EVENT);

//noinspection unchecked
ArgumentCaptor<Supplier<DeadLetter<? extends EventMessage<?>>>> enqueueIfPresentCaptor =
ArgumentCaptor.forClass(Supplier.class);
verify(queue).enqueueIfPresent(eq(TEST_SEQUENCE_ID), enqueueIfPresentCaptor.capture());
assertLetter(expectedIfPresentLetter, enqueueIfPresentCaptor.getValue().get());

verify(queue, never()).enqueue(eq(TEST_SEQUENCE_ID), any());
verifyNoInteractions(transactionManager);
}

@Test
void handleMethodDoesNotEnqueueForShouldNotEnqueueDecisionWhenDelegateThrowsAnException() throws Exception {
when(enqueuePolicy.decide(any(), any())).thenReturn(Decisions.doNotEnqueue());
Expand All @@ -191,7 +227,6 @@ void handleMethodDoesNotEnqueueForShouldNotEnqueueDecisionWhenDelegateThrowsAnEx

doThrow(testCause).when(handler).handle(TEST_EVENT);
when(queue.enqueueIfPresent(any(), any())).thenReturn(false);

testSubject.handle(TEST_EVENT, Segment.ROOT_SEGMENT);

verify(sequencingPolicy, times(2)).getSequenceIdentifierFor(TEST_EVENT);
Expand Down Expand Up @@ -447,4 +482,15 @@ private static void assertLetter(DeadLetter<? extends EventMessage<?>> expected,
assertEquals(expected.lastTouched(), result.lastTouched());
assertEquals(expected.diagnostics(), result.diagnostics());
}

private MessageHandlerInterceptor<? super EventMessage<?>> errorCatchingInterceptor() {
return (event, chain) -> {
try {
chain.proceed();
} catch (RuntimeException e) {
return event;
}
return event;
};
}
}

0 comments on commit 7f08372

Please sign in to comment.