Skip to content

Commit

Permalink
Added Upcasters to Configuration API
Browse files Browse the repository at this point in the history
  • Loading branch information
abuijze committed Mar 3, 2017
1 parent f7baf3b commit a78e2c2
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 30 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;

import java.util.List;
import java.util.function.Supplier;
Expand Down Expand Up @@ -195,4 +196,11 @@ default ParameterResolverFactory parameterResolverFactory() {
* @see #onStart(Runnable)
*/
void onShutdown(Runnable shutdownHandler);

/**
* Returns the EventUpcasterChain with all registered upcasters.
*
* @return the EventUpcasterChain with all registered upcasters
*/
EventUpcasterChain upcasterChain();
}
9 changes: 9 additions & 0 deletions core/src/main/java/org/axonframework/config/Configurer.java
Expand Up @@ -26,6 +26,7 @@
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;

import java.util.List;
import java.util.function.BiFunction;
Expand All @@ -45,6 +46,14 @@
*/
public interface Configurer {

/**
* Registers an upcaster to be used to upcast Events to a newer version
*
* @param upcasterBuilder The function that returns an EventUpcaster based on the configuration
* @return the current instance of the Configurer, for chaining purposes
*/
Configurer registerEventUpcaster(Function<Configuration, EventUpcaster> upcasterBuilder);

/**
* Configure the Message Monitor to use for the Message processing components in this configuration. The builder
* function receives the type of component as well as its name as input, and is expected to return a MessageMonitor
Expand Down
Expand Up @@ -48,6 +48,8 @@
import org.axonframework.serialization.AnnotationRevisionResolver;
import org.axonframework.serialization.RevisionResolver;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.xml.XStreamSerializer;

import java.util.*;
Expand All @@ -56,6 +58,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.stream.Collectors.toList;

/**
* Entry point of the Axon Configuration API. It implements the Configurer interface, providing access to the methods to
* configure the default Axon components.
Expand All @@ -82,6 +86,11 @@ public class DefaultConfigurer implements Configurer {
c -> Collections.singletonList(new MessageOriginProvider()));

private final Map<Class<?>, Component<?>> components = new HashMap<>();
private final List<Component<EventUpcaster>> upcasters = new ArrayList<>();
private final Component<EventUpcasterChain> upcasterChain = new Component<>(
config, "eventUpcasterChain",
c -> new EventUpcasterChain(upcasters.stream().map(Component::get).collect(toList())));

private final Map<Class<?>, AggregateConfiguration> aggregateConfigurations = new HashMap<>();

private final List<Consumer<Configuration>> initHandlers = new ArrayList<>();
Expand Down Expand Up @@ -110,11 +119,16 @@ public static Configurer defaultConfiguration() {
public static Configurer jpaConfiguration(EntityManagerProvider entityManagerProvider) {
return new DefaultConfigurer().registerComponent(EntityManagerProvider.class, c -> entityManagerProvider)
.configureEmbeddedEventStore(c -> new JpaEventStorageEngine(
c.serializer(),
c.upcasterChain(),
null, null,
c.getComponent(EntityManagerProvider.class, () -> entityManagerProvider),
c.getComponent(TransactionManager.class, () -> NoTransactionManager.INSTANCE)))
c.getComponent(TransactionManager.class, () -> NoTransactionManager.INSTANCE),
null, null, true))
.registerComponent(TokenStore.class, c -> new JpaTokenStore(
c.getComponent(EntityManagerProvider.class, () -> entityManagerProvider), c.serializer()))
.registerComponent(SagaStore.class, c -> new JpaSagaStore(
c.serializer(),
c.getComponent(EntityManagerProvider.class, () -> entityManagerProvider)));
}

Expand Down Expand Up @@ -199,6 +213,12 @@ protected Serializer defaultSerializer(Configuration config) {
return new XStreamSerializer(config.getComponent(RevisionResolver.class, AnnotationRevisionResolver::new));
}

@Override
public Configurer registerEventUpcaster(Function<Configuration, EventUpcaster> upcasterBuilder) {
upcasters.add(new Component<>(config, "upcaster", upcasterBuilder));
return this;
}

@Override
public Configurer configureMessageMonitor(
Function<Configuration, BiFunction<Class<?>, String, MessageMonitor<Message<?>>>>
Expand Down Expand Up @@ -368,6 +388,11 @@ public void onShutdown(Runnable shutdownHandler) {
shutdownHandlers.add(shutdownHandler);
}

@Override
public EventUpcasterChain upcasterChain() {
return upcasterChain.get();
}

@Override
public void onStart(Runnable startHandler) {
startHandlers.add(startHandler);
Expand Down
Expand Up @@ -81,7 +81,25 @@ public JpaEventStorageEngine(EntityManagerProvider entityManagerProvider, Transa
*/
public JpaEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain, DataSource dataSource,
EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) throws SQLException {
this(serializer, upcasterChain, new SQLErrorCodesResolver(dataSource), null, entityManagerProvider, transactionManager,
this(serializer, upcasterChain, new SQLErrorCodesResolver(dataSource), entityManagerProvider, transactionManager);
}

/**
* Initializes an EventStorageEngine that uses JPA to store and load events. Events are fetched in batches of 100.
*
* @param serializer Used to serialize and deserialize event payload and metadata.
* @param upcasterChain Allows older revisions of serialized objects to be deserialized.
* @param persistenceExceptionResolver Detects concurrency exceptions from the backing database. If {@code null}
* persistence exceptions are not explicitly resolved.
* that represent concurrent access failures for most database types.
* @param entityManagerProvider Provider for the {@link EntityManager} used by this EventStorageEngine.
* @param transactionManager The instance managing transactions around fetching event data. Required by
* certain databases for reading blob data.
*/
public JpaEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain,
PersistenceExceptionResolver persistenceExceptionResolver,
EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
this(serializer, upcasterChain, persistenceExceptionResolver, null, entityManagerProvider, transactionManager,
null, null, true);
}

Expand Down
102 changes: 75 additions & 27 deletions core/src/test/java/org/axonframework/config/DefaultConfigurerTest.java
Expand Up @@ -16,30 +16,30 @@

package org.axonframework.config;

import org.axonframework.commandhandling.AsynchronousCommandBus;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.*;
import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.GenericJpaRepository;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;
import org.junit.Assert;
import org.junit.Test;

import javax.persistence.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.axonframework.config.AggregateConfigurer.defaultConfiguration;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class DefaultConfigurerTest {

Expand All @@ -59,28 +59,32 @@ public void defaultConfigurationWithEventSourcing() throws Exception {
}

@Test
public void defaultConfigurationWithJpaRepository() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("hibernate.connection.url", "jdbc:hsqldb:mem:axontest");
properties.put("hibernate.hbm2ddl.auto", "create-drop");
EntityManagerFactory emf = Persistence.createEntityManagerFactory("eventStore", properties);
EntityManager em = emf.createEntityManager();
public void defaultConfigurationWithUpcaster() throws Exception {
EntityManager em = createEntityManager();
AtomicInteger counter = new AtomicInteger();
Configuration config = DefaultConfigurer.defaultConfiguration()
.configureTransactionManager(c -> () -> {
EntityTransaction tx = em.getTransaction();
tx.begin();
return new Transaction() {
@Override
public void commit() {
tx.commit();
}

@Override
public void rollback() {
tx.rollback();
}
};
.configureEmbeddedEventStore(c -> new JpaEventStorageEngine(c.serializer(), c.upcasterChain(), c.getComponent(PersistenceExceptionResolver.class), () -> em, c.getComponent(TransactionManager.class)))
.configureAggregate(defaultConfiguration(StubAggregate.class)
.configureCommandTargetResolver(c -> command -> new VersionedAggregateIdentifier(command.getPayload().toString(), null)))
.registerEventUpcaster(c -> events -> {
counter.incrementAndGet();
return events;
})
.configureTransactionManager(c -> new EntityManagerTransactionManager(em))
.buildConfiguration();
config.start();

config.commandGateway().sendAndWait(GenericCommandMessage.asCommandMessage("test"));
config.commandGateway().sendAndWait(new GenericCommandMessage<>(new GenericMessage<>("test"), "update"));
Assert.assertEquals(1, counter.get());
assertNotNull(config.repository(StubAggregate.class));
}

@Test
public void defaultConfigurationWithJpaRepository() throws Exception {
EntityManager em = createEntityManager();
Configuration config = DefaultConfigurer.defaultConfiguration()
.configureTransactionManager(c -> new EntityManagerTransactionManager(em))
.configureCommandBus(c -> {
AsynchronousCommandBus commandBus = new AsynchronousCommandBus();
commandBus.registerHandlerInterceptor(new TransactionManagingInterceptor<>(c.getComponent(TransactionManager.class)));
Expand All @@ -89,7 +93,8 @@ public void rollback() {
.configureAggregate(
defaultConfiguration(StubAggregate.class)
.configureRepository(c -> new GenericJpaRepository<>(new SimpleEntityManagerProvider(em),
StubAggregate.class, c.eventBus(), c.parameterResolverFactory())))
StubAggregate.class, c.eventBus(),
c.parameterResolverFactory())))
.buildConfiguration();

config.start();
Expand All @@ -99,6 +104,14 @@ public void rollback() {
assertNotNull(config.repository(StubAggregate.class));
}

private EntityManager createEntityManager() {
Map<String, String> properties = new HashMap<>();
properties.put("hibernate.connection.url", "jdbc:hsqldb:mem:axontest");
properties.put("hibernate.hbm2ddl.auto", "create-drop");
EntityManagerFactory emf = Persistence.createEntityManagerFactory("eventStore", properties);
return emf.createEntityManager();
}

@Entity(name = "StubAggregate")
private static class StubAggregate {

Expand All @@ -111,7 +124,6 @@ public StubAggregate() {

@CommandHandler
public StubAggregate(String command, CommandBus commandBus) {
assertTrue(commandBus instanceof AsynchronousCommandBus);
apply(command);
}

Expand All @@ -125,4 +137,40 @@ protected void on(String event) {
this.id = event;
}
}

private static class EntityManagerTransactionManager implements TransactionManager {
private final EntityManager em;

public EntityManagerTransactionManager(EntityManager em) {
this.em = em;
}

@Override
public Transaction startTransaction() {
EntityTransaction tx = em.getTransaction();
if (tx.isActive()) {
return new Transaction() {
@Override
public void commit() {
}

@Override
public void rollback() {
}
};
}
tx.begin();
return new Transaction() {
@Override
public void commit() {
tx.commit();
}

@Override
public void rollback() {
tx.rollback();
}
};
}
}
}
Expand Up @@ -11,6 +11,7 @@
import org.axonframework.messaging.Message;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
Expand All @@ -28,8 +29,8 @@
@org.springframework.context.annotation.Configuration("org.axonframework.spring.config.AxonConfiguration")
public class AxonConfiguration implements Configuration, InitializingBean, ApplicationContextAware, SmartLifecycle {

private Configuration config;
private final Configurer configurer;
private Configuration config;
private volatile boolean running = false;

/**
Expand Down Expand Up @@ -91,6 +92,11 @@ public List<CorrelationDataProvider> correlationDataProviders() {
return config.correlationDataProviders();
}

@Override
public EventUpcasterChain upcasterChain() {
return config.upcasterChain();
}

@Override
public void onStart(Runnable startHandler) {
config.onStart(startHandler);
Expand Down

0 comments on commit a78e2c2

Please sign in to comment.