Skip to content

Commit

Permalink
Create event store tracking EventProcessor
Browse files Browse the repository at this point in the history
Events can now be supplied to an EventProcessor in two ways: either by subscribing the processor directly to the event bus or by streaming events from the event store and supplying those events to the event processor. The event processor then passes the events on to an EventProcessingStrategy which may decide to handle the events directly or asynchronously. Finally, when the events are handled they are passed to registered EventListeners.
  • Loading branch information
renedewaele committed Jun 6, 2016
1 parent 654a289 commit e762b92
Show file tree
Hide file tree
Showing 65 changed files with 1,559 additions and 2,033 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testMessageListenerInvokesAllEventProcessors() throws Exception {
outputStream.writeEventMessage(new GenericEventMessage<>("Event")); outputStream.writeEventMessage(new GenericEventMessage<>("Event"));
testSubject.onMessage(new Message(baos.toByteArray(), new MessageProperties())); testSubject.onMessage(new Message(baos.toByteArray(), new MessageProperties()));


verify(eventProcessor).handle(argThat(new TypeSafeMatcher<EventMessage>() { verify(eventProcessor).accept(argThat(new TypeSafeMatcher<EventMessage>() {
@Override @Override
public boolean matchesSafely(EventMessage item) { public boolean matchesSafely(EventMessage item) {
return "Event".equals(item.getPayload()); return "Event".equals(item.getPayload());
Expand All @@ -78,6 +78,6 @@ public void testMessageListenerIgnoredOnDeserializationFailure() throws Exceptio
.getBytes(Charset.forName("UTF-8")); .getBytes(Charset.forName("UTF-8"));
testSubject.onMessage(new Message(body, new MessageProperties())); testSubject.onMessage(new Message(body, new MessageProperties()));


verify(eventProcessor, never()).handle(any(EventMessage.class)); verify(eventProcessor, never()).accept(any(EventMessage.class));
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.axonframework.eventhandling.amqp.spring; package org.axonframework.eventhandling.amqp.spring;


import org.axonframework.common.AxonConfigurationException; import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.SimpleEventProcessor;
import org.axonframework.eventhandling.amqp.DefaultAMQPConsumerConfiguration; import org.axonframework.eventhandling.amqp.DefaultAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter; import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.serialization.Serializer; import org.axonframework.serialization.Serializer;
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,12 +1,9 @@
/* /*
* Copyright (c) 2010-2016. Axon Framework * Copyright (c) 2010-2016. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -16,129 +13,36 @@


package org.axonframework.eventhandling; package org.axonframework.eventhandling;


import org.axonframework.common.Assert;
import org.axonframework.common.Registration; import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandlerInterceptor; import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;


import java.util.*; import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;


import static java.util.Objects.requireNonNull;

/** /**
* Abstract {@code EventProcessor} implementation that keeps track of event processor members ({@link EventListener * @author Rene de Waele
* EventListeners}). This implementation is thread-safe.
*
* @author Allard Buijze
* @since 1.2
*/ */
public abstract class AbstractEventProcessor implements EventProcessor, EventProcessingMonitorSupport { public abstract class AbstractEventProcessor implements EventProcessor {

private final String name; private final String name;
private final Set<EventListener> eventListeners; private final EventProcessingStrategy processingStrategy;
private final Set<EventListener> immutableEventListeners; private final Set<MessageHandlerInterceptor<EventMessage<?>>> interceptors = new CopyOnWriteArraySet<>();
private final Set<MessageHandlerInterceptor<EventMessage<?>>> interceptors;
private final EventProcessingMonitorCollection subscribedMonitors = new EventProcessingMonitorCollection();
private final MultiplexingEventProcessingMonitor eventProcessingMonitor = new MultiplexingEventProcessingMonitor(
subscribedMonitors);

/**
* Initializes the event processor with given <code>name</code>. The order in which listeners are organized in the
* event processor is undefined.
*
* @param name The name of this event processor
*/
protected AbstractEventProcessor(String name) {
Assert.notNull(name, "name may not be null");
this.name = name;
eventListeners = new CopyOnWriteArraySet<>();
immutableEventListeners = Collections.unmodifiableSet(eventListeners);
interceptors = new CopyOnWriteArraySet<>();
}

/**
* Initializes the event processor with given <code>name</code>. The order in which listeners are organized in the
* event processor is undefined.
*
* @param name The name of this event processor
*/
protected AbstractEventProcessor(String name, EventListener... initialListeners) {
this(name);
eventListeners.addAll(Arrays.asList(initialListeners));
}

/**
* Initializes the event processor with given <code>name</code>, using given <code>comparator</code> to order the
* listeners in the event processor. The order of invocation of the members in this event processor is according the
* order provided by the comparator.
*
* @param name The name of this event processor
* @param comparator The comparator providing the ordering of the Event Listeners
*/
protected AbstractEventProcessor(String name, Comparator<EventListener> comparator) {
Assert.notNull(name, "name may not be null");
this.name = name;
eventListeners = new ConcurrentSkipListSet<>(comparator);
immutableEventListeners = Collections.unmodifiableSet(eventListeners);
interceptors = new CopyOnWriteArraySet<>();
}


@Override public AbstractEventProcessor(String name, EventProcessingStrategy processingStrategy) {
public void accept(List<? extends EventMessage<?>> events) { this.name = requireNonNull(name);
doPublish(events, immutableEventListeners, interceptors, eventProcessingMonitor); this.processingStrategy = requireNonNull(processingStrategy);
} }


/**
* Publish the given list of <code>events</code> to the given set of <code>eventListeners</code>, and notify the
* given <code>eventProcessingMonitor</code> after completion. The given set of <code>eventListeners</code> is a
* live view on the memberships of the event processor. Any subscription changes are immediately visible in this
* set. Iterators created on the set iterate over an immutable view reflecting the state at the moment the iterator
* was created.
* <p/>
* Before each event is given to the <code>eventListeners</code> the event message should be passed through a chain
* of given <code>interceptors</code>. Each of the interceptors may modify the event message, or stop publication
* altogether. Additionally, Interceptors are able to interact with the unit of work that is created to process each
* event message.
* <p/>
* When this method is invoked as part of a Unit of Work (see {@link CurrentUnitOfWork#isStarted()}), the monitor
* invocation should be postponed until the Unit of Work is committed or rolled back, to ensure any transactions are
* properly propagated when the monitor is invoked.
* <p/>
* It is the implementation's responsibility to ensure that &ndash;eventually&ndash; the each of the given
* <code>events</code> is provided to the <code>eventProcessingMonitor</code>, either to the {@link
* org.axonframework.eventhandling.EventProcessingMonitor#onEventProcessingCompleted(java.util.List)} or the {@link
* org.axonframework.eventhandling.EventProcessingMonitor#onEventProcessingFailed(java.util.List, Throwable)}
* method.
*
* @param events The events to publish
* @param eventListeners Immutable real-time view on subscribed event listeners
* @param interceptors Registered interceptors that need to intercept each event before it's handled
* @param eventProcessingMonitor The monitor to notify after completion.
*/
protected abstract void doPublish(List<? extends EventMessage<?>> events, Set<EventListener> eventListeners,
Set<MessageHandlerInterceptor<EventMessage<?>>> interceptors,
MultiplexingEventProcessingMonitor eventProcessingMonitor);

@Override @Override
public String getName() { public String getName() {
return name; return name;
} }


@Override @Override
public Registration subscribe(EventListener eventListener) { public void accept(List<? extends EventMessage<?>> events) {
eventListeners.add(eventListener); processingStrategy.handle(events, this::doHandle);
Registration monitorSubscription = eventListener instanceof EventProcessingMonitorSupport ?
((EventProcessingMonitorSupport) eventListener)
.subscribeEventProcessingMonitor(eventProcessingMonitor) : null;
return () -> {
if (eventListeners.remove(eventListener)) {
if (monitorSubscription != null) {
monitorSubscription.cancel();
}
return true;
}
return false;
};
} }


@Override @Override
Expand All @@ -147,18 +51,9 @@ public Registration registerInterceptor(MessageHandlerInterceptor<EventMessage<?
return () -> interceptors.remove(interceptor); return () -> interceptors.remove(interceptor);
} }


@Override protected abstract void doHandle(List<? extends EventMessage<?>> eventMessages);
public Registration subscribeEventProcessingMonitor(EventProcessingMonitor monitor) {
return subscribedMonitors.subscribeEventProcessingMonitor(monitor);
}


/** protected Set<MessageHandlerInterceptor<EventMessage<?>>> interceptors() {
* This implementation returns the name of the event processor. return interceptors;
* <p>
* {@inheritDoc}
*/
@Override
public String toString() {
return name;
} }
} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2010-2016. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling;

import java.util.List;
import java.util.function.Consumer;

/**
* @author Rene de Waele
*/
public enum DirectEventProcessingStrategy implements EventProcessingStrategy {
INSTANCE;

@Override
public void handle(List<? extends EventMessage<?>> events, Consumer<List<? extends EventMessage<?>>> processor) {
processor.accept(events);
}
}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2010-2016. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling;

/**
* @author Rene de Waele
*/
public interface ErrorHandler {

/**
* Invoked after given <code>eventListener</code> failed to handle given <code>event</code>. Implementations have a
* choice of options for how to continue:
* <p>
* <ul> <li>To ignore this error no special action is required. Processing will continue for this and subsequent
* events.</li> <li>To retry processing the event, implementations can re-invoke {@link
* EventListener#handle(EventMessage)} on the eventListener once or multiple times.</li> <li>To retry processing at
* a later time schedule a task to re-invoke the eventListener.</li> <li>To terminate event processing altogether
* implementations may throw an exception. Given default configuration this will roll back the Unit of Work involved
* in the processing of this event and events that are processed in the same batch.</li></ul>
*
* @param exception
* @param event
* @param eventListener
* @param eventProcessor
* @throws Exception
*/
void onError(Exception exception, EventMessage<?> event, EventListener eventListener, String eventProcessor) throws Exception;

}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2010-2016. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling;

import org.axonframework.common.AxonException;

/**
* Exception thrown when an {@link EventProcessor} failed to handle a batch of events.
*
* @author Allard Buijze
* @since 0.3
*/
public class EventProcessingException extends AxonException {

/**
* Initialize the exception with given <code>message</code> and <code>cause</code>.
*
* @param message Message describing the cause of the exception
* @param cause The exception that caused this exception to occur.
*/
public EventProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2010-2016. Axon Framework
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling;

import java.util.List;
import java.util.function.Consumer;

/**
* @author Rene de Waele
*/
public interface EventProcessingStrategy {

void handle(List<? extends EventMessage<?>> events, Consumer<List<? extends EventMessage<?>>> processor);

}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,21 +55,8 @@ public interface EventProcessor extends Consumer<List<? extends EventMessage<?>>
* *
* @param events The Events to publish in the event processor * @param events The Events to publish in the event processor
*/ */
default void handle(EventMessage<?>... events) { default void accept(EventMessage<?>... events) {
handle(Arrays.asList(events)); accept(Arrays.asList(events));
}

/**
* Publishes the given Events to the members of this event processor.
* <p/>
* Implementations may do this synchronously or asynchronously. Although {@link EventListener EventListeners} are
* discouraged to throw exceptions, it is possible that they are propagated through this method invocation. In that
* case, no guarantees can be given about the delivery of Events at all EventProcessor members.
*
* @param events The Events to publish in the event processor
*/
default void handle(List<? extends EventMessage<?>> events) {
accept(events);
} }


/** /**
Expand All @@ -84,18 +71,6 @@ default void handle(List<? extends EventMessage<?>> events) {
@Override @Override
void accept(List<? extends EventMessage<?>> events); void accept(List<? extends EventMessage<?>> events);


/**
* Subscribe the given {@code eventListener} to this event processor. If the listener is already subscribed, nothing
* happens.
* <p/>
* While the Event Listeners is subscribed, it will receive all messages published to the event processor.
*
* @param eventListener the Event Listener instance to subscribe
* @return a handle to unsubscribe the <code>eventListener</code>.
* When unsubscribed it will no longer receive events from this event processor.
*/
Registration subscribe(EventListener eventListener);

/** /**
* Registers the given <code>interceptor</code> to this event processor. The <code>interceptor</code> will * Registers the given <code>interceptor</code> to this event processor. The <code>interceptor</code> will
* receive each event message that is about to be published but before it has reached its event handlers. * receive each event message that is about to be published but before it has reached its event handlers.
Expand Down
Loading

0 comments on commit e762b92

Please sign in to comment.