diff --git a/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java b/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java index 43c090e283..85524fae0a 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java @@ -260,6 +260,25 @@ EventProcessingConfigurer registerTokenStore(String processorName, */ EventProcessingConfigurer usingPooledStreamingEventProcessors(); + /** + * Defaults Event Processors builders to construct a {@link PooledStreamingEventProcessor} using the + * {@code configuration} to configure them. + *

+ * The default behavior depends on the {@link EventBus} available in the {@link Configuration}. If the + * {@code EventBus} is a {@link StreamableMessageSource}, processors are Tracking by default. This method must be + * used to force the use of Pooled Streaming Processors, unless specifically overridden for individual processors. + * + * @param pooledStreamingProcessorConfiguration configuration used when constructing every + * {@link PooledStreamingEventProcessor} + * @return the current {@link EventProcessingConfigurer} instance, for fluent interfacing + */ + default EventProcessingConfigurer usingPooledStreamingEventProcessors( + PooledStreamingProcessorConfiguration pooledStreamingProcessorConfiguration + ) { + return usingPooledStreamingEventProcessors() + .registerPooledStreamingEventProcessorConfiguration(pooledStreamingProcessorConfiguration); + } + /** * Registers a {@link org.axonframework.eventhandling.SubscribingEventProcessor} with given {@code name} within this * Configurer. diff --git a/config/src/main/java/org/axonframework/config/EventProcessingModule.java b/config/src/main/java/org/axonframework/config/EventProcessingModule.java index cb12c2b2b3..3c82307622 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingModule.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingModule.java @@ -849,13 +849,13 @@ protected EventProcessor pooledStreamingEventProcessor( .transactionManager(transactionManager(name)) .coordinatorExecutor(processorName -> { ScheduledExecutorService coordinatorExecutor = - defaultExecutor("Coordinator[" + processorName + "]"); + defaultExecutor(1, "Coordinator[" + processorName + "]"); config.onShutdown(coordinatorExecutor::shutdown); return coordinatorExecutor; }) .workerExecutor(processorName -> { ScheduledExecutorService workerExecutor = - defaultExecutor("WorkPackage[" + processorName + "]"); + defaultExecutor(4, "WorkPackage[" + processorName + "]"); config.onShutdown(workerExecutor::shutdown); return workerExecutor; }); @@ -865,8 +865,8 @@ protected EventProcessor pooledStreamingEventProcessor( .build(); } - private ScheduledExecutorService defaultExecutor(String factoryName) { - return Executors.newScheduledThreadPool(1, new AxonThreadFactory(factoryName)); + private ScheduledExecutorService defaultExecutor(int poolSize, String factoryName) { + return Executors.newScheduledThreadPool(poolSize, new AxonThreadFactory(factoryName)); } /** diff --git a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java index 5cbcfa767f..9576087466 100644 --- a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java +++ b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java @@ -915,6 +915,40 @@ void testRegisterPooledStreamingEventProcessorConfigurationIsUsedDuringAllPsepCo assertEquals(mockedSource, getField("messageSource", result)); } + @Test + void testUsingPooledStreamingEventProcessorWithConfigurationIsUsedDuringAllPsepConstructions( + @Mock StreamableMessageSource> mockedSource + ) throws NoSuchFieldException, IllegalAccessException { + String testName = "pooled-streaming"; + int testCapacity = 24; + Object testHandler = new Object(); + + configurer.eventProcessing() + .usingPooledStreamingEventProcessors((config, builder) -> builder.maxClaimedSegments(testCapacity)) + .configureDefaultStreamableMessageSource(config -> mockedSource) + .registerEventHandler(config -> new PooledStreamingEventHandler()) + .byDefaultAssignTo("default") + .registerEventHandler(config -> testHandler); + Configuration config = configurer.start(); + + Optional optionalResult = + config.eventProcessingConfiguration() + .eventProcessor(testName, PooledStreamingEventProcessor.class); + + assertTrue(optionalResult.isPresent()); + PooledStreamingEventProcessor result = optionalResult.get(); + assertEquals(testCapacity, result.maxCapacity()); + assertEquals(mockedSource, getField("messageSource", result)); + + optionalResult = config.eventProcessingConfiguration() + .eventProcessor("default", PooledStreamingEventProcessor.class); + + assertTrue(optionalResult.isPresent()); + result = optionalResult.get(); + assertEquals(testCapacity, result.maxCapacity()); + assertEquals(mockedSource, getField("messageSource", result)); + } + @Test void testRegisterPooledStreamingEventProcessorConfigurationForNameIsUsedDuringSpecificPsepConstruction( @Mock StreamableMessageSource> mockedSource