Skip to content

Commit

Permalink
Initial commit for the new Event Store API
Browse files Browse the repository at this point in the history
Event store now supports streaming of events. Event store has an embedded storage to fetch and store events. Test code in core needs fixing.
  • Loading branch information
renedewaele committed Apr 11, 2016
1 parent 28ce88e commit fbec2f3
Show file tree
Hide file tree
Showing 237 changed files with 6,259 additions and 11,212 deletions.
@@ -1,12 +1,9 @@
/*
* Copyright (c) 2010-2014. Axon Framework
*
* Copyright (c) 2010-2016. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,116 +13,48 @@

package org.axonframework.commandhandling;

import org.axonframework.domain.IdentifierFactory;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDecorator;
import org.axonframework.messaging.metadata.MetaData;

import java.util.Map;

/**
* Implementation of the CommandMessage that takes all properties as constructor parameters.
*
* @param <T> The type of payload contained in this Message
* @author Allard Buijze
* @since 2.0
* @author Rene de Waele
*/
public class GenericCommandMessage<T> extends GenericMessage<T> implements CommandMessage<T> {

private static final long serialVersionUID = 8754588074137370013L;

public class GenericCommandMessage<T> extends MessageDecorator<T> implements CommandMessage<T> {
private final String commandName;

/**
* Returns the given command as a CommandMessage. If <code>command</code> already implements CommandMessage, it is
* returned as-is. Otherwise, the given <code>command</code> is wrapped into a GenericCommandMessage as its
* payload.
*
* @param command the command to wrap as CommandMessage
* @return a CommandMessage containing given <code>command</code> as payload, or <code>command</code> if it already
* implements CommandMessage.
*/
@SuppressWarnings("unchecked")
public static <C> CommandMessage<C> asCommandMessage(Object command) {
if (CommandMessage.class.isInstance(command)) {
return (CommandMessage<C>) command;
}
return new GenericCommandMessage<>((C) command);
}

/**
* Create a CommandMessage with the given <code>command</code> as payload and empty metaData
*
* @param payload the payload for the Message
*/
public GenericCommandMessage(T payload) {
this(payload, MetaData.emptyInstance());
return new GenericCommandMessage<>((C) command, MetaData.emptyInstance());
}

/**
* Create a CommandMessage with the given <code>command</code> as payload.
*
* @param payload the payload for the Message
* @param newMetaData The meta data for this message
*/
public GenericCommandMessage(T payload, Map<String, ?> newMetaData) {
this(payload.getClass().getName(), payload, newMetaData);
public GenericCommandMessage(T payload, Map<String, ?> metaData) {
this(new GenericMessage<>(payload, metaData), payload.getClass().getName());
}

/**
* Create a CommandMessage with the given <code>command</code> as payload.
*
* @param commandName The name of the command
* @param payload the payload for the Message
* @param newMetaData The meta data for this message
*/
public GenericCommandMessage(String commandName, T payload, Map<String, ?> newMetaData) {
this(IdentifierFactory.getInstance().generateIdentifier(), commandName, payload, newMetaData);
}

/**
* Create a CommandMessage with the given <code>command</code> as payload and a custom chosen
* <code>identifier</code>. Use this constructor to reconstruct instances of existing command messages, which have
* already been assigned an identifier.
*
* @param identifier the unique identifier of this message
* @param commandName The name of the command
* @param payload the payload for the Message
* @param newMetaData The meta data for this message (<code>null</code> results in empty meta data)
*/
public GenericCommandMessage(String identifier, String commandName, T payload, Map<String, ?> newMetaData) {
super(identifier, payload, newMetaData);
public GenericCommandMessage(Message<T> delegate, String commandName) {
super(delegate);
this.commandName = commandName;
}

private GenericCommandMessage(GenericCommandMessage<T> original, Map<String, ?> metaData) {
this(original.getIdentifier(), original.getCommandName(), original.getPayload(), metaData);
}

@Override
public String getCommandName() {
return commandName;
}

@Override
@SuppressWarnings("EqualsBetweenInconvertibleTypes")
public GenericCommandMessage<T> withMetaData(Map<String, ?> newMetaData) {
if (getMetaData().equals(newMetaData)) {
return this;
}
return new GenericCommandMessage<>(this, newMetaData);
public GenericCommandMessage<T> withMetaData(Map<String, ?> metaData) {
return new GenericCommandMessage<>(getDelegate().withMetaData(metaData), commandName);
}

@Override
public GenericCommandMessage<T> andMetaData(Map<String, ?> additionalMetaData) {
if (additionalMetaData.isEmpty()) {
return this;
}
return new GenericCommandMessage<>(this, getMetaData().mergedWith(additionalMetaData));
}


@Override
public String toString() {
return String.format("GenericCommandMessage[%s]", getPayload().toString());
public GenericCommandMessage<T> andMetaData(Map<String, ?> metaData) {
return new GenericCommandMessage<>(getDelegate().andMetaData(metaData), commandName);
}
}
Expand Up @@ -113,8 +113,7 @@ private <C> Object doDispatch(CommandMessage<C> command, MessageHandler<? super
logger.debug("Dispatching command [{}]", command.getCommandName());
}
UnitOfWork<CommandMessage<?>> unitOfWork = unitOfWorkFactory.createUnitOfWork(command);
InterceptorChain<CommandMessage<?>> chain = new DefaultInterceptorChain<>(
unitOfWork, handlerInterceptors, handler);
InterceptorChain chain = new DefaultInterceptorChain<>(unitOfWork, handlerInterceptors, handler);
return unitOfWork.executeWithResult(chain::proceed, rollbackConfiguration);
}

Expand Down
Expand Up @@ -27,13 +27,11 @@
import org.axonframework.commandhandling.model.inspection.EventSourcedAggregate;
import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.Assert;
import org.axonframework.common.io.IOUtils;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.common.PeekingIterator;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.DomainEventStream;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +40,7 @@
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
* Component of the DisruptorCommandBus that invokes the command handler. The execution is done within a Unit Of Work.
Expand All @@ -60,7 +59,6 @@ public class CommandHandlerInvoker implements EventHandler<CommandHandlingEntry>
private final Cache cache;
private final int segmentId;
private final EventStore eventStore;
private final EventBus eventBus;

/**
* Create an aggregate invoker instance that uses the given <code>eventStore</code> and <code>cache</code> to
Expand All @@ -70,11 +68,10 @@ public class CommandHandlerInvoker implements EventHandler<CommandHandlingEntry>
* @param cache The cache temporarily storing aggregate instances
* @param segmentId The id of the segment this invoker should handle
*/
public CommandHandlerInvoker(EventStore eventStore, EventBus eventBus, Cache cache, int segmentId) {
public CommandHandlerInvoker(EventStore eventStore, Cache cache, int segmentId) {
this.eventStore = eventStore;
this.cache = cache;
this.segmentId = segmentId;
this.eventBus = eventBus;
}

/**
Expand Down Expand Up @@ -126,7 +123,7 @@ public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory,
EventStreamDecorator decorator) {
return repositories.computeIfAbsent(aggregateFactory.getAggregateType(),
k -> new DisruptorRepository<>(aggregateFactory, cache,
eventStore, eventBus, decorator));
eventStore, decorator));
}

private void removeEntry(String aggregateIdentifier) {
Expand Down Expand Up @@ -154,19 +151,17 @@ public void onShutdown() {
static final class DisruptorRepository<T> implements Repository<T> {

private final EventStore eventStore;
private final EventBus eventBus;
private final EventStreamDecorator decorator;
private final AggregateFactory<T> aggregateFactory;
private final Map<EventSourcedAggregate<T>, Object> firstLevelCache = new WeakHashMap<>();
private final Cache cache;
private final AggregateModel<T> model;

private DisruptorRepository(AggregateFactory<T> aggregateFactory, Cache cache, EventStore eventStore,
EventBus eventBus, EventStreamDecorator decorator) {
EventStreamDecorator decorator) {
this.aggregateFactory = aggregateFactory;
this.cache = cache;
this.eventStore = eventStore;
this.eventBus = eventBus;
this.decorator = decorator;
this.model = ModelInspector.inspectAggregate(aggregateFactory.getAggregateType());
}
Expand Down Expand Up @@ -210,26 +205,20 @@ public Aggregate<T> load(String aggregateIdentifier) {
if (aggregateRoot == null) {
logger.debug("Aggregate {} not in first level cache, loading fresh one from Event Store",
aggregateIdentifier);
DomainEventStream events = null;
Stream<? extends DomainEventMessage<?>> events = eventStore.readEvents(aggregateIdentifier);
try {
events = decorator.decorateForRead(aggregateIdentifier,
eventStore.readEvents(aggregateIdentifier));
if (events.hasNext()) {
aggregateRoot = EventSourcedAggregate.initialize(aggregateFactory.createAggregate(
aggregateIdentifier, events.peek()), model, eventBus, eventStore);
aggregateRoot.initializeState(events);
events = decorator.decorateForRead(aggregateIdentifier, events);
PeekingIterator<? extends DomainEventMessage<?>> iterator = PeekingIterator.of(events.iterator());
if (!iterator.hasNext()) {
throw new AggregateNotFoundException(aggregateIdentifier,
"The aggregate was not found in the event store");
}
} catch (EventStreamNotFoundException e) {
throw new AggregateNotFoundException(
aggregateIdentifier,
"Aggregate not found. Possibly involves an aggregate being created, "
+ "or a command that was executed against an aggregate that did not yet "
+ "finish the creation process. It will be rescheduled for publication when it "
+ "attempts to load an aggregate",
e
);
aggregateRoot = EventSourcedAggregate
.initialize(aggregateFactory.createAggregate(aggregateIdentifier, iterator.peek()),
model, eventStore);
aggregateRoot.initializeState(iterator);
} finally {
IOUtils.closeQuietlyIfCloseable(events);
events.close();
}
firstLevelCache.put(aggregateRoot, PLACEHOLDER_VALUE);
cache.put(aggregateIdentifier, aggregateRoot);
Expand All @@ -240,7 +229,7 @@ public Aggregate<T> load(String aggregateIdentifier) {
@Override
public Aggregate<T> newInstance(Callable<T> factoryMethod) throws Exception {
EventSourcedAggregate<T> aggregate = EventSourcedAggregate.initialize(factoryMethod, model,
eventBus, eventStore);
eventStore);
firstLevelCache.put(aggregate, PLACEHOLDER_VALUE);
cache.put(aggregate.identifier(), aggregate);
return aggregate;
Expand Down
Expand Up @@ -37,8 +37,8 @@
public class CommandHandlingEntry extends DisruptorUnitOfWork<CommandMessage<?>> {

private final MessageHandler<CommandMessage<?>> repeatingCommandHandler;
private InterceptorChain<CommandMessage<?>> invocationInterceptorChain;
private InterceptorChain<CommandMessage<?>> publisherInterceptorChain;
private InterceptorChain invocationInterceptorChain;
private InterceptorChain publisherInterceptorChain;
private Exception exceptionResult;
private Object result;
private int publisherSegmentId;
Expand All @@ -63,7 +63,7 @@ public CommandHandlingEntry() {
*
* @return the InterceptorChain for the invocation process registered with this entry
*/
public InterceptorChain<CommandMessage<?>> getInvocationInterceptorChain() {
public InterceptorChain getInvocationInterceptorChain() {
return invocationInterceptorChain;
}

Expand Down

0 comments on commit fbec2f3

Please sign in to comment.