From 25fadb0579e47263ddf34bca0b7a92372fb57721 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Thu, 7 Jul 2022 12:20:18 +0200 Subject: [PATCH 1/2] Allows config definition when defaulting to PSEP When invoking the EventProcessingConfigurer#usingPooledStreamingEventProcessors method, it would be beneficial to provide the default PSEP configuration as well. To that end we can easily introduce a default method that firstly invokes usingPooledStreamingEventProcessors and after that invokes registerPooledStreamingEventProcessorConfiguration. #enhancement/psep-config --- .../config/EventProcessingConfigurer.java | 23 ++++++++++-- .../config/EventProcessingModuleTest.java | 36 ++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java b/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java index c3ae068730..42aaf673e4 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingConfigurer.java @@ -1,11 +1,11 @@ /* - * Copyright (c) 2010-2019. Axon Framework + * Copyright (c) 2010-2022. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -260,6 +260,25 @@ EventProcessingConfigurer registerTokenStore(String processorName, */ EventProcessingConfigurer usingPooledStreamingEventProcessors(); + /** + * Defaults Event Processors builders to construct a {@link PooledStreamingEventProcessor} using the + * {@code configuration} to configure them. + *

+ * The default behavior depends on the {@link EventBus} available in the {@link Configuration}. If the + * {@code EventBus} is a {@link StreamableMessageSource}, processors are Tracking by default. This method must be + * used to force the use of Pooled Streaming Processors, unless specifically overridden for individual processors. + * + * @param pooledStreamingProcessorConfiguration configuration used when constructing every + * {@link PooledStreamingEventProcessor} + * @return the current {@link EventProcessingConfigurer} instance, for fluent interfacing + */ + default EventProcessingConfigurer usingPooledStreamingEventProcessors( + PooledStreamingProcessorConfiguration pooledStreamingProcessorConfiguration + ) { + return usingPooledStreamingEventProcessors() + .registerPooledStreamingEventProcessorConfiguration(pooledStreamingProcessorConfiguration); + } + /** * Registers a {@link org.axonframework.eventhandling.SubscribingEventProcessor} with given {@code name} within this * Configurer. diff --git a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java index d5f283f216..038e46d574 100644 --- a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java +++ b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2020. Axon Framework + * Copyright (c) 2010-2022. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -892,6 +892,40 @@ void testRegisterPooledStreamingEventProcessorConfigurationIsUsedDuringAllPsepCo assertEquals(mockedSource, getField("messageSource", result)); } + @Test + void testUsingPooledStreamingEventProcessorWithConfigurationIsUsedDuringAllPsepConstructions( + @Mock StreamableMessageSource> mockedSource + ) throws NoSuchFieldException, IllegalAccessException { + String testName = "pooled-streaming"; + int testCapacity = 24; + Object testHandler = new Object(); + + configurer.eventProcessing() + .usingPooledStreamingEventProcessors((config, builder) -> builder.maxClaimedSegments(testCapacity)) + .configureDefaultStreamableMessageSource(config -> mockedSource) + .registerEventHandler(config -> new PooledStreamingEventHandler()) + .byDefaultAssignTo("default") + .registerEventHandler(config -> testHandler); + Configuration config = configurer.start(); + + Optional optionalResult = + config.eventProcessingConfiguration() + .eventProcessor(testName, PooledStreamingEventProcessor.class); + + assertTrue(optionalResult.isPresent()); + PooledStreamingEventProcessor result = optionalResult.get(); + assertEquals(testCapacity, result.maxCapacity()); + assertEquals(mockedSource, getField("messageSource", result)); + + optionalResult = config.eventProcessingConfiguration() + .eventProcessor("default", PooledStreamingEventProcessor.class); + + assertTrue(optionalResult.isPresent()); + result = optionalResult.get(); + assertEquals(testCapacity, result.maxCapacity()); + assertEquals(mockedSource, getField("messageSource", result)); + } + @Test void testRegisterPooledStreamingEventProcessorConfigurationForNameIsUsedDuringSpecificPsepConstruction( @Mock StreamableMessageSource> mockedSource From a1c186291feeca1f152510076f3a4da0c716ace0 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Thu, 7 Jul 2022 12:21:52 +0200 Subject: [PATCH 2/2] Define default executors with a pool size The Spring Boot Auto Configuration defaults the workerExecutor thread pool to four, whereas the EventProcessingModule defaults it to one. Adjust this deviation by letting the EventProcessingModule construct an executor with a pool size. #enhancement/psep-config --- .../axonframework/config/EventProcessingModule.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/config/src/main/java/org/axonframework/config/EventProcessingModule.java b/config/src/main/java/org/axonframework/config/EventProcessingModule.java index b09128897a..b943ab5e6c 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingModule.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingModule.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2021. Axon Framework + * Copyright (c) 2010-2022. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -814,13 +814,13 @@ private PooledStreamingEventProcessor pooledStreamingEventProcessor( .transactionManager(transactionManager(name)) .coordinatorExecutor(processorName -> { ScheduledExecutorService coordinatorExecutor = - defaultExecutor("Coordinator[" + processorName + "]"); + defaultExecutor(1, "Coordinator[" + processorName + "]"); config.onShutdown(coordinatorExecutor::shutdown); return coordinatorExecutor; }) .workerExecutor(processorName -> { ScheduledExecutorService workerExecutor = - defaultExecutor("WorkPackage[" + processorName + "]"); + defaultExecutor(4, "WorkPackage[" + processorName + "]"); config.onShutdown(workerExecutor::shutdown); return workerExecutor; }); @@ -830,8 +830,8 @@ private PooledStreamingEventProcessor pooledStreamingEventProcessor( .build(); } - private ScheduledExecutorService defaultExecutor(String factoryName) { - return Executors.newScheduledThreadPool(1, new AxonThreadFactory(factoryName)); + private ScheduledExecutorService defaultExecutor(int poolSize, String factoryName) { + return Executors.newScheduledThreadPool(poolSize, new AxonThreadFactory(factoryName)); } /**