diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index 367d709e..573f50e3 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -175,12 +175,18 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { - // To configure a default ... - processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */) - // ... or for a specific processing group: - .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */); + @Bean + public ConfigurerModule processingGroupErrorHandlingConfigurerModule() { + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerDefaultListenerInvocationErrorHandler( + conf -> /* create listener error handler */ + ) + // ... or for a specific processing group: + .registerListenerInvocationErrorHandler( + "my-processing-group", + conf -> /* create listener error handler */ + ) + ); } } ``` @@ -231,13 +237,16 @@ public class AxonConfig { ```java @Configuration public class AxonConfig { - // ... - @Autowired - public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) { - // To configure a default ... - processingConfigurer.registerDefaultErrorHandler(conf -> /* create error handler */) - // ... or for a specific processor: - .registerErrorHandler("my-processor", conf -> /* create error handler */); + @Bean + public ConfigurerModule processorErrorHandlingConfigurerModule() { + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerDefaultErrorHandler(conf -> /* create error handler */) + // ... or for a specific processor: + .registerErrorHandler( + "my-processor", + conf -> /* create error handler */ + ) + ); } } ``` diff --git a/axon-framework/events/event-processors/streaming.md b/axon-framework/events/event-processors/streaming.md index b35a1641..2f8d3397 100644 --- a/axon-framework/events/event-processors/streaming.md +++ b/axon-framework/events/event-processors/streaming.md @@ -76,9 +76,9 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.usingTrackingEventProcessors(); + @Bean + public ConfigurerModule processorDefaultConfigurerModule() { + return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingTrackingEventProcessors); } } ``` @@ -118,23 +118,27 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureTrackingProcessors(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule trackingProcessorConfigurerModule() { // This configuration object allows for fine-grained control over the Tracking Processor TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration.forSingleThreadedProcessing(); - - // To configure a processor to be tracking ... - processingConfigurer.registerTrackingEventProcessor("my-processor") - // ... to define a specific StreamableMessageSource ... - .registerTrackingEventProcessor( - "my-processor", conf -> /* create/return StreamableMessageSource */ - ) - // ... to provide additional configuration ... - .registerTrackingEventProcessor( - "my-processor", conf -> /* create/return StreamableMessageSource */, - conf -> tepConfig - ); + + return configurer -> configurer.eventProcessing( + // To configure a processor to be tracking ... + processingConfigurer -> processingConfigurer.registerTrackingEventProcessor("my-processor") + // ... to define a specific StreamableMessageSource ... + .registerTrackingEventProcessor( + "my-processor", + conf -> /* create/return StreamableMessageSource */ + ) + // ... to provide additional configuration ... + .registerTrackingEventProcessor( + "my-processor", + conf -> /* create/return StreamableMessageSource */, + conf -> tepConfig + ) + ); } } ``` @@ -184,15 +188,19 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void registerTrackingProcessorConfig(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule trackingProcessorConfigurerModule() { TrackingEventProcessorConfiguration tepConfig = - TrackingEventProcessorConfiguration.forSingleThreadedProcessing(); - - // To register a default tracking config ... - processingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig) - // ... to register a config for a specific processor. - .registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig); + TrackingEventProcessorConfiguration.forSingleThreadedProcessing(); + + return configurer -> configurer.eventProcessing( + // To register a default tracking config ... + processingConfigurer -> processingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig) + // ... to register a config for a specific processor. + .registerTrackingEventProcessorConfiguration( + "my-processor", config -> tepConfig + ) + ); } } ``` @@ -220,9 +228,9 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.usingPooledStreamingEventProcessors(); + @Bean + public ConfigurerModule processorDefaultConfigurerModule() { + return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingPooledStreamingEventProcessors); } } ``` @@ -261,22 +269,27 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configurePooledStreamingProcessors(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule pooledStreamingProcessorConfigurerModule() { // This configuration object allows for fine-grained control over the Pooled Streaming Processor EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder/* ... */; - - // To configure a processor to be pooled streaming ... - processingConfigurer.registerPooledStreamingEventProcessor("my-processor") - // ... to define a specific StreamableMessageSource ... - .registerPooledStreamingEventProcessor( - "my-processor", conf -> /* create/return StreamableMessageSource */ - ) - // ... to provide additional configuration ... - .registerPooledStreamingEventProcessor( - "my-processor", conf -> /* create/return StreamableMessageSource */, psepConfig - ); + (config, builder) -> builder/* ... */; + + return configurer -> configurer.eventProcessing( + // To configure a processor to be pooled streaming ... + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessor("my-processor") + // ... to define a specific StreamableMessageSource ... + .registerPooledStreamingEventProcessor( + "my-processor", + conf -> /* create/return StreamableMessageSource */ + ) + // ... to provide additional configuration ... + .registerPooledStreamingEventProcessor( + "my-processor", + conf -> /* create/return StreamableMessageSource */, + psepConfig + ) + ); } } ``` @@ -326,15 +339,19 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void registerPooledStreamingProcessorConfig(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule pooledStreamingProcessorConfigurerModule() { EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder/* ... */; - - // To register a default pooled streaming config ... - processingConfigurer.registerPooledStreamingEventProcessorConfiguration(psepConfig) - // ... to register a config for a specific processor. - .registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); + (config, builder) -> builder/* ... */; + + return configurer -> configurer.eventProcessing( + // To register a default pooled streaming config ... + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessorConfiguration(psepConfig) + // ... to register a config for a specific processor. + .registerPooledStreamingEventProcessorConfiguration( + "my-processor", psepConfig + ) + ); } } ``` @@ -454,13 +471,17 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) { - TrackingEventProcessorConfiguration tepConfig = - TrackingEventProcessorConfiguration.forSingleThreadedProcessing() - .andInitialTrackingToken(StreamableMessageSource::createTailToken); - - processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig); + @Bean + public ConfigurerModule initialTrackingTokenConfigurerModule() { + TrackingEventProcessorConfiguration tepConfig = + TrackingEventProcessorConfiguration.forSingleThreadedProcessing() + .andInitialTrackingToken(StreamableMessageSource::createTailToken); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTrackingEventProcessorConfiguration( + "my-processor", config -> tepConfig + ) + ); } } ``` @@ -487,14 +508,18 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) { - EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder.initialToken( - messageSource -> messageSource.createTokenSince(Duration.ofDays(31)) - ); - - processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); + @Bean + public ConfigurerModule initialTrackingTokenConfigurerModule() { + EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = + (config, builder) -> builder.initialToken( + messageSource -> messageSource.createTokenSince(Duration.ofDays(31)) + ); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessorConfiguration( + "my-processor", psepConfig + ) + ); } } ``` @@ -541,14 +566,18 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) { - TrackingEventProcessorConfiguration tepConfig = - TrackingEventProcessorConfiguration.forSingleThreadedProcessing() - .andTokenClaimInterval(1000, TimeUnit.MILLISECONDS) - .andEventAvailabilityTimeout(2000, TimeUnit.MILLISECONDS); - - processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig); + @Bean + public ConfigurerModule tokenClaimValuesConfigurerModule() { + TrackingEventProcessorConfiguration tepConfig = + TrackingEventProcessorConfiguration.forSingleThreadedProcessing() + .andTokenClaimInterval(1000, TimeUnit.MILLISECONDS) + .andEventAvailabilityTimeout(2000, TimeUnit.MILLISECONDS); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTrackingEventProcessorConfiguration( + "my-processor", config -> tepConfig + ) + ); } } ``` @@ -574,13 +603,17 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) { - EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder.tokenClaimInterval(2000) - .claimExtensionThreshold(3000); - - processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); + @Bean + public ConfigurerModule tokenClaimValuesConfigurerModule() { + EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = + (config, builder) -> builder.tokenClaimInterval(2000) + .claimExtensionThreshold(3000); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessorConfiguration( + "my-processor", psepConfig + ) + ); } } ``` @@ -747,15 +780,17 @@ Alternatively, inject the `EventProcessingConfigurer`, which allows more fine-gr @Configuration public class AxonConfig { // ... - @Autowired - public void registerTokenStore(EventProcessingConfigurer processingConfigurer) { - TokenStore tokenStore = JdbcTokenStore.builder() - // … - .build(); - - processingConfigurer.registerTokenStore(conf -> tokenStore) - // or, to define one for a specific processor: - .registerTokenStore("my-processor", conf -> tokenStore); + @Bean + public ConfigurerModule tokenStoreConfigurerModule() { + TokenStore tokenStore = JdbcTokenStore.builder() + // … + .build(); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTokenStore(conf -> tokenStore) + // or, to define one for a specific processor: + .registerTokenStore("my-processor", conf -> tokenStore) + ); } } ``` @@ -815,13 +850,17 @@ The default number of segments for a `TrackingEventProcessor` is one. @Configuration public class AxonConfig { // ... - @Autowired - public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) { - TrackingEventProcessorConfiguration tepConfig = - TrackingEventProcessorConfiguration.forParallelProcessing(2) - .andInitialSegmentsCount(2); - - processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig); + @Bean + public ConfigurerModule segmentCountConfigurerModule() { + TrackingEventProcessorConfiguration tepConfig = + TrackingEventProcessorConfiguration.forParallelProcessing(2) + .andInitialSegmentsCount(2); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTrackingEventProcessorConfiguration( + "my-processor", config -> tepConfig + ) + ); } } ``` @@ -850,12 +889,16 @@ The default number of segments for a `PooledStreamingEventProcessor` is sixteen. @Configuration public class AxonConfig { // ... - @Autowired - public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) { - EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder.initialSegmentCount(32); - - processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); + @Bean + public ConfigurerModule segmentCountConfigurerModule() { + EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = + (config, builder) -> builder.initialSegmentCount(32); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessorConfiguration( + "my-processor", psepConfig + ) + ); } } ``` @@ -955,13 +998,13 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureSequencingPolicy(EventProcessingConfigurer processingConfigurer, - SequencingPolicy> mySequencingPolicy) { - - processingConfigurer.registerDefaultSequencingPolicy(config -> mySequencingPolicy) - // or, to define one for a specific processor: - .registerSequencingPolicy("my-processor", config -> mySequencingPolicy); + @Bean + public ConfigurerModule sequencingPolicyConfigurerModule(SequencingPolicy> mySequencingPolicy) { + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerDefaultSequencingPolicy(config -> mySequencingPolicy) + // or, to define one for a specific processor: + .registerSequencingPolicy("my-processor", config -> mySequencingPolicy) + ); } @Bean @@ -1052,13 +1095,17 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureThreadCount(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule threadCountConfigurerModule() { TrackingEventProcessorConfiguration tepConfig = - TrackingEventProcessorConfiguration.forParallelProcessing(4) - .andInitialSegmentsCount(4); + TrackingEventProcessorConfiguration.forParallelProcessing(4) + .andInitialSegmentsCount(4); - processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig); + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTrackingEventProcessorConfiguration( + "my-processor", config -> tepConfig + ) + ); } } ``` @@ -1120,21 +1167,25 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureThreadCount(EventProcessingConfigurer processingConfigurer) { + @Bean + public ConfigurerModule threadCountConfigurerModule() { // the "name" is the name of the processor, which can be used to define the thread factory name - Function coordinatorExecutorBuilder = - name -> Executors.newScheduledThreadPool(1, new AxonThreadFactory("Coordinator - " + name)); + Function coordinatorExecutorBuilder = + name -> Executors.newScheduledThreadPool(1, new AxonThreadFactory("Coordinator - " + name)); Function workerExecutorBuilder = - name -> Executors.newScheduledThreadPool(16, new AxonThreadFactory("Worker - " + name)); + name -> Executors.newScheduledThreadPool(16, new AxonThreadFactory("Worker - " + name)); EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = - (config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder) - .workerExecutor(workerExecutorBuilder) - .initialSegmentCount(32); - - processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); + (config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder) + .workerExecutor(workerExecutorBuilder) + .initialSegmentCount(32); + + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessorConfiguration( + "my-processor", psepConfig + ) + ); } } ``` @@ -1536,10 +1587,12 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureTrackingProcessor(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.registerTrackingEventProcessor( - "my-processor", config -> buildMultiStreamableMessageSource(/*...*/) + @Bean + public ConfigurerModule trackingProcessorConfigurerModule() { + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerTrackingEventProcessor( + "my-processor", config -> buildMultiStreamableMessageSource(/*...*/) + ) ); } } @@ -1564,10 +1617,12 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configurePooledStreamingProcessor(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.registerPooledStreamingEventProcessor( - "my-processor", config -> buildMultiStreamableMessageSource(/*...*/) + @Bean + public ConfigurerModule pooledStreamingProcessorConfigurerModule() { + return configurer -> configurer.eventProcessing( + processingConfigurer -> processingConfigurer.registerPooledStreamingEventProcessor( + "my-processor", config -> buildMultiStreamableMessageSource(/*...*/) + ) ); } } diff --git a/axon-framework/events/event-processors/subscribing.md b/axon-framework/events/event-processors/subscribing.md index c3a0d11b..3f6d99da 100644 --- a/axon-framework/events/event-processors/subscribing.md +++ b/axon-framework/events/event-processors/subscribing.md @@ -52,9 +52,9 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.usingSubscribingEventProcessors(); + @Bean + public ConfigurerModule processorDefaultConfigurerModule() { + return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingSubscribingEventProcessors); } } ``` @@ -83,12 +83,17 @@ public class AxonConfig { @Configuration public class AxonConfig { // ... - @Autowired - public void configureSubscribingProcessors(EventProcessingConfigurer processingConfigurer) { - // To configure a processor to be subscribing ... - processingConfigurer.registerSubscribingEventProcessor("my-processor") - // ... to define a specific SubscribableMessageSource ... - .registerSubscribingEventProcessor("my-processor", conf -> /* create/return SubscribableMessageSource */); + @Bean + public ConfigurerModule subscribingProcessorsConfigurerModule() { + return configurer -> configurer.eventProcessing( + // To configure a processor to be subscribing ... + processingConfigurer -> processingConfigurer.registerSubscribingEventProcessor("my-processor") + // ... to define a specific SubscribableMessageSource ... + .registerSubscribingEventProcessor( + "my-processor", + conf -> /* create/return SubscribableMessageSource */ + ) + ); } } ```