Skip to content

Commit

Permalink
Merge branch 'axon-3.0.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
abuijze committed Apr 20, 2017
2 parents ee3fe6e + fabc2ae commit 7b308cd
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 80 deletions.
10 changes: 8 additions & 2 deletions core/src/main/java/org/axonframework/config/Configuration.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -175,6 +174,13 @@ default ParameterResolverFactory parameterResolverFactory() {
return getComponent(ParameterResolverFactory.class);
}

/**
* Returns all modules that have been registered with this Configuration.
*
* @return all modules that have been registered with this Configuration
*/
List<ModuleConfiguration> getModules();

/**
* Registers a handler to be executed when this Configuration is started.
* <p>
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/java/org/axonframework/config/Configurer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -227,4 +226,16 @@ default <A> Configurer configureAggregate(Class<A> aggregate) {
* @return the fully initialized Configuration
*/
Configuration buildConfiguration();

/**
* Builds the configuration and starts it immediately. It is not recommended to change any configuration on this
* Configurer once this method is called.
*
* @return The started configuration
*/
default Configuration start() {
Configuration configuration = buildConfiguration();
configuration.start();
return configuration;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -96,6 +95,7 @@ public class DefaultConfigurer implements Configurer {
private final List<Consumer<Configuration>> initHandlers = new ArrayList<>();
private final List<Runnable> startHandlers = new ArrayList<>();
private final List<Runnable> shutdownHandlers = new ArrayList<>();
private List<ModuleConfiguration> modules = new ArrayList<>();

private boolean initialized = false;

Expand Down Expand Up @@ -241,6 +241,7 @@ public Configurer registerModule(ModuleConfiguration module) {
} else {
initHandlers.add(module::initialize);
}
this.modules.add(module);
startHandlers.add(module::start);
shutdownHandlers.add(module::shutdown);
return this;
Expand Down Expand Up @@ -279,6 +280,7 @@ public Configurer configureEmbeddedEventStore(Function<Configuration, EventStora

@Override
public <A> Configurer configureAggregate(AggregateConfiguration<A> aggregateConfiguration) {
this.modules.add(aggregateConfiguration);
this.aggregateConfigurations.put(aggregateConfiguration.aggregateType(), aggregateConfiguration);
this.initHandlers.add(aggregateConfiguration::initialize);
this.startHandlers.add(aggregateConfiguration::start);
Expand Down Expand Up @@ -383,6 +385,11 @@ public List<CorrelationDataProvider> correlationDataProviders() {
return correlationProviders.get();
}

@Override
public List<ModuleConfiguration> getModules() {
return modules;
}

@Override
public void onShutdown(Runnable shutdownHandler) {
shutdownHandlers.add(shutdownHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright (c) 2010-2016. Axon Framework
*
* Copyright (c) 2010-2017. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -373,6 +372,29 @@ public EventHandlingConfiguration registerSubscribingEventProcessor(
(c, n, eh) -> subscribingEventProcessor(c, n, eh, messageSource));
}

/**
* Returns a list of Event Processors that have been initialized. Note that an empty list may be returned if this
* configuration hasn't been {@link #initialize(Configuration) initialized} yet.
*
* @return a read-only list of processors initialized in this configuration.
*/
public List<EventProcessor> getProcessors() {
return Collections.unmodifiableList(initializedProcessors);
}


/**
* Returns the Event Processor with the given {@code name}, if present. This method also returns an unresolved
* optional if the Processor was configured, but it hasn't been assigned any Event Handlers.
*
* @param name The name of the processor to return
* @return an Optional referencing the processor, if present.
*/
public <T extends EventProcessor> Optional<T> getProcessor(String name) {
//noinspection unchecked
return (Optional<T>) initializedProcessors.stream().filter(p -> name.equals(p.getName())).findAny();
}

/**
* Interface describing a Builder function for Event Processors.
*
Expand Down
109 changes: 97 additions & 12 deletions core/src/main/java/org/axonframework/config/SagaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

package org.axonframework.config;

import org.axonframework.common.Assert;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.*;
import org.axonframework.eventhandling.saga.AnnotatedSagaManager;
import org.axonframework.eventhandling.saga.SagaRepository;
import org.axonframework.eventhandling.saga.repository.AnnotatedSagaRepository;
Expand All @@ -29,6 +27,8 @@
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;

import java.util.ArrayList;
Expand Down Expand Up @@ -57,9 +57,23 @@ public class SagaConfiguration<S> implements ModuleConfiguration {
* @return a SagaConfiguration instance, ready for further configuration
*/
public static <S> SagaConfiguration<S> subscribingSagaManager(Class<S> sagaType) {
return new SagaConfiguration<>(sagaType);
return subscribingSagaManager(sagaType, Configuration::eventBus);
}

/**
* Initialize a configuration for a Saga of given {@code sagaType}, using a Subscribing Event Processor to process
* incoming Events from the message source provided by given {@code messageSourceBuilder}
*
* @param sagaType The type of Saga to handle events with
* @param messageSourceBuilder The function providing the message source based on the configuration
* @param <S> The type of Saga configured in this configuration
* @return a SagaConfiguration instance, ready for further configuration
*/
public static <S> SagaConfiguration<S> subscribingSagaManager(
Class<S> sagaType,
Function<Configuration, SubscribableMessageSource<EventMessage<?>>> messageSourceBuilder) {
return new SagaConfiguration<>(sagaType, messageSourceBuilder);
}

/**
* Initialize a configuration for a Saga of given {@code sagaType}, using a Tracking Event Processor to process
Expand All @@ -71,12 +85,29 @@ public static <S> SagaConfiguration<S> subscribingSagaManager(Class<S> sagaType)
* @return a SagaConfiguration instance, ready for further configuration
*/
public static <S> SagaConfiguration<S> trackingSagaManager(Class<S> sagaType) {
SagaConfiguration<S> configuration = new SagaConfiguration<>(sagaType);
return trackingSagaManager(sagaType, Configuration::eventBus);
}

/**
* Initialize a configuration for a Saga of given {@code sagaType}, using a Tracking Event Processor to process
* incoming Events from a Message Source provided by given {@code messageSourceBuilder}. Note that a Token Store
* should be configured in the global configuration, or the Saga Manager will default to an in-memory token store,
* which is not recommended for production environments.
*
* @param sagaType The type of Saga to handle events with
* @param messageSourceBuilder The function providing the message source based on the configuration
* @param <S> The type of Saga configured in this configuration
* @return a SagaConfiguration instance, ready for further configuration
*/
public static <S> SagaConfiguration<S> trackingSagaManager(
Class<S> sagaType,
Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> messageSourceBuilder) {
SagaConfiguration<S> configuration = new SagaConfiguration<>(sagaType, c -> null);
configuration.processor.update(c -> {
TrackingEventProcessor processor = new TrackingEventProcessor(
sagaType.getSimpleName() + "Processor",
configuration.sagaManager.get(),
c.eventBus(),
messageSourceBuilder.apply(configuration.config),
c.getComponent(TokenStore.class, InMemoryTokenStore::new),
c.getComponent(TransactionManager.class, NoTransactionManager::instance));
processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
Expand All @@ -86,7 +117,7 @@ public static <S> SagaConfiguration<S> trackingSagaManager(Class<S> sagaType) {
}

@SuppressWarnings("unchecked")
private SagaConfiguration(Class<S> sagaType) {
private SagaConfiguration(Class<S> sagaType, Function<Configuration, SubscribableMessageSource<EventMessage<?>>> messageSourceBuilder) {
String managerName = sagaType.getSimpleName() + "Manager";
String processorName = sagaType.getSimpleName() + "Processor";
String repositoryName = sagaType.getSimpleName() + "Repository";
Expand All @@ -98,7 +129,8 @@ private SagaConfiguration(Class<S> sagaType) {
c.parameterResolverFactory()));
processor = new Component<>(() -> config, processorName,
c -> {
SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(), c.eventBus());
SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(),
messageSourceBuilder.apply(c));
processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
return processor;
});
Expand All @@ -118,11 +150,18 @@ public SagaConfiguration<S> configureSagaStore(Function<Configuration, SagaStore
return this;
}

public SagaConfiguration<S> registerHandlerInterceptor(Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> handlerInterceptor) {
/**
* Registers the handler interceptor provided by the given {@code handlerInterceptorBuilder} function with
* the processor defined in this configuration.
*
* @param handlerInterceptorBuilder The function to create the interceptor based on the current configuration
* @return this SagaConfiguration instance, ready for further configuration
*/
public SagaConfiguration<S> registerHandlerInterceptor(Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> handlerInterceptorBuilder) {
if (config != null) {
processor.get().registerInterceptor(handlerInterceptor.apply(config));
processor.get().registerInterceptor(handlerInterceptorBuilder.apply(config));
} else {
handlerInterceptors.add(handlerInterceptor);
handlerInterceptors.add(handlerInterceptorBuilder);
}
return this;
}
Expand All @@ -140,6 +179,52 @@ public void start() {
processor.get().start();
}

/**
* Returns the processor that processed events for the Saga in this Configuration.
*
* @return The EventProcessor defined in this Configuration
* @throws IllegalStateException when this configuration hasn't been initialized yet
*/
public EventProcessor getProcessor() {
Assert.state(config != null, () -> "Configuration is not initialized yet");
return processor.get();
}

/**
* Returns the Saga Store used by the Saga defined in this Configuration. If none has been explicitly defined,
* it will return the Saga Store of the main Configuration.
*
* @return The Saga Store defined in this Configuration
* @throws IllegalStateException when this configuration hasn't been initialized yet
*/
public SagaStore<? super S> getSagaStore() {
Assert.state(config != null, () -> "Configuration is not initialized yet");
return sagaStore.get();
}

/**
* Returns the SagaRepository instance used to load Saga instances in this Configuration.
*
* @return the SagaRepository defined in this Configuration
* @throws IllegalStateException when this configuration hasn't been initialized yet
*/
public SagaRepository<S> getSagaRepository() {
Assert.state(config != null, () -> "Configuration is not initialized yet");
return sagaRepository.get();
}

/**
* Returns the SagaManager responsible for managing the lifecycle and invocation of Saga instances of the type
* defined in this Configuration
*
* @return The SagaManager defined in this configuration
* @throws IllegalStateException when this configuration hasn't been initialized yet
*/
public AnnotatedSagaManager<S> getSagaManager() {
Assert.state(config != null, () -> "Configuration is not initialized yet");
return sagaManager.get();
}

@Override
public void shutdown() {
processor.get().shutDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.StreamableMessageSource;
Expand Down Expand Up @@ -215,6 +216,12 @@ protected void processingLoop() {
eventStream = ensureEventStreamOpened(eventStream);
processBatch(eventStream);
errorWaitTime = 1;
} catch (UnableToClaimTokenException e) {
if (errorWaitTime == 1) {
logger.info("Token is owned by another node. Waiting for it to become available...");
}
errorWaitTime = 5;
waitFor(errorWaitTime);
} catch (Exception e) {
// make sure to start with a clean event stream. The exception may have cause an illegal state
if (errorWaitTime == 1) {
Expand All @@ -224,13 +231,7 @@ protected void processingLoop() {
releaseToken();
closeQuietly(eventStream);
eventStream = null;
try {
Thread.sleep(errorWaitTime * 1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
logger.warn("Thread interrupted. Preparing to shut down event processor");
shutDown();
}
waitFor(errorWaitTime);
errorWaitTime = Math.min(errorWaitTime * 2, 60);
}
}
Expand All @@ -240,6 +241,16 @@ protected void processingLoop() {
}
}

private void waitFor(long errorWaitTime) {
try {
Thread.sleep(errorWaitTime * 1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
logger.warn("Thread interrupted. Preparing to shut down event processor");
shutDown();
}
}

private void releaseToken() {
try {
transactionManager.executeInTransaction(() -> tokenStore.releaseClaim(getName(), 0));
Expand Down
Loading

0 comments on commit 7b308cd

Please sign in to comment.