Skip to content

Commit

Permalink
Merge pull request #2533 from AxonFramework/bug/default-tep-config-sagas
Browse files Browse the repository at this point in the history
Ensure default `TrackingEventProcessorConfiguration` is taken into account for Sagas
  • Loading branch information
smcvb committed Dec 29, 2022
2 parents 89ab6cb + f660c8c commit 856b880
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
Expand Up @@ -88,9 +88,11 @@
public class EventProcessingModule
implements ModuleConfiguration, EventProcessingConfiguration, EventProcessingConfigurer {

private static final String CONFIGURED_DEFAULT_TEP_CONFIG = "___DEFAULT_TEP_CONFIG";
private static final TrackingEventProcessorConfiguration DEFAULT_TEP_CONFIG =
TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
private static final TrackingEventProcessorConfiguration DEFAULT_SAGA_TEP_CONFIG =
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
DEFAULT_TEP_CONFIG.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
private static final Function<Class<?>, String> DEFAULT_SAGA_PROCESSING_GROUP_FUNCTION =
c -> c.getSimpleName() + "Processor";

Expand Down Expand Up @@ -182,7 +184,7 @@ public class EventProcessingModule
"defaultSubscribableMessageSource",
Configuration::eventBus
);
private final Component<TrackingEventProcessorConfiguration> defaultTrackingEventProcessorConfiguration =
private final Component<TrackingEventProcessorConfiguration> defaultTepConfig =
new Component<>(
() -> configuration,
"trackingEventProcessorConfiguration",
Expand Down Expand Up @@ -344,7 +346,8 @@ private boolean noSagaProcessorCustomization(Class<?> type, String processingGro
return DEFAULT_SAGA_PROCESSING_GROUP_FUNCTION.apply(type).equals(processingGroup)
&& processingGroup.equals(processorName)
&& !eventProcessorBuilders.containsKey(processorName)
&& !tepConfigs.containsKey(processorName);
&& !tepConfigs.containsKey(processorName)
&& !tepConfigs.containsKey(CONFIGURED_DEFAULT_TEP_CONFIG);
}

private EventProcessor buildEventProcessor(List<Function<Configuration, EventHandlerInvoker>> builderFunctions,
Expand Down Expand Up @@ -791,7 +794,10 @@ public EventProcessingConfigurer registerTrackingEventProcessorConfiguration(
public EventProcessingConfigurer registerTrackingEventProcessorConfiguration(
Function<Configuration, TrackingEventProcessorConfiguration> trackingEventProcessorConfigurationBuilder
) {
this.defaultTrackingEventProcessorConfiguration.update(trackingEventProcessorConfigurationBuilder);
this.tepConfigs.put(CONFIGURED_DEFAULT_TEP_CONFIG,
new Component<>(() -> configuration,
"trackingEventProcessorConfiguration",
trackingEventProcessorConfigurationBuilder));
return this;
}

Expand Down Expand Up @@ -879,7 +885,10 @@ private EventProcessor defaultEventProcessor(String name,
}

private TrackingEventProcessorConfiguration trackingEventProcessorConfig(String name) {
return tepConfigs.getOrDefault(name, defaultTrackingEventProcessorConfiguration).get();
if (tepConfigs.containsKey(name)) {
return tepConfigs.get(name).get();
}
return tepConfigs.getOrDefault(CONFIGURED_DEFAULT_TEP_CONFIG, defaultTepConfig).get();
}

/**
Expand Down
Expand Up @@ -647,7 +647,7 @@ void customTrackingEventProcessingConfiguration(
}

@Test
void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomizationIsPresent(
void sagaTrackingProcessorConstructionUsesDefaultSagaProcessorConfigIfNoCustomizationIsPresent(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
Expand All @@ -661,8 +661,7 @@ void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomi
config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(1, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
Expand All @@ -674,7 +673,7 @@ void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomi
}

@Test
void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomProcessingGroup(
void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessingGroup(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
Expand All @@ -689,8 +688,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(1, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
Expand All @@ -702,7 +700,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
}

@Test
void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomProcessor(
void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessor(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
Expand All @@ -717,8 +715,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(1, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
Expand All @@ -730,7 +727,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
}

@Test
void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomTrackingProcessorBuilder(
void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomTrackingProcessorBuilder(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
Expand All @@ -745,8 +742,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(3, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
Expand All @@ -758,7 +754,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
}

@Test
void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomConfigInstance(
void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomConfigInstance(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
Expand All @@ -775,8 +771,36 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo
config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(4, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), tep);
tepInitialTokenBuilder.apply(mockedSourceForVerification);
// In absence of the default Saga Config, the stream starts at the tail
verify(mockedSourceForVerification).createTailToken();
verify(mockedSourceForVerification, times(0)).createHeadToken();
}

@Test
void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomDefaultConfig(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource,
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSourceForVerification
) throws NoSuchFieldException {
TrackingEventProcessorConfiguration testTepConfig =
TrackingEventProcessorConfiguration.forParallelProcessing(4);
configurer.eventProcessing()
.usingTrackingEventProcessors()
.configureDefaultStreamableMessageSource(config -> mockedSource)
.registerSaga(Object.class)
.registerTrackingEventProcessorConfiguration(config -> testTepConfig);
Configuration config = configurer.start();

Optional<TrackingEventProcessor> resultTep =
config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
assertTrue(resultTep.isPresent());
TrackingEventProcessor tep = resultTep.get();
int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep);
assertEquals(4, tepSegmentsSize);

Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> tepInitialTokenBuilder =
Expand Down

0 comments on commit 856b880

Please sign in to comment.