Skip to content

Commit

Permalink
Documented protected classes and methods
Browse files Browse the repository at this point in the history
  • Loading branch information
renedewaele committed Nov 21, 2016
1 parent 285c72a commit b051cbd
Show file tree
Hide file tree
Showing 45 changed files with 755 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public A newInstance(Callable<T> factoryMethod) throws Exception {
return aggregate;
}

/**
* Creates a new aggregate instance using the given {@code factoryMethod}. Implementations should assume that this
* method is only called if a UnitOfWork is currently active.
*
* @param factoryMethod The method to create the aggregate's root instance
* @return an Aggregate instance describing the aggregate's state
* @throws Exception when the factoryMethod throws an exception
*/
protected abstract A doCreateNew(Callable<T> factoryMethod) throws Exception;

/**
Expand Down Expand Up @@ -154,6 +162,11 @@ protected void prepareForCommit(A aggregate) {
});
}

/**
* Returns the aggregate model stored by this repository.
*
* @return the aggregate model stored by this repository
*/
protected AggregateModel<T> aggregateModel() {
return aggregateModel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static ApplyMore apply(Object payload, MetaData metaData) {
* aggregate and additional events need to be applied that depends on state changes brought about by the first event
* use the returned {@link ApplyMore} instance.
*
* @param payload the payload of the event to apply
* @param payload the payload of the event to apply
* @return a gizmo to apply additional events after the given event has been processed by the entire aggregate
* @see ApplyMore
*/
Expand All @@ -72,7 +72,8 @@ public static ApplyMore apply(Object payload) {
* are currently happening, as opposed to events representing historic decisions used to reconstruct the
* Aggregate's state.
*
* @return {@code true} if the aggregate is 'live', {@code false} if the aggregate is initializing state based on historic events
* @return {@code true} if the aggregate is 'live', {@code false} if the aggregate is initializing state based on
* historic events
*/
public static boolean isLive() {
return AggregateLifecycle.getInstance().getIsLive();
Expand All @@ -82,7 +83,8 @@ public static boolean isLive() {
* Indicates whether this Aggregate instance is 'live'. This means events currently applied represent events that
* are currently happening, as opposed to events representing historic decisions.
*
* @return {@code true} if the aggregate is 'live', {@code false} if the aggregate is initializing state based on historic events
* @return {@code true} if the aggregate is 'live', {@code false} if the aggregate is initializing state based on
* historic events
*/
protected abstract boolean getIsLive();

Expand All @@ -97,6 +99,12 @@ public static void markDeleted() {
getInstance().doMarkDeleted();
}

/**
* Returns the {@link AggregateLifecycle} for the current aggregate. If none was defined this method will throw
* an exception.
*
* @return the {@link AggregateLifecycle} for the current aggregate
*/
protected static AggregateLifecycle getInstance() {
AggregateLifecycle instance = CURRENT.get();
if (instance == null && CurrentUnitOfWork.isStarted()) {
Expand All @@ -112,14 +120,46 @@ protected static AggregateLifecycle getInstance() {
return instance;
}

/**
* Marks this aggregate as deleted. Implementations may react differently to aggregates marked for deletion.
* Typically, Event Sourced Repositories will ignore the marking and expect deletion to be provided as part of Event
* information.
*/
protected abstract void doMarkDeleted();

/**
* Registers this aggregate with the current unit of work if one is started.
*/
protected void registerWithUnitOfWork() {
CurrentUnitOfWork.ifStarted(u -> u.getOrComputeResource("ManagedAggregates", k -> new HashSet<>()).add(this));
}

/**
* Apply a {@link DomainEventMessage} with given payload and metadata (metadata from interceptors will be combined
* with the provided metadata). The event should be applied to the aggregate immediately and scheduled for
* publication to other event handlers.
* <p/>
* The event should be applied on all entities part of this aggregate. If the event is applied from an event handler
* of the aggregate and additional events need to be applied that depends on state changes brought about by the
* first event the returned {@link ApplyMore} instance should allow for additional events to be applied after this
* event.
*
* @param payload the payload of the event to apply
* @param metaData any meta-data that must be registered with the Event
* @return a gizmo to apply additional events after the given event has been processed by the entire aggregate
* @see ApplyMore
*/
protected abstract <T> ApplyMore doApply(T payload, MetaData metaData);

/**
* Executes the given task and returns the result of the task. While the task is being executed the current
* aggregate will be registered with the current thread as the 'current' aggregate.
*
* @param task the task to execute on the aggregate
* @param <V> the result of the task
* @return the task's result
* @throws Exception if executing the task causes an exception
*/
protected <V> V executeWithResult(Callable<V> task) throws Exception {
AggregateLifecycle existing = CURRENT.get();
CURRENT.set(this);
Expand All @@ -134,6 +174,12 @@ protected <V> V executeWithResult(Callable<V> task) throws Exception {
}
}

/**
* Executes the given task. While the task is being executed the current aggregate will be registered with the
* current thread as the 'current' aggregate.
*
* @param task the task to execute on the aggregate
*/
protected void execute(Runnable task) {
try {
executeWithResult(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
* @author Allard Buijze
* @since 0.3
*/
public abstract class LockingRepository<T, A extends Aggregate<T>> extends AbstractRepository<T, LockAwareAggregate<T, A>> {
public abstract class LockingRepository<T, A extends Aggregate<T>> extends AbstractRepository<T,
LockAwareAggregate<T, A>> {

private static final Logger logger = LoggerFactory.getLogger(LockingRepository.class);

Expand All @@ -63,8 +64,8 @@ protected LockingRepository(Class<T> aggregateType) {
/**
* Initialize a repository with a pessimistic locking strategy and a parameter resolver factory.
*
* @param aggregateType The type of aggregate stored in this repository
* @param parameterResolverFactory The parameter resolver factory used to resolve parameters of annotated handlers
* @param aggregateType The type of aggregate stored in this repository
* @param parameterResolverFactory The parameter resolver factory used to resolve parameters of annotated handlers
*/
protected LockingRepository(Class<T> aggregateType, ParameterResolverFactory parameterResolverFactory) {
this(aggregateType, new PessimisticLockFactory(), parameterResolverFactory);
Expand All @@ -74,7 +75,7 @@ protected LockingRepository(Class<T> aggregateType, ParameterResolverFactory par
* Initialize the repository with the given {@code LockFactory}.
*
* @param aggregateType The type of aggregate stored in this repository
* @param lockFactory the lock factory to use
* @param lockFactory the lock factory to use
*/
protected LockingRepository(Class<T> aggregateType, LockFactory lockFactory) {
super(aggregateType);
Expand All @@ -85,11 +86,12 @@ protected LockingRepository(Class<T> aggregateType, LockFactory lockFactory) {
/**
* Initialize the repository with the given {@code LockFactory} and {@code ParameterResolverFactory}.
*
* @param aggregateType The type of aggregate stored in this repository
* @param lockFactory The lock factory to use
* @param parameterResolverFactory The parameter resolver factory used to resolve parameters of annotated handlers
* @param aggregateType The type of aggregate stored in this repository
* @param lockFactory The lock factory to use
* @param parameterResolverFactory The parameter resolver factory used to resolve parameters of annotated handlers
*/
protected LockingRepository(Class<T> aggregateType, LockFactory lockFactory, ParameterResolverFactory parameterResolverFactory) {
protected LockingRepository(Class<T> aggregateType, LockFactory lockFactory,
ParameterResolverFactory parameterResolverFactory) {
super(aggregateType, parameterResolverFactory);
Assert.notNull(lockFactory, () -> "LockFactory may not be null");
this.lockFactory = lockFactory;
Expand All @@ -112,6 +114,14 @@ protected LockAwareAggregate<T, A> doCreateNew(Callable<T> factoryMethod) throws
return new LockAwareAggregate<>(aggregate, lock);
}

/**
* Creates a new aggregate instance using the given {@code factoryMethod}. Implementations should assume that this
* method is only called if a UnitOfWork is currently active.
*
* @param factoryMethod The method to create the aggregate's root instance
* @return an Aggregate instance describing the aggregate's state
* @throws Exception when the factoryMethod throws an exception
*/
protected abstract A doCreateNewForLock(Callable<T> factoryMethod) throws Exception;

/**
Expand Down Expand Up @@ -152,11 +162,10 @@ protected void prepareForCommit(LockAwareAggregate<T, A> aggregate) {
protected void doSave(LockAwareAggregate<T, A> aggregate) {
if (aggregate.version() != null && !aggregate.isLockHeld()) {
throw new ConcurrencyException(String.format(
"The aggregate of type [%s] with identifier [%s] could not be "
+ "saved, as a valid lock is not held. Either another thread has saved an aggregate, or "
+ "the current thread had released its lock earlier on.",
aggregate.getClass().getSimpleName(),
aggregate.identifierAsString()));
"The aggregate of type [%s] with identifier [%s] could not be " +
"saved, as a valid lock is not held. Either another thread has saved an aggregate, or " +
"the current thread had released its lock earlier on.",
aggregate.getClass().getSimpleName(), aggregate.identifierAsString()));
}
doSaveWithLock(aggregate.getWrappedAggregate());
}
Expand All @@ -171,11 +180,10 @@ protected void doSave(LockAwareAggregate<T, A> aggregate) {
protected final void doDelete(LockAwareAggregate<T, A> aggregate) {
if (aggregate.version() != null && !aggregate.isLockHeld()) {
throw new ConcurrencyException(String.format(
"The aggregate of type [%s] with identifier [%s] could not be "
+ "saved, as a valid lock is not held. Either another thread has saved an aggregate, or "
+ "the current thread had released its lock earlier on.",
aggregate.getClass().getSimpleName(),
aggregate.identifierAsString()));
"The aggregate of type [%s] with identifier [%s] could not be " +
"saved, as a valid lock is not held. Either another thread has saved an aggregate, or " +
"the current thread had released its lock earlier on.",
aggregate.getClass().getSimpleName(), aggregate.identifierAsString()));
}
doDeleteWithLock(aggregate.getWrappedAggregate());
}
Expand All @@ -194,5 +202,13 @@ protected final void doDelete(LockAwareAggregate<T, A> aggregate) {
*/
protected abstract void doDeleteWithLock(A aggregate);

/**
* Loads the aggregate with the given aggregateIdentifier. All necessary locks have been obtained.
*
* @param aggregateIdentifier the identifier of the aggregate to load
* @param expectedVersion The expected version of the aggregate to load
* @return a fully initialized aggregate
* @throws AggregateNotFoundException if the aggregate with given identifier does not exist
*/
protected abstract A doLoadWithLock(String aggregateIdentifier, Long expectedVersion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ protected AnnotatedAggregate(AggregateModel<T> inspector, EventBus eventBus) {
this.eventBus = eventBus;
}

/**
* Registers the aggregate root created by the given {@code aggregateFactory} with this aggregate. Applies any
* delayed events that have not been applied to the aggregate yet.
* <p>
* This is method is commonly called while an aggregate is being initialized.
*
* @param aggregateFactory the factory to create the aggregate root
* @throws Exception if the aggregate factory fails to create the aggregate root
*/
protected void registerRoot(Callable<T> aggregateFactory) throws Exception {
this.aggregateRoot = executeWithResult(aggregateFactory);
execute(() -> {
Expand Down Expand Up @@ -179,11 +188,22 @@ protected void doMarkDeleted() {
this.isDeleted = true;
}

/**
* Publish an event to the aggregate root and its entities first and external event handlers (using the given
* event bus) later.
*
* @param msg the event message to publish
*/
protected void publish(EventMessage<?> msg) {
inspector.publish(msg, aggregateRoot);
publishOnEventBus(msg);
}

/**
* Publish an event to external event handlers using the given event bus.
*
* @param msg the event message to publish
*/
protected void publishOnEventBus(EventMessage<?> msg) {
if (eventBus != null) {
eventBus.publish(msg);
Expand All @@ -204,6 +224,7 @@ public Object handle(CommandMessage<?> msg) throws Exception {
});
}

@Override
protected <P> ApplyMore doApply(P payload, MetaData metaData) {
if (!applying && aggregateRoot != null) {
applying = true;
Expand All @@ -222,6 +243,14 @@ protected <P> ApplyMore doApply(P payload, MetaData metaData) {
return this;
}

/**
* Creates an {@link EventMessage} with given {@code payload} and {@code metaData}.
*
* @param payload payload of the resulting message
* @param metaData metadata of the resulting message
* @param <P> the payload type
* @return the resulting message
*/
protected <P> EventMessage<P> createMessage(P payload, MetaData metaData) {
return new GenericEventMessage<>(payload, metaData);
}
Expand All @@ -246,6 +275,13 @@ public ApplyMore andThenApply(Supplier<?> payloadOrMessageSupplier) {
return this;
}

/**
* Apply a new event message to the aggregate and then publish this message to external systems. If the given {@code
* payloadOrMessage} is an instance of a {@link Message} an event message is applied with the payload and metadata
* of the given message, otherwise an event message is applied with given payload and empty metadata.
*
* @param payloadOrMessage defines the payload and optionally metadata to apply to the aggregate
*/
protected void applyMessageOrPayload(Object payloadOrMessage) {
if (payloadOrMessage instanceof Message) {
Message message = (Message) payloadOrMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ public Long getVersion(T target) {
return null;
}

/**
* Returns the {@link MessageHandlingMember} that is capable of handling the given {@code message}. If no member is
* found an empty optional is returned.
*
* @param message the message to find a handler for
* @return the handler of the message if present on the model
*/
@SuppressWarnings("unchecked")
protected Optional<MessageHandlingMember<? super T>> getHandler(Message<?> message) {
for (MessageHandlingMember<? super T> handler : eventHandlers) {
Expand Down
Loading

0 comments on commit b051cbd

Please sign in to comment.