Skip to content

Commit

Permalink
Resolve remaining TODO's and failing (non-integration) tests
Browse files Browse the repository at this point in the history
We still need to resolve ignored EventStreamDecorator#decorateForAppend() tests though.
  • Loading branch information
renedewaele committed Jun 10, 2016
1 parent a157a3d commit aca7609
Show file tree
Hide file tree
Showing 28 changed files with 216 additions and 472 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.axonframework.contextsupport.spring.amqp; package org.axonframework.contextsupport.spring.amqp;


import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager; import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPEventBus; import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.AbstractBeanDefinition;
Expand Down Expand Up @@ -65,7 +65,7 @@ public class EventBusBeanDefinitionParser extends AbstractBeanDefinitionParser {
@Override @Override
protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) {
GenericBeanDefinition terminalDefinition = new GenericBeanDefinition(); GenericBeanDefinition terminalDefinition = new GenericBeanDefinition();
terminalDefinition.setBeanClass(SpringAMQPEventBus.class); terminalDefinition.setBeanClass(SpringAMQPTerminal.class);
GenericBeanDefinition listenerContainerDefinition = createContainerManager(element, parserContext); GenericBeanDefinition listenerContainerDefinition = createContainerManager(element, parserContext);
final String containerBeanName = resolveId(element, terminalDefinition, parserContext) final String containerBeanName = resolveId(element, terminalDefinition, parserContext)
+ CONTAINER_MANAGER_SUFFIX; + CONTAINER_MANAGER_SUFFIX;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


/** /**
* MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors. * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
* The <code>byte[]</code> making up the message payload must the format as used by the {@link SpringAMQPEventBus}. * The <code>byte[]</code> making up the message payload must the format as used by the {@link SpringAMQPTerminal}.
* *
* @author Allard Buijze * @author Allard Buijze
* @since 2.0 * @since 2.0
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import org.axonframework.common.Assert; import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException; import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration; import org.axonframework.common.Registration;
import org.axonframework.eventhandling.AbstractEventBus; import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.amqp.*; import org.axonframework.eventhandling.amqp.*;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.serialization.Serializer; import org.axonframework.serialization.Serializer;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -40,6 +38,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;


Expand All @@ -52,13 +51,13 @@
* @author Allard Buijze * @author Allard Buijze
* @since 2.0 * @since 2.0
*/ */
public class SpringAMQPEventBus extends AbstractEventBus implements InitializingBean, ApplicationContextAware { public class SpringAMQPTerminal implements InitializingBean, ApplicationContextAware {


//todo turn this into an event processor private static final Logger logger = LoggerFactory.getLogger(SpringAMQPTerminal.class);

private static final Logger logger = LoggerFactory.getLogger(SpringAMQPEventBus.class);
private static final String DEFAULT_EXCHANGE_NAME = "Axon.EventBus"; private static final String DEFAULT_EXCHANGE_NAME = "Axon.EventBus";


private final EventBus eventBus;

private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactory;
private String exchangeName = DEFAULT_EXCHANGE_NAME; private String exchangeName = DEFAULT_EXCHANGE_NAME;
private boolean isTransactional = false; private boolean isTransactional = false;
Expand All @@ -70,9 +69,21 @@ public class SpringAMQPEventBus extends AbstractEventBus implements Initializing
private RoutingKeyResolver routingKeyResolver; private RoutingKeyResolver routingKeyResolver;
private boolean waitForAck; private boolean waitForAck;
private long publisherAckTimeout; private long publisherAckTimeout;
private Registration eventBusRegistration;


@Override public SpringAMQPTerminal(EventBus eventBus) {
protected void prepareCommit(List<? extends EventMessage<?>> events) { this.eventBus = eventBus;
}

public void start() {
eventBusRegistration = eventBus.subscribe(this::send);
}

public void shutDown() {
Optional.ofNullable(eventBusRegistration).ifPresent(Registration::cancel);
}

protected void send(List<? extends EventMessage<?>> events) {
Channel channel = connectionFactory.createConnection().createChannel(isTransactional); Channel channel = connectionFactory.createConnection().createChannel(isTransactional);
try { try {
if (isTransactional) { if (isTransactional) {
Expand Down Expand Up @@ -179,13 +190,6 @@ private void tryRollback(Channel channel) {
} }
} }


@Override
public TrackingEventStream streamEvents(TrackingToken trackingToken) {
//todo can be removed when this is modified to be an event processor
throw new UnsupportedOperationException();
}

@Override
public Registration subscribe(Consumer<List<? extends EventMessage<?>>> eventProcessor) { public Registration subscribe(Consumer<List<? extends EventMessage<?>>> eventProcessor) {
AMQPConsumerConfiguration config = new DefaultAMQPConsumerConfiguration(eventProcessor.toString()); AMQPConsumerConfiguration config = new DefaultAMQPConsumerConfiguration(eventProcessor.toString());
return getListenerContainerLifecycleManager().registerEventProcessor(eventProcessor, config, messageConverter); return getListenerContainerLifecycleManager().registerEventProcessor(eventProcessor, config, messageConverter);
Expand Down Expand Up @@ -289,7 +293,7 @@ public void setPublisherAckTimeout(long publisherAckTimeout) {
/** /**
* Sets the ConnectionFactory providing the Connections and Channels to send messages on. The SpringAMQPTerminal * Sets the ConnectionFactory providing the Connections and Channels to send messages on. The SpringAMQPTerminal
* does not cache or reuse connections. Providing a ConnectionFactory instance that caches connections will prevent * does not cache or reuse connections. Providing a ConnectionFactory instance that caches connections will prevent
* new connections to be opened for each invocation to {@link #publish(EventMessage[])} * new connections to be opened for each invocation to {@link #accept(List)}
* <p> * <p>
* Defaults to an autowired Connection Factory. * Defaults to an autowired Connection Factory.
* *
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</xsd:documentation> </xsd:documentation>
<xsd:appinfo> <xsd:appinfo>
<tool:annotation> <tool:annotation>
<tool:exports type="org.axonframework.eventhandling.amqp.spring.SpringAMQPEventBus"/> <tool:exports type="org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal"/>
<tool:exports type="org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager"/> <tool:exports type="org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager"/>
</tool:annotation> </tool:annotation>
</xsd:appinfo> </xsd:appinfo>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.axonframework.eventhandling.amqp.AMQPMessageConverter; import org.axonframework.eventhandling.amqp.AMQPMessageConverter;
import org.axonframework.eventhandling.amqp.RoutingKeyResolver; import org.axonframework.eventhandling.amqp.RoutingKeyResolver;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration; import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPEventBus; import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.axonframework.serialization.Serializer; import org.axonframework.serialization.Serializer;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testContextStartup() {
assertNotNull(applicationContext); assertNotNull(applicationContext);
assertNotNull(beanFactory); assertNotNull(beanFactory);
BeanDefinition terminal1 = beanFactory.getBeanDefinition("terminal1"); BeanDefinition terminal1 = beanFactory.getBeanDefinition("terminal1");
assertEquals(SpringAMQPEventBus.class.getName(), terminal1.getBeanClassName()); assertEquals(SpringAMQPTerminal.class.getName(), terminal1.getBeanClassName());
assertTrue(property(terminal1, "listenerContainerLifecycleManager") instanceof BeanReference); assertTrue(property(terminal1, "listenerContainerLifecycleManager") instanceof BeanReference);


assertEquals("connectionFactory", property(terminal1, "connectionFactory", BeanReference.class).getBeanName()); assertEquals("connectionFactory", property(terminal1, "connectionFactory", BeanReference.class).getBeanName());
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
@ContextConfiguration(locations = "/META-INF/spring/messaging-context.xml") @ContextConfiguration(locations = "/META-INF/spring/messaging-context.xml")
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@Ignore @Ignore
public class SpringAMQPEventBusIntegrationTest { public class SpringAMQPTerminalIntegrationTest {


private static final int EVENT_COUNT = 100; private static final int EVENT_COUNT = 100;
private static final int THREAD_COUNT = 10; private static final int THREAD_COUNT = 10;
Expand All @@ -57,7 +57,7 @@ public class SpringAMQPEventBusIntegrationTest {
@Autowired @Autowired
private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactory;
@Autowired @Autowired
private SpringAMQPEventBus terminal; private SpringAMQPTerminal terminal;


@Autowired @Autowired
@Qualifier("eventProcessor1") @Qualifier("eventProcessor1")
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.SimpleEventBus;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter; import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.EventPublicationFailedException; import org.axonframework.eventhandling.amqp.EventPublicationFailedException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
Expand All @@ -44,29 +45,33 @@
/** /**
* @author Allard Buijze * @author Allard Buijze
*/ */
public class SpringAMQPEventBusTest { public class SpringAMQPTerminalTest {


private static final Charset UTF_8 = Charset.forName("UTF-8"); private static final Charset UTF_8 = Charset.forName("UTF-8");
private SpringAMQPEventBus testSubject; private SpringAMQPTerminal testSubject;
private ConnectionFactory connectionFactory; private ConnectionFactory connectionFactory;
private Serializer serializer; private Serializer serializer;
private SimpleEventBus eventBus;


@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
testSubject = new SpringAMQPEventBus(); eventBus = new SimpleEventBus();
testSubject = new SpringAMQPTerminal(eventBus);
connectionFactory = mock(ConnectionFactory.class); connectionFactory = mock(ConnectionFactory.class);
serializer = mock(Serializer.class); serializer = mock(Serializer.class);
testSubject.setConnectionFactory(connectionFactory); testSubject.setConnectionFactory(connectionFactory);
testSubject.setExchangeName("mockExchange"); testSubject.setExchangeName("mockExchange");
testSubject.setTransactional(true); testSubject.setTransactional(true);
testSubject.setMessageConverter(new DefaultAMQPMessageConverter(serializer)); testSubject.setMessageConverter(new DefaultAMQPMessageConverter(serializer));
testSubject.start();
} }


@After @After
public void tearDown() { public void tearDown() {
while (CurrentUnitOfWork.isStarted()) { while (CurrentUnitOfWork.isStarted()) {
CurrentUnitOfWork.get().rollback(); CurrentUnitOfWork.get().rollback();
} }
testSubject.shutDown();
} }


@Test @Test
Expand All @@ -80,7 +85,7 @@ public void testSendMessage_NoUnitOfWork() throws IOException {
.thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0")); .thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0"));
when(serializer.serialize(message.getMetaData(), byte[].class)) when(serializer.serialize(message.getMetaData(), byte[].class))
.thenReturn(new SerializedMetaData<>(new byte[0], byte[].class)); .thenReturn(new SerializedMetaData<>(new byte[0], byte[].class));
testSubject.publish(message); eventBus.publish(message);


verify(transactionalChannel).basicPublish(eq("mockExchange"), eq("java.lang"), verify(transactionalChannel).basicPublish(eq("mockExchange"), eq("java.lang"),
eq(false), eq(false), eq(false), eq(false),
Expand All @@ -103,7 +108,7 @@ public void testSendMessage_WithTransactionalUnitOfWork() throws IOException {
.thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0")); .thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0"));
when(serializer.serialize(message.getMetaData(), byte[].class)) when(serializer.serialize(message.getMetaData(), byte[].class))
.thenReturn(new SerializedMetaData<>(new byte[0], byte[].class)); .thenReturn(new SerializedMetaData<>(new byte[0], byte[].class));
testSubject.publish(message); eventBus.publish(message);


uow.commit(); uow.commit();
verify(transactionalChannel).basicPublish(eq("mockExchange"), eq("java.lang"), verify(transactionalChannel).basicPublish(eq("mockExchange"), eq("java.lang"),
Expand All @@ -127,7 +132,7 @@ public void testSendMessage_WithTransactionalUnitOfWork_ChannelClosedBeforeCommi
.thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0")); .thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0"));
when(serializer.serialize(message.getMetaData(), byte[].class)) when(serializer.serialize(message.getMetaData(), byte[].class))
.thenReturn(new SerializedMetaData<>(new byte[0], byte[].class)); .thenReturn(new SerializedMetaData<>(new byte[0], byte[].class));
testSubject.publish(message); eventBus.publish(message);


try { try {
uow.commit(); uow.commit();
Expand Down Expand Up @@ -157,7 +162,7 @@ public void testSendMessage_WithUnitOfWorkRollback() throws IOException {
.thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0")); .thenReturn(new SimpleSerializedObject<>("Message".getBytes(UTF_8), byte[].class, "String", "0"));
when(serializer.serialize(message.getMetaData(), byte[].class)) when(serializer.serialize(message.getMetaData(), byte[].class))
.thenReturn(new SerializedMetaData<>(new byte[0], byte[].class)); .thenReturn(new SerializedMetaData<>(new byte[0], byte[].class));
testSubject.publish(message); eventBus.publish(message);


verify(transactionalChannel, never()).txRollback(); verify(transactionalChannel, never()).txRollback();
verify(transactionalChannel, never()).txCommit(); verify(transactionalChannel, never()).txCommit();
Expand Down Expand Up @@ -193,7 +198,7 @@ public void testSendMessageWithPublisherAck_UnitOfWorkCommitted()


UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message); UnitOfWork<?> uow = DefaultUnitOfWork.startAndGet(message);


testSubject.publish(message); eventBus.publish(message);
verify(channel, never()).waitForConfirms(); verify(channel, never()).waitForConfirms();


uow.commit(); uow.commit();
Expand Down Expand Up @@ -239,7 +244,7 @@ public void testSendMessageWithPublisherAck_NoActiveUnitOfWork() throws Interrup
when(serializer.serialize(message.getMetaData(), byte[].class)) when(serializer.serialize(message.getMetaData(), byte[].class))
.thenReturn(new SerializedMetaData<>(new byte[0], byte[].class)); .thenReturn(new SerializedMetaData<>(new byte[0], byte[].class));


testSubject.publish(message); eventBus.publish(message);
verify(channel).confirmSelect(); verify(channel).confirmSelect();
verify(channel).basicPublish(eq("mockExchange"), eq("java.lang"), verify(channel).basicPublish(eq("mockExchange"), eq("java.lang"),
eq(false), eq(false), eq(false), eq(false),
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -71,20 +71,22 @@ public void accept(List<? extends EventMessage<?>> eventMessages) {
Map<? extends EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbacks = Map<? extends EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbacks =
eventMessages.stream().collect(toMap(Function.identity(), messageMonitor::onMessageIngested)); eventMessages.stream().collect(toMap(Function.identity(), messageMonitor::onMessageIngested));
UnitOfWork<? extends EventMessage<?>> unitOfWork = new BatchingUnitOfWork<>(eventMessages); UnitOfWork<? extends EventMessage<?>> unitOfWork = new BatchingUnitOfWork<>(eventMessages);
unitOfWork.onRollback(uow -> errorHandler
.handleError(getName(), uow.getExecutionResult().getExceptionResult(), eventMessages,
() -> accept(eventMessages)));
unitOfWork.onCleanup(uow -> {
MessageMonitor.MonitorCallback callback = monitorCallbacks.get(uow.getMessage());
if (uow.isRolledBack()) {
callback.reportFailure(uow.getExecutionResult().getExceptionResult());
} else {
callback.reportSuccess();
}
});
try { try {
unitOfWork.executeWithResult( unitOfWork.executeWithResult(
() -> new DefaultInterceptorChain<>(unitOfWork, interceptors, eventHandlerInvoker).proceed(), () -> {
unitOfWork.onRollback(uow -> errorHandler
.handleError(getName(), uow.getExecutionResult().getExceptionResult(), eventMessages,
() -> accept(eventMessages)));
unitOfWork.onCleanup(uow -> {
MessageMonitor.MonitorCallback callback = monitorCallbacks.get(uow.getMessage());
if (uow.isRolledBack()) {
callback.reportFailure(uow.getExecutionResult().getExceptionResult());
} else {
callback.reportSuccess();
}
});
return new DefaultInterceptorChain<>(unitOfWork, interceptors, eventHandlerInvoker).proceed();
},
rollbackConfiguration); rollbackConfiguration);
} catch (Exception e) { } catch (Exception e) {
throw new EventProcessingException( throw new EventProcessingException(
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ public AnnotatedSagaManager(Class<T> sagaType, SagaRepository<T> sagaRepository,
this(sagaType, sagaRepository, sagaFactory, new DefaultSagaMetaModelFactory().modelOf(sagaType)); this(sagaType, sagaRepository, sagaFactory, new DefaultSagaMetaModelFactory().modelOf(sagaType));
} }


/**
* TODO: Javadoc
*/
public AnnotatedSagaManager(Class<T> sagaType, SagaRepository<T> sagaRepository, Callable<T> sagaFactory, public AnnotatedSagaManager(Class<T> sagaType, SagaRepository<T> sagaRepository, Callable<T> sagaFactory,
SagaModel<T> sagaMetaModel) { SagaModel<T> sagaMetaModel) {
super(sagaType, sagaRepository, sagaFactory); super(sagaType, sagaRepository, sagaFactory);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ public interface SagaRepository<T> {
*/ */
Saga<T> load(String sagaIdentifier); Saga<T> load(String sagaIdentifier);


/**
* TODO: Fix docs
*/
Saga<T> newInstance(Callable<T> factoryMethod); Saga<T> newInstance(Callable<T> factoryMethod);


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ public abstract class AbstractSnapshotter implements Snapshotter {
private final TransactionManager transactionManager; private final TransactionManager transactionManager;


/** /**
* Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will * Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will create the
* create the snapshots in the process that triggers them, and save them into the Event Store without any * snapshots in the process that triggers them, and save them into the Event Store without any transaction.
* transaction.
* *
* @param eventStorageEngine the EventStore instance to store snapshots in * @param eventStorageEngine the EventStore instance to store snapshots in
*/ */
Expand All @@ -54,36 +53,37 @@ protected AbstractSnapshotter(EventStorageEngine eventStorageEngine) {
} }


/** /**
* Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will create * Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will create the
* the snapshots in the process that triggers them, and save them into the Event Store in a transaction managed by * snapshots in the process that triggers them, and save them into the Event Store in a transaction managed by the
* the given {@code transactionManager}. * given {@code transactionManager}.
* *
* @param eventStorageEngine the EventStore instance to store snapshots in * @param eventStorageEngine the EventStore instance to store snapshots in
* @param transactionManager The transaction manager to create the surrounding transaction with * @param transactionManager The transaction manager to create the surrounding transaction with
*/ */
protected AbstractSnapshotter(EventStorageEngine eventStorageEngine, TransactionManager transactionManager) { protected AbstractSnapshotter(EventStorageEngine eventStorageEngine, TransactionManager transactionManager) {
this(eventStorageEngine, DirectExecutor.INSTANCE, transactionManager); this(eventStorageEngine, DirectExecutor.INSTANCE, transactionManager);
} }


/** /**
* Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will create * Initializes the Snapshotter to append snapshots in the given {@code eventStore}. This snapshotter will create the
* the snapshots in the process provided by the given {@code executor}, and save them into the Event Store in a * snapshots in the process provided by the given {@code executor}, and save them into the Event Store in a
* transaction managed by the given {@code transactionManager}. * transaction managed by the given {@code transactionManager}.
* *
* @param eventStorageEngine The EventStore instance to store snapshots in * @param eventStorageEngine The EventStore instance to store snapshots in
* @param executor The executor that handles the actual snapshot creation process * @param executor The executor that handles the actual snapshot creation process
* @param transactionManager The transaction manager to create the surrounding transaction with * @param transactionManager The transaction manager to create the surrounding transaction with
*/ */
protected AbstractSnapshotter(EventStorageEngine eventStorageEngine, Executor executor, TransactionManager transactionManager) { protected AbstractSnapshotter(EventStorageEngine eventStorageEngine, Executor executor,
TransactionManager transactionManager) {
this.eventStorageEngine = eventStorageEngine; this.eventStorageEngine = eventStorageEngine;
this.executor = executor; this.executor = executor;
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
} }


@Override @Override
public void scheduleSnapshot(Class<?> aggregateType, String aggregateIdentifier) { public void scheduleSnapshot(Class<?> aggregateType, String aggregateIdentifier) {
executor.execute(() -> transactionManager executor.execute(new SilentTask(() -> transactionManager
.executeInTransaction(new SilentTask(createSnapshotterTask(aggregateType, aggregateIdentifier)))); .executeInTransaction(createSnapshotterTask(aggregateType, aggregateIdentifier))));
} }


/** /**
Expand All @@ -98,9 +98,8 @@ protected Runnable createSnapshotterTask(Class<?> aggregateType, String aggregat
} }


/** /**
* Creates a snapshot event for an aggregate of which passed events are available in the given * Creates a snapshot event for an aggregate of which passed events are available in the given {@code eventStream}.
* {@code eventStream}. May return {@code null} to indicate a snapshot event is not necessary or * May return {@code null} to indicate a snapshot event is not necessary or appropriate for the given event stream.
* appropriate for the given event stream.
* *
* @param aggregateType The aggregate's type identifier * @param aggregateType The aggregate's type identifier
* @param aggregateIdentifier The identifier of the aggregate to create a snapshot for * @param aggregateIdentifier The identifier of the aggregate to create a snapshot for
Expand Down Expand Up @@ -165,7 +164,7 @@ private CreateSnapshotTask(Class<?> aggregateType, String identifier) {


@Override @Override
public void run() { public void run() {
DomainEventStream eventStream = eventStream = eventStorageEngine.readEvents(identifier); DomainEventStream eventStream = eventStorageEngine.readEvents(identifier);
// a snapshot should only be stored if the snapshot replaces at least more than one event // a snapshot should only be stored if the snapshot replaces at least more than one event
long firstEventSequenceNumber = eventStream.peek().getSequenceNumber(); long firstEventSequenceNumber = eventStream.peek().getSequenceNumber();
DomainEventMessage snapshotEvent = createSnapshot(aggregateType, identifier, eventStream); DomainEventMessage snapshotEvent = createSnapshot(aggregateType, identifier, eventStream);
Expand Down
Loading

0 comments on commit aca7609

Please sign in to comment.