Skip to content

Commit

Permalink
Merge pull request #2276 from AxonFramework/enhancement/psep-config
Browse files Browse the repository at this point in the history
Pooled Streaming Event Processor configuration enhancement
  • Loading branch information
smcvb committed Jul 7, 2022
2 parents 25753a7 + a1c1862 commit 3990411
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
Expand Up @@ -260,6 +260,25 @@ EventProcessingConfigurer registerTokenStore(String processorName,
*/
EventProcessingConfigurer usingPooledStreamingEventProcessors();

/**
* Defaults Event Processors builders to construct a {@link PooledStreamingEventProcessor} using the
* {@code configuration} to configure them.
* <p>
* The default behavior depends on the {@link EventBus} available in the {@link Configuration}. If the
* {@code EventBus} is a {@link StreamableMessageSource}, processors are Tracking by default. This method must be
* used to force the use of Pooled Streaming Processors, unless specifically overridden for individual processors.
*
* @param pooledStreamingProcessorConfiguration configuration used when constructing every
* {@link PooledStreamingEventProcessor}
* @return the current {@link EventProcessingConfigurer} instance, for fluent interfacing
*/
default EventProcessingConfigurer usingPooledStreamingEventProcessors(
PooledStreamingProcessorConfiguration pooledStreamingProcessorConfiguration
) {
return usingPooledStreamingEventProcessors()
.registerPooledStreamingEventProcessorConfiguration(pooledStreamingProcessorConfiguration);
}

/**
* Registers a {@link org.axonframework.eventhandling.SubscribingEventProcessor} with given {@code name} within this
* Configurer.
Expand Down
Expand Up @@ -849,13 +849,13 @@ protected EventProcessor pooledStreamingEventProcessor(
.transactionManager(transactionManager(name))
.coordinatorExecutor(processorName -> {
ScheduledExecutorService coordinatorExecutor =
defaultExecutor("Coordinator[" + processorName + "]");
defaultExecutor(1, "Coordinator[" + processorName + "]");
config.onShutdown(coordinatorExecutor::shutdown);
return coordinatorExecutor;
})
.workerExecutor(processorName -> {
ScheduledExecutorService workerExecutor =
defaultExecutor("WorkPackage[" + processorName + "]");
defaultExecutor(4, "WorkPackage[" + processorName + "]");
config.onShutdown(workerExecutor::shutdown);
return workerExecutor;
});
Expand All @@ -865,8 +865,8 @@ protected EventProcessor pooledStreamingEventProcessor(
.build();
}

private ScheduledExecutorService defaultExecutor(String factoryName) {
return Executors.newScheduledThreadPool(1, new AxonThreadFactory(factoryName));
private ScheduledExecutorService defaultExecutor(int poolSize, String factoryName) {
return Executors.newScheduledThreadPool(poolSize, new AxonThreadFactory(factoryName));
}

/**
Expand Down
Expand Up @@ -915,6 +915,40 @@ void testRegisterPooledStreamingEventProcessorConfigurationIsUsedDuringAllPsepCo
assertEquals(mockedSource, getField("messageSource", result));
}

@Test
void testUsingPooledStreamingEventProcessorWithConfigurationIsUsedDuringAllPsepConstructions(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource
) throws NoSuchFieldException, IllegalAccessException {
String testName = "pooled-streaming";
int testCapacity = 24;
Object testHandler = new Object();

configurer.eventProcessing()
.usingPooledStreamingEventProcessors((config, builder) -> builder.maxClaimedSegments(testCapacity))
.configureDefaultStreamableMessageSource(config -> mockedSource)
.registerEventHandler(config -> new PooledStreamingEventHandler())
.byDefaultAssignTo("default")
.registerEventHandler(config -> testHandler);
Configuration config = configurer.start();

Optional<PooledStreamingEventProcessor> optionalResult =
config.eventProcessingConfiguration()
.eventProcessor(testName, PooledStreamingEventProcessor.class);

assertTrue(optionalResult.isPresent());
PooledStreamingEventProcessor result = optionalResult.get();
assertEquals(testCapacity, result.maxCapacity());
assertEquals(mockedSource, getField("messageSource", result));

optionalResult = config.eventProcessingConfiguration()
.eventProcessor("default", PooledStreamingEventProcessor.class);

assertTrue(optionalResult.isPresent());
result = optionalResult.get();
assertEquals(testCapacity, result.maxCapacity());
assertEquals(mockedSource, getField("messageSource", result));
}

@Test
void testRegisterPooledStreamingEventProcessorConfigurationForNameIsUsedDuringSpecificPsepConstruction(
@Mock StreamableMessageSource<TrackedEventMessage<?>> mockedSource
Expand Down

0 comments on commit 3990411

Please sign in to comment.