Skip to content

Commit

Permalink
Add Builder pattern for SimpleEventScheduler
Browse files Browse the repository at this point in the history
-Add Builder pattern for SimpleEventScheduler
-Change usages of constructor to builder
-Reindent touched files

#754
  • Loading branch information
smcvb committed Sep 12, 2018
1 parent 64e7d91 commit 816c891
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 48 deletions.
Expand Up @@ -16,7 +16,7 @@


package org.axonframework.eventhandling.scheduling.java; package org.axonframework.eventhandling.scheduling.java;


import org.axonframework.common.Assert; import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.IdentifierFactory; import org.axonframework.common.IdentifierFactory;
import org.axonframework.common.transaction.NoTransactionManager; import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager; import org.axonframework.common.transaction.TransactionManager;
Expand All @@ -34,7 +34,13 @@
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.axonframework.common.BuilderUtils.assertNonNull;


/** /**
* An {@link EventScheduler} implementation that uses Java's ScheduledExecutorService as scheduling and triggering * An {@link EventScheduler} implementation that uses Java's ScheduledExecutorService as scheduling and triggering
Expand All @@ -52,40 +58,38 @@ public class SimpleEventScheduler implements EventScheduler {


private static final Logger logger = LoggerFactory.getLogger(SimpleEventScheduler.class); private static final Logger logger = LoggerFactory.getLogger(SimpleEventScheduler.class);


private final ScheduledExecutorService executorService; private final ScheduledExecutorService scheduledExecutorService;
private final EventBus eventBus; private final EventBus eventBus;
private final TransactionManager transactionManager; private final TransactionManager transactionManager;

private final Map<String, Future<?>> tokens = new ConcurrentHashMap<>(); private final Map<String, Future<?>> tokens = new ConcurrentHashMap<>();


/** /**
* Initialize the SimpleEventScheduler using the given {@code executorService} as trigger and execution * Instantiate a {@link SimpleEventScheduler} based on the fields contained in the {@link Builder}.
* mechanism, and publishes events to the given {@code eventBus}. * <p>
* Will assert that the {@link ScheduledExecutorService}, {@link EventBus} and {@link TransactionManager} are not
* {@code null}, and will throw an {@link AxonConfigurationException} if any of them is {@code null}.
* *
* @param executorService The backing ScheduledExecutorService * @param builder the {@link Builder} used to instantiate a {@link SimpleEventScheduler} instance
* @param eventBus The Event Bus on which Events are to be published
*/ */
public SimpleEventScheduler(ScheduledExecutorService executorService, EventBus eventBus) { protected SimpleEventScheduler(Builder builder) {
this(executorService, eventBus, NoTransactionManager.INSTANCE); builder.validate();
this.scheduledExecutorService = builder.scheduledExecutorService;
this.eventBus = builder.eventBus;
this.transactionManager = builder.transactionManager;
} }


/** /**
* Initialize the SimpleEventScheduler using the given {@code executorService} as trigger and execution * Builder class to instantiate a {@link SimpleEventScheduler}.
* mechanism, and publishes events to the given {@code eventBus}. The {@code eventTriggerCallback} is * <p>
* invoked just before and after publication of a scheduled event. * The {@link TransactionManager} is defaulted to a {@link NoTransactionManager}. The
* {@link ScheduledExecutorService} and {@link EventBus} are a <b>hard requirements</b> and as such should be
* provided.
* *
* @param executorService The backing ScheduledExecutorService * @return a Builder to be able to create a {@link SimpleEventScheduler}
* @param eventBus The Event Bus on which Events are to be published
* @param transactionManager to manage the transaction around Event publication
*/ */
public SimpleEventScheduler(ScheduledExecutorService executorService, EventBus eventBus, public static Builder builder() {
TransactionManager transactionManager) { return new Builder();
Assert.notNull(executorService, () -> "executorService may not be null");
Assert.notNull(eventBus, () -> "eventBus may not be null");
Assert.notNull(transactionManager, () -> "transactionManager may not be null");

this.executorService = executorService;
this.eventBus = eventBus;
this.transactionManager = transactionManager;
} }


@Override @Override
Expand All @@ -96,9 +100,9 @@ public ScheduleToken schedule(Instant triggerDateTime, Object event) {
@Override @Override
public ScheduleToken schedule(Duration triggerDuration, Object event) { public ScheduleToken schedule(Duration triggerDuration, Object event) {
String tokenId = IdentifierFactory.getInstance().generateIdentifier(); String tokenId = IdentifierFactory.getInstance().generateIdentifier();
ScheduledFuture<?> future = executorService.schedule(new PublishEventTask(event, tokenId), ScheduledFuture<?> future = scheduledExecutorService.schedule(new PublishEventTask(event, tokenId),
triggerDuration.toMillis(), triggerDuration.toMillis(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
tokens.put(tokenId, future); tokens.put(tokenId, future);
return new SimpleScheduleToken(tokenId); return new SimpleScheduleToken(tokenId);
} }
Expand Down Expand Up @@ -150,11 +154,86 @@ private EventMessage<?> createMessage() {
EventMessage<?> eventMessage; EventMessage<?> eventMessage;
if (event instanceof EventMessage) { if (event instanceof EventMessage) {
eventMessage = new GenericEventMessage<>(((EventMessage) event).getPayload(), eventMessage = new GenericEventMessage<>(((EventMessage) event).getPayload(),
((EventMessage) event).getMetaData()); ((EventMessage) event).getMetaData());
} else { } else {
eventMessage = new GenericEventMessage<>(event, MetaData.emptyInstance()); eventMessage = new GenericEventMessage<>(event, MetaData.emptyInstance());
} }
return eventMessage; return eventMessage;
} }
} }

/**
* Builder class to instantiate a {@link SimpleEventScheduler}.
* <p>
* The {@link TransactionManager} is defaulted to a {@link NoTransactionManager}. The
* {@link ScheduledExecutorService} and {@link EventBus} are a <b>hard requirements</b> and as such should be
* provided.
*/
public static class Builder {

private ScheduledExecutorService scheduledExecutorService;
private EventBus eventBus;
private TransactionManager transactionManager = NoTransactionManager.INSTANCE;

/**
* Sets the {@link EventBus} used to publish scheduled events on once the schedule has been met.
*
* @param eventBus a {@link EventBus} used to publish scheduled events on once the schedule has been met
* @return the current Builder instance, for a fluent interfacing
*/
public Builder eventBus(EventBus eventBus) {
assertNonNull(eventBus, "EventBus may not be null");
this.eventBus = eventBus;
return this;
}

/**
* Sets the {@link ScheduledExecutorService} used for scheduling and triggering events.
*
* @param scheduledExecutorService a {@link ScheduledExecutorService} used for scheduling and triggering
* events
* @return the current Builder instance, for a fluent interfacing
*/
public Builder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
assertNonNull(scheduledExecutorService, "ScheduledExecutorService may not be null");
this.scheduledExecutorService = scheduledExecutorService;
return this;
}

/**
* Sets the {@link TransactionManager} used to build transactions and ties them on event publication. Defaults
* to a {@link NoTransactionManager}.
*
* @param transactionManager a {@link TransactionManager} used to build transactions and ties them on event
* publication
* @return the current Builder instance, for a fluent interfacing
*/
public Builder transactionManager(TransactionManager transactionManager) {
assertNonNull(transactionManager, "TransactionManager may not be null");
this.transactionManager = transactionManager;
return this;
}

/**
* Initializes a {@link SimpleEventScheduler} as specified through this Builder.
*
* @return a {@link SimpleEventScheduler} as specified through this Builder
*/
public SimpleEventScheduler build() {
return new SimpleEventScheduler(this);
}

/**
* Validate whether the fields contained in this Builder are set accordingly.
*
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
* specifications
*/
protected void validate() throws AxonConfigurationException {
assertNonNull(eventBus, "The EventBus is a hard requirement and should be provided");
assertNonNull(scheduledExecutorService,
"The ScheduledExecutorService is a hard requirement and should be provided");
assertNonNull(transactionManager, "The TransactionManager is a hard requirement and should be provided");
}
}
} }
Expand Up @@ -21,21 +21,22 @@
import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.saga.Saga; import org.axonframework.eventhandling.saga.Saga;
import org.axonframework.eventhandling.scheduling.ScheduleToken; import org.axonframework.eventhandling.scheduling.ScheduleToken;
import org.junit.After; import org.junit.*;
import org.junit.Before; import org.mockito.*;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.quartz.SchedulerException;


import java.io.*; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration; import java.time.Duration;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import static org.junit.Assert.assertTrue; import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;


/** /**
Expand All @@ -46,19 +47,22 @@ public class SimpleEventSchedulerTest {


private SimpleEventScheduler testSubject; private SimpleEventScheduler testSubject;
private EventBus eventBus; private EventBus eventBus;
private ScheduledExecutorService executorService; private ScheduledExecutorService scheduledExecutorService;


@Before @Before
public void setUp() { public void setUp() {
eventBus = mock(EventBus.class); eventBus = mock(EventBus.class);
executorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
testSubject = new SimpleEventScheduler(executorService, eventBus); testSubject = SimpleEventScheduler.builder()
.scheduledExecutorService(scheduledExecutorService)
.eventBus(eventBus)
.build();
} }


@After @After
public void tearDown() { public void tearDown() {
if (executorService != null) { if (scheduledExecutorService != null) {
executorService.shutdownNow(); scheduledExecutorService.shutdownNow();
} }
} }


Expand Down Expand Up @@ -107,9 +111,9 @@ public void testCancelJob() throws InterruptedException {
verify(eventBus).publish(argThat((ArgumentMatcher<EventMessage>) item -> (item != null) verify(eventBus).publish(argThat((ArgumentMatcher<EventMessage>) item -> (item != null)
&& event2.getPayload().equals(item.getPayload()) && event2.getPayload().equals(item.getPayload())
&& event2.getMetaData().equals(item.getMetaData()))); && event2.getMetaData().equals(item.getMetaData())));
executorService.shutdown(); scheduledExecutorService.shutdown();
assertTrue("Executor refused to shutdown within a second", assertTrue("Executor refused to shutdown within a second",
executorService.awaitTermination(1, TimeUnit.SECONDS)); scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS));
} }


private EventMessage<Object> createEvent() { private EventMessage<Object> createEvent() {
Expand Down
Expand Up @@ -78,13 +78,17 @@ public void afterPropertiesSet() {
if (eventBus == null) { if (eventBus == null) {
eventBus = applicationContext.getBean(EventBus.class); eventBus = applicationContext.getBean(EventBus.class);
} }
if (transactionManager == null) {
this.eventScheduler = new SimpleEventScheduler(executorService, eventBus); SimpleEventScheduler.Builder eventSchedulerBuilder =
} else { SimpleEventScheduler.builder()
this.eventScheduler = new SimpleEventScheduler( .scheduledExecutorService(executorService)
executorService, eventBus, .eventBus(eventBus);
new SpringTransactionManager(transactionManager, transactionDefinition)); if (transactionManager != null) {
eventSchedulerBuilder.transactionManager(
new SpringTransactionManager(transactionManager, transactionDefinition)
);
} }
this.eventScheduler = eventSchedulerBuilder.build();
} }


@Override @Override
Expand Down

0 comments on commit 816c891

Please sign in to comment.