Skip to content

Commit

Permalink
Replace DisruptorConfiguration for DisruptorCommandBus.Builder
Browse files Browse the repository at this point in the history
-Copy DisruptorConfiguration to DisruptorCommandBus.Builder
-Builder to be inline with other Builder implementations
-Remove DisruptorConfiguration
-Rename DisruptorConfigurationTest to DisruptorCommandBusBuilderTest
-Adjust all usages of DisruptorConfiguration constructors and the
DisruptorConfiguration for the new Builder approach
-Perform overall reindent and warning clean up of all touched files

#754
  • Loading branch information
smcvb committed Sep 10, 2018
1 parent 6552d71 commit e30a81d
Show file tree
Hide file tree
Showing 8 changed files with 618 additions and 676 deletions.

Large diffs are not rendered by default.

This file was deleted.

Expand Up @@ -42,7 +42,7 @@
import java.util.function.Consumer;

import static org.axonframework.commandhandling.GenericCommandMessage.asCommandMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

/**
* @author Allard Buijze
Expand All @@ -54,7 +54,7 @@ public class DisruptorCommandBusBenchmark {
public static void main(String[] args) throws InterruptedException {
InMemoryEventStore eventStore = new InMemoryEventStore();
StubHandler stubHandler = new StubHandler();
DisruptorCommandBus commandBus = new DisruptorCommandBus();
DisruptorCommandBus commandBus = DisruptorCommandBus.builder().build();
commandBus.subscribe(StubCommand.class.getName(), stubHandler);
stubHandler.setRepository(commandBus.createRepository(eventStore,
new GenericAggregateFactory<>(StubAggregate.class)));
Expand Down
Expand Up @@ -16,21 +16,18 @@

package org.axonframework.commandhandling.disruptor;

import org.junit.Test;
import org.axonframework.common.AxonConfigurationException;
import org.junit.*;

/**
* @author Allard Buijze
*/
public class DisruptorConfigurationTest {
public class DisruptorCommandBusBuilderTest {

@Test(expected = IllegalArgumentException.class)
@Test(expected = AxonConfigurationException.class)
public void testSetIllegalPublisherThreadCount() {
new DisruptorConfiguration().setPublisherThreadCount(0);
DisruptorCommandBus.builder().publisherThreadCount(0).build();
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = AxonConfigurationException.class)
public void testSetIllegalInvokerThreadCount() {
new DisruptorConfiguration().setInvokerThreadCount(0);
DisruptorCommandBus.builder().invokerThreadCount(0).build();
}

}

Large diffs are not rendered by default.

Expand Up @@ -39,10 +39,8 @@
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import org.junit.*;
import org.mockito.stubbing.*;

import java.util.List;
import java.util.Map;
Expand All @@ -53,14 +51,8 @@

import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

/**
* @author Allard Buijze
Expand All @@ -77,18 +69,19 @@ public class DisruptorCommandBusTest_MultiThreaded {
public void setUp() {
StubHandler stubHandler = new StubHandler();
inMemoryEventStore = new InMemoryEventStore();
testSubject = new DisruptorCommandBus(
new DisruptorConfiguration().setBufferSize(4)
.setProducerType(ProducerType.MULTI)
.setWaitStrategy(new SleepingWaitStrategy())
.setRollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE)
.setInvokerThreadCount(2)
.setPublisherThreadCount(3));
testSubject = DisruptorCommandBus.builder()
.bufferSize(4)
.producerType(ProducerType.MULTI)
.waitStrategy(new SleepingWaitStrategy())
.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE)
.invokerThreadCount(2)
.publisherThreadCount(3)
.build();
testSubject.subscribe(StubCommand.class.getName(), stubHandler);
testSubject.subscribe(CreateCommand.class.getName(), stubHandler);
testSubject.subscribe(ErrorCommand.class.getName(), stubHandler);
spiedRepository = spy(testSubject.createRepository(inMemoryEventStore,
new GenericAggregateFactory<>(StubAggregate.class)));
new GenericAggregateFactory<>(StubAggregate.class)));
stubHandler.setRepository(spiedRepository);
}

Expand All @@ -105,8 +98,9 @@ public void testDispatchLargeNumberCommandForDifferentAggregates() throws Except
doAnswer(trackCreateAndLoad(garbageCollectionPrevention)).when(spiedRepository).load(isA(String.class));

List<String> aggregateIdentifiers = IntStream.range(0, AGGREGATE_COUNT)
.mapToObj(i -> IdentifierFactory.getInstance().generateIdentifier())
.collect(toList());
.mapToObj(i -> IdentifierFactory.getInstance()
.generateIdentifier())
.collect(toList());

CommandCallback mockCallback = mock(CommandCallback.class);

Expand All @@ -118,7 +112,7 @@ public void testDispatchLargeNumberCommandForDifferentAggregates() throws Except
assertEquals(10, inMemoryEventStore.loadCounter.get());
assertEquals(20, garbageCollectionPrevention.size());
assertEquals((COMMAND_COUNT * AGGREGATE_COUNT) + (2 * AGGREGATE_COUNT),
inMemoryEventStore.storedEventCounter.get());
inMemoryEventStore.storedEventCounter.get());
verify(mockCallback, times(1000)).onSuccess(any(), any());
verify(mockCallback, times(10)).onFailure(any(), isA(MockException.class));
}
Expand All @@ -133,21 +127,21 @@ private Answer trackCreateAndLoad(Map<Object, Object> garbageCollectionPreventio

private Stream<CommandMessage<Object>> generateCommands(List<String> aggregateIdentifiers) {
Stream<CommandMessage<Object>> create = aggregateIdentifiers.stream()
.map(CreateCommand::new)
.map(GenericCommandMessage::asCommandMessage);
.map(CreateCommand::new)
.map(GenericCommandMessage::asCommandMessage);
Stream<CommandMessage<Object>> head = IntStream.range(0, 10)
.mapToObj(k -> aggregateIdentifiers.stream()
.map(StubCommand::new)
.map(GenericCommandMessage::asCommandMessage))
.reduce(Stream.of(), Stream::concat);
.mapToObj(k -> aggregateIdentifiers.stream()
.map(StubCommand::new)
.map(GenericCommandMessage::asCommandMessage))
.reduce(Stream.of(), Stream::concat);
Stream<CommandMessage<Object>> errors = aggregateIdentifiers.stream()
.map(ErrorCommand::new)
.map(GenericCommandMessage::asCommandMessage);
.map(ErrorCommand::new)
.map(GenericCommandMessage::asCommandMessage);
Stream<CommandMessage<Object>> tail = IntStream.range(11, 100)
.mapToObj(k -> aggregateIdentifiers.stream()
.map(StubCommand::new)
.map(GenericCommandMessage::asCommandMessage))
.reduce(Stream.of(), Stream::concat);
.mapToObj(k -> aggregateIdentifiers.stream()
.map(StubCommand::new)
.map(GenericCommandMessage::asCommandMessage))
.reduce(Stream.of(), Stream::concat);

return Stream.of(create, head, errors, tail).flatMap(identity());
}
Expand Down Expand Up @@ -279,7 +273,8 @@ public Object handle(CommandMessage<?> command) throws Exception {
if (ExceptionCommand.class.isAssignableFrom(command.getPayloadType())) {
throw ((ExceptionCommand) command.getPayload()).getException();
} else if (CreateCommand.class.isAssignableFrom(command.getPayloadType())) {
Aggregate<StubAggregate> aggregate = repository.newInstance(() -> new StubAggregate(payload.getAggregateIdentifier().toString()));
Aggregate<StubAggregate> aggregate = repository
.newInstance(() -> new StubAggregate(payload.getAggregateIdentifier().toString()));
aggregate.execute(StubAggregate::doSomething);
} else {
Aggregate<StubAggregate> aggregate = repository.load(payload.getAggregateIdentifier().toString());
Expand Down
Expand Up @@ -40,7 +40,7 @@ public class DisruptorRepositoryTest {

@Test
public void testDisruptorCommandBusRepositoryNotAvailableOutsideOfInvokerThread() {
DisruptorCommandBus commandBus = new DisruptorCommandBus();
DisruptorCommandBus commandBus = DisruptorCommandBus.builder().build();
Repository<TestAggregate> repository = commandBus
.createRepository(eventStore, new GenericAggregateFactory<>(TestAggregate.class));

Expand Down
Expand Up @@ -32,8 +32,8 @@
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.spring.stereotype.Aggregate;
import org.axonframework.spring.stereotype.Saga;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.*;
import org.junit.runner.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
Expand All @@ -47,12 +47,11 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.*;

@ContextConfiguration(classes = AxonAutoConfigurationWithDisruptorTest.Context.class)
@EnableAutoConfiguration(exclude = {JmxAutoConfiguration.class, WebClientAutoConfiguration.class,
HibernateJpaAutoConfiguration.class, DataSourceAutoConfiguration.class})
HibernateJpaAutoConfiguration.class, DataSourceAutoConfiguration.class})
@RunWith(SpringRunner.class)
public class AxonAutoConfigurationWithDisruptorTest {

Expand Down Expand Up @@ -103,9 +102,10 @@ public CorrelationDataProvider correlationData2() {


@Bean
public DisruptorCommandBus commandBus(){
return new DisruptorCommandBus();
public DisruptorCommandBus commandBus() {
return DisruptorCommandBus.builder().build();
}

@Aggregate
public static class MyAggregate {

Expand All @@ -122,11 +122,11 @@ public void on(String type, SomeComponent test) {

@Saga
public static class MySaga {

@SagaEventHandler(associationProperty = "toString")
public void handle(String type, SomeComponent test) {

}

}

@Component
Expand All @@ -136,14 +136,11 @@ public static class SomeComponent {
public void handle(String event, SomeOtherComponent test) {

}

}

@Component
public static class SomeOtherComponent {
}



}
}
}

0 comments on commit e30a81d

Please sign in to comment.