Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added support for Snapshotting in DisruptorCommandBus

The Repositories in the DisruptorCommandBus now support EventStreamDecorators,
meaning it is possible to specify a snapshotter trigger. The Spring XML support
has been adapted to allow configuration of a snapshotter trigger, similar to
the 'regular' repositories.

Issue #AXON-237 Fixed
commit a5ad5730342b27170f3b18bd048ddf5592913367 1 parent bb80d54
@abuijze abuijze authored
Showing with 2,541 additions and 43 deletions.
  1. +148 −0 core/src/main/java/org/axonframework/cache/WeakReferenceCache.java
  2. +17 −6 core/src/main/java/org/axonframework/commandhandling/disruptor/CommandHandlerInvoker.java
  3. +42 −2 core/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBus.java
  4. +18 −2 core/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorUnitOfWork.java
  5. +11 −5 core/src/main/java/org/axonframework/commandhandling/disruptor/EventPublisher.java
  6. +79 −9 core/src/main/java/org/axonframework/contextsupport/spring/DisruptorCommandBusBeanDefinitionParser.java
  7. +65 −0 core/src/main/java/org/axonframework/eventsourcing/CompositeEventStreamDecorator.java
  8. +2 −1  core/src/main/resources/META-INF/spring.schemas
  9. +1,792 −0 core/src/main/resources/org/axonframework/contextsupport/spring/axon-core-2.3.xsd
  10. +98 −0 core/src/test/java/org/axonframework/cache/WeakReferenceCacheTest.java
  11. +13 −5 core/src/test/java/org/axonframework/commandhandling/disruptor/CommandHandlerInvokerTest.java
  12. +57 −7 core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest.java
  13. +24 −0 core/src/test/java/org/axonframework/contextsupport/spring/DisruptorCommandBusBeanDefinitionParserTest.java
  14. +84 −1 core/src/test/java/org/axonframework/contextsupport/spring/DisruptorContextConfigurationTest.java
  15. +71 −0 core/src/test/java/org/axonframework/eventsourcing/CompositeEventStreamDecoratorTest.java
  16. +12 −1 core/src/test/resources/contexts/disruptor-context.xml
  17. +8 −4 core/src/test/resources/contexts/simple-disruptor-context.xml
View
148 core/src/main/java/org/axonframework/cache/WeakReferenceCache.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2010-2014. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.cache;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Cache implementation that keeps values in the cache until the garbage collector has removed them. Unlike the
+ * WeakHashMap, which uses weak references on the keys, this Cache uses weak references on the values.
+ * <p/>
+ * Values are Weakly referenced, which means they are not eligible for removal as long as any other references to the
+ * value exist.
+ * <p/>
+ * Items expire once the garbage collector has removed them. Some time after they have been removed, the entry
+ * listeners are being notified thereof. Note that notification are emitted when the cache is being accessed (either
+ * for reading or writing). If the cache is not being accessed for a longer period of time, it may occur that listeners
+ * are not notified.
+ *
+ * @author Allard Buijze
+ * @since 2.2.1
+ */
+public class WeakReferenceCache implements Cache {
+
+ private final ConcurrentMap<Object, Entry> cache = new ConcurrentHashMap<Object, Entry>();
+ private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<Object>();
+ private final Set<EntryListener> adapters = new CopyOnWriteArraySet<EntryListener>();
+
+ @Override
+ public void registerCacheEntryListener(EntryListener entryListener) {
+ this.adapters.add(entryListener);
+ }
+
+ @Override
+ public void unregisterCacheEntryListener(EntryListener entryListener) {
+ this.adapters.remove(entryListener);
+ }
+
+ @Override
+ public <K, V> V get(K key) {
+ purgeItems();
+ final Reference<Object> entry = cache.get(key);
+
+ final V returnValue = entry == null ? null : (V) entry.get();
+ if (returnValue != null) {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryRead(key, returnValue);
+ }
+ }
+ return returnValue;
+ }
+
+ @Override
+ public <K, V> void put(K key, V value) {
+ if (value == null) {
+ throw new IllegalArgumentException("Null values not supported");
+ }
+
+ purgeItems();
+ if (cache.put(key, new Entry(key, value)) != null) {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryUpdated(key, value);
+ }
+ } else {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryCreated(key, value);
+ }
+ }
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value) {
+ if (value == null) {
+ throw new IllegalArgumentException("Null values not supported");
+ }
+ purgeItems();
+ if (cache.putIfAbsent(key, new Entry(key, value)) == null) {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryCreated(key, value);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public <K> boolean remove(K key) {
+ if (cache.remove(key) != null) {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryRemoved(key);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public <K> boolean containsKey(K key) {
+ purgeItems();
+ final Reference<Object> entry = cache.get(key);
+
+ return entry != null && entry.get() != null;
+ }
+
+ private void purgeItems() {
+ Entry purgedEntry;
+ while ((purgedEntry = (Entry) referenceQueue.poll()) != null) {
+ if (cache.remove(purgedEntry.getKey()) != null) {
+ for (EntryListener adapter : adapters) {
+ adapter.onEntryExpired(purgedEntry.getKey());
+ }
+ }
+ }
+ }
+
+ private class Entry extends WeakReference<Object> {
+
+ private final Object key;
+
+ public Entry(Object key, Object value) {
+ super(value, referenceQueue);
+ this.key = key;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+ }
+}
View
23 core/src/main/java/org/axonframework/commandhandling/disruptor/CommandHandlerInvoker.java
@@ -23,6 +23,7 @@
import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.repository.AggregateNotFoundException;
@@ -107,14 +108,17 @@ public void onEvent(CommandHandlingEntry entry, long sequence, boolean endOfBatc
* repository must be sage to use by this invoker instance.
*
* @param aggregateFactory The factory creating aggregate instances
+ * @param decorator The decorator to decorate event streams with
* @param <T> The type of aggregate created by the factory
* @return A Repository instance for the given aggregate
*/
@SuppressWarnings("unchecked")
- public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory) {
+ public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory,
+ EventStreamDecorator decorator) {
String typeIdentifier = aggregateFactory.getTypeIdentifier();
if (!repositories.containsKey(typeIdentifier)) {
- DisruptorRepository<T> repository = new DisruptorRepository<T>(aggregateFactory, cache, eventStore);
+ DisruptorRepository<T> repository = new DisruptorRepository<T>(aggregateFactory, cache, eventStore,
+ decorator);
repositories.putIfAbsent(typeIdentifier, repository);
}
return repositories.get(typeIdentifier);
@@ -145,16 +149,19 @@ public void onShutdown() {
static final class DisruptorRepository<T extends EventSourcedAggregateRoot> implements Repository<T> {
private final EventStore eventStore;
+ private final EventStreamDecorator decorator;
private final AggregateFactory<T> aggregateFactory;
private final Map<T, Object> firstLevelCache = new WeakHashMap<T, Object>();
private final String typeIdentifier;
private final Cache cache;
- private DisruptorRepository(AggregateFactory<T> aggregateFactory, Cache cache, EventStore eventStore) {
+ private DisruptorRepository(AggregateFactory<T> aggregateFactory, Cache cache, EventStore eventStore,
+ EventStreamDecorator decorator) {
this.aggregateFactory = aggregateFactory;
this.cache = cache;
this.eventStore = eventStore;
- typeIdentifier = this.aggregateFactory.getTypeIdentifier();
+ this.decorator = decorator;
+ this.typeIdentifier = this.aggregateFactory.getTypeIdentifier();
}
@Override
@@ -188,7 +195,8 @@ public T load(Object aggregateIdentifier) {
aggregateIdentifier);
DomainEventStream events = null;
try {
- events = eventStore.readEvents(typeIdentifier, aggregateIdentifier);
+ events = decorator.decorateForRead(typeIdentifier, aggregateIdentifier,
+ eventStore.readEvents(typeIdentifier, aggregateIdentifier));
if (events.hasNext()) {
aggregateRoot = aggregateFactory.createAggregate(aggregateIdentifier, events.peek());
aggregateRoot.initializeState(events);
@@ -200,7 +208,8 @@ public T load(Object aggregateIdentifier) {
+ "or a command that was executed against an aggregate that did not yet "
+ "finish the creation process. It will be rescheduled for publication when it "
+ "attempts to load an aggregate",
- e);
+ e
+ );
} finally {
IOUtils.closeQuietlyIfCloseable(events);
}
@@ -210,6 +219,7 @@ public T load(Object aggregateIdentifier) {
if (aggregateRoot != null) {
DisruptorUnitOfWork unitOfWork = (DisruptorUnitOfWork) CurrentUnitOfWork.get();
unitOfWork.setAggregateType(typeIdentifier);
+ unitOfWork.setEventStreamDecorator(decorator);
unitOfWork.registerAggregate(aggregateRoot, null, null);
}
return aggregateRoot;
@@ -218,6 +228,7 @@ public T load(Object aggregateIdentifier) {
@Override
public void add(T aggregate) {
DisruptorUnitOfWork unitOfWork = (DisruptorUnitOfWork) CurrentUnitOfWork.get();
+ unitOfWork.setEventStreamDecorator(decorator);
unitOfWork.setAggregateType(typeIdentifier);
unitOfWork.registerAggregate(aggregate, null, null);
firstLevelCache.put(aggregate, PLACEHOLDER_VALUE);
View
44 core/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBus.java
@@ -29,9 +29,11 @@
import org.axonframework.commandhandling.interceptors.SerializationOptimizingInterceptor;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
+import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.Repository;
import org.axonframework.serializer.Serializer;
@@ -276,7 +278,8 @@ public void dispatch(final CommandMessage<?> command) {
event.reset(command, commandHandlers.get(command.getCommandName()), invokerSegment, publisherSegment,
serializerSegment, new BlacklistDetectingCallback<R>(callback, command, disruptor.getRingBuffer(),
this, rescheduleOnCorruptState),
- invokerInterceptors, publisherInterceptors);
+ invokerInterceptors, publisherInterceptors
+ );
ringBuffer.publish(sequence);
}
@@ -292,8 +295,28 @@ public void dispatch(final CommandMessage<?> command) {
* @return the repository that provides access to stored aggregates
*/
public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory) {
+ return createRepository(aggregateFactory, NoOpEventStreamDecorator.INSTANCE);
+ }
+
+ /**
+ * Creates a repository instance for an Event Sourced aggregate that is created by the given
+ * <code>aggregateFactory</code>. The given <code>decorator</code> is used to decorate event streams.
+ * <p/>
+ * The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate
+ * instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
+ * <p/>
+ * Note that a second invocation of this method with an aggregate factory for the same aggregate type <em>may</em>
+ * return the same instance as the first invocation, even if the given <code>decorator</code> is different.
+ *
+ * @param aggregateFactory The factory creating uninitialized instances of the Aggregate
+ * @param decorator The decorator to decorate events streams with
+ * @param <T> The type of aggregate to create the repository for
+ * @return the repository that provides access to stored aggregates
+ */
+ public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory,
+ EventStreamDecorator decorator) {
for (CommandHandlerInvoker invoker : commandHandlerInvokers) {
- invoker.createRepository(aggregateFactory);
+ invoker.createRepository(aggregateFactory, decorator);
}
return new DisruptorRepository<T>(aggregateFactory.getTypeIdentifier());
}
@@ -391,4 +414,21 @@ public void add(T aggregate) {
CommandHandlerInvoker.getRepository(typeIdentifier).add(aggregate);
}
}
+
+ private static class NoOpEventStreamDecorator implements EventStreamDecorator {
+
+ public static final EventStreamDecorator INSTANCE = new NoOpEventStreamDecorator();
+
+ @Override
+ public DomainEventStream decorateForRead(String aggregateType, Object aggregateIdentifier,
+ DomainEventStream eventStream) {
+ return eventStream;
+ }
+
+ @Override
+ public DomainEventStream decorateForAppend(String aggregateType, EventSourcedAggregateRoot aggregate,
+ DomainEventStream eventStream) {
+ return eventStream;
+ }
+ }
}
View
20 core/src/main/java/org/axonframework/commandhandling/disruptor/DisruptorUnitOfWork.java
@@ -24,6 +24,7 @@
import org.axonframework.domain.SimpleDomainEventStream;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.axonframework.unitofwork.SaveAggregateCallback;
import org.axonframework.unitofwork.UnitOfWork;
@@ -58,6 +59,7 @@
private final boolean transactional;
private final Map<String, Object> resources = new HashMap<String, Object>();
private final Map<String, Object> inheritedResources = new HashMap<String, Object>();
+ private EventStreamDecorator eventStreamDecorator;
/**
* Creates a new Unit of Work for use in the DisruptorCommandBus.
@@ -111,6 +113,7 @@ public void onCleanup() {
// clear the lists of events to make them garbage-collectible
eventsToStore = EMPTY_DOMAIN_EVENT_STREAM;
eventsToPublish.clear();
+ eventStreamDecorator = null;
this.resources.clear();
this.inheritedResources.clear();
}
@@ -170,7 +173,8 @@ public void registerListener(UnitOfWorkListener listener) {
if (aggregate != null && aggregateRoot != aggregate) { // NOSONAR - Intentional equality check
throw new IllegalArgumentException(
"Cannot register more than one aggregate in this Unit Of Work. Either ensure each command "
- + "executes against at most one aggregate, or use another Command Bus implementation.");
+ + "executes against at most one aggregate, or use another Command Bus implementation."
+ );
}
aggregate = (EventSourcedAggregateRoot) aggregateRoot;
@@ -220,7 +224,10 @@ public void publishEvent(EventMessage event, EventBus eventBus) {
* @return the events that need to be stored as part of this Unit of Work
*/
public DomainEventStream getEventsToStore() {
- return eventsToStore;
+ if (eventStreamDecorator == null) {
+ return eventsToStore;
+ }
+ return eventStreamDecorator.decorateForAppend(aggregateType, aggregate, eventsToStore);
}
/**
@@ -265,4 +272,13 @@ public String getAggregateType() {
public void setAggregateType(String aggregateType) {
this.aggregateType = aggregateType;
}
+
+ /**
+ * Registers the EventStreamDecorator for events as part of this unit of work
+ *
+ * @param eventStreamDecorator The EventStreamDecorator to use for the event streams part of this unit of work
+ */
+ public void setEventStreamDecorator(EventStreamDecorator eventStreamDecorator) {
+ this.eventStreamDecorator = eventStreamDecorator;
+ }
}
View
16 core/src/main/java/org/axonframework/commandhandling/disruptor/EventPublisher.java
@@ -26,6 +26,7 @@
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.AggregateNotFoundException;
+import org.axonframework.unitofwork.CurrentUnitOfWork;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,11 +94,16 @@ public void onEvent(CommandHandlingEntry entry, long sequence, boolean endOfBatc
reschedule(entry);
} else {
DisruptorUnitOfWork unitOfWork = entry.getUnitOfWork();
- EventSourcedAggregateRoot aggregate = unitOfWork.getAggregate();
- if (aggregate != null && blackListedAggregates.contains(aggregate.getIdentifier())) {
- rejectExecution(entry, unitOfWork, entry.getAggregateIdentifier());
- } else {
- processPublication(entry, unitOfWork, aggregate);
+ CurrentUnitOfWork.set(unitOfWork);
+ try {
+ EventSourcedAggregateRoot aggregate = unitOfWork.getAggregate();
+ if (aggregate != null && blackListedAggregates.contains(aggregate.getIdentifier())) {
+ rejectExecution(entry, unitOfWork, entry.getAggregateIdentifier());
+ } else {
+ processPublication(entry, unitOfWork, aggregate);
+ }
+ } finally {
+ CurrentUnitOfWork.clear(unitOfWork);
}
}
}
View
88 core/src/main/java/org/axonframework/contextsupport/spring/DisruptorCommandBusBeanDefinitionParser.java
@@ -22,14 +22,18 @@
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
-
+import org.axonframework.cache.WeakReferenceCache;
import org.axonframework.commandhandling.disruptor.DisruptorCommandBus;
import org.axonframework.commandhandling.disruptor.DisruptorConfiguration;
import org.axonframework.common.Assert;
import org.axonframework.eventsourcing.AggregateFactory;
+import org.axonframework.eventsourcing.CompositeEventStreamDecorator;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventsourcing.GenericAggregateFactory;
+import org.axonframework.eventsourcing.SnapshotterTrigger;
import org.axonframework.repository.Repository;
+import org.springframework.beans.PropertyValue;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
@@ -41,6 +45,7 @@
import org.springframework.util.xml.DomUtils;
import org.w3c.dom.Element;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -69,6 +74,12 @@
private static final String ATTRIBUTE_PRODUCER_TYPE = "producer-type";
private static final String ELEMENT_REPOSITORIES = "repositories";
+ private static final String EVENT_PROCESSORS_ELEMENT = "event-processors";
+ private static final String SNAPSHOT_TRIGGER_ELEMENT = "snapshotter-trigger";
+
+ private static final String EVENT_STREAM_DECORATORS_PROPERTY = "eventStreamDecorators";
+ private static final String SNAPSHOTTER_TRIGGER_PROPERTY = "snapshotterTrigger";
+
private static final String ATTRIBUTE_TRANSACTION_MANAGER = "transaction-manager";
private static final String PROPERTY_TRANSACTION_MANAGER = "transactionManager";
@@ -76,6 +87,9 @@
private static final Map<String, String> REF_PROPERTY_MAPPING = new HashMap<String, String>();
private static final Map<String, String> LIST_PROPERTY_MAPPING = new HashMap<String, String>();
+ private final SnapshotterTriggerBeanDefinitionParser snapshotterTriggerParser =
+ new SnapshotterTriggerBeanDefinitionParser();
+
static {
REF_PROPERTY_MAPPING.put("cache", "cache");
REF_PROPERTY_MAPPING.put("executor", "executor");
@@ -98,17 +112,19 @@
@Override
protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) {
+ final BeanDefinition configurationDefinition = createConfiguration(element, parserContext);
final AbstractBeanDefinition definition =
genericBeanDefinition(DisruptorCommandBus.class)
.addConstructorArgReference(element.getAttribute(ATTRIBUTE_EVENT_STORE))
.addConstructorArgReference(element.getAttribute(ATTRIBUTE_EVENT_BUS))
- .addConstructorArgValue(createConfiguration(element, parserContext))
+ .addConstructorArgValue(configurationDefinition)
.getBeanDefinition();
Element repoElement = DomUtils.getChildElementByTagName(element, ELEMENT_REPOSITORIES);
List<Element> repositories = DomUtils.getChildElementsByTagName(repoElement, ELEMENT_REPOSITORY);
String id = super.resolveId(element, definition, parserContext);
for (Element repository : repositories) {
- parseRepository(repository, id, parserContext);
+ parseRepository(repository, id, parserContext,
+ configurationDefinition.getPropertyValues().getPropertyValue("cache"));
}
definition.setDestroyMethodName("stop");
return definition;
@@ -136,7 +152,8 @@ private BeanDefinition createConfiguration(Element element, ParserContext parser
if (interceptorsElement != null) {
builder.addPropertyValue(entry.getValue(),
parserContext.getDelegate().parseListElement(interceptorsElement,
- builder.getBeanDefinition()));
+ builder.getBeanDefinition())
+ );
}
}
return builder.getBeanDefinition();
@@ -149,7 +166,8 @@ private void parseTransactionManager(Element element, BeanDefinitionBuilder buil
BeanDefinitionBuilder.genericBeanDefinition(TransactionManagerFactoryBean.class)
.addPropertyReference(PROPERTY_TRANSACTION_MANAGER,
txManagerId)
- .getBeanDefinition());
+ .getBeanDefinition()
+ );
}
}
@@ -178,11 +196,13 @@ private void parseWaitStrategy(Element element, BeanDefinitionBuilder builder) {
builder.addPropertyValue(PROPERTY_WAIT_STRATEGY,
BeanDefinitionBuilder.genericBeanDefinition(WaitStrategyFactoryBean.class)
.addConstructorArgValue(waitStrategy)
- .getBeanDefinition());
+ .getBeanDefinition()
+ );
}
}
- private void parseRepository(Element repository, String commandBusId, ParserContext parserContext) {
+ private void parseRepository(Element repository, String commandBusId, ParserContext parserContext,
+ PropertyValue aggregateCache) {
String id = repository.getAttribute(ATTRIBUTE_ID);
BeanDefinitionBuilder definitionBuilder =
genericBeanDefinition(RepositoryFactoryBean.class)
@@ -199,7 +219,32 @@ private void parseRepository(Element repository, String commandBusId, ParserCont
.addConstructorArgValue(aggregateType)
.getBeanDefinition());
}
+
+ Element processorsElement = DomUtils.getChildElementByTagName(repository, EVENT_PROCESSORS_ELEMENT);
+
+ Element snapshotTriggerElement = DomUtils.getChildElementByTagName(repository, SNAPSHOT_TRIGGER_ELEMENT);
+ if (snapshotTriggerElement != null) {
+ BeanDefinition triggerDefinition = snapshotterTriggerParser.parse(snapshotTriggerElement, parserContext);
+ if (aggregateCache != null) {
+ triggerDefinition.getPropertyValues().add("aggregateCache", aggregateCache.getValue());
+ } else {
+ // the DisruptorCommandBus uses an internal cache. Not defining any cache on the snapshotter trigger
+ // would lead to undesired side-effects (mainly missing triggers).
+ triggerDefinition.getPropertyValues().add("aggregateCache", BeanDefinitionBuilder
+ .genericBeanDefinition(WeakReferenceCache.class).getBeanDefinition());
+ }
+ definitionBuilder = definitionBuilder.addPropertyValue(SNAPSHOTTER_TRIGGER_PROPERTY, triggerDefinition);
+ }
+
final AbstractBeanDefinition definition = definitionBuilder.getBeanDefinition();
+ if (processorsElement != null) {
+ //noinspection unchecked
+ List<Object> processorsList = parserContext.getDelegate().parseListElement(processorsElement,
+ definition);
+ if (!processorsList.isEmpty()) {
+ definition.getPropertyValues().add(EVENT_STREAM_DECORATORS_PROPERTY, processorsList);
+ }
+ }
parserContext.getRegistry().registerBeanDefinition(id, definition);
}
@@ -249,7 +294,8 @@ public void afterPropertiesSet() throws Exception {
public void setType(String type) {
Assert.isTrue("single-threaded".equals(type) || "multi-threaded".equals(type),
"The given value for producer type (" + type
- + ") is not valid. It must either be 'single-threaded' or 'multi-threaded'.");
+ + ") is not valid. It must either be 'single-threaded' or 'multi-threaded'."
+ );
this.type = type;
}
}
@@ -296,11 +342,15 @@ public boolean isSingleton() {
public static class RepositoryFactoryBean implements FactoryBean<Repository> {
private DisruptorCommandBus commandBus;
+ private List<EventStreamDecorator> eventStreamDecorators = new ArrayList<EventStreamDecorator>();
private AggregateFactory<? extends EventSourcedAggregateRoot> factory;
@Override
public Repository getObject() throws Exception {
- return commandBus.createRepository(factory);
+ if (eventStreamDecorators.isEmpty()) {
+ return commandBus.createRepository(factory);
+ }
+ return commandBus.createRepository(factory, new CompositeEventStreamDecorator(eventStreamDecorators));
}
@Override
@@ -334,5 +384,25 @@ public void setCommandBus(DisruptorCommandBus commandBus) {
public void setAggregateFactory(AggregateFactory<? extends EventSourcedAggregateRoot> factory) {
this.factory = factory;
}
+
+ /**
+ * Sets the (additional) decorators to use when loading and storing events from/to the Event Store.
+ *
+ * @param decorators the decorators to decorate event streams with
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setEventStreamDecorators(List<EventStreamDecorator> decorators) {
+ eventStreamDecorators.addAll(decorators);
+ }
+
+ /**
+ * The snapshotter trigger instance that will decide when to trigger a snapshot
+ *
+ * @param snapshotterTrigger The trigger to configure on the DisruptorCommandBus
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setSnapshotterTrigger(SnapshotterTrigger snapshotterTrigger) {
+ eventStreamDecorators.add(snapshotterTrigger);
+ }
}
}
View
65 core/src/main/java/org/axonframework/eventsourcing/CompositeEventStreamDecorator.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2010-2014. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.eventsourcing;
+
+import org.axonframework.domain.DomainEventStream;
+
+import java.util.Collection;
+
+/**
+ * EventStreamDecorator implementation that delegates to several other decorator instances.
+ *
+ * @author Allard Buijze
+ * @since 2.2.1
+ */
+public class CompositeEventStreamDecorator implements EventStreamDecorator {
+
+ private final EventStreamDecorator[] eventStreamDecorators;
+
+ /**
+ * Initialize the decorator, delegating to the given <code>eventStreamDecorators</code>. The decorators are
+ * invoked in the iterator's order on {@link #decorateForRead(String, Object, org.axonframework.domain.DomainEventStream)},
+ * and in revese order on {@link #decorateForAppend(String, EventSourcedAggregateRoot,
+ * org.axonframework.domain.DomainEventStream)}.
+ *
+ * @param eventStreamDecorators The decorators to decorate Event Streams with
+ */
+ public CompositeEventStreamDecorator(Collection<EventStreamDecorator> eventStreamDecorators) {
+ this.eventStreamDecorators = eventStreamDecorators.toArray(new EventStreamDecorator[eventStreamDecorators
+ .size()]);
+ }
+
+ @Override
+ public DomainEventStream decorateForRead(String aggregateType, Object aggregateIdentifier,
+ DomainEventStream eventStream) {
+ DomainEventStream events = eventStream;
+ for (EventStreamDecorator decorator : eventStreamDecorators) {
+ events = decorator.decorateForRead(aggregateType, aggregateIdentifier, events);
+ }
+ return events;
+ }
+
+ @Override
+ public DomainEventStream decorateForAppend(String aggregateType, EventSourcedAggregateRoot aggregate,
+ DomainEventStream eventStream) {
+ DomainEventStream events = eventStream;
+ for (int i = eventStreamDecorators.length - 1; i >= 0; i--) {
+ events = eventStreamDecorators[i].decorateForAppend(aggregateType, aggregate, events);
+ }
+ return events;
+ }
+}
View
3  core/src/main/resources/META-INF/spring.schemas
@@ -17,4 +17,5 @@
http\://www.axonframework.org/schema/axon-core-2.0.xsd=org/axonframework/contextsupport/spring/axon-core-2.0.xsd
http\://www.axonframework.org/schema/axon-core-2.1.xsd=org/axonframework/contextsupport/spring/axon-core-2.1.xsd
http\://www.axonframework.org/schema/axon-core-2.2.xsd=org/axonframework/contextsupport/spring/axon-core-2.2.xsd
-http\://www.axonframework.org/schema/axon-core.xsd=org/axonframework/contextsupport/spring/axon-core-2.2.xsd
+http\://www.axonframework.org/schema/axon-core-2.3.xsd=org/axonframework/contextsupport/spring/axon-core-2.3.xsd
+http\://www.axonframework.org/schema/axon-core.xsd=org/axonframework/contextsupport/spring/axon-core-2.3.xsd
View
1,792 core/src/main/resources/org/axonframework/contextsupport/spring/axon-core-2.3.xsd
1,792 additions, 0 deletions not shown
View
98 core/src/test/java/org/axonframework/cache/WeakReferenceCacheTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2010-2014. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.cache;
+
+import org.junit.*;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import static java.util.Collections.singleton;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class WeakReferenceCacheTest {
+
+ private WeakReferenceCache testSubject;
+ private Cache.EntryListener mockListener;
+
+ @Before
+ public void setUp() throws Exception {
+ mockListener = mock(Cache.EntryListener.class);
+ testSubject = new WeakReferenceCache();
+ testSubject.registerCacheEntryListener(mockListener);
+ }
+
+ @Test
+ public void testItemPurgedWhenNoLongerReferenced() throws Exception {
+ // Mockito holds a reference to all parameters, preventing GC
+ testSubject.unregisterCacheEntryListener(mockListener);
+ final Set<String> expiredEntries = new CopyOnWriteArraySet<String>();
+ testSubject.registerCacheEntryListener(new Cache.EntryListenerAdapter() {
+ @Override
+ public void onEntryExpired(Object key) {
+ expiredEntries.add(key.toString());
+ }
+ });
+
+ Object value = new Object();
+ testSubject.put("test1", value);
+ assertSame(value, testSubject.get("test1"));
+
+ // dereference
+ value = null;
+ // hope for a true GC
+ System.gc();
+
+ for (int i = 0; i < 5 && testSubject.containsKey("test1"); i++) {
+ // try again
+ System.gc();
+ Thread.sleep(100);
+ }
+
+ assertNull(testSubject.get("test1"));
+
+ // the reference is gone, but it may take a 'while' for the reference to be queued
+ for (int i = 0; i < 5 && !expiredEntries.contains("test1"); i++) {
+ testSubject.get("test1");
+ Thread.sleep(100);
+ }
+
+ assertEquals(singleton("test1"), expiredEntries);
+ }
+
+ @Test
+ public void testEntryListenerNotifiedOfCreationUpdateAndDeletion() throws Exception {
+
+ Object value = new Object();
+ Object value2 = new Object();
+ testSubject.put("test1", value);
+ verify(mockListener).onEntryCreated("test1", value);
+
+ testSubject.put("test1", value2);
+ verify(mockListener).onEntryUpdated("test1", value2);
+
+ testSubject.get("test1");
+ verify(mockListener).onEntryRead("test1", value2);
+
+ testSubject.remove("test1");
+ verify(mockListener).onEntryRemoved("test1");
+
+ assertNull(testSubject.get("test1"));
+ verifyNoMoreInteractions(mockListener);
+ }
+}
View
18 core/src/test/java/org/axonframework/commandhandling/disruptor/CommandHandlerInvokerTest.java
@@ -7,6 +7,8 @@
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.domain.SimpleDomainEventStream;
+import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventsourcing.annotation.AggregateIdentifier;
@@ -15,6 +17,7 @@
import org.axonframework.repository.Repository;
import org.axonframework.unitofwork.UnitOfWork;
import org.junit.*;
+import org.mockito.internal.stubbing.answers.*;
import org.mockito.invocation.*;
import org.mockito.stubbing.*;
@@ -35,6 +38,7 @@
private String aggregateIdentifier;
private CommandMessage<?> mockCommandMessage;
private CommandHandler mockCommandHandler;
+ private EventStreamDecorator eventStreamDecorator;
@Before
public void setUp() throws Exception {
@@ -48,12 +52,16 @@ public void setUp() throws Exception {
commandHandlingEntry.reset(mockCommandMessage, mockCommandHandler, 0, 0, 0, null,
Collections.<CommandHandlerInterceptor>emptyList(),
Collections.<CommandHandlerInterceptor>emptyList());
+ eventStreamDecorator = mock(EventStreamDecorator.class);
+ when(eventStreamDecorator.decorateForAppend(anyString(), any(EventSourcedAggregateRoot.class), any(DomainEventStream.class))).thenAnswer(new ReturnsArgumentAt(2));
+ when(eventStreamDecorator.decorateForRead(anyString(), any(), any(DomainEventStream.class))).thenAnswer(
+ new ReturnsArgumentAt(2));
}
@Test
public void testLoadFromRepositoryStoresLoadedAggregateInCache() throws Throwable {
final Repository<StubAggregate> repository = testSubject.createRepository(
- new GenericAggregateFactory<StubAggregate>(StubAggregate.class));
+ new GenericAggregateFactory<StubAggregate>(StubAggregate.class), eventStreamDecorator);
when(mockCommandHandler.handle(eq(mockCommandMessage), isA(UnitOfWork.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -73,7 +81,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@Test
public void testLoadFromRepositoryLoadsFromCache() throws Throwable {
final Repository<StubAggregate> repository = testSubject.createRepository(
- new GenericAggregateFactory<StubAggregate>(StubAggregate.class));
+ new GenericAggregateFactory<StubAggregate>(StubAggregate.class), eventStreamDecorator);
when(mockCommandHandler.handle(eq(mockCommandMessage), isA(UnitOfWork.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -90,7 +98,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@Test
public void testAddToRepositoryAddsInCache() throws Throwable {
final Repository<StubAggregate> repository = testSubject.createRepository(
- new GenericAggregateFactory<StubAggregate>(StubAggregate.class));
+ new GenericAggregateFactory<StubAggregate>(StubAggregate.class), eventStreamDecorator);
when(mockCommandHandler.handle(eq(mockCommandMessage), isA(UnitOfWork.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -121,9 +129,9 @@ public void testCacheEntryInvalidatedOnRecoveryEntry() throws Exception {
@Test
public void testCreateRepositoryReturnsSameInstanceOnSecondInvocation() {
final Repository<StubAggregate> repository1 = testSubject.createRepository(
- new GenericAggregateFactory<StubAggregate>(StubAggregate.class));
+ new GenericAggregateFactory<StubAggregate>(StubAggregate.class), eventStreamDecorator);
final Repository<StubAggregate> repository2= testSubject.createRepository(
- new GenericAggregateFactory<StubAggregate>(StubAggregate.class));
+ new GenericAggregateFactory<StubAggregate>(StubAggregate.class), eventStreamDecorator);
assertSame(repository1, repository2);
}
View
64 core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest.java
@@ -18,7 +18,6 @@
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
-
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandDispatchInterceptor;
import org.axonframework.commandhandling.CommandHandler;
@@ -38,7 +37,9 @@
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventsourcing.AbstractEventSourcedAggregateRoot;
+import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventsourcing.EventSourcedEntity;
+import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.EventStreamNotFoundException;
@@ -53,6 +54,7 @@
import org.junit.*;
import org.junit.internal.matchers.*;
import org.mockito.*;
+import org.mockito.internal.stubbing.answers.*;
import org.mockito.invocation.*;
import org.mockito.stubbing.*;
@@ -119,13 +121,14 @@ public void testCallbackInvokedBeforeUnitOfWorkCleanup() throws Throwable {
.setWaitStrategy(new SleepingWaitStrategy())
.setExecutor(customExecutor)
.setInvokerThreadCount(2)
- .setPublisherThreadCount(3));
+ .setPublisherThreadCount(3)
+ );
testSubject.subscribe(StubCommand.class.getName(), stubHandler);
stubHandler.setRepository(testSubject
.createRepository(new GenericAggregateFactory<StubAggregate>(StubAggregate.class)));
final UnitOfWorkListener mockUnitOfWorkListener = mock(UnitOfWorkListener.class);
when(mockUnitOfWorkListener.onEventRegistered(isA(UnitOfWork.class), any(EventMessage.class)))
- .thenAnswer(new Parameter(1));
+ .thenAnswer(new ReturnsArgumentAt(1));
when(mockHandlerInterceptor.handle(any(CommandMessage.class),
any(UnitOfWork.class),
any(InterceptorChain.class)))
@@ -161,6 +164,49 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
@Test
+ public void testEventStreamsDecoratedOnReadAndWrite() throws InterruptedException {
+ ExecutorService customExecutor = Executors.newCachedThreadPool();
+ testSubject = new DisruptorCommandBus(
+ inMemoryEventStore, eventBus,
+ new DisruptorConfiguration().setBufferSize(8)
+ .setProducerType(ProducerType.SINGLE)
+ .setWaitStrategy(new SleepingWaitStrategy())
+ .setExecutor(customExecutor)
+ .setInvokerThreadCount(2)
+ .setPublisherThreadCount(3)
+ );
+ testSubject.subscribe(StubCommand.class.getName(), stubHandler);
+ final EventStreamDecorator mockDecorator = mock(EventStreamDecorator.class);
+ when(mockDecorator.decorateForAppend(anyString(),
+ any(EventSourcedAggregateRoot.class),
+ any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+ when(mockDecorator.decorateForRead(anyString(), any(), any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+
+ stubHandler.setRepository(testSubject
+ .createRepository(new GenericAggregateFactory<StubAggregate>(StubAggregate.class),
+ mockDecorator));
+
+ CommandMessage<StubCommand> command = new GenericCommandMessage<StubCommand>(
+ new StubCommand(aggregateIdentifier));
+ CommandCallback mockCallback = mock(CommandCallback.class);
+ testSubject.dispatch(command, mockCallback);
+ testSubject.dispatch(command);
+
+ testSubject.stop();
+ assertFalse(customExecutor.awaitTermination(250, TimeUnit.MILLISECONDS));
+ customExecutor.shutdown();
+ assertTrue(customExecutor.awaitTermination(5, TimeUnit.SECONDS));
+
+ // invoked only once, because the second time, the aggregate comes from the 1st level cache
+ verify(mockDecorator).decorateForRead(eq("StubAggregate"), eq(aggregateIdentifier),
+ isA(DomainEventStream.class));
+ verify(mockDecorator, times(2)).decorateForAppend(eq("StubAggregate"), isA(EventSourcedAggregateRoot.class),
+ isA(DomainEventStream.class));
+ }
+
+ @Test
public void testEventPublicationExecutedWithinTransaction() throws Throwable {
CommandHandlerInterceptor mockInterceptor = mock(CommandHandlerInterceptor.class);
ExecutorService customExecutor = Executors.newCachedThreadPool();
@@ -187,7 +233,8 @@ public void testAggregatesBlacklistedAndRecoveredOnError_WithAutoReschedule() th
CommandCallback mockCallback = dispatchCommands(mockInterceptor,
customExecutor,
new GenericCommandMessage<ErrorCommand>(
- new ErrorCommand(aggregateIdentifier)));
+ new ErrorCommand(aggregateIdentifier))
+ );
assertFalse(customExecutor.awaitTermination(250, TimeUnit.MILLISECONDS));
customExecutor.shutdown();
assertTrue(customExecutor.awaitTermination(5, TimeUnit.SECONDS));
@@ -204,7 +251,8 @@ public void testAggregatesBlacklistedAndRecoveredOnError_WithoutReschedule() thr
CommandCallback mockCallback = dispatchCommands(mockInterceptor,
customExecutor,
new GenericCommandMessage<ErrorCommand>(
- new ErrorCommand(aggregateIdentifier)));
+ new ErrorCommand(aggregateIdentifier))
+ );
assertFalse(customExecutor.awaitTermination(250, TimeUnit.MILLISECONDS));
customExecutor.shutdown();
@@ -305,7 +353,8 @@ private CommandCallback dispatchCommands(CommandHandlerInterceptor mockIntercept
.setRollbackConfiguration(new RollbackOnAllExceptionsConfiguration())
.setInvokerThreadCount(2)
.setPublisherThreadCount(3)
- .setTransactionManager(mockTransactionManager));
+ .setTransactionManager(mockTransactionManager)
+ );
testSubject.subscribe(StubCommand.class.getName(), stubHandler);
testSubject.subscribe(CreateCommand.class.getName(), stubHandler);
testSubject.subscribe(ErrorCommand.class.getName(), stubHandler);
@@ -349,7 +398,8 @@ public void testCreateAggregate() {
.setProducerType(ProducerType.SINGLE)
.setWaitStrategy(new SleepingWaitStrategy())
.setInvokerThreadCount(2)
- .setPublisherThreadCount(3));
+ .setPublisherThreadCount(3)
+ );
testSubject.subscribe(StubCommand.class.getName(), stubHandler);
testSubject.subscribe(CreateCommand.class.getName(), stubHandler);
testSubject.subscribe(ErrorCommand.class.getName(), stubHandler);
View
24 core/src/test/java/org/axonframework/contextsupport/spring/DisruptorCommandBusBeanDefinitionParserTest.java
@@ -25,8 +25,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.config.BeanReference;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
+import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.Bean;
@@ -88,6 +90,28 @@ public void testBeanProperties() {
assertEquals("org.dom4j.Document", config.getPropertyValues().getPropertyValue("serializedRepresentation").getValue());
}
+ @Test
+ public void testRepositoryContainsDecorator() {
+ GenericBeanDefinition beanDefinition = (GenericBeanDefinition)
+ beanFactory.getBeanDefinition("myAggregateRepository");
+ PropertyValue triggerValue = beanDefinition.getPropertyValues().getPropertyValue("snapshotterTrigger");
+ assertNotNull(triggerValue);
+ final GenericBeanDefinition snapshotterDefinition = (GenericBeanDefinition) triggerValue.getValue();
+ assertEquals("org.axonframework.eventsourcing.EventCountSnapshotterTrigger",
+ snapshotterDefinition.getBeanClassName());
+ assertEquals("10", snapshotterDefinition.getPropertyValues().getPropertyValue("trigger").getValue());
+ assertEquals(new RuntimeBeanReference("snapshotter"),
+ snapshotterDefinition.getPropertyValues().getPropertyValue("snapshotter").getValue());
+
+ PropertyValue decoratorsValue = beanDefinition.getPropertyValues().getPropertyValue("eventStreamDecorators");
+ assertNotNull(decoratorsValue);
+ final List<BeanDefinitionHolder> decorators = (List) decoratorsValue.getValue();
+ assertEquals(1, decorators.size());
+ assertEquals("org.axonframework.testutils.MockitoMockFactoryBean",
+ decorators.get(0).getBeanDefinition().getBeanClassName());
+
+ }
+
/**
* Spring configuration to test for <a href="http://issues.axonframework.org/youtrack/issue/AXON-159">AXON-159</a>
*/
View
85 core/src/test/java/org/axonframework/contextsupport/spring/DisruptorContextConfigurationTest.java
@@ -20,15 +20,28 @@
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.annotation.CommandHandler;
import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier;
+import org.axonframework.commandhandling.callbacks.FutureCallback;
+import org.axonframework.domain.DomainEventMessage;
+import org.axonframework.domain.DomainEventStream;
+import org.axonframework.domain.SimpleDomainEventStream;
+import org.axonframework.eventsourcing.Snapshotter;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventsourcing.annotation.AggregateIdentifier;
import org.axonframework.eventsourcing.annotation.EventSourcingHandler;
+import org.axonframework.eventstore.EventStore;
+import org.axonframework.eventstore.EventStreamNotFoundException;
import org.junit.*;
import org.junit.runner.*;
+import org.mockito.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
/**
* @author Allard Buijze
*/
@@ -39,10 +52,38 @@
@Autowired
private CommandBus commandBus;
+ @Autowired
+ private Snapshotter mockSnaphotter;
+
+ @Before
+ public void setUp() throws Exception {
+ Mockito.reset(mockSnaphotter);
+ }
+
// Tests a scenario where the order in which command bus and event bus are declared could cause a circular dependency error in Spring
@Test
public void testCommandBus() throws Exception {
- commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("test")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCreateCommand("test")));
+ }
+
+ @Test
+ public void testSnapshotTriggeredAfterFiringCommands() {
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCreateCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")));
+
+ FutureCallback<Object> callback = new FutureCallback<Object>();
+ commandBus.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand("snapshottest")), callback);
+ callback.awaitCompletion(1, TimeUnit.SECONDS);
+
+ Mockito.verify(mockSnaphotter).scheduleSnapshot("MyAggregate", "snapshottest");
}
public static class MyAggregate extends AbstractAnnotatedAggregateRoot {
@@ -54,6 +95,11 @@ public MyAggregate() {
}
@CommandHandler
+ public MyAggregate(StubCreateCommand command) {
+ apply(new SimpleEvent(command.id));
+ }
+
+ @CommandHandler
public MyAggregate(StubCommand command) {
apply(new SimpleEvent(command.id));
}
@@ -64,6 +110,16 @@ public void on(SimpleEvent event) {
}
}
+ public static class StubCreateCommand {
+
+ @TargetAggregateIdentifier
+ private final String id;
+
+ public StubCreateCommand(String id) {
+ this.id = id;
+ }
+ }
+
public static class StubCommand {
@TargetAggregateIdentifier
@@ -73,4 +129,31 @@ public StubCommand(String id) {
this.id = id;
}
}
+
+ public static class InMemoryEventStore implements EventStore {
+
+ private final Map<String, DomainEventMessage> storedEvents = new ConcurrentHashMap<String, DomainEventMessage>();
+
+ @Override
+ public void appendEvents(String type, DomainEventStream events) {
+ if (!events.hasNext()) {
+ return;
+ }
+ String key = events.peek().getAggregateIdentifier().toString();
+ DomainEventMessage<?> lastEvent = null;
+ while (events.hasNext()) {
+ lastEvent = events.next();
+ }
+ storedEvents.put(key, lastEvent);
+ }
+
+ @Override
+ public DomainEventStream readEvents(String type, Object identifier) {
+ DomainEventMessage message = storedEvents.get(identifier.toString());
+ if (message == null) {
+ throw new EventStreamNotFoundException(type, identifier);
+ }
+ return new SimpleDomainEventStream(Collections.singletonList(message));
+ }
+ }
}
View
71 core/src/test/java/org/axonframework/eventsourcing/CompositeEventStreamDecoratorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2010-2014. Axon Framework
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.axonframework.eventsourcing;
+
+import org.axonframework.domain.DomainEventStream;
+import org.axonframework.domain.SimpleDomainEventStream;
+import org.junit.*;
+import org.mockito.*;
+import org.mockito.internal.stubbing.answers.*;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.*;
+
+public class CompositeEventStreamDecoratorTest {
+
+ private CompositeEventStreamDecorator testSubject;
+ private EventStreamDecorator decorator1;
+ private EventStreamDecorator decorator2;
+
+ @Before
+ public void setUp() throws Exception {
+ decorator1 = mock(EventStreamDecorator.class);
+ decorator2 = mock(EventStreamDecorator.class);
+ when(decorator1.decorateForRead(anyString(), any(), any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+ when(decorator2.decorateForRead(anyString(), any(), any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+ when(decorator1.decorateForAppend(anyString(), any(EventSourcedAggregateRoot.class), any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+ when(decorator2.decorateForAppend(anyString(), any(EventSourcedAggregateRoot.class), any(DomainEventStream.class)))
+ .thenAnswer(new ReturnsArgumentAt(2));
+ testSubject = new CompositeEventStreamDecorator(asList(decorator1, decorator2));
+ }
+
+ @Test
+ public void testDecorateForRead() throws Exception {
+ final SimpleDomainEventStream eventStream = new SimpleDomainEventStream();
+ testSubject.decorateForRead("test", "id", eventStream);
+
+ InOrder inOrder = inOrder(decorator1, decorator2);
+ inOrder.verify(decorator1).decorateForRead("test", "id", eventStream);
+ inOrder.verify(decorator2).decorateForRead("test", "id", eventStream);
+ verifyNoMoreInteractions(decorator1, decorator2);
+ }
+
+ @Test
+ public void testDecorateForAppend() throws Exception {
+ final SimpleDomainEventStream eventStream = new SimpleDomainEventStream();
+ final EventSourcedAggregateRoot aggregate = mock(EventSourcedAggregateRoot.class);
+ testSubject.decorateForAppend("test", aggregate, eventStream);
+
+ InOrder inOrder = inOrder(decorator1, decorator2);
+ inOrder.verify(decorator2).decorateForAppend("test", aggregate, eventStream);
+ inOrder.verify(decorator1).decorateForAppend("test", aggregate, eventStream);
+ verifyNoMoreInteractions(decorator1, decorator2);
+ }
+}
View
13 core/src/test/resources/contexts/disruptor-context.xml
@@ -55,7 +55,14 @@
</bean>
</axon:publisher-interceptors>
<axon:repositories>
- <axon:repository id="myAggregateRepository" aggregate-factory="aggregateFactory"/>
+ <axon:repository id="myAggregateRepository" aggregate-factory="aggregateFactory">
+ <axon:snapshotter-trigger event-count-threshold="10" snapshotter-ref="snapshotter"/>
+ <axon:event-processors>
+ <bean class="org.axonframework.testutils.MockitoMockFactoryBean">
+ <property name="mockType" value="org.axonframework.eventsourcing.EventStreamDecorator"/>
+ </bean>
+ </axon:event-processors>
+ </axon:repository>
<axon:repository id="myOtherAggregateRepository"
aggregate-type="org.axonframework.contextsupport.spring.SimpleAnnotatedAggregate"/>
</axon:repositories>
@@ -65,6 +72,10 @@
<property name="mockType" value="org.axonframework.commandhandling.RollbackConfiguration"/>
</bean>
+ <bean id="snapshotter" class="org.axonframework.testutils.MockitoMockFactoryBean">
+ <property name="mockType" value="org.axonframework.eventsourcing.Snapshotter"/>
+ </bean>
+
<bean id="ctr" class="org.axonframework.testutils.MockitoMockFactoryBean">
<property name="mockType" value="org.axonframework.commandhandling.CommandTargetResolver"/>
</bean>
View
12 core/src/test/resources/contexts/simple-disruptor-context.xml
@@ -34,13 +34,17 @@
event-bus="eventBus"
command-target-resolver="commandTargetResolver">
<axon:repositories>
- <axon:repository id="exampleRepository" aggregate-type="org.axonframework.contextsupport.spring.DisruptorContextConfigurationTest.MyAggregate"/>
+ <axon:repository id="exampleRepository" aggregate-type="org.axonframework.contextsupport.spring.DisruptorContextConfigurationTest.MyAggregate">
+ <axon:snapshotter-trigger event-count-threshold="10" snapshotter-ref="mockSnapshotter"/>
+ </axon:repository>
</axon:repositories>
</axon:disruptor-command-bus>
- <axon:filesystem-event-store
- id="eventStore"
- base-dir="${java.io.tmpdir}"/>
+ <bean id="mockSnapshotter" class="org.axonframework.testutils.MockitoMockFactoryBean">
+ <property name="mockType" value="org.axonframework.eventsourcing.Snapshotter"/>
+ </bean>
+
+ <bean id="eventStore" class="org.axonframework.contextsupport.spring.DisruptorContextConfigurationTest$InMemoryEventStore"/>
<bean id="commandTargetResolver" class="org.axonframework.commandhandling.annotation.AnnotationCommandTargetResolver"/>
Please sign in to comment.
Something went wrong with that request. Please try again.