Skip to content

Commit

Permalink
Generic attribute of interceptors in DisruptorCommandBus is required …
Browse files Browse the repository at this point in the history
…to be a super of CommandMessage
  • Loading branch information
Steven van Beelen committed Jul 21, 2016
1 parent 8f7d665 commit 20a77ce
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 43 deletions.
Expand Up @@ -16,12 +16,13 @@

package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.RingBuffer;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.RingBuffer;

/**
* Wrapper for command handler Callbacks that detects blacklisted aggregates and starts a cleanup process when an
* aggregate is blacklisted.
Expand All @@ -35,7 +36,7 @@ public class BlacklistDetectingCallback<C, R> implements CommandCallback<C, R> {

private static final Logger logger = LoggerFactory.getLogger(BlacklistDetectingCallback.class);

private final CommandCallback<C, R> delegate;
private final CommandCallback<? super C, R> delegate;
private final RingBuffer<CommandHandlingEntry> ringBuffer;
private final DisruptorCommandBus commandBus;
private final boolean rescheduleOnCorruptState;
Expand All @@ -54,7 +55,7 @@ public class BlacklistDetectingCallback<C, R> implements CommandCallback<C, R> {
* @param rescheduleOnCorruptState Whether the command should be retried if it has been executed against corrupt
* state
*/
public BlacklistDetectingCallback(CommandCallback<C, R> delegate,
public BlacklistDetectingCallback(CommandCallback<? super C, R> delegate,
RingBuffer<CommandHandlingEntry> ringBuffer,
DisruptorCommandBus commandBus, boolean rescheduleOnCorruptState) {
this.delegate = delegate;
Expand Down
Expand Up @@ -16,14 +16,14 @@

package org.axonframework.commandhandling.disruptor;

import java.util.List;

import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.messaging.DefaultInterceptorChain;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;

import java.util.List;

/**
* DataHolder for the DisruptorCommandBus. The CommandHandlingEntry maintains all information required for or produced
* by the command handling process.
Expand Down Expand Up @@ -173,8 +173,8 @@ public int getPublisherId() {
*/
public void reset(CommandMessage<?> newCommand, MessageHandler<? super CommandMessage<?>> newCommandHandler, // NOSONAR - Not important
int newInvokerSegmentId, int newPublisherSegmentId, BlacklistDetectingCallback newCallback,
List<MessageHandlerInterceptor<CommandMessage<?>>> invokerInterceptors,
List<MessageHandlerInterceptor<CommandMessage<?>>> publisherInterceptors) {
List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors,
List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors) {
this.invokerSegmentId = newInvokerSegmentId;
this.publisherSegmentId = newPublisherSegmentId;
this.callback = newCallback;
Expand Down
Expand Up @@ -16,9 +16,22 @@

package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.axonframework.commandhandling.*;
import static java.lang.String.format;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.Assert;
Expand All @@ -37,11 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import static java.lang.String.format;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

/**
* Asynchronous CommandBus implementation with very high performance characteristics. It divides the command handling
Expand Down Expand Up @@ -106,9 +116,9 @@ public class DisruptorCommandBus implements CommandBus {
private final ConcurrentMap<String, MessageHandler<? super CommandMessage<?>>> commandHandlers = new ConcurrentHashMap<>();
private final Disruptor<CommandHandlingEntry> disruptor;
private final CommandHandlerInvoker[] commandHandlerInvokers;
private final List<MessageDispatchInterceptor<CommandMessage<?>>> dispatchInterceptors;
private final List<MessageHandlerInterceptor<CommandMessage<?>>> invokerInterceptors;
private final List<MessageHandlerInterceptor<CommandMessage<?>>> publisherInterceptors;
private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors;
private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors;
private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors;
private final ExecutorService executorService;
private final boolean rescheduleOnCorruptState;
private final long coolingDownPeriod;
Expand Down Expand Up @@ -199,7 +209,7 @@ public <C> void dispatch(CommandMessage<C> command) {
public <C, R> void dispatch(CommandMessage<C> command, CommandCallback<? super C, R> callback) {
Assert.state(started, "CommandBus has been shut down. It is not accepting any Commands");
CommandMessage<? extends C> commandToDispatch = command;
for (MessageDispatchInterceptor<CommandMessage<?>> interceptor : dispatchInterceptors) {
for (MessageDispatchInterceptor<? super CommandMessage<?>> interceptor : dispatchInterceptors) {
commandToDispatch = (CommandMessage) interceptor.handle(commandToDispatch);
}
doDispatch(commandToDispatch, callback);
Expand Down
Expand Up @@ -16,9 +16,10 @@

package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;

import org.axonframework.commandhandling.AnnotationCommandTargetResolver;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
Expand All @@ -32,9 +33,9 @@
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.serialization.Serializer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

/**
* Configuration object for the DisruptorCommandBus. The DisruptorConfiguration provides access to the options to tweak
Expand All @@ -59,9 +60,9 @@ public class DisruptorConfiguration {
private boolean rescheduleCommandsOnCorruptState;
private long coolingDownPeriod;
private Cache cache;
private final List<MessageHandlerInterceptor<CommandMessage<?>>> invokerInterceptors = new ArrayList<>();
private final List<MessageHandlerInterceptor<CommandMessage<?>>> publisherInterceptors = new ArrayList<>();
private final List<MessageDispatchInterceptor<CommandMessage<?>>> dispatchInterceptors = new ArrayList<>();
private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors = new ArrayList<>();
private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors = new ArrayList<>();
private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors = new ArrayList<>();
private TransactionManager transactionManager;
private CommandTargetResolver commandTargetResolver;
private int invokerThreadCount = 1;
Expand Down Expand Up @@ -148,7 +149,7 @@ public DisruptorConfiguration setExecutor(Executor executor) { //NOSONAR (setter
*
* @return the interceptors for the DisruptorCommandBus
*/
public List<MessageHandlerInterceptor<CommandMessage<?>>> getInvokerInterceptors() {
public List<MessageHandlerInterceptor<? super CommandMessage<?>>> getInvokerInterceptors() {
return invokerInterceptors;
}

Expand All @@ -163,7 +164,7 @@ public List<MessageHandlerInterceptor<CommandMessage<?>>> getInvokerInterceptors
* @return {@code this} for method chaining
*/
public DisruptorConfiguration setInvokerInterceptors(
List<MessageHandlerInterceptor<CommandMessage<?>>> invokerInterceptors) { //NOSONAR (setter may hide field)
List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors) { //NOSONAR (setter may hide field)
this.invokerInterceptors.clear();
this.invokerInterceptors.addAll(invokerInterceptors);
return this;
Expand All @@ -174,7 +175,7 @@ public DisruptorConfiguration setInvokerInterceptors(
*
* @return the interceptors for the DisruptorCommandBus
*/
public List<MessageHandlerInterceptor<CommandMessage<?>>> getPublisherInterceptors() {
public List<MessageHandlerInterceptor<? super CommandMessage<?>>> getPublisherInterceptors() {
return publisherInterceptors;
}

Expand All @@ -197,7 +198,7 @@ public DisruptorConfiguration setPublisherInterceptors(
*
* @return the dispatch interceptors for the DisruptorCommandBus
*/
public List<MessageDispatchInterceptor<CommandMessage<?>>> getDispatchInterceptors() {
public List<MessageDispatchInterceptor<? super CommandMessage<?>>> getDispatchInterceptors() {
return dispatchInterceptors;
}

Expand Down
Expand Up @@ -16,31 +16,42 @@

package org.axonframework.commandhandling.disruptor;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyObject;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.function.Function;

import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.commandhandling.model.inspection.ModelInspector;
import org.axonframework.common.caching.Cache;
import org.axonframework.eventsourcing.*;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventSourcedAggregate;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.internal.stubbing.answers.ReturnsArgumentAt;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.Collections;
import java.util.function.Function;

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.*;

/**
*
*/
Expand All @@ -65,8 +76,8 @@ public void setUp() throws Exception {
mockCommandHandler = mock(MessageHandler.class);
commandHandlingEntry = new CommandHandlingEntry();
commandHandlingEntry.reset(mockCommandMessage, mockCommandHandler, 0, 0, null,
Collections.<MessageHandlerInterceptor<CommandMessage<?>>>emptyList(),
Collections.<MessageHandlerInterceptor<CommandMessage<?>>>emptyList());
Collections.emptyList(),
Collections.emptyList());
eventStreamDecorator = mock(EventStreamDecorator.class);
when(eventStreamDecorator.decorateForAppend(any(), any())).thenAnswer(new ReturnsArgumentAt(1));
when(eventStreamDecorator.decorateForRead(any(), any(DomainEventStream.class)))
Expand Down

0 comments on commit 20a77ce

Please sign in to comment.