diff --git a/build.gradle b/build.gradle
index 4855b483023..ee199c9c158 100644
--- a/build.gradle
+++ b/build.gradle
@@ -15,7 +15,7 @@ allprojects {
apply plugin: 'jacoco'
group = 'org.spine3'
- version = '0.2'
+ version = '0.3'
}
project.ext {
diff --git a/client/build.gradle b/client/build.gradle
index 7491242a5e7..c775db48eb0 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -4,7 +4,7 @@ buildscript {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
}
dependencies {
- classpath group: 'org.spine3.tools', name: 'protobuf-plugin', version: '1.2', changing: true
+ classpath group: 'org.spine3.tools', name: 'protobuf-plugin', version: '1.3.1', changing: true
}
}
diff --git a/client/src/main/java/org/spine3/base/Commands.java b/client/src/main/java/org/spine3/base/Commands.java
index b12db1d15b3..561e2d858cd 100644
--- a/client/src/main/java/org/spine3/base/Commands.java
+++ b/client/src/main/java/org/spine3/base/Commands.java
@@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.Duration;
import com.google.protobuf.Message;
import com.google.protobuf.Timestamp;
import org.spine3.protobuf.EntityPackagesMap;
@@ -39,6 +40,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.protobuf.util.TimeUtil.getCurrentTime;
+import static org.spine3.validate.Validate.isNotDefault;
/**
* Client-side utilities for working with commands.
@@ -48,11 +50,12 @@
public class Commands {
/**
- * A substring which the {@code .proto} file containing commands must have in its name.
+ * A suffix which the {@code .proto} file containing commands must have in its name.
*/
- public static final String COMMANDS_FILE_SUBSTRING = "commands";
+ public static final String FILE_NAME_SUFFIX = "commands";
- private static final char PROTO_FILE_SEPARATOR = '/';
+ private static final char FILE_PATH_SEPARATOR = '/';
+ private static final char FILE_EXTENSION_SEPARATOR = '.';
private Commands() {}
@@ -193,13 +196,14 @@ public static String formatMessageTypeAndId(String format, Message commandMessag
* Checks if the file is for commands.
*
* @param file a descriptor of a {@code .proto} file to check
- * @return {@code true} if the file name contains {@link #COMMANDS_FILE_SUBSTRING} substring, {@code false} otherwise
+ * @return {@code true} if the file name ends with the {@link #FILE_NAME_SUFFIX}, {@code false} otherwise
*/
public static boolean isCommandsFile(FileDescriptor file) {
final String fqn = file.getName();
- final int startIndexOfFileName = fqn.lastIndexOf(PROTO_FILE_SEPARATOR) + 1;
- final String fileName = fqn.substring(startIndexOfFileName);
- final boolean isCommandsFile = fileName.contains(COMMANDS_FILE_SUBSTRING);
+ final int startIndexOfFileName = fqn.lastIndexOf(FILE_PATH_SEPARATOR) + 1;
+ final int endIndexOfFileName = fqn.lastIndexOf(FILE_EXTENSION_SEPARATOR);
+ final String fileName = fqn.substring(startIndexOfFileName, endIndexOfFileName);
+ final boolean isCommandsFile = fileName.endsWith(FILE_NAME_SUFFIX);
return isCommandsFile;
}
@@ -216,4 +220,20 @@ public static boolean isEntityFile(FileDescriptor file) {
final boolean isCommandForEntity = EntityPackagesMap.contains(protoPackage);
return isCommandForEntity;
}
+
+ /**
+ * Checks if the command is scheduled to be delivered later.
+ *
+ * @param command a command to check
+ * @return {@code true} if the command context has a scheduling option set, {@code false} otherwise
+ */
+ public static boolean isScheduled(Command command) {
+ final Schedule schedule = command.getContext().getSchedule();
+ final Duration delay = schedule.getAfter();
+ if (isNotDefault(delay)) {
+ checkArgument(delay.getSeconds() > 0, "Command delay seconds must be a positive value.");
+ return true;
+ }
+ return false;
+ }
}
diff --git a/client/src/main/java/org/spine3/base/Responses.java b/client/src/main/java/org/spine3/base/Responses.java
index 5ef430a9663..d1eaf4ebac6 100644
--- a/client/src/main/java/org/spine3/base/Responses.java
+++ b/client/src/main/java/org/spine3/base/Responses.java
@@ -22,6 +22,8 @@
import com.google.protobuf.Empty;
+import static org.spine3.protobuf.Messages.fromAny;
+
/**
* Utilities for working with {@link org.spine3.base.Response} objects.
*
@@ -47,6 +49,8 @@ public static Response ok() {
}
/**
+ * Checks if the response is OK.
+ *
* @return {@code true} if the passed response represents `ok` status,
* {@code false} otherwise
*/
@@ -56,13 +60,31 @@ public static boolean isOk(Response response) {
}
/**
+ * Checks if the response is `unsupported command`.
+ *
* @return {@code true} if the passed response represents `unsupported command` error,
* {@code false} otherwise
*/
public static boolean isUnsupportedCommand(Response response) {
if (response.getStatusCase() == Response.StatusCase.ERROR) {
final Error error = response.getError();
- return error.getCode() == CommandValidationError.UNSUPPORTED_COMMAND.getNumber();
+ final boolean isUnsupported = error.getCode() == CommandValidationError.UNSUPPORTED_COMMAND.getNumber();
+ return isUnsupported;
+ }
+ return false;
+ }
+
+ /**
+ * Checks if the response is `invalid command`.
+ *
+ * @return {@code true} if the passed response represents `invalid command` error,
+ * {@code false} otherwise
+ */
+ public static boolean isInvalidCommand(Response response) {
+ if (response.getStatusCase() == Response.StatusCase.FAILURE) {
+ final ValidationFailure failure = fromAny(response.getFailure().getInstance());
+ final boolean isInvalid = !failure.getConstraintViolationList().isEmpty();
+ return isInvalid;
}
return false;
}
diff --git a/client/src/main/java/org/spine3/protobuf/Durations.java b/client/src/main/java/org/spine3/protobuf/Durations.java
index 27ea85ce63e..c14ea52ef16 100644
--- a/client/src/main/java/org/spine3/protobuf/Durations.java
+++ b/client/src/main/java/org/spine3/protobuf/Durations.java
@@ -19,7 +19,7 @@
/**
* Utility class for working with durations in addition to those available from {@link TimeUtil}.
- *
+ *
*
Use {@code import static org.spine3.protobuf.Durations.*} for compact initialization like this:
*
violations = newArrayList(ConstraintViolation.getDefaultInstance());
+ final ValidationFailure failureInstance = ValidationFailure.newBuilder()
+ .addAllConstraintViolation(violations)
+ .build();
+ final Failure.Builder failure = Failure.newBuilder()
+ .setInstance(toAny(failureInstance));
+ final Response response = Response.newBuilder()
+ .setFailure(failure)
+ .build();
+ return response;
+ }
}
diff --git a/client/src/test/java/org/spine3/protobuf/DurationsShould.java b/client/src/test/java/org/spine3/protobuf/DurationsShould.java
index dad06a894b1..34eb0fa4624 100644
--- a/client/src/test/java/org/spine3/protobuf/DurationsShould.java
+++ b/client/src/test/java/org/spine3/protobuf/DurationsShould.java
@@ -21,9 +21,9 @@
import com.google.protobuf.Duration;
+import com.google.protobuf.util.TimeUtil;
import org.junit.Test;
-import static com.google.protobuf.Duration.newBuilder;
import static org.junit.Assert.*;
import static org.spine3.protobuf.Durations.*;
import static org.spine3.test.Tests.hasPrivateUtilityConstructor;
@@ -46,6 +46,25 @@ public void have_ZERO_constant() {
}
+ @Test
+ public void convert_milliseconds_to_duration() {
+ convertMillisecondsToDurationTest(0);
+ convertMillisecondsToDurationTest(27);
+ convertMillisecondsToDurationTest(-384);
+ }
+
+ private static void convertMillisecondsToDurationTest(long millis) {
+ final Duration expected = TimeUtil.createDurationFromMillis(millis);
+ assertEquals(expected, ofMilliseconds(millis));
+ assertEquals(expected, milliseconds(millis));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void fail_to_convert_milliseconds_to_duration_if_input_is_too_big() {
+ ofMilliseconds(Long.MAX_VALUE);
+ }
+
+
@Test
public void convert_seconds_to_duration() {
convertSecondsToDurationTest(0);
@@ -384,6 +403,6 @@ private static long minutesToSeconds(long minutes) {
}
private static Duration durationFromSec(long seconds) {
- return newBuilder().setSeconds(seconds).build();
+ return Duration.newBuilder().setSeconds(seconds).build();
}
}
diff --git a/server/src/test/java/org/spine3/testdata/TestContextFactory.java b/client/src/test/java/org/spine3/testdata/TestContextFactory.java
similarity index 82%
rename from server/src/test/java/org/spine3/testdata/TestContextFactory.java
rename to client/src/test/java/org/spine3/testdata/TestContextFactory.java
index e16bf6b3ccc..370e15000ab 100644
--- a/server/src/test/java/org/spine3/testdata/TestContextFactory.java
+++ b/client/src/test/java/org/spine3/testdata/TestContextFactory.java
@@ -21,6 +21,7 @@
package org.spine3.testdata;
import com.google.protobuf.Any;
+import com.google.protobuf.Duration;
import com.google.protobuf.Message;
import com.google.protobuf.Timestamp;
import org.spine3.base.CommandContext;
@@ -29,6 +30,7 @@
import org.spine3.base.EventContext;
import org.spine3.base.EventId;
import org.spine3.base.Events;
+import org.spine3.base.Schedule;
import org.spine3.base.UserId;
import org.spine3.time.ZoneOffset;
@@ -36,7 +38,7 @@
import static org.spine3.base.Identifiers.newUuid;
import static org.spine3.client.UserUtil.newUserId;
import static org.spine3.protobuf.Messages.toAny;
-import static org.spine3.testdata.TestAggregateIdFactory.newProjectId;
+import static org.spine3.protobuf.Values.newStringValue;
/**
@@ -47,7 +49,7 @@
@SuppressWarnings({"UtilityClass", "OverloadedMethodsWithSameNumberOfParameters"})
public class TestContextFactory {
- private static final Any AGGREGATE_ID = toAny(newProjectId());
+ private static final Any AGGREGATE_ID = toAny(newStringValue(newUuid()));
private TestContextFactory() {
}
@@ -74,6 +76,29 @@ public static CommandContext createCommandContext(UserId userId, CommandId comma
return builder.build();
}
+ /**
+ * Creates a new context with the given delay before the delivery time.
+ */
+ public static CommandContext createCommandContext(Duration delay) {
+ final Schedule schedule = Schedule.newBuilder()
+ .setAfter(delay)
+ .build();
+ return createCommandContext(schedule);
+ }
+
+ /**
+ * Creates a new context with the given scheduling options.
+ */
+ public static CommandContext createCommandContext(Schedule schedule) {
+ final CommandContext.Builder builder = createCommandContext().toBuilder()
+ .setSchedule(schedule);
+ return builder.build();
+ }
+
+ /*
+ * Event context factory methods.
+ */
+
/**
* Creates a new {@link EventContext} with default properties.
*/
diff --git a/examples/src/main/java/org/spine3/examples/aggregate/server/Application.java b/examples/src/main/java/org/spine3/examples/aggregate/server/Application.java
index d6898b6abc6..6211a10d78b 100644
--- a/examples/src/main/java/org/spine3/examples/aggregate/server/Application.java
+++ b/examples/src/main/java/org/spine3/examples/aggregate/server/Application.java
@@ -31,6 +31,7 @@
import org.spine3.server.command.CommandBus;
import org.spine3.server.command.CommandStore;
import org.spine3.server.event.EventBus;
+import org.spine3.server.event.EventHandler;
import org.spine3.server.event.EventStore;
import org.spine3.server.storage.StorageFactory;
import org.spine3.server.storage.memory.InMemoryStorageFactory;
@@ -48,7 +49,7 @@ public class Application implements AutoCloseable {
private final StorageFactory storageFactory;
private final BoundedContext boundedContext;
- private final EventLogger eventLogger = new EventLogger();
+ private final EventHandler eventLogger = new EventLogger();
/**
* Creates a new sample with the specified storage factory.
@@ -65,7 +66,11 @@ public Application(StorageFactory storageFactory) {
}
private static CommandBus createCommandBus() {
- return CommandBus.create(new CommandStore(InMemoryStorageFactory.getInstance().createCommandStorage()));
+ final CommandStore store = new CommandStore(InMemoryStorageFactory.getInstance().createCommandStorage());
+ final CommandBus commandBus = CommandBus.newBuilder()
+ .setCommandStore(store)
+ .build();
+ return commandBus;
}
private static EventBus createEventBus(StorageFactory storageFactory) {
@@ -74,7 +79,6 @@ private static EventBus createEventBus(StorageFactory storageFactory) {
.setStorage(storageFactory.createEventStorage())
.setLogger(EventStore.log())
.build();
-
return EventBus.newInstance(eventStore);
}
diff --git a/server/src/main/java/org/spine3/server/aggregate/Aggregate.java b/server/src/main/java/org/spine3/server/aggregate/Aggregate.java
index 88f73600811..a4c13c28dc5 100644
--- a/server/src/main/java/org/spine3/server/aggregate/Aggregate.java
+++ b/server/src/main/java/org/spine3/server/aggregate/Aggregate.java
@@ -47,6 +47,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
+import static java.util.Collections.singletonList;
import static org.spine3.base.Identifiers.idToAny;
/**
@@ -229,7 +230,7 @@ private void updateState() {
* which is called automatically by {@link AggregateRepository}.
*/
@VisibleForTesting
- protected final void testDispatch(Message command, CommandContext context) {
+ public final void dispatchForTest(Message command, CommandContext context) {
dispatch(command, context);
}
@@ -314,6 +315,32 @@ private void apply(Iterable extends Message> messages, CommandContext commandC
}
}
+ /**
+ * This method is provided only for the purpose of testing event appliers
+ * of an aggregate and must not be called from the production code.
+ *
+ * Calls {@link #apply(Iterable, CommandContext)}.
+ */
+ @VisibleForTesting
+ public final void applyForTest(Message message, CommandContext commandContext) {
+ try {
+ apply(singletonList(message), commandContext);
+ } catch (InvocationTargetException e) {
+ throw propagate(e);
+ }
+ }
+
+ /**
+ * This method is provided only for the purpose of testing an aggregate and
+ * must not be called from the production code.
+ *
+ *
Calls {@link #incrementState(Message)}.
+ */
+ @VisibleForTesting
+ public final void incrementStateForTest(S newState) {
+ incrementState(newState);
+ }
+
/**
* Applies an event to the aggregate.
*
diff --git a/server/src/main/java/org/spine3/server/command/CommandBus.java b/server/src/main/java/org/spine3/server/command/CommandBus.java
index 4062b3d0d43..b54da078e83 100644
--- a/server/src/main/java/org/spine3/server/command/CommandBus.java
+++ b/server/src/main/java/org/spine3/server/command/CommandBus.java
@@ -20,6 +20,7 @@
package org.spine3.server.command;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.protobuf.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@
import org.spine3.server.validate.MessageValidator;
import org.spine3.validate.options.ConstraintViolation;
-import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
@@ -43,12 +44,14 @@
import static org.spine3.base.Commands.*;
import static org.spine3.server.command.CommandValidation.invalidCommand;
import static org.spine3.server.command.CommandValidation.unsupportedCommand;
+import static org.spine3.validate.Validate.checkNotDefault;
/**
* Dispatches the incoming commands to the corresponding handler.
*
* @author Alexander Yevsyukov
* @author Mikhail Melnik
+ * @author Alexander Litus
*/
public class CommandBus implements AutoCloseable {
@@ -57,18 +60,28 @@ public class CommandBus implements AutoCloseable {
private final CommandStore commandStore;
- private final CommandStatusService commandStatusService;
-
+ @Nullable
+ private final CommandScheduler scheduler;
+
private ProblemLog problemLog = new ProblemLog();
+ private final CommandStatusService commandStatusService;
+ private MessageValidator messageValidator;
- @CheckReturnValue
- public static CommandBus create(CommandStore store) {
- return new CommandBus(checkNotNull(store));
+ private CommandBus(Builder builder) {
+ commandStore = builder.getCommandStore();
+ scheduler = builder.getScheduler();
+ if (scheduler != null) {
+ scheduler.setPostFunction(newPostFunction());
+ }
+ commandStatusService = new CommandStatusService(commandStore);
+ messageValidator = new MessageValidator();
}
- protected CommandBus(CommandStore commandStore) {
- this.commandStore = commandStore;
- this.commandStatusService = new CommandStatusService(commandStore);
+ /**
+ * Creates a new builder for command bus.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
}
/**
@@ -129,13 +142,12 @@ public void unregister(CommandHandler handler) {
* {@link CommandValidation#unsupportedCommand(Message)} otherwise
*/
public Response validate(Message command) {
- checkNotNull(command);
+ checkNotDefault(command);
final CommandClass commandClass = CommandClass.of(command);
if (isUnsupportedCommand(commandClass)) {
return unsupportedCommand(command);
}
- final MessageValidator validator = new MessageValidator();
- final List violations = validator.validate(command);
+ final List violations = messageValidator.validate(command);
if (!violations.isEmpty()) {
return invalidCommand(command, violations);
}
@@ -150,30 +162,29 @@ private boolean isUnsupportedCommand(CommandClass commandClass) {
/**
* Directs a command request to the corresponding handler.
*
- * @param request the command request to be processed
+ * @param command the command to be processed
* @throws UnsupportedCommandException if there is neither handler nor dispatcher registered for
* the class of the passed command
*/
- public void post(Command request) {
- checkNotNull(request);
-
- store(request);
-
- final CommandClass commandClass = CommandClass.of(request);
-
+ public void post(Command command) {
+ checkNotDefault(command);
+ if (isScheduled(command)) {
+ schedule(command);
+ return;
+ }
+ store(command);
+ final CommandClass commandClass = CommandClass.of(command);
if (isDispatcherRegistered(commandClass)) {
- dispatch(request);
+ dispatch(command);
return;
}
-
if (isHandlerRegistered(commandClass)) {
- final Message command = getMessage(request);
- final CommandContext context = request.getContext();
- invokeHandler(command, context);
+ final Message message = getMessage(command);
+ final CommandContext context = command.getContext();
+ invokeHandler(message, context);
return;
}
-
- throw new UnsupportedCommandException(getMessage(request));
+ throw new UnsupportedCommandException(getMessage(command));
}
/**
@@ -187,7 +198,6 @@ private void dispatch(Command command) {
final CommandClass commandClass = CommandClass.of(command);
final CommandDispatcher dispatcher = getDispatcher(commandClass);
final CommandId commandId = command.getContext().getCommandId();
-
try {
dispatcher.dispatch(command);
} catch (Exception e) {
@@ -202,9 +212,7 @@ private void invokeHandler(Message msg, CommandContext context) {
final CommandId commandId = context.getCommandId();
try {
handler.handle(msg, context);
-
commandStatusService.setOk(commandId);
-
} catch (InvocationTargetException e) {
final Throwable cause = e.getCause();
//noinspection ChainOfInstanceofChecks
@@ -228,9 +236,13 @@ private CommandHandler getHandler(CommandClass commandClass) {
return handler;
}
- @VisibleForTesting
- /* package */ void setProblemLog(ProblemLog problemLog) {
- this.problemLog = problemLog;
+ private void schedule(Command command) {
+ if (scheduler != null) {
+ scheduler.schedule(command);
+ } else {
+ throw new IllegalStateException(
+ "Scheduled commands are not supported by this command bus: scheduler is not set.");
+ }
}
/**
@@ -262,8 +274,13 @@ private CommandHandler getHandler(CommandClass commandClass) {
}
@VisibleForTesting
- /* package */ ProblemLog getProblemLog() {
- return problemLog;
+ /* package */ void setProblemLog(ProblemLog problemLog) {
+ this.problemLog = problemLog;
+ }
+
+ @VisibleForTesting
+ /* package */ void setMessageValidator(MessageValidator messageValidator) {
+ this.messageValidator = messageValidator;
}
private void store(Command request) {
@@ -284,16 +301,31 @@ private CommandDispatcher getDispatcher(CommandClass commandClass) {
return dispatcherRegistry.getDispatcher(commandClass);
}
+ private Function newPostFunction() {
+ final Function function = new Function() {
+ @Nullable
+ @Override
+ public Command apply(@Nullable Command command) {
+ //noinspection ConstantConditions
+ post(command);
+ return command;
+ }
+ };
+ return function;
+ }
+
@Override
public void close() throws Exception {
dispatcherRegistry.unregisterAll();
handlerRegistry.unregisterAll();
commandStore.close();
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
}
private enum LogSingleton {
INSTANCE;
-
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Logger value = LoggerFactory.getLogger(CommandBus.class);
}
@@ -305,4 +337,38 @@ private enum LogSingleton {
/* package */ static Logger log() {
return LogSingleton.INSTANCE.value;
}
+
+ /**
+ * Constructs a command bus.
+ */
+ public static class Builder {
+
+ private CommandStore commandStore;
+ private CommandScheduler scheduler;
+
+ public CommandBus build() {
+ checkNotNull(commandStore, "Command store must be set.");
+ final CommandBus commandBus = new CommandBus(this);
+ return commandBus;
+ }
+
+ public Builder setCommandStore(CommandStore commandStore) {
+ this.commandStore = commandStore;
+ return this;
+ }
+
+ public CommandStore getCommandStore() {
+ return commandStore;
+ }
+
+ public Builder setScheduler(CommandScheduler scheduler) {
+ this.scheduler = scheduler;
+ return this;
+ }
+
+ @Nullable
+ public CommandScheduler getScheduler() {
+ return scheduler;
+ }
+ }
}
diff --git a/server/src/main/java/org/spine3/server/command/CommandScheduler.java b/server/src/main/java/org/spine3/server/command/CommandScheduler.java
new file mode 100644
index 00000000000..45d4d23d4a5
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/command/CommandScheduler.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.server.command;
+
+import com.google.common.base.Function;
+import org.spine3.base.Command;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Schedules commands delivering them to the target according to the scheduling options.
+ *
+ * @author Alexander Litus
+ */
+public abstract class CommandScheduler {
+
+ private boolean isActive = true;
+
+ private Function postFunction;
+
+ /**
+ * Schedule a command and deliver it to the target according to the scheduling options.
+ *
+ * NOTE: check if the command is scheduled already.
+ *
+ * @param command a command to deliver later
+ * @throws IllegalStateException if the scheduler is shut down
+ * @see #post(Command)
+ */
+ public void schedule(Command command) {
+ checkState(isActive, "Scheduler is shut down.");
+ }
+
+ /**
+ * Initiates an orderly shutdown in which previously scheduled commands will be delivered later,
+ * but no new commands will be accepted.
+ *
+ *
Invocation has no effect if the scheduler is already shut down.
+ */
+ public void shutdown() {
+ isActive = false;
+ }
+
+ /**
+ * Delivers a scheduled command to a target.
+ *
+ * @param command a command to deliver
+ */
+ protected void post(Command command) {
+ postFunction.apply(command);
+ }
+
+ /**
+ * Sets a function used to post scheduled commands.
+ */
+ /* package */ void setPostFunction(Function postFunction) {
+ this.postFunction = postFunction;
+ }
+}
diff --git a/server/src/main/java/org/spine3/server/command/ExecutorCommandScheduler.java b/server/src/main/java/org/spine3/server/command/ExecutorCommandScheduler.java
new file mode 100644
index 00000000000..21c2fcde3c2
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/command/ExecutorCommandScheduler.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.server.command;
+
+import org.spine3.base.Command;
+import org.spine3.base.CommandId;
+import org.spine3.base.Schedule;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * The command scheduler implementation which uses basic Java task scheduling features.
+ *
+ * NOTE: please use another implementation
+ * in applications running under the Google App Engine.
+ *
+ * @see ScheduledExecutorService
+ * @author Alexander Litus
+ */
+public class ExecutorCommandScheduler extends CommandScheduler {
+
+ private static final int MIN_THEAD_POOL_SIZE = 5;
+
+ private static final Set SCHEDULED_COMMAND_IDS = newHashSet();
+
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(MIN_THEAD_POOL_SIZE);
+
+ @Override
+ public void schedule(final Command command) {
+ super.schedule(command);
+ if (isScheduledAlready(command)) {
+ return;
+ }
+ final long delaySec = getDelaySeconds(command);
+ executorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ post(command);
+ }
+ }, delaySec, SECONDS);
+ SCHEDULED_COMMAND_IDS.add(command.getContext().getCommandId());
+ }
+
+ private static boolean isScheduledAlready(Command command) {
+ final CommandId id = command.getContext().getCommandId();
+ final boolean isScheduledAlready = SCHEDULED_COMMAND_IDS.contains(id);
+ return isScheduledAlready;
+ }
+
+ private static long getDelaySeconds(Command command) {
+ final Schedule schedule = command.getContext().getSchedule();
+ final long delaySec = schedule.getAfter().getSeconds();
+ return delaySec;
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ executorService.shutdown();
+ }
+}
diff --git a/server/src/main/java/org/spine3/server/reflect/MethodMap.java b/server/src/main/java/org/spine3/server/reflect/MethodMap.java
index 4d8b7932690..e097bb4065d 100644
--- a/server/src/main/java/org/spine3/server/reflect/MethodMap.java
+++ b/server/src/main/java/org/spine3/server/reflect/MethodMap.java
@@ -93,9 +93,8 @@ private static Map, Method> scan(Class> declaringClas
tempMap.put(messageClass, method);
}
}
- final ImmutableMap.Builder, Method> builder = ImmutableMap.builder();
- builder.putAll(tempMap);
- return builder.build();
+ final ImmutableMap, Method> result = ImmutableMap.copyOf(tempMap);
+ return result;
}
/**
diff --git a/server/src/test/java/org/spine3/server/BoundedContextBuilderShould.java b/server/src/test/java/org/spine3/server/BoundedContextBuilderShould.java
index 8ca78e80a7e..92a6ebbed5b 100644
--- a/server/src/test/java/org/spine3/server/BoundedContextBuilderShould.java
+++ b/server/src/test/java/org/spine3/server/BoundedContextBuilderShould.java
@@ -20,18 +20,17 @@
package org.spine3.server;
-import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.spine3.server.command.CommandBus;
-import org.spine3.server.command.CommandStore;
import org.spine3.server.event.EventBus;
-import org.spine3.server.event.EventStore;
import org.spine3.server.storage.StorageFactory;
import org.spine3.server.storage.memory.InMemoryStorageFactory;
import static org.junit.Assert.*;
+import static org.spine3.testdata.TestCommands.newCommandBus;
+import static org.spine3.testdata.TestEventFactory.newEventBus;
@SuppressWarnings("InstanceMethodNamingConvention")
public class BoundedContextBuilderShould {
@@ -48,17 +47,6 @@ public void tearDown() throws Exception {
storageFactory.close();
}
- private static CommandBus newCommandDispatcher(StorageFactory storageFactory) {
- return CommandBus.create(new CommandStore(storageFactory.createCommandStorage()));
- }
-
- private static EventBus newEventBus(StorageFactory storageFactory) {
- return EventBus.newInstance(EventStore.newBuilder()
- .setStreamExecutor(MoreExecutors.directExecutor())
- .setStorage(storageFactory.createEventStorage())
- .build());
- }
-
@Test(expected = NullPointerException.class)
public void do_not_accept_null_StorageFactory() {
//noinspection ConstantConditions
@@ -80,7 +68,7 @@ public void do_not_accept_null_CommandDispatcher() {
@Test
public void return_CommandDispatcher_from_builder() {
- final CommandBus expected = newCommandDispatcher(storageFactory);
+ final CommandBus expected = newCommandBus(storageFactory);
final BoundedContext.Builder builder = BoundedContext.newBuilder().setCommandBus(expected);
assertEquals(expected, builder.getCommandBus());
}
@@ -101,7 +89,8 @@ public void return_if_multitenant_from_builder() {
@Test
public void be_not_multitenant_by_default() {
- assertFalse(BoundedContext.newBuilder().isMultitenant());
+ assertFalse(BoundedContext.newBuilder()
+ .isMultitenant());
}
@Test(expected = NullPointerException.class)
diff --git a/server/src/test/java/org/spine3/server/BoundedContextShould.java b/server/src/test/java/org/spine3/server/BoundedContextShould.java
index 27369164de5..56b9d0dfb24 100644
--- a/server/src/test/java/org/spine3/server/BoundedContextShould.java
+++ b/server/src/test/java/org/spine3/server/BoundedContextShould.java
@@ -20,7 +20,6 @@
package org.spine3.server;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.protobuf.Message;
@@ -40,12 +39,8 @@
import org.spine3.server.aggregate.AggregateRepository;
import org.spine3.server.aggregate.Apply;
import org.spine3.server.command.Assign;
-import org.spine3.server.command.CommandBus;
-import org.spine3.server.command.CommandStore;
import org.spine3.server.entity.IdFunction;
-import org.spine3.server.event.EventBus;
import org.spine3.server.event.EventHandler;
-import org.spine3.server.event.EventStore;
import org.spine3.server.event.GetProducerIdFromEvent;
import org.spine3.server.event.Subscribe;
import org.spine3.server.procman.CommandRouted;
@@ -74,6 +69,8 @@
import static org.spine3.base.Identifiers.newUuid;
import static org.spine3.client.UserUtil.newUserId;
import static org.spine3.testdata.TestCommands.createProject;
+import static org.spine3.testdata.TestCommands.newCommandBus;
+import static org.spine3.testdata.TestEventFactory.newEventBus;
import static org.spine3.testdata.TestEventMessageFactory.*;
/**
@@ -110,17 +107,6 @@ public void setUp() {
boundedContext = BoundedContextTestStubs.create(storageFactory);
}
- private static EventBus newEventBus(StorageFactory storageFactory) {
- return EventBus.newInstance(EventStore.newBuilder()
- .setStreamExecutor(MoreExecutors.directExecutor())
- .setStorage(storageFactory.createEventStorage())
- .build());
- }
-
- private static CommandBus newCommandBus(StorageFactory storageFactory) {
- return CommandBus.create(new CommandStore(storageFactory.createCommandStorage()));
- }
-
@After
public void tearDown() throws Exception {
if (handlersRegistered) {
diff --git a/server/src/test/java/org/spine3/server/BoundedContextTestStubs.java b/server/src/test/java/org/spine3/server/BoundedContextTestStubs.java
index d728f5d43a7..98da6e59f80 100644
--- a/server/src/test/java/org/spine3/server/BoundedContextTestStubs.java
+++ b/server/src/test/java/org/spine3/server/BoundedContextTestStubs.java
@@ -20,15 +20,14 @@
package org.spine3.server;
-import com.google.common.util.concurrent.MoreExecutors;
import org.spine3.server.command.CommandBus;
-import org.spine3.server.command.CommandStore;
import org.spine3.server.event.EventBus;
-import org.spine3.server.event.EventStore;
import org.spine3.server.storage.StorageFactory;
import org.spine3.server.storage.memory.InMemoryStorageFactory;
import static org.mockito.Mockito.spy;
+import static org.spine3.testdata.TestCommands.newCommandBus;
+import static org.spine3.testdata.TestEventFactory.newEventBus;
/**
* Creates stubs with instances of {@link BoundedContext} for testing purposes.
@@ -43,21 +42,14 @@ public static BoundedContext create() {
}
public static BoundedContext create(StorageFactory storageFactory) {
- final CommandBus commandBus = CommandBus.create(
- new CommandStore(storageFactory.createCommandStorage()));
-
- final EventBus eventBus = EventBus.newInstance(EventStore.newBuilder()
- .setStreamExecutor(MoreExecutors.directExecutor())
- .setStorage(storageFactory.createEventStorage())
- .build());
-
- return BoundedContext.newBuilder()
+ final CommandBus commandBus = newCommandBus(storageFactory);
+ final EventBus eventBus = newEventBus(storageFactory);
+ final BoundedContext.Builder builder = BoundedContext.newBuilder()
.setStorageFactory(storageFactory)
.setCommandBus(commandBus)
- .setEventBus(spy(eventBus))
- .build();
+ .setEventBus(spy(eventBus));
+ return builder.build();
}
- private BoundedContextTestStubs() {
- }
+ private BoundedContextTestStubs() {}
}
diff --git a/server/src/test/java/org/spine3/server/aggregate/AggregateShould.java b/server/src/test/java/org/spine3/server/aggregate/AggregateShould.java
index 72ce64cf399..54218c5f365 100644
--- a/server/src/test/java/org/spine3/server/aggregate/AggregateShould.java
+++ b/server/src/test/java/org/spine3/server/aggregate/AggregateShould.java
@@ -146,7 +146,7 @@ public void not_accept_to_constructor_id_of_unsupported_type() {
@Test
public void handle_one_command_and_apply_appropriate_event() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
assertTrue(aggregate.isCreateProjectCommandHandled);
assertTrue(aggregate.isProjectCreatedEventApplied);
@@ -154,7 +154,7 @@ public void handle_one_command_and_apply_appropriate_event() {
@Test
public void handle_only_appropriate_command() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
assertTrue(aggregate.isCreateProjectCommandHandled);
assertTrue(aggregate.isProjectCreatedEventApplied);
@@ -168,15 +168,15 @@ public void handle_only_appropriate_command() {
@Test
public void handle_appropriate_commands_sequentially() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
assertTrue(aggregate.isCreateProjectCommandHandled);
assertTrue(aggregate.isProjectCreatedEventApplied);
- aggregate.testDispatch(addTask, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(addTask, COMMAND_CONTEXT);
assertTrue(aggregate.isAddTaskCommandHandled);
assertTrue(aggregate.isTaskAddedEventApplied);
- aggregate.testDispatch(startProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(startProject, COMMAND_CONTEXT);
assertTrue(aggregate.isStartProjectCommandHandled);
assertTrue(aggregate.isProjectStartedEventApplied);
}
@@ -186,7 +186,7 @@ public void throw_exception_if_missing_command_handler() {
final TestAggregateForCaseMissingHandlerOrApplier aggregate =
new TestAggregateForCaseMissingHandlerOrApplier(ID);
- aggregate.testDispatch(addTask, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(addTask, COMMAND_CONTEXT);
}
@Test(expected = IllegalStateException.class)
@@ -194,7 +194,7 @@ public void throw_exception_if_missing_event_applier_for_non_state_neutral_event
final TestAggregateForCaseMissingHandlerOrApplier aggregate =
new TestAggregateForCaseMissingHandlerOrApplier(ID);
try {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
} catch (IllegalStateException e) { // expected exception
assertTrue(aggregate.isCreateProjectCommandHandled);
throw e;
@@ -228,7 +228,7 @@ public void return_default_state_by_default() {
@Test
public void return_current_state_after_dispatch() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
final Project state = aggregate.getState();
@@ -238,10 +238,10 @@ public void return_current_state_after_dispatch() {
@Test
public void return_current_state_after_several_dispatches() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
assertEquals(TestAggregate.STATUS_NEW, aggregate.getState().getStatus());
- aggregate.testDispatch(startProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(startProject, COMMAND_CONTEXT);
assertEquals(TestAggregate.STATUS_STARTED, aggregate.getState().getStatus());
}
@@ -253,7 +253,7 @@ public void return_non_null_time_when_was_last_modified() {
@Test
public void return_time_when_was_last_modified() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
final long expectedTimeSec = currentTimeSeconds();
final Timestamp whenLastModified = aggregate.whenModified();
@@ -272,11 +272,11 @@ public void play_events() {
@Test
public void play_snapshot_event_and_restore_state() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
final Snapshot snapshotNewProject = aggregate.toSnapshot();
- aggregate.testDispatch(startProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(startProject, COMMAND_CONTEXT);
assertEquals(TestAggregate.STATUS_STARTED, aggregate.getState().getStatus());
final List events = newArrayList(snapshotToEvent(snapshotNewProject));
@@ -332,7 +332,7 @@ public void clear_event_records_when_commit_after_dispatch() {
@Test
public void transform_current_state_to_snapshot_event() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
final Snapshot snapshot = aggregate.toSnapshot();
final Project state = fromAny(snapshot.getState());
@@ -344,11 +344,11 @@ public void transform_current_state_to_snapshot_event() {
@Test
public void restore_state_from_snapshot() {
- aggregate.testDispatch(createProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(createProject, COMMAND_CONTEXT);
final Snapshot snapshotNewProject = aggregate.toSnapshot();
- aggregate.testDispatch(startProject, COMMAND_CONTEXT);
+ aggregate.dispatchForTest(startProject, COMMAND_CONTEXT);
assertEquals(TestAggregate.STATUS_STARTED, aggregate.getState().getStatus());
aggregate.restore(snapshotNewProject);
@@ -460,7 +460,7 @@ public void dispatchCommands(Message... commands) {
final UserId userId = UserUtil.newUserId("aggregate_should@spine3.org");
for (Message cmd : commands) {
final CommandContext ctx = Commands.createContext(userId, ZoneOffsets.UTC);
- testDispatch(cmd, ctx);
+ dispatchForTest(cmd, ctx);
}
}
}
@@ -582,7 +582,7 @@ public void propagate_RuntimeException_when_handler_throws() {
final Command command = createProject();
try {
- faultyAggregate.testDispatch(command.getMessage(), command.getContext());
+ faultyAggregate.dispatchForTest(command.getMessage(), command.getContext());
} catch (RuntimeException e) {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // We need it for checking.
final Throwable cause = Throwables.getRootCause(e);
@@ -597,7 +597,7 @@ public void propagate_RuntimeException_when_applier_throws() {
final Command command = createProject();
try {
- faultyAggregate.testDispatch(command.getMessage(), command.getContext());
+ faultyAggregate.dispatchForTest(command.getMessage(), command.getContext());
} catch (RuntimeException e) {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored") // ... because we need it for checking.
final Throwable cause = Throwables.getRootCause(e);
diff --git a/server/src/test/java/org/spine3/server/command/CommandBusShould.java b/server/src/test/java/org/spine3/server/command/CommandBusShould.java
index 0b986ae9fca..4f70704a8da 100644
--- a/server/src/test/java/org/spine3/server/command/CommandBusShould.java
+++ b/server/src/test/java/org/spine3/server/command/CommandBusShould.java
@@ -20,6 +20,7 @@
package org.spine3.server.command;
+import com.google.protobuf.Duration;
import com.google.protobuf.Message;
import org.junit.Before;
import org.junit.Test;
@@ -28,31 +29,41 @@
import org.spine3.base.CommandId;
import org.spine3.base.Commands;
import org.spine3.base.Errors;
+import org.spine3.base.Response;
import org.spine3.base.Responses;
import org.spine3.client.CommandFactory;
import org.spine3.client.test.TestCommandFactory;
import org.spine3.server.error.UnsupportedCommandException;
import org.spine3.server.event.EventBus;
import org.spine3.server.failure.FailureThrowable;
+import org.spine3.server.storage.memory.InMemoryStorageFactory;
import org.spine3.server.type.CommandClass;
+import org.spine3.server.validate.MessageValidator;
import org.spine3.test.failures.Failures;
import org.spine3.test.project.command.AddTask;
import org.spine3.test.project.command.CreateProject;
import org.spine3.test.project.command.StartProject;
import org.spine3.test.project.event.ProjectCreated;
+import org.spine3.testdata.TestEventFactory;
+import org.spine3.validate.options.ConstraintViolation;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Set;
+import static com.google.common.collect.Lists.newArrayList;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
import static org.spine3.base.Identifiers.newUuid;
+import static org.spine3.base.Responses.isInvalidCommand;
import static org.spine3.base.Responses.isUnsupportedCommand;
+import static org.spine3.protobuf.Durations.milliseconds;
+import static org.spine3.protobuf.Durations.minutes;
import static org.spine3.testdata.TestCommands.*;
+import static org.spine3.testdata.TestContextFactory.createCommandContext;
-@SuppressWarnings({"InstanceMethodNamingConvention", "ClassWithTooManyMethods"})
+@SuppressWarnings({"InstanceMethodNamingConvention", "ClassWithTooManyMethods", "OverlyCoupledClass"})
public class CommandBusShould {
private CommandBus commandBus;
@@ -60,22 +71,28 @@ public class CommandBusShould {
private CommandFactory commandFactory;
private CommandBus.ProblemLog log;
private EventBus eventBus;
+ private ExecutorCommandScheduler scheduler;
+ private CreateProjectHandler handler;
@Before
public void setUp() {
- commandStore = mock(CommandStore.class);
-
- commandBus = CommandBus.create(commandStore);
- log = mock(CommandBus.ProblemLog.class);
+ final InMemoryStorageFactory storageFactory = InMemoryStorageFactory.getInstance();
+ commandStore = spy(new CommandStore(storageFactory.createCommandStorage()));
+ scheduler = spy(new ExecutorCommandScheduler());
+ commandBus = newCommandBus(commandStore, scheduler);
+ log = spy(new CommandBus.ProblemLog());
commandBus.setProblemLog(log);
- eventBus = mock(EventBus.class);
+ eventBus = spy(TestEventFactory.newEventBus(storageFactory));
commandFactory = TestCommandFactory.newInstance(CommandBusShould.class);
+ handler = new CreateProjectHandler(newUuid(), eventBus);
}
@Test(expected = NullPointerException.class)
public void do_not_accept_null_CommandStore_on_construction() {
//noinspection ConstantConditions,ResultOfMethodCallIgnored
- CommandBus.create(null);
+ CommandBus.newBuilder()
+ .setCommandStore(null)
+ .build();
}
//
@@ -100,13 +117,13 @@ public void do_not_accept_empty_dispatchers() {
@Test(expected = IllegalArgumentException.class)
public void do_not_accept_command_handlers_without_methods() {
- commandBus.register(new EmptyCommandHandler(eventBus));
+ commandBus.register(new EmptyCommandHandler(newUuid(), eventBus));
}
@SuppressWarnings("EmptyClass")
private static class EmptyCommandHandler extends CommandHandler {
- private EmptyCommandHandler(EventBus eventBus) {
- super(newUuid(), eventBus);
+ protected EmptyCommandHandler(String id, EventBus eventBus) {
+ super(id, eventBus);
}
}
@@ -179,43 +196,47 @@ public void dispatch(Command request) throws Exception {
assertTrue(isUnsupportedCommand(commandBus.validate(addTask(projectId))));
}
+ @Test
+ public void return_invalid_command_response_if_command_is_invalid() {
+ commandBus.register(handler);
+ final MessageValidator validator = mock(MessageValidator.class);
+ doReturn(newArrayList(ConstraintViolation.getDefaultInstance()))
+ .when(validator)
+ .validate(any(Message.class));
+ commandBus.setMessageValidator(validator);
+
+ final Response response = commandBus.validate(createProject(newUuid()));
+ assertTrue(isInvalidCommand(response));
+ }
+
//
// Tests for not overriding handlers by dispatchers and vice versa
//-------------------------------------------------------------------
@Test(expected = IllegalArgumentException.class)
public void do_not_allow_to_register_dispatcher_for_the_command_with_registered_handler() {
-
- final CommandHandler createProjectHandler = new CreateProjectHandler(eventBus);
final CommandDispatcher createProjectDispatcher = new CreateProjectDispatcher();
-
- commandBus.register(createProjectHandler);
-
+ commandBus.register(handler);
commandBus.register(createProjectDispatcher);
}
@Test(expected = IllegalArgumentException.class)
public void do_not_allow_to_register_handler_for_the_command_with_registered_dispatcher() {
-
- final CommandHandler createProjectHandler = new CreateProjectHandler(eventBus);
final CommandDispatcher createProjectDispatcher = new CreateProjectDispatcher();
-
commandBus.register(createProjectDispatcher);
-
- commandBus.register(createProjectHandler);
+ commandBus.register(handler);
}
private static class CreateProjectHandler extends CommandHandler {
private boolean handlerInvoked = false;
- private CreateProjectHandler(EventBus eventBus) {
- super(newUuid(), eventBus);
+ protected CreateProjectHandler(String id, EventBus eventBus) {
+ super(id, eventBus);
}
@Assign
- public ProjectCreated handle(CreateProject command, CommandContext ctx)
- throws TestFailure, TestThrowable {
+ public ProjectCreated handle(CreateProject command, CommandContext ctx) throws TestFailure, TestThrowable {
handlerInvoked = true;
return ProjectCreated.getDefaultInstance();
}
@@ -238,7 +259,6 @@ public void dispatch(Command request) throws Exception {
@Test
public void unregister_handler() {
- final CreateProjectHandler handler = new CreateProjectHandler(eventBus);
commandBus.register(handler);
commandBus.unregister(handler);
final String projectId = newUuid();
@@ -247,8 +267,7 @@ public void unregister_handler() {
@Test
public void validate_commands_both_dispatched_and_handled() {
- final CreateProjectHandler handler = new CreateProjectHandler(eventBus);
- final AddTaskDispatcher dispatcher = new AddTaskDispatcher();
+ final CommandDispatcher dispatcher = new AddTaskDispatcher();
commandBus.register(handler);
commandBus.register(dispatcher);
@@ -281,6 +300,11 @@ public void have_log() {
assertNotNull(CommandBus.log());
}
+ @Test // To improve coverage stats.
+ public void have_command_status_service() {
+ assertNotNull(commandBus.getCommandStatusService());
+ }
+
@Test
public void close_CommandStore_when_closed() throws Exception {
commandBus.close();
@@ -288,9 +312,15 @@ public void close_CommandStore_when_closed() throws Exception {
verify(commandStore, times(1)).close();
}
+ @Test
+ public void shutdown_CommandScheduler_when_closed() throws Exception {
+ commandBus.close();
+
+ verify(scheduler, times(1)).shutdown();
+ }
+
@Test
public void remove_all_handlers_on_close() throws Exception {
- final CreateProjectHandler handler = new CreateProjectHandler(eventBus);
commandBus.register(handler);
commandBus.close();
@@ -299,7 +329,6 @@ public void remove_all_handlers_on_close() throws Exception {
@Test
public void invoke_handler_when_command_posted() {
- final CreateProjectHandler handler = new CreateProjectHandler(eventBus);
commandBus.register(handler);
final Command command = commandFactory.create(createProject(newUuid()));
@@ -329,25 +358,19 @@ public void throw_exception_when_there_is_no_neither_handler_nor_dispatcher() {
@Test
public void set_command_status_to_OK_when_handler_returns() {
- final CreateProjectHandler handler = new CreateProjectHandler(eventBus);
commandBus.register(handler);
final Command command = commandFactory.create(createProject(newUuid()));
commandBus.post(command);
// See that we called CommandStore only once with the right command ID.
- verify(commandStore, times(1)).setCommandStatusOk(command.getContext().getCommandId());
+ verify(commandStore, times(1)).setCommandStatusOk(command.getContext()
+ .getCommandId());
}
@Test
public void set_command_status_to_error_when_dispatcher_throws() throws Exception {
- final CommandDispatcher throwingDispatcher = mock(CommandDispatcher.class);
- when(throwingDispatcher.getCommandClasses()).thenReturn(CommandClass.setOf(CreateProject.class));
- final IOException exception = new IOException("Unable to dispatch");
- doThrow(exception).when(throwingDispatcher)
- .dispatch(any(Command.class));
-
- commandBus.register(throwingDispatcher);
+ final IOException exception = givenThrowingDispatcher();
final Command command = commandFactory.create(createProject(newUuid()));
commandBus.post(command);
@@ -359,59 +382,18 @@ public void set_command_status_to_error_when_dispatcher_throws() throws Exceptio
verify(log, times(1)).errorDispatching(eq(exception), eq(command));
}
- private static class TestFailure extends FailureThrowable {
- private static final long serialVersionUID = 1L;
-
- private TestFailure() {
- super(Failures.UnableToHandle.newBuilder()
- .setMessage(TestFailure.class.getName())
- .build());
- }
- }
-
- @SuppressWarnings("serial")
- private static class TestThrowable extends Throwable {
- }
-
- /**
- * A stub handler that throws passed `Throwable` in the command handler method.
- *
- * @see #set_command_status_to_failure_when_handler_throws_failure
- * @see #set_command_status_to_failure_when_handler_throws_exception
- * @see #set_command_status_to_failure_when_handler_throws_unknown_Throwable
- */
- private static class ThrowingCreateProjectHandler extends CommandHandler {
-
- private final Throwable throwable;
-
- protected ThrowingCreateProjectHandler(EventBus eventBus, Throwable throwable) {
- super(newUuid(), eventBus);
- this.throwable = throwable;
- }
-
- @Assign
- public ProjectCreated handle(CreateProject msg, CommandContext context) throws Throwable {
- //noinspection ProhibitedExceptionThrown
- throw throwable;
- }
- }
-
@Test
- public void set_command_status_to_failure_when_handler_throws_failure() throws TestFailure, TestThrowable, InvocationTargetException {
- final FailureThrowable failure = new TestFailure();
- final CommandHandler handler = new ThrowingCreateProjectHandler(eventBus, failure);
-
- commandBus.register(handler);
- final Command command = commandFactory.create(createProject(newUuid()));
- final CommandId commandId = command.getContext()
- .getCommandId();
+ public void set_command_status_to_failure_when_handler_throws_failure() throws TestFailure, TestThrowable {
+ final TestFailure failure = new TestFailure();
+ final Command command = givenThrowingHandler(failure);
+ final CommandId commandId = command.getContext().getCommandId();
final Message commandMessage = Commands.getMessage(command);
commandBus.post(command);
// Verify we updated the status.
- verify(commandStore, times(1)).updateStatus(eq(commandId), eq(failure.toMessage()));
+ verify(commandStore, times(1)).updateStatus(eq(commandId), eq(failure.toMessage()));
// Verify we logged the failure.
verify(log, times(1)).failureHandling(eq(failure), eq(commandMessage), eq(commandId));
}
@@ -419,18 +401,13 @@ public void set_command_status_to_failure_when_handler_throws_failure() throws T
@Test
public void set_command_status_to_failure_when_handler_throws_exception() throws TestFailure, TestThrowable {
final RuntimeException exception = new IllegalStateException("handler throws");
- final CommandHandler handler = new ThrowingCreateProjectHandler(eventBus, exception);
-
- commandBus.register(handler);
- final Command command = commandFactory.create(createProject(newUuid()));
- final CommandId commandId = command.getContext()
- .getCommandId();
+ final Command command = givenThrowingHandler(exception);
+ final CommandId commandId = command.getContext().getCommandId();
final Message commandMessage = Commands.getMessage(command);
commandBus.post(command);
// Verify we updated the status.
-
verify(commandStore, times(1)).updateStatus(eq(commandId), eq(exception));
// Verify we logged the failure.
verify(log, times(1)).errorHandling(eq(exception), eq(commandMessage), eq(commandId));
@@ -439,20 +416,119 @@ public void set_command_status_to_failure_when_handler_throws_exception() throws
@Test
public void set_command_status_to_failure_when_handler_throws_unknown_Throwable() throws TestFailure, TestThrowable {
final Throwable throwable = new TestThrowable();
- final CommandHandler handler = new ThrowingCreateProjectHandler(eventBus, throwable);
-
- commandBus.register(handler);
- final Command command = commandFactory.create(createProject(newUuid()));
- final CommandId commandId = command.getContext()
- .getCommandId();
+ final Command command = givenThrowingHandler(throwable);
+ final CommandId commandId = command.getContext().getCommandId();
final Message commandMessage = Commands.getMessage(command);
commandBus.post(command);
// Verify we updated the status.
-
verify(commandStore, times(1)).updateStatus(eq(commandId), eq(Errors.fromThrowable(throwable)));
// Verify we logged the failure.
verify(log, times(1)).errorHandlingUnknown(eq(throwable), eq(commandMessage), eq(commandId));
}
+
+ private Command givenThrowingHandler(E throwable) throws TestThrowable, TestFailure {
+ final CommandHandler handler = new ThrowingCreateProjectHandler(eventBus, throwable);
+ commandBus.register(handler);
+ final CreateProject msg = createProject(newUuid());
+ final Command command = commandFactory.create(msg);
+ return command;
+ }
+
+ private E givenThrowingDispatcher() throws Exception {
+ final CommandDispatcher throwingDispatcher = mock(CommandDispatcher.class);
+ when(throwingDispatcher.getCommandClasses()).thenReturn(CommandClass.setOf(CreateProject.class));
+ final IOException exception = new IOException("Unable to dispatch");
+ doThrow(exception)
+ .when(throwingDispatcher)
+ .dispatch(any(Command.class));
+ commandBus.register(throwingDispatcher);
+ @SuppressWarnings("unchecked")
+ final E throwable = (E) exception;
+ return throwable;
+ }
+
+ @Test
+ public void schedule_command_if_delay_is_set() {
+ final int delayMsec = 1100;
+ final Command cmd = newCommand(/*delay=*/milliseconds(delayMsec));
+
+ commandBus.post(cmd);
+
+ verify(scheduler, times(1)).schedule(cmd);
+ verify(scheduler, never()).post(cmd);
+ verify(scheduler, after(delayMsec).times(1)).post(cmd);
+ }
+
+ @Test
+ public void do_not_schedule_command_if_no_scheduling_options_are_set() {
+ commandBus.register(new CreateProjectHandler(newUuid(), eventBus));
+ final Command cmd = commandFactory.create(createProject(newUuid()));
+
+ commandBus.post(cmd);
+
+ verify(scheduler, never()).schedule(cmd);
+ }
+
+ @Test
+ public void do_not_store_command_if_command_is_scheduled() {
+ final Command cmd = newCommand(/*delay=*/minutes(1));
+
+ commandBus.post(cmd);
+
+ verify(commandStore, never()).store(cmd);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void throw_exception_if_post_scheduled_cmd_and_no_scheduler_is_set() {
+ final CommandBus commandBus = CommandBus.newBuilder()
+ .setCommandStore(commandStore)
+ .build();
+ final Command cmd = newCommand(/*delay=*/minutes(1));
+
+ commandBus.post(cmd);
+ }
+
+ private static Command newCommand(Duration delay) {
+ final CommandContext context = createCommandContext(delay);
+ return Commands.create(createProject(newUuid()), context);
+ }
+
+ private static class TestFailure extends FailureThrowable {
+ private static final long serialVersionUID = 1L;
+
+ private TestFailure() {
+ super(Failures.UnableToHandle.newBuilder()
+ .setMessage(TestFailure.class.getName())
+ .build());
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestThrowable extends Throwable {
+ }
+
+ /**
+ * A stub handler that throws passed `Throwable` in the command handler method.
+ *
+ * @see #set_command_status_to_failure_when_handler_throws_failure
+ * @see #set_command_status_to_failure_when_handler_throws_exception
+ * @see #set_command_status_to_failure_when_handler_throws_unknown_Throwable
+ */
+ private static class ThrowingCreateProjectHandler extends CommandHandler {
+
+ private final Throwable throwable;
+
+ protected ThrowingCreateProjectHandler(EventBus eventBus, Throwable throwable) {
+ super(newUuid(), eventBus);
+ this.throwable = throwable;
+ }
+
+ @Assign
+ public ProjectCreated handle(CreateProject msg, CommandContext context) throws Throwable {
+ //noinspection ProhibitedExceptionThrown
+ throw throwable;
+ }
+ }
}
diff --git a/server/src/test/java/org/spine3/server/command/ExecutorCommandSchedulerShould.java b/server/src/test/java/org/spine3/server/command/ExecutorCommandSchedulerShould.java
new file mode 100644
index 00000000000..e325da9cab7
--- /dev/null
+++ b/server/src/test/java/org/spine3/server/command/ExecutorCommandSchedulerShould.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.server.command;
+
+import com.google.common.base.Function;
+import com.google.protobuf.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.spine3.base.Command;
+import org.spine3.base.CommandContext;
+import org.spine3.base.Commands;
+
+import javax.annotation.Nullable;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+import static org.spine3.base.Identifiers.newUuid;
+import static org.spine3.protobuf.Durations.milliseconds;
+import static org.spine3.testdata.TestCommands.addTask;
+import static org.spine3.testdata.TestCommands.createProject;
+import static org.spine3.testdata.TestContextFactory.createCommandContext;
+
+/**
+ * @author Alexander Litus
+ */
+@SuppressWarnings("InstanceMethodNamingConvention")
+public class ExecutorCommandSchedulerShould {
+
+ private static final long DELAY_MS = 1100;
+
+ private static final Duration DELAY = milliseconds(DELAY_MS);
+
+ private CommandScheduler scheduler;
+ private CommandContext context;
+
+ @Before
+ public void setUpTest() {
+ scheduler = spy(ExecutorCommandScheduler.class);
+ scheduler.setPostFunction(newStubPostFunction());
+ context = createCommandContext(DELAY);
+ }
+
+ @After
+ public void tearDownTest() {
+ scheduler.shutdown();
+ }
+
+ @Test
+ public void schedule_command_if_delay_is_set() {
+ final Command cmd = Commands.create(createProject(newUuid()), context);
+
+ scheduler.schedule(cmd);
+
+ verify(scheduler, never()).post(cmd);
+ verify(scheduler, after(DELAY_MS).times(1)).post(cmd);
+ }
+
+ @Test
+ public void not_schedule_command_with_same_id_twice() {
+ final String id = newUuid();
+ final Command expectedCmd = Commands.create(createProject(id), context);
+ final Command extraCmd = Commands.create(addTask(id), context);
+
+ scheduler.schedule(expectedCmd);
+ scheduler.schedule(extraCmd);
+
+ verify(scheduler, after(DELAY_MS).times(1)).post(expectedCmd);
+ verify(scheduler, never()).post(extraCmd);
+ }
+
+ @Test
+ public void throw_exception_if_is_shutdown() {
+ scheduler.shutdown();
+ try {
+ scheduler.schedule(createProject());
+ } catch (IllegalStateException expected) {
+ // is OK as it is shutdown
+ return;
+ }
+ fail("Must throw an exception as it is shutdown.");
+ }
+
+ private static Function newStubPostFunction() {
+ return new Function() {
+ @Nullable
+ @Override
+ public Command apply(@Nullable Command input) {
+ return input;
+ }
+ };
+ }
+}
diff --git a/server/src/test/java/org/spine3/testdata/TestCommands.java b/server/src/test/java/org/spine3/testdata/TestCommands.java
index 3380aa76067..9172fbec74e 100644
--- a/server/src/test/java/org/spine3/testdata/TestCommands.java
+++ b/server/src/test/java/org/spine3/testdata/TestCommands.java
@@ -28,6 +28,10 @@
import org.spine3.base.CommandId;
import org.spine3.base.Commands;
import org.spine3.base.UserId;
+import org.spine3.server.command.CommandBus;
+import org.spine3.server.command.CommandScheduler;
+import org.spine3.server.command.CommandStore;
+import org.spine3.server.storage.StorageFactory;
import org.spine3.test.project.ProjectId;
import org.spine3.test.project.command.AddTask;
import org.spine3.test.project.command.CreateProject;
@@ -112,8 +116,8 @@ public static CreateProject createProject(String projectId) {
return CreateProject.newBuilder()
.setProjectId(
ProjectId.newBuilder()
- .setId(projectId)
- .build())
+ .setId(projectId)
+ .build())
.build();
}
@@ -148,8 +152,30 @@ public static StartProject startProject(ProjectId id) {
public static StartProject startProject(String projectId) {
return StartProject.newBuilder()
.setProjectId(ProjectId.newBuilder()
- .setId(projectId)
- .build())
+ .setId(projectId)
+ .build())
.build();
}
+
+ /**
+ * Creates a new command bus with the given storage factory.
+ */
+ public static CommandBus newCommandBus(StorageFactory storageFactory) {
+ final CommandStore store = new CommandStore(storageFactory.createCommandStorage());
+ final CommandBus commandBus = CommandBus.newBuilder()
+ .setCommandStore(store)
+ .build();
+ return commandBus;
+ }
+
+ /**
+ * Creates a new command bus with the given command store and scheduler.
+ */
+ public static CommandBus newCommandBus(CommandStore store, CommandScheduler scheduler) {
+ final CommandBus commandBus = CommandBus.newBuilder()
+ .setCommandStore(store)
+ .setScheduler(scheduler)
+ .build();
+ return commandBus;
+ }
}
diff --git a/server/src/test/java/org/spine3/testdata/TestEventFactory.java b/server/src/test/java/org/spine3/testdata/TestEventFactory.java
index b613514b543..36ac3dcf3a7 100644
--- a/server/src/test/java/org/spine3/testdata/TestEventFactory.java
+++ b/server/src/test/java/org/spine3/testdata/TestEventFactory.java
@@ -20,8 +20,12 @@
package org.spine3.testdata;
+import com.google.common.util.concurrent.MoreExecutors;
import org.spine3.base.Event;
import org.spine3.base.EventContext;
+import org.spine3.server.event.EventBus;
+import org.spine3.server.event.EventStore;
+import org.spine3.server.storage.StorageFactory;
import org.spine3.test.project.ProjectId;
import org.spine3.test.project.event.ProjectCreated;
import org.spine3.test.project.event.ProjectStarted;
@@ -117,4 +121,16 @@ public static Event projectStarted(ProjectId projectId, EventContext eventContex
final Event.Builder builder = Event.newBuilder().setContext(eventContext).setMessage(toAny(event));
return builder.build();
}
+
+ /**
+ * Creates a new event bus with the given storage factory.
+ */
+ public static EventBus newEventBus(StorageFactory storageFactory) {
+ final EventStore store = EventStore.newBuilder()
+ .setStreamExecutor(MoreExecutors.directExecutor())
+ .setStorage(storageFactory.createEventStorage())
+ .build();
+ final EventBus eventBus = EventBus.newInstance(store);
+ return eventBus;
+ }
}
diff --git a/values/build.gradle b/values/build.gradle
index 504f638579f..310100b1162 100644
--- a/values/build.gradle
+++ b/values/build.gradle
@@ -28,15 +28,9 @@ sourceSets {
}
protobuf {
- plugins {
- grpc {
- artifact = 'io.grpc:protoc-gen-grpc-java:0.13.1'
- }
- }
generateProtoTasks {
all().each { final task ->
task.plugins {
- grpc {}
task.generateDescriptorSet = true
task.descriptorSetOptions.path = "${projectDir}/build/descriptors/${task.sourceSet.name}.desc"
task.descriptorSetOptions.includeImports = true