Skip to content

Commit

Permalink
Replaced protected method by use EventStreamLoadingStrategy in favor …
Browse files Browse the repository at this point in the history
…of making it Spring configurable
  • Loading branch information
André Bierwolf committed May 29, 2018
1 parent c621201 commit d4712e0
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 38 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.axonframework.common.jpa.EntityManagerProvider; import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.eventsourcing.*; import org.axonframework.eventsourcing.*;
import org.axonframework.eventsourcing.eventstore.EventStore; import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.EventStreamLoadingStrategyFactory;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
Expand All @@ -54,6 +55,7 @@ public class AggregateConfigurer<A> implements AggregateConfiguration<A> {
private final Component<CommandTargetResolver> commandTargetResolver; private final Component<CommandTargetResolver> commandTargetResolver;
private final Component<AggregateModel<A>> metaModel; private final Component<AggregateModel<A>> metaModel;
private final List<Registration> registrations = new ArrayList<>(); private final List<Registration> registrations = new ArrayList<>();
private final Component<EventStreamLoadingStrategyFactory> eventStreamLoadingStrategyFactory;
private Configuration parent; private Configuration parent;


/** /**
Expand All @@ -76,6 +78,8 @@ protected AggregateConfigurer(Class<A> aggregate) {
c -> NoSnapshotTriggerDefinition.INSTANCE); c -> NoSnapshotTriggerDefinition.INSTANCE);
aggregateFactory = aggregateFactory =
new Component<>(() -> parent, name("aggregateFactory"), c -> new GenericAggregateFactory<>(aggregate)); new Component<>(() -> parent, name("aggregateFactory"), c -> new GenericAggregateFactory<>(aggregate));
eventStreamLoadingStrategyFactory = new Component<>(() -> parent, name("eventStreamLoadingStrategyFactory"), c -> c
.getComponent(EventStreamLoadingStrategyFactory.class, () -> m -> EventSourcingRepository.DEFAULT_EVENT_STREAM_LOADING_STRATEGY));
repository = new Component<>(() -> parent, "Repository<" + aggregate.getSimpleName() + ">", c -> { repository = new Component<>(() -> parent, "Repository<" + aggregate.getSimpleName() + ">", c -> {
Assert.state(c.eventBus() instanceof EventStore, Assert.state(c.eventBus() instanceof EventStore,
() -> "Default configuration requires the use of event sourcing. Either configure an Event " + () -> "Default configuration requires the use of event sourcing. Either configure an Event " +
Expand All @@ -88,7 +92,7 @@ protected AggregateConfigurer(Class<A> aggregate) {
c.parameterResolverFactory()); c.parameterResolverFactory());
} }
return new EventSourcingRepository<>(metaModel.get(), aggregateFactory.get(), c.eventStore(), return new EventSourcingRepository<>(metaModel.get(), aggregateFactory.get(), c.eventStore(),
snapshotTriggerDefinition.get()); snapshotTriggerDefinition.get(),eventStreamLoadingStrategyFactory.get().create(metaModel.get()));
}); });
commandHandler = new Component<>(() -> parent, "aggregateCommandHandler<" + aggregate.getSimpleName() + ">", commandHandler = new Component<>(() -> parent, "aggregateCommandHandler<" + aggregate.getSimpleName() + ">",
c -> new AggregateAnnotationCommandHandler<>(repository.get(), c -> new AggregateAnnotationCommandHandler<>(repository.get(),
Expand Down Expand Up @@ -217,6 +221,20 @@ public AggregateConfigurer<A> configureSnapshotTrigger(Function<Configuration, S
return this; return this;
} }


/**
* Configures the factory for returning EventStreamLoadingStrategy.
* <p>
* Note that this configuration is ignored if a custom repository instance is configured.
*
* @param eventStreamLoadingStrategyFactory The factory for returning EventStreamLoadingStrategy instances
* @return this configurer instance for chaining
*/
public AggregateConfigurer<A> configureEventStreamLoadingStrategyFactory(Function<Configuration,
EventStreamLoadingStrategyFactory> eventStreamLoadingStrategyFactory) {
this.eventStreamLoadingStrategyFactory.update(eventStreamLoadingStrategyFactory);
return this;
}

@Override @Override
public void initialize(Configuration parent) { public void initialize(Configuration parent) {
this.parent = parent; this.parent = parent;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.axonframework.common.lock.LockFactory; import org.axonframework.common.lock.LockFactory;
import org.axonframework.eventsourcing.eventstore.DomainEventStream; import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore; import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.EventStreamLoadingStrategy;
import org.axonframework.messaging.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;


Expand All @@ -44,9 +45,12 @@
*/ */
public class EventSourcingRepository<T> extends LockingRepository<T, EventSourcedAggregate<T>> { public class EventSourcingRepository<T> extends LockingRepository<T, EventSourcedAggregate<T>> {


public static final EventStreamLoadingStrategy DEFAULT_EVENT_STREAM_LOADING_STRATEGY = (i,e) -> e.readEvents(i);

private final EventStore eventStore; private final EventStore eventStore;
private final SnapshotTriggerDefinition snapshotTriggerDefinition; private final SnapshotTriggerDefinition snapshotTriggerDefinition;
private final AggregateFactory<T> aggregateFactory; private final AggregateFactory<T> aggregateFactory;
private final EventStreamLoadingStrategy eventStreamLoadingStrategy;


/** /**
* Initializes a repository with the default locking strategy, using a GenericAggregateFactory to create new * Initializes a repository with the default locking strategy, using a GenericAggregateFactory to create new
Expand Down Expand Up @@ -86,6 +90,22 @@ public EventSourcingRepository(final AggregateFactory<T> aggregateFactory, Event
this(aggregateFactory, eventStore, NoSnapshotTriggerDefinition.INSTANCE); this(aggregateFactory, eventStore, NoSnapshotTriggerDefinition.INSTANCE);
} }


/**
* Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to
* create new aggregate instances and {@code eventStreamLoadingStrategy} to load the events from
* {@code eventStore}.
*
* @param aggregateModel The meta model describing the aggregate's structure
* @param aggregateFactory The factory for new aggregate instances
* @param eventStore The event store that holds the event streams for this repository
* @param eventStreamLoadingStrategy The strategy for reading events from EventStore
* @see LockingRepository#LockingRepository(Class)
*/
public EventSourcingRepository(AggregateModel<T> aggregateModel, AggregateFactory<T> aggregateFactory, EventStore eventStore,
EventStreamLoadingStrategy eventStreamLoadingStrategy) {
this(aggregateModel, aggregateFactory, eventStore, NoSnapshotTriggerDefinition.INSTANCE, eventStreamLoadingStrategy);
}

/** /**
* Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to * Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to
* create new aggregate instances. * create new aggregate instances.
Expand All @@ -100,8 +120,9 @@ public EventSourcingRepository(AggregateModel<T> aggregateModel, AggregateFactor
} }


/** /**
* Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to * Initializes a repository with the default locking and event stream loading strategy, using the given
* create new aggregate instances and triggering snapshots using the given {@code snapshotTriggerDefinition} * {@code aggregateFactory} to create new aggregate instances and triggering snapshots using the given
* {@code snapshotTriggerDefinition}
* *
* @param aggregateFactory The factory for new aggregate instances * @param aggregateFactory The factory for new aggregate instances
* @param eventStore The event store that holds the event streams for this repository * @param eventStore The event store that holds the event streams for this repository
Expand All @@ -115,11 +136,13 @@ public EventSourcingRepository(final AggregateFactory<T> aggregateFactory, Event
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.eventStore = eventStore; this.eventStore = eventStore;
this.snapshotTriggerDefinition = snapshotTriggerDefinition; this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStreamLoadingStrategy = DEFAULT_EVENT_STREAM_LOADING_STRATEGY;
} }


/** /**
* Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to * Initializes a repository with the default locking and event stream loading strategy, using the given
* create new aggregate instances and triggering snapshots using the given {@code snapshotTriggerDefinition} * {@code aggregateFactory} to create new aggregate instances and triggering snapshots using the given
* {@code snapshotTriggerDefinition}
* *
* @param aggregateModel The meta model describing the aggregate's structure * @param aggregateModel The meta model describing the aggregate's structure
* @param aggregateFactory The factory for new aggregate instances * @param aggregateFactory The factory for new aggregate instances
Expand All @@ -134,12 +157,35 @@ public EventSourcingRepository(AggregateModel<T> aggregateModel, AggregateFactor
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.eventStore = eventStore; this.eventStore = eventStore;
this.snapshotTriggerDefinition = snapshotTriggerDefinition; this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStreamLoadingStrategy = DEFAULT_EVENT_STREAM_LOADING_STRATEGY;
} }



/** /**
* Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to * Initializes a repository with the default locking strategy, using the given {@code aggregateFactory} to
* create new aggregate instances. * create new aggregate instances, triggering snapshots using the given {@code snapshotTriggerDefinition}
* and {@code eventStreamLoadingStrategy} to load the events from {@code eventStore}.
*
* @param aggregateModel The meta model describing the aggregate's structure
* @param aggregateFactory The factory for new aggregate instances
* @param eventStore The event store that holds the event streams for this repository
* @param snapshotTriggerDefinition The definition describing when to trigger a snapshot
* @param eventStreamLoadingStrategy The strategy for reading events from EventStore
* @see LockingRepository#LockingRepository(Class)
*/
public EventSourcingRepository(AggregateModel<T> aggregateModel, AggregateFactory<T> aggregateFactory,
EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition,
EventStreamLoadingStrategy eventStreamLoadingStrategy) {
super(aggregateModel);
Assert.notNull(eventStore, () -> "eventStore may not be null");
this.aggregateFactory = aggregateFactory;
this.eventStore = eventStore;
this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStreamLoadingStrategy = eventStreamLoadingStrategy;
}

/**
* Initializes a repository with the default locking and event stream loading strategy, using the given
* {@code aggregateFactory} to create new aggregate instances.
* *
* @param aggregateFactory The factory for new aggregate instances * @param aggregateFactory The factory for new aggregate instances
* @param eventStore The event store that holds the event streams for this repository * @param eventStore The event store that holds the event streams for this repository
Expand All @@ -155,6 +201,7 @@ public EventSourcingRepository(AggregateFactory<T> aggregateFactory, EventStore
this.snapshotTriggerDefinition = snapshotTriggerDefinition; this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStore = eventStore; this.eventStore = eventStore;
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.eventStreamLoadingStrategy = DEFAULT_EVENT_STREAM_LOADING_STRATEGY;
} }


/** /**
Expand All @@ -172,6 +219,7 @@ public EventSourcingRepository(AggregateFactory<T> aggregateFactory, EventStore
this.eventStore = eventStore; this.eventStore = eventStore;
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.snapshotTriggerDefinition = snapshotTriggerDefinition; this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStreamLoadingStrategy = DEFAULT_EVENT_STREAM_LOADING_STRATEGY;
} }


/** /**
Expand All @@ -191,6 +239,7 @@ public EventSourcingRepository(AggregateFactory<T> aggregateFactory, EventStore
this.eventStore = eventStore; this.eventStore = eventStore;
this.aggregateFactory = aggregateFactory; this.aggregateFactory = aggregateFactory;
this.snapshotTriggerDefinition = snapshotTriggerDefinition; this.snapshotTriggerDefinition = snapshotTriggerDefinition;
this.eventStreamLoadingStrategy = DEFAULT_EVENT_STREAM_LOADING_STRATEGY;
} }


/** /**
Expand All @@ -204,7 +253,7 @@ public EventSourcingRepository(AggregateFactory<T> aggregateFactory, EventStore
*/ */
@Override @Override
protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Long expectedVersion) { protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Long expectedVersion) {
DomainEventStream eventStream = readEvents(aggregateIdentifier); DomainEventStream eventStream = eventStreamLoadingStrategy.readEvents(aggregateIdentifier, eventStore);
SnapshotTrigger trigger = snapshotTriggerDefinition.prepareTrigger(aggregateFactory.getAggregateType()); SnapshotTrigger trigger = snapshotTriggerDefinition.prepareTrigger(aggregateFactory.getAggregateType());
if (!eventStream.hasNext()) { if (!eventStream.hasNext()) {
throw new AggregateNotFoundException(aggregateIdentifier, "The aggregate was not found in the event store"); throw new AggregateNotFoundException(aggregateIdentifier, "The aggregate was not found in the event store");
Expand All @@ -219,17 +268,6 @@ protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Lo
return aggregate; return aggregate;
} }


/**
* Reads the events for the given aggregateIdentifier from the eventStore. This method can be overridden
* to perform filtering on for example aggregate type.
*
* @param aggregateIdentifier the identifier of the aggregate to load
* @return the domain event stream for the given aggregateIdentifier
*/
protected DomainEventStream readEvents(String aggregateIdentifier) {
return eventStore.readEvents(aggregateIdentifier);
}

@Override @Override
protected void validateOnLoad(Aggregate<T> aggregate, Long expectedVersion) { protected void validateOnLoad(Aggregate<T> aggregate, Long expectedVersion) {
if (expectedVersion != null && expectedVersion < aggregate.version()) { if (expectedVersion != null && expectedVersion < aggregate.version()) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -141,27 +141,12 @@ static DomainEventStream concat(DomainEventStream a, DomainEventStream b) {
} }


/** /**
* Filters a DomainEventStream. In the resulting stream events from stream * Returns a stream that provides the items of this stream that match the given {@code filter}.
* {@code a} will be filtered by {@code filter}.
*
* @param a The stream
* @param filter The filter to apply to the stream
* @return A filtered version of stream a
*/
static DomainEventStream filter(DomainEventStream a, Predicate<DomainEventMessage<?>> filter) {
Objects.requireNonNull(a);
Objects.requireNonNull(filter);
return new FilteringDomainEventStream(a, filter);
}

/**
* Apply a filter the current DomainEventStream. In the resulting stream events
* will be filtered by {@code filter}.
* *
* @param filter The filter to apply to the stream * @param filter The filter to apply to the stream
* @return A filtered version of this stream * @return A filtered version of this stream
*/ */
default DomainEventStream filter(Predicate<DomainEventMessage<?>> filter) { default DomainEventStream filter(Predicate<? super DomainEventMessage<?>> filter) {
Objects.requireNonNull(filter); Objects.requireNonNull(filter);
return new FilteringDomainEventStream(this, filter); return new FilteringDomainEventStream(this, filter);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2010-2017. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventsourcing.eventstore;

/**
* Provides a mechanism to adjust the way the EventSourcingRepository reads events
* from the EventStore.
*
* @author André Bierwolf
* @since 3.3
*/
@FunctionalInterface
public interface EventStreamLoadingStrategy {

/**
* Reads the events for the given aggregateIdentifier from the eventStore.
*
* @param aggregateIdentifier the identifier of the aggregate to load
* @param eventStore the EventStore to read from
* @return the domain event stream for the given aggregateIdentifier
*/
public DomainEventStream readEvents(String aggregateIdentifier, EventStore eventStore);
}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2010-2017. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventsourcing.eventstore;

import org.axonframework.commandhandling.model.inspection.AggregateModel;

/**
* Interface describing objects capable of creating instances of EventStreamLoadingStrategy.
*
* @author André Bierwolf
* @since 3.3
*/
@FunctionalInterface
public interface EventStreamLoadingStrategyFactory {

/**
* Instantiates an EventStreamLoadingStrategy based on the given aggregateModel.
*
* @param aggregateModel the aggregateModel to create the strategy for
* @return the strategy to use for the given aggregateModel
*/
public EventStreamLoadingStrategy create(AggregateModel<?> aggregateModel);
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/ */
public class FilteringDomainEventStream implements DomainEventStream { public class FilteringDomainEventStream implements DomainEventStream {
private final DomainEventStream delegate; private final DomainEventStream delegate;
private final Predicate<DomainEventMessage<?>> filter; private final Predicate<? super DomainEventMessage<?>> filter;
private Long lastSequenceNumber; private Long lastSequenceNumber;


/** /**
Expand All @@ -38,7 +38,7 @@ public class FilteringDomainEventStream implements DomainEventStream {
* @param delegate The stream providing the elements * @param delegate The stream providing the elements
* @param filter The filter to apply to the delegate stream * @param filter The filter to apply to the delegate stream
*/ */
public FilteringDomainEventStream(DomainEventStream delegate, Predicate<DomainEventMessage<?>> filter) { public FilteringDomainEventStream(DomainEventStream delegate, Predicate<? super DomainEventMessage<?>> filter) {
this.delegate = delegate; this.delegate = delegate;
this.filter = filter; this.filter = filter;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.axonframework.eventsourcing.AggregateFactory; import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition; import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine; import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStreamLoadingStrategyFactory;
import org.axonframework.messaging.annotation.MessageHandler; import org.axonframework.messaging.annotation.MessageHandler;
import org.axonframework.messaging.annotation.ParameterResolverFactory; import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.correlation.CorrelationDataProvider; import org.axonframework.messaging.correlation.CorrelationDataProvider;
Expand Down Expand Up @@ -334,6 +335,10 @@ private void registerAggregateBeanDefinitions(Configurer configurer, BeanDefinit
.getBean(aggregateAnnotation.commandTargetResolver(), CommandTargetResolver.class)); .getBean(aggregateAnnotation.commandTargetResolver(), CommandTargetResolver.class));
} }


if (beanFactory.containsBean("eventStreamLoadingStrategyFactory")) {
aggregateConf.configureEventStreamLoadingStrategyFactory(c -> beanFactory.getBean("eventStreamLoadingStrategyFactory",EventStreamLoadingStrategyFactory.class));
}

configurer.configureAggregate(aggregateConf); configurer.configureAggregate(aggregateConf);
} }
} }
Expand Down
Loading

0 comments on commit d4712e0

Please sign in to comment.