diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 03926c8cc38..428360dec65 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -74,7 +74,6 @@
-
@@ -215,7 +214,6 @@
-
@@ -252,7 +250,6 @@
-
@@ -327,7 +324,6 @@
-
@@ -340,7 +336,6 @@
-
@@ -386,7 +381,6 @@
-
@@ -474,6 +468,7 @@
+
@@ -494,6 +489,7 @@
+
@@ -643,7 +639,6 @@
-
@@ -713,9 +708,6 @@
-
-
-
@@ -732,13 +724,11 @@
-
-
@@ -811,21 +801,13 @@
-
-
+
+
-
-
-
-
-
-
-
-
@@ -856,7 +838,7 @@
-
+
@@ -875,4 +857,4 @@
-
+
\ No newline at end of file
diff --git a/client/src/main/java/io/spine/client/Client.java b/client/src/main/java/io/spine/client/Client.java
index 862fa08458d..d9c7bc28bbd 100644
--- a/client/src/main/java/io/spine/client/Client.java
+++ b/client/src/main/java/io/spine/client/Client.java
@@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
@@ -30,6 +31,7 @@
import io.spine.client.grpc.CommandServiceGrpc.CommandServiceBlockingStub;
import io.spine.client.grpc.QueryServiceGrpc;
import io.spine.client.grpc.QueryServiceGrpc.QueryServiceBlockingStub;
+import io.spine.core.Ack;
import io.spine.core.Command;
import io.spine.core.TenantId;
import io.spine.core.UserId;
@@ -102,6 +104,16 @@ public class Client implements AutoCloseable {
/** Active subscriptions maintained by the client. */
private final Subscriptions subscriptions;
+ /**
+ * The handler for errors that may occur during asynchronous requests initiated by this client.
+ */
+ private final @Nullable ErrorHandler streamingErrorHandler;
+
+ /**
+ * The handler for errors returned from server side in response to posted messages.
+ */
+ private final @Nullable ServerErrorHandler serverErrorHandler;
+
/**
* Creates a builder for a client connected to the specified address.
*
@@ -156,7 +168,9 @@ private Client(Builder builder) {
this.shutdownTimeout = checkNotNull(builder.shutdownTimeout);
this.commandService = CommandServiceGrpc.newBlockingStub(channel);
this.queryService = QueryServiceGrpc.newBlockingStub(channel);
- this.subscriptions = new Subscriptions(channel);
+ this.streamingErrorHandler = builder.streamingErrorHandler;
+ this.serverErrorHandler = builder.serverErrorHandler;
+ this.subscriptions = new Subscriptions(channel, streamingErrorHandler, serverErrorHandler);
}
/**
@@ -214,7 +228,14 @@ public boolean isOpen() {
*/
public ClientRequest onBehalfOf(UserId user) {
checkNotDefaultArg(user);
- return new ClientRequest(user, this);
+ ClientRequest request = new ClientRequest(user, this);
+ if (streamingErrorHandler != null) {
+ request.onStreamingError(streamingErrorHandler);
+ }
+ if (serverErrorHandler != null) {
+ request.onServerError(serverErrorHandler);
+ }
+ return request;
}
/**
@@ -269,8 +290,9 @@ ActorRequestFactory requestOf(UserId user) {
/**
* Posts the command to the {@code CommandService}.
*/
- void post(Command c) {
- commandService.post(c);
+ Ack post(Command c) {
+ Ack ack = commandService.post(c);
+ return ack;
}
/**
@@ -324,6 +346,9 @@ public static final class Builder {
/** The ID of the user for performing requests on behalf of a non-logged in user. */
private UserId guestUser = DEFAULT_GUEST_ID;
+ private @Nullable ErrorHandler streamingErrorHandler;
+ private @Nullable ServerErrorHandler serverErrorHandler;
+
private Builder(ManagedChannel channel) {
this.channel = checkNotNull(channel);
}
@@ -404,6 +429,27 @@ public Builder shutdownTimout(long timeout, TimeUnit timeUnit) {
return this;
}
+ /**
+ * Assigns a default handler for streaming errors for the asynchronous requests
+ * initiated by the client.
+ */
+ @CanIgnoreReturnValue
+ public Builder onStreamingError(ErrorHandler handler) {
+ this.streamingErrorHandler = checkNotNull(handler);
+ return this;
+ }
+
+ /**
+ * Assigns a default handler for an error occurred on the server-side (such as
+ * validation error) in response to a message posted by the client.
+ */
+ @CanIgnoreReturnValue
+ public Builder onServerError(ServerErrorHandler handler) {
+ checkNotNull(handler);
+ this.serverErrorHandler = handler;
+ return this;
+ }
+
/**
* Creates a new instance of the client.
*/
diff --git a/client/src/main/java/io/spine/client/ClientRequest.java b/client/src/main/java/io/spine/client/ClientRequest.java
index c8212e6e4a0..8497893e476 100644
--- a/client/src/main/java/io/spine/client/ClientRequest.java
+++ b/client/src/main/java/io/spine/client/ClientRequest.java
@@ -20,10 +20,13 @@
package io.spine.client;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import io.spine.base.CommandMessage;
import io.spine.base.EntityState;
import io.spine.base.EventMessage;
import io.spine.core.UserId;
+import org.checkerframework.checker.nullness.qual.Nullable;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.util.Preconditions2.checkNotDefaultArg;
@@ -47,6 +50,9 @@ public class ClientRequest {
private final UserId user;
private final Client client;
+ private @Nullable ErrorHandler streamingErrorHandler;
+ private @Nullable ServerErrorHandler serverErrorHandler;
+
ClientRequest(UserId user, Client client) {
checkNotDefaultArg(user);
this.user = user;
@@ -55,12 +61,15 @@ public class ClientRequest {
ClientRequest(ClientRequest parent) {
this(parent.user, parent.client);
+ this.streamingErrorHandler = parent.streamingErrorHandler;
+ this.serverErrorHandler = parent.serverErrorHandler;
}
/**
* Creates a builder for customizing command request.
*/
public CommandRequest command(CommandMessage c) {
+ checkNotNull(c);
return new CommandRequest(this, c);
}
@@ -100,4 +109,36 @@ protected final UserId user() {
protected final Client client() {
return client;
}
+
+ /**
+ * Assigns a handler for errors occurred when delivering messages from the server.
+ *
+ *
If such an error occurs, no more results are expected from the server.
+ */
+ @CanIgnoreReturnValue
+ @OverridingMethodsMustInvokeSuper
+ public ClientRequest onStreamingError(ErrorHandler handler) {
+ this.streamingErrorHandler = checkNotNull(handler);
+ return this;
+ }
+
+ /**
+ * Assigns a handler for an error occurred on the server-side (such as validation error)
+ * in response to posting a request.
+ */
+ @CanIgnoreReturnValue
+ @OverridingMethodsMustInvokeSuper
+ public ClientRequest onServerError(ServerErrorHandler handler) {
+ checkNotNull(handler);
+ this.serverErrorHandler = handler;
+ return this;
+ }
+
+ final @Nullable ErrorHandler streamingErrorHandler() {
+ return streamingErrorHandler;
+ }
+
+ final @Nullable ServerErrorHandler serverErrorHandler() {
+ return serverErrorHandler;
+ }
}
diff --git a/client/src/main/java/io/spine/client/CommandRequest.java b/client/src/main/java/io/spine/client/CommandRequest.java
index ff7773b5d6a..7286485a319 100644
--- a/client/src/main/java/io/spine/client/CommandRequest.java
+++ b/client/src/main/java/io/spine/client/CommandRequest.java
@@ -23,14 +23,22 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import io.spine.base.CommandMessage;
import io.spine.base.EventMessage;
+import io.spine.core.Ack;
import io.spine.core.Command;
+import io.spine.core.Status;
+import io.spine.logging.Logging;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.util.Optional;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.protobuf.TextFormat.shortDebugString;
+import static io.spine.client.EventsAfterCommand.subscribe;
+import static io.spine.util.Exceptions.newIllegalStateException;
/**
* Allows to post a command optionally subscribing to events that are immediate results
@@ -48,20 +56,19 @@
* }
*
*
The subscription obtained from the {@link #post()} should be cancelled
- * to preserve both client-side and server-side resources. The moment of cancelling the
- * subscriptions
- * depends on the nature of the posted command and the outcome expected by the client application.
+ * to preserve both client-side and server-side resources. The moment of cancelling
+ * the subscriptions depends on the nature of the posted command and the outcome
+ * expected by the client application.
*/
-public final class CommandRequest extends ClientRequest {
+public final class CommandRequest extends ClientRequest implements Logging {
private final CommandMessage message;
- private final MultiEventConsumers.Builder builder;
- private @Nullable ErrorHandler streamingErrorHandler;
+ private final MultiEventConsumers.Builder eventConsumers;
CommandRequest(ClientRequest parent, CommandMessage c) {
super(parent);
this.message = c;
- this.builder = MultiEventConsumers.newBuilder();
+ this.eventConsumers = MultiEventConsumers.newBuilder();
}
/**
@@ -78,7 +85,7 @@ public final class CommandRequest extends ClientRequest {
public CommandRequest
observe(Class type, Consumer consumer) {
checkNotNull(consumer);
- builder.observe(type, consumer);
+ eventConsumers.observe(type, consumer);
return this;
}
@@ -96,7 +103,7 @@ public final class CommandRequest extends ClientRequest {
public CommandRequest
observe(Class type, EventConsumer consumer) {
checkNotNull(consumer);
- builder.observe(type, consumer);
+ eventConsumers.observe(type, consumer);
return this;
}
@@ -106,9 +113,36 @@ public final class CommandRequest extends ClientRequest {
*
If such an error occurs, no more events resulting from the posted command will be
* delivered to the consumers.
*/
+ @Override
@CanIgnoreReturnValue
public CommandRequest onStreamingError(ErrorHandler handler) {
- this.streamingErrorHandler = checkNotNull(handler);
+ super.onStreamingError(handler);
+ eventConsumers.onStreamingError(handler);
+ return this;
+ }
+
+ /**
+ * Assigns a handler for errors occurred in consumers of events.
+ *
+ *
After the passed handler is called, remaining event consumers will get the messages
+ * as usually. If not specified, the default implementation simply logs the error.
+ */
+ @CanIgnoreReturnValue
+ public CommandRequest onConsumingError(ConsumerErrorHandler handler) {
+ checkNotNull(handler);
+ eventConsumers.onConsumingError(handler);
+ return this;
+ }
+
+ /**
+ * Assigns a handler for an error occurred on the server-side (such as validation error)
+ * in response to posting a command.
+ */
+ @OverridingMethodsMustInvokeSuper
+ @CanIgnoreReturnValue
+ @Override
+ public CommandRequest onServerError(ServerErrorHandler handler) {
+ super.onServerError(handler);
return this;
}
@@ -120,8 +154,16 @@ public CommandRequest onStreamingError(ErrorHandler handler) {
* {@linkplain Subscriptions#cancel(Subscription) canceled} after the requesting code receives
* expected events, or after a reasonable timeout.
*
- * @return subscription to the events
- * @implNote Subscriptions should be cancelled to free up client and server resources
+ *
The method returns subscriptions to {@linkplain #observe(Class, EventConsumer) events}
+ * that the handling of the command may produce. A command may not be accepted
+ * for processing by the server, e.g. because of a validation error. In such a case, the
+ * method would report the error to the configured {@linkplain #onStreamingError(ErrorHandler)
+ * error handler}, and return an empty set.
+ *
+ * @return subscription to the {@linkplain #observe(Class, EventConsumer) observed events}
+ * if the command was successfully posted, or
+ * an empty set if posting caused an error
+ * @apiNote Subscriptions should be cancelled to free up client and server resources
* required for their maintenance. It is not possible to cancel the returned
* subscription in an automatic way because of the following.
* Subscriptions by nature are asynchronous and infinite requests.
@@ -132,28 +174,85 @@ public CommandRequest onStreamingError(ErrorHandler handler) {
*/
@CanIgnoreReturnValue
public ImmutableSet post() {
- ImmutableSet newSubscriptions = doPost();
- client().subscriptions()
- .addAll(newSubscriptions);
- return newSubscriptions;
- }
-
- private ImmutableSet doPost() {
- MultiEventConsumers consumers = builder.build();
- Client client = client();
- Command command = client.requestOf(user())
- .command()
- .create(message);
- ImmutableSet result =
- EventsAfterCommand.subscribe(client, command, consumers, streamingErrorHandler);
- //TODO:2020-04-17:alexander.yevsyukov: Check the returned Ack and throw an exception
- // in case of problems.
- client.post(command);
- return result;
+ PostOperation op = new PostOperation();
+ return op.perform();
}
@VisibleForTesting
CommandMessage message() {
return message;
}
+
+ /**
+ * Method object for posting a command.
+ */
+ private final class PostOperation {
+
+ private final Command command;
+ private final MultiEventConsumers consumers;
+ private @Nullable ImmutableSet subscriptions;
+
+ private PostOperation() {
+ this.command =
+ client().requestOf(user())
+ .command()
+ .create(message);
+ this.consumers = eventConsumers.build();
+ }
+
+ private ImmutableSet perform() {
+ subscribeToEvents();
+ Ack ack = client().post(command);
+ Status status = ack.getStatus();
+ switch (status.getStatusCase()) {
+ case OK:
+ return Optional.ofNullable(subscriptions)
+ .orElse(ImmutableSet.of());
+ case ERROR:
+ cancelVoidSubscriptions();
+ reportErrorWhenPosting(status);
+ return ImmutableSet.of();
+ case REJECTION:
+ /* This should not happen as a rejection can be raised when the command is
+ already dispatched. We include this case for the sake of completeness. */
+ case STATUS_NOT_SET:
+ /* The server sent an ack with invalid status. */
+ default:
+ throw newIllegalStateException(
+ "Cannot handle ack status `%s` when posting the command `%s`.",
+ shortDebugString(command));
+ }
+ }
+
+ private void subscribeToEvents() {
+ Client client = client();
+ this.subscriptions = subscribe(client, command, consumers, streamingErrorHandler());
+ checkNotNull(subscriptions);
+ client.subscriptions()
+ .addAll(subscriptions);
+ }
+
+ /**
+ * Cancels the passed subscriptions to events because the command could not be posted.
+ *
+ */
+ private void cancelVoidSubscriptions() {
+ Subscriptions activeSubscriptions = client().subscriptions();
+ if (subscriptions != null) {
+ subscriptions.forEach(activeSubscriptions::cancel);
+ }
+ }
+
+ private void reportErrorWhenPosting(Status status) {
+ errorHandler().accept(command, status.getError());
+ }
+
+ private ServerErrorHandler errorHandler() {
+ return Optional.ofNullable(serverErrorHandler())
+ .orElse(new LoggingServerErrorHandler(
+ CommandRequest.this.logger(),
+ "Unable to post the command `%s`. Returned error: `%s`.")
+ );
+ }
+ }
}
diff --git a/client/src/main/java/io/spine/client/Consumers.java b/client/src/main/java/io/spine/client/Consumers.java
index 86dac2746b5..9429bf7bec6 100644
--- a/client/src/main/java/io/spine/client/Consumers.java
+++ b/client/src/main/java/io/spine/client/Consumers.java
@@ -181,10 +181,11 @@ public void onCompleted() {
* @param
* the type of this builder for return type covariance
*/
+ @SuppressWarnings("rawtypes")
abstract static class Builder {
+ C extends MessageContext,
+ W extends Message,
+ B extends Builder> {
private final ImmutableSet.Builder> consumers =
ImmutableSet.builder();
diff --git a/client/src/main/java/io/spine/client/DelegatingConsumer.java b/client/src/main/java/io/spine/client/DelegatingConsumer.java
index 1b673a006ec..2e031274e82 100644
--- a/client/src/main/java/io/spine/client/DelegatingConsumer.java
+++ b/client/src/main/java/io/spine/client/DelegatingConsumer.java
@@ -62,12 +62,12 @@ private DelegatingConsumer(Consumer delegate) {
*/
static Object toRealConsumer(MessageConsumer, ?> consumer) {
return consumer instanceof DelegatingConsumer
- ? ((DelegatingConsumer) consumer).delegate()
+ ? ((DelegatingConsumer, ?>) consumer).delegate()
: consumer;
}
/** Obtains the consumer of the event message to which this consumer delegates. */
- final Consumer delegate() {
+ private Consumer delegate() {
return delegate;
}
@@ -89,6 +89,9 @@ private DelegatingEventConsumer(Consumer delegate) {
}
}
+ /**
+ * Adapts a {@code Consumer} of an {@code EntityState} to the {@link StateConsumer} interface.
+ */
private static final class DelegatingStateConsumer
extends DelegatingConsumer
implements StateConsumer {
diff --git a/client/src/main/java/io/spine/client/DeliveringMultiEventObserver.java b/client/src/main/java/io/spine/client/DeliveringMultiEventObserver.java
new file mode 100644
index 00000000000..17953fa0d73
--- /dev/null
+++ b/client/src/main/java/io/spine/client/DeliveringMultiEventObserver.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.common.collect.ImmutableMap;
+import io.grpc.stub.StreamObserver;
+import io.spine.base.EventMessage;
+import io.spine.core.Event;
+import io.spine.logging.Logging;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Delivers events to their consumers.
+ *
+ * @see #onNext(Event)
+ * @see MultiEventConsumers
+ */
+final class DeliveringMultiEventObserver implements StreamObserver, Logging {
+
+ private final ImmutableMap, StreamObserver> observers;
+ private final ErrorHandler streamingErrorHandler;
+
+ /**
+ * Creates new instance.
+ *
+ * @param consumers
+ * event consumers
+ * @param streamingErrorHandler
+ * the handler for errors which is used when a {@linkplain #onError(Throwable)
+ * streaming error occurs}. When this handler is invoked no further delivery of
+ * events is expected. If {@code null} is passed, default implementation would
+ * log the error.
+ */
+ DeliveringMultiEventObserver(MultiEventConsumers consumers,
+ @Nullable ErrorHandler streamingErrorHandler) {
+ this.observers = consumers.toObservers();
+ this.streamingErrorHandler = nullToDefault(streamingErrorHandler);
+ }
+
+ private ErrorHandler nullToDefault(@Nullable ErrorHandler handler) {
+ if (handler != null) {
+ return handler;
+ }
+ return throwable -> _error().withCause(throwable).log("Error receiving event.");
+ }
+
+ @Override
+ public void onNext(Event e) {
+ StreamObserver observer = observers.get(e.type());
+ observer.onNext(e);
+ }
+
+ /** Passes the {@code Throwable} to the configured error handler. */
+ @Override
+ public void onError(Throwable t) {
+ streamingErrorHandler.accept(t);
+ }
+
+ /** Does nothing. */
+ @Override
+ public void onCompleted() {
+ // Do nothing.
+ }
+}
diff --git a/client/src/main/java/io/spine/client/ErrorHandler.java b/client/src/main/java/io/spine/client/ErrorHandler.java
index 36f7affb1d6..8a9a41319b5 100644
--- a/client/src/main/java/io/spine/client/ErrorHandler.java
+++ b/client/src/main/java/io/spine/client/ErrorHandler.java
@@ -48,6 +48,6 @@ public interface ErrorHandler extends Consumer {
*/
static ErrorHandler
logError(FluentLogger logger, String messageFormat, Class extends Message> type) {
- return new LoggingErrorHandler<>(logger, messageFormat, type);
+ return new LoggingTypeErrorHandler(logger, messageFormat, type);
}
}
diff --git a/client/src/main/java/io/spine/client/EventConsumers.java b/client/src/main/java/io/spine/client/EventConsumers.java
index d350291e0b6..9892ec32c40 100644
--- a/client/src/main/java/io/spine/client/EventConsumers.java
+++ b/client/src/main/java/io/spine/client/EventConsumers.java
@@ -78,7 +78,7 @@ Builder self() {
}
@Override
- Consumers build() {
+ EventConsumers build() {
return new EventConsumers<>(this);
}
}
diff --git a/client/src/main/java/io/spine/client/EventsAfterCommand.java b/client/src/main/java/io/spine/client/EventsAfterCommand.java
index 737a78404ab..3234b05622f 100644
--- a/client/src/main/java/io/spine/client/EventsAfterCommand.java
+++ b/client/src/main/java/io/spine/client/EventsAfterCommand.java
@@ -60,9 +60,7 @@ final class EventsAfterCommand implements Logging {
private EventsAfterCommand(Client client, Command cmd, MultiEventConsumers consumers) {
this.client = checkNotNull(client);
this.command = checkNotDefaultArg(cmd);
- this.user = cmd.getContext()
- .getActorContext()
- .getActor();
+ this.user = cmd.actor();
this.consumers = checkNotNull(consumers);
}
@@ -82,12 +80,14 @@ private ImmutableSet subscribeWith(@Nullable ErrorHandler errorHan
* as the origin.
*/
private ImmutableSet eventsOf(Command c) {
- Field pastMessage = EventContext.Field.pastMessage()
- .getField();
- String fieldName = Event.Field.context()
- .getField()
- .nested(pastMessage)
- .toString();
+ Field pastMessage =
+ EventContext.Field.pastMessage()
+ .getField();
+ String fieldName =
+ Event.Field.context()
+ .getField()
+ .nested(pastMessage)
+ .toString();
ImmutableSet> eventTypes = consumers.eventTypes();
TopicFactory topic = client.requestOf(user)
.topic();
diff --git a/client/src/main/java/io/spine/client/LoggingConsumerErrorHandler.java b/client/src/main/java/io/spine/client/LoggingConsumerErrorHandler.java
index dd59af514e1..e4d5619fbed 100644
--- a/client/src/main/java/io/spine/client/LoggingConsumerErrorHandler.java
+++ b/client/src/main/java/io/spine/client/LoggingConsumerErrorHandler.java
@@ -33,7 +33,7 @@
* the type of the messages delivered to the consumer
*/
final class LoggingConsumerErrorHandler
- extends LoggingHandler
+ extends LoggingHandlerWithType
implements ConsumerErrorHandler {
/**
@@ -44,6 +44,8 @@ final class LoggingConsumerErrorHandler
* @param messageFormat
* the formatting message where the first parameter is the consumer which caused
* the error, and the second parameter is the type of the message which caused the error
+ * @param type
+ * the type of the message which caused the error
*/
LoggingConsumerErrorHandler(FluentLogger logger,
String messageFormat,
@@ -54,8 +56,6 @@ final class LoggingConsumerErrorHandler
@Override
public void accept(MessageConsumer consumer, Throwable throwable) {
Object consumerToReport = toRealConsumer(consumer);
- logger().atSevere()
- .withCause(throwable)
- .log(messageFormat(), consumerToReport, type());
+ error(throwable).log(messageFormat(), consumerToReport, typeName());
}
}
diff --git a/client/src/main/java/io/spine/client/LoggingErrorHandler.java b/client/src/main/java/io/spine/client/LoggingErrorHandler.java
index f107047ee37..f318193212b 100644
--- a/client/src/main/java/io/spine/client/LoggingErrorHandler.java
+++ b/client/src/main/java/io/spine/client/LoggingErrorHandler.java
@@ -21,27 +21,19 @@
package io.spine.client;
import com.google.common.flogger.FluentLogger;
-import com.google.protobuf.Message;
+import org.checkerframework.checker.nullness.qual.Nullable;
-/**
- * Logs the fact of the error using the {@linkplain FluentLogger#atSevere() severe} level
- * of the passed logger.
- *
- * @param
- * the type of the messages delivered to the consumer
- */
-final class LoggingErrorHandler
- extends LoggingHandler
- implements ErrorHandler {
+final class LoggingErrorHandler extends LoggingHandler implements ErrorHandler{
+
+ private final @Nullable Object param;
- LoggingErrorHandler(FluentLogger logger, String messageFormat, Class extends Message> type) {
- super(logger, messageFormat, type);
+ LoggingErrorHandler(FluentLogger logger, String messageFormat, @Nullable Object param) {
+ super(logger, messageFormat);
+ this.param = param;
}
@Override
public void accept(Throwable throwable) {
- logger().atSevere()
- .withCause(throwable)
- .log(messageFormat(), type());
+ error(throwable).log(messageFormat(), param);
}
}
diff --git a/client/src/main/java/io/spine/client/LoggingHandler.java b/client/src/main/java/io/spine/client/LoggingHandler.java
index 0e1da21b5c4..f7eb7fe9c40 100644
--- a/client/src/main/java/io/spine/client/LoggingHandler.java
+++ b/client/src/main/java/io/spine/client/LoggingHandler.java
@@ -21,27 +21,27 @@
package io.spine.client;
import com.google.common.flogger.FluentLogger;
-import com.google.protobuf.Message;
+/**
+ * Abstract base for error handlers that write a message to the associated logger.
+ *
+ *
The handler always uses {@linkplain FluentLogger#atSevere() severe} level.
+ */
abstract class LoggingHandler {
private final FluentLogger logger;
private final String messageFormat;
- private final Class extends Message> type;
/**
* Creates new instance of the logging handler.
* @param logger
* the instance of the logger to use for reporting the error
* @param messageFormat
- * the formatting message where the first parameter is the consumer which caused
- * @param type
- * the type of messages obtained by a consumer
+ * the formatting message for an error text
*/
- LoggingHandler(FluentLogger logger, String messageFormat, Class extends Message> type) {
+ LoggingHandler(FluentLogger logger, String messageFormat) {
this.logger = logger;
this.messageFormat = messageFormat;
- this.type = type;
}
final FluentLogger logger() {
@@ -52,7 +52,13 @@ final String messageFormat() {
return messageFormat;
}
- final Class extends Message> type() {
- return type;
+ /** Obtains logging API at {@code sever} level. */
+ protected final FluentLogger.Api error() {
+ return logger().atSevere();
+ }
+
+ /** Obtains logging API at {@code sever} level and initializes it with the passed cause. */
+ protected final FluentLogger.Api error(Throwable cause) {
+ return error().withCause(cause);
}
}
diff --git a/client/src/main/java/io/spine/client/LoggingHandlerWithType.java b/client/src/main/java/io/spine/client/LoggingHandlerWithType.java
new file mode 100644
index 00000000000..d1410b818df
--- /dev/null
+++ b/client/src/main/java/io/spine/client/LoggingHandlerWithType.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.protobuf.Message;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Abstract base for logging handlers that contain information about a type.
+ */
+abstract class LoggingHandlerWithType extends LoggingHandler {
+
+ private final Class extends Message> type;
+
+ LoggingHandlerWithType(FluentLogger logger,
+ String messageFormat,
+ Class extends Message> type) {
+ super(logger, messageFormat);
+ this.type = checkNotNull(type);
+ }
+
+ final String typeName() {
+ return type.getName();
+ }
+}
diff --git a/client/src/main/java/io/spine/client/LoggingServerErrorHandler.java b/client/src/main/java/io/spine/client/LoggingServerErrorHandler.java
new file mode 100644
index 00000000000..d54e177c95b
--- /dev/null
+++ b/client/src/main/java/io/spine/client/LoggingServerErrorHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.protobuf.Message;
+import io.spine.base.Error;
+
+import static com.google.protobuf.TextFormat.shortDebugString;
+
+/**
+ * A posting handler which logs the error.
+ */
+final class LoggingServerErrorHandler extends LoggingHandler implements ServerErrorHandler {
+
+ LoggingServerErrorHandler(FluentLogger logger, String messageFormat) {
+ super(logger, messageFormat);
+ }
+
+ @Override
+ public void accept(Message message, Error error) {
+ error().log(messageFormat(), shortDebugString(message), shortDebugString(error));
+ }
+}
diff --git a/client/src/main/java/io/spine/client/LoggingTypeErrorHandler.java b/client/src/main/java/io/spine/client/LoggingTypeErrorHandler.java
new file mode 100644
index 00000000000..2cb799f08b0
--- /dev/null
+++ b/client/src/main/java/io/spine/client/LoggingTypeErrorHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.protobuf.Message;
+
+/**
+ * Logs the fact of an error caused by handling a message of the passed type.
+ */
+final class LoggingTypeErrorHandler extends LoggingHandlerWithType implements ErrorHandler {
+
+ /**
+ * Creates a new instance of the logging handler.
+ *
+ * @param logger
+ * the instance of the logger to use for reporting the error
+ * @param messageFormat
+ * the formatting message with one string parameter is the name of the message type
+ * which caused the error
+ * @param type
+ * the type of the message which caused the error
+ */
+ LoggingTypeErrorHandler(FluentLogger logger,
+ String messageFormat,
+ Class extends Message> type) {
+ super(logger, messageFormat, type);
+ }
+
+ @Override
+ public void accept(Throwable throwable) {
+ error(throwable).log(messageFormat(), typeName());
+ }
+}
diff --git a/client/src/main/java/io/spine/client/MultiEventConsumers.java b/client/src/main/java/io/spine/client/MultiEventConsumers.java
index 780a23addd2..fd683910ff9 100644
--- a/client/src/main/java/io/spine/client/MultiEventConsumers.java
+++ b/client/src/main/java/io/spine/client/MultiEventConsumers.java
@@ -20,11 +20,9 @@
package io.spine.client;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.Maps;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.stub.StreamObserver;
import io.spine.base.EventMessage;
@@ -33,11 +31,11 @@
import io.spine.logging.Logging;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.protobuf.TextFormat.shortDebugString;
-import static io.spine.client.DelegatingConsumer.toRealConsumer;
/**
* An association of event types to their consumers which also delivers events.
@@ -48,14 +46,14 @@
final class MultiEventConsumers implements Logging {
private final
- ImmutableMultimap, EventConsumer extends EventMessage>> map;
+ ImmutableMap, EventConsumers extends EventMessage>> map;
static Builder newBuilder() {
return new Builder();
}
private MultiEventConsumers(Builder builder) {
- this.map = ImmutableMultimap.copyOf(builder.map);
+ this.map = ImmutableMap.copyOf(builder.toMap());
}
/**
@@ -65,6 +63,13 @@ ImmutableSet> eventTypes() {
return map.keySet();
}
+ /** Obtains all the consumers grouped by type of consumed events. */
+ ImmutableMap, StreamObserver> toObservers() {
+ Map, StreamObserver> observers =
+ Maps.transformValues(map, EventConsumers::toObserver);
+ return ImmutableMap.copyOf(observers);
+ }
+
/**
* Creates an observer that would deliver events to all the consumers.
*
@@ -73,43 +78,7 @@ ImmutableSet> eventTypes() {
* If null the error will be simply logged.
*/
StreamObserver toObserver(@Nullable ErrorHandler handler) {
- return new EventObserver(handler);
- }
-
- /**
- * Delivers the event to all the subscribed consumers.
- *
- *
If one of the consumers would cause an error when handling the event, the error will
- * be logged, and the event will be passed to remaining consumers.
- */
- void deliver(Event event) {
- EventMessage message = event.enclosedMessage();
- EventContext context = event.getContext();
- Class extends EventMessage> type = message.getClass();
- ImmutableCollection> consumers = map.get(type);
- consumers.forEach(c -> {
- try {
- @SuppressWarnings("unchecked") // Safe as we match the type when adding consumers.
- EventConsumer consumer = (EventConsumer) c;
- consumer.accept(message, context);
- } catch (Throwable throwable) {
- logError(c, event, throwable);
- }
- });
- }
-
- /**
- * Logs the fact that the passed event consumer cased the error.
- *
- *
If the passed consumer is an instance of {@code DelegatingEventConsumer}
- * the real consumer will be reported in the log.
- */
- private void logError(EventConsumer> consumer, Event event, Throwable throwable) {
- String eventDiags = shortDebugString(event);
- Object consumerToReport = toRealConsumer(consumer);
- _error().withCause(throwable)
- .log("The consumer `%s` could not handle the event `%s`.",
- consumerToReport, eventDiags);
+ return new DeliveringMultiEventObserver(this, handler);
}
/**
@@ -117,34 +86,99 @@ private void logError(EventConsumer> consumer, Event event, Throwable throwabl
*/
static final class Builder {
+ /** Maps a type of an event to the builder of {@code EventConsumers} of such events. */
private final
- Multimap, EventConsumer extends EventMessage>>
- map = HashMultimap.create();
+ Map, EventConsumers.Builder extends EventMessage>> map =
+ new HashMap<>();
+
+ /** The handler for streaming errors that may occur during gRPC calls. */
+ private @Nullable ErrorHandler streamingErrorHandler;
+
+ /** The common handler for errors of all consumed event types that may occur. */
+ private @Nullable ConsumerErrorHandler consumingErrorHandler;
- /**
- * Adds the consumer of the events message.
- */
@CanIgnoreReturnValue
- public
+
Builder observe(Class eventType, Consumer consumer) {
checkNotNull(eventType);
checkNotNull(consumer);
- map.put(eventType, EventConsumer.from(consumer));
- return this;
+ EventConsumer ec = EventConsumer.from(consumer);
+ return doPut(eventType, ec);
}
/**
* Adds the consumer of the event message and its context.
*/
@CanIgnoreReturnValue
- public
+
Builder observe(Class eventType, EventConsumer consumer) {
checkNotNull(eventType);
checkNotNull(consumer);
- map.put(eventType, consumer);
+ return doPut(eventType, consumer);
+ }
+
+ private
+ Builder doPut(Class eventType, EventConsumer ec) {
+ if (map.containsKey(eventType)) {
+ @SuppressWarnings("unchecked")
+ // The cast is protected by generic params of this method.
+ EventConsumers.Builder builder = (EventConsumers.Builder) map.get(eventType);
+ builder.add(ec);
+ } else {
+ map.put(eventType, EventConsumers.newBuilder().add(ec));
+ }
return this;
}
+ /**
+ * Produces a map from an event type to consumers of those events.
+ */
+ private ImmutableMap, EventConsumers extends EventMessage>>
+ toMap() {
+ ImmutableMap.Builder,
+ EventConsumers extends EventMessage>>
+ builder = ImmutableMap.builder();
+ for (Class extends EventMessage> eventType : map.keySet()) {
+ EventConsumers.Builder extends EventMessage> consumers = map.get(eventType);
+ if (streamingErrorHandler != null) {
+ consumers.onStreamingError(streamingErrorHandler);
+ }
+ if (consumingErrorHandler != null) {
+ consumers.onConsumingError(
+ new DelegatingEventConsumerHandler<>(consumingErrorHandler)
+ );
+ }
+ builder.put(eventType, consumers.build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Assigns a handler for the error reported to
+ * {@link StreamObserver#onError(Throwable)}.
+ *
+ *
Once this handler is called, no more messages will be delivered to consumers.
+ *
+ * @see #onConsumingError(ConsumerErrorHandler)
+ */
+ @CanIgnoreReturnValue
+ Builder onStreamingError(ErrorHandler handler) {
+ streamingErrorHandler = checkNotNull(handler);
+ return this;
+ }
+
+ /**
+ * Assigns a handler for an error that may occur in the code of one of the consumers.
+ *
+ *
After this handler called, remaining consumers will get the message as usually.
+ *
+ * @see #onStreamingError(ErrorHandler)
+ */
+ @CanIgnoreReturnValue
+ Builder onConsumingError(ConsumerErrorHandler handler) {
+ consumingErrorHandler = checkNotNull(handler);
+ return this;
+ }
/**
* Creates the new instance.
*/
@@ -154,36 +188,29 @@ MultiEventConsumers build() {
}
/**
- * Passes the event to listener once the subscription is updated, then cancels the subscription.
+ * Adapts generified {@code ConsumerErrorHandler} API to non-generified
+ * so that a common error handler can be used for all the consumers.
+ *
+ * @param
+ * the type of events observed by an instance of {@code EventConsumers}
+ * @see Builder#toMap()
*/
- private final class EventObserver implements StreamObserver {
-
- private final ErrorHandler errorHandler;
+ private static final class DelegatingEventConsumerHandler
+ implements ConsumerErrorHandler {
- private EventObserver(@Nullable ErrorHandler handler) {
- this.errorHandler = nullToDefault(handler);
- }
+ private final ConsumerErrorHandler delegate;
- private ErrorHandler nullToDefault(@Nullable ErrorHandler handler) {
- if (handler != null) {
- return handler;
- }
- return throwable -> _error().withCause(throwable).log("Error receiving event.");
- }
-
- @Override
- public void onNext(Event e) {
- deliver(e);
- }
-
- @Override
- public void onError(Throwable t) {
- errorHandler.accept(t);
+ private DelegatingEventConsumerHandler(ConsumerErrorHandler delegate) {
+ this.delegate = checkNotNull(delegate);
}
@Override
- public void onCompleted() {
- // Do nothing.
+ public void accept(MessageConsumer consumer, Throwable throwable) {
+ @SuppressWarnings("unchecked")
+ // The cast is protected by generic params of `EventConsumers`.
+ MessageConsumer cast =
+ (MessageConsumer) consumer;
+ delegate.accept(cast, throwable);
}
}
}
diff --git a/client/src/main/java/io/spine/client/ServerErrorHandler.java b/client/src/main/java/io/spine/client/ServerErrorHandler.java
new file mode 100644
index 00000000000..f8812be42bf
--- /dev/null
+++ b/client/src/main/java/io/spine/client/ServerErrorHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.protobuf.Message;
+import io.spine.base.Error;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A handler for an error occurred when posting a message for asynchronous processing
+ * at the server side.
+ *
+ *
The handler accepts a message which caused an error, and an {@link io.spine.base.Error Error}
+ * obtained from the server.
+ */
+public interface ServerErrorHandler extends BiConsumer {
+}
diff --git a/client/src/main/java/io/spine/client/SubscribingRequest.java b/client/src/main/java/io/spine/client/SubscribingRequest.java
index 136d6c64322..2b04275b72d 100644
--- a/client/src/main/java/io/spine/client/SubscribingRequest.java
+++ b/client/src/main/java/io/spine/client/SubscribingRequest.java
@@ -21,6 +21,7 @@
package io.spine.client;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import com.google.protobuf.Message;
import io.grpc.stub.StreamObserver;
import io.spine.base.MessageContext;
@@ -53,8 +54,14 @@
super(parent, type);
}
+ /**
+ * Obtains the builder for collecting consumers of the subscribed messages.
+ */
abstract Consumers.Builder consumers();
+ /**
+ * Adapts the passed instance to the specific type of {@link MessageConsumer}.
+ */
abstract MessageConsumer toMessageConsumer(Consumer consumer);
/**
@@ -76,8 +83,11 @@ public B observe(Consumer consumer) {
*
* @see #onConsumingError(ConsumerErrorHandler)
*/
+ @Override
@CanIgnoreReturnValue
+ @OverridingMethodsMustInvokeSuper
public B onStreamingError(ErrorHandler handler) {
+ super.onStreamingError(handler);
consumers().onStreamingError(handler);
return self();
}
diff --git a/client/src/main/java/io/spine/client/Subscriptions.java b/client/src/main/java/io/spine/client/Subscriptions.java
index 4f06554d715..a5415ddabd5 100644
--- a/client/src/main/java/io/spine/client/Subscriptions.java
+++ b/client/src/main/java/io/spine/client/Subscriptions.java
@@ -20,20 +20,27 @@
package io.spine.client;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.FluentLogger;
import com.google.protobuf.Message;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
+import io.spine.base.Error;
import io.spine.base.Identifier;
import io.spine.client.grpc.SubscriptionServiceGrpc;
import io.spine.client.grpc.SubscriptionServiceGrpc.SubscriptionServiceBlockingStub;
import io.spine.client.grpc.SubscriptionServiceGrpc.SubscriptionServiceStub;
+import io.spine.core.Response;
+import io.spine.logging.Logging;
+import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Optional;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.protobuf.TextFormat.shortDebugString;
import static java.lang.String.format;
import static java.util.Collections.synchronizedSet;
@@ -57,7 +64,7 @@
* @see ClientRequest#subscribeToEvent(Class)
* @see CommandRequest#post()
*/
-public final class Subscriptions {
+public final class Subscriptions implements Logging {
/**
* The format of all {@linkplain SubscriptionId Subscription identifiers}.
@@ -69,14 +76,20 @@ public final class Subscriptions {
*/
static final String SUBSCRIPTION_PRINT_FORMAT = "(ID: %s, target: %s)";
- private final SubscriptionServiceStub subscriptionService;
- private final SubscriptionServiceBlockingStub blockingSubscriptionService;
- private final Set subscriptions;
+ private final SubscriptionServiceStub service;
+ private final SubscriptionServiceBlockingStub blockingServiceStub;
+ private final Set items;
+ private final @Nullable ErrorHandler streamingErrorHandler;
+ private final @Nullable ServerErrorHandler serverErrorHandler;
- Subscriptions(ManagedChannel channel) {
- this.subscriptionService = SubscriptionServiceGrpc.newStub(channel);
- this.blockingSubscriptionService = SubscriptionServiceGrpc.newBlockingStub(channel);
- this.subscriptions = synchronizedSet(new HashSet<>());
+ Subscriptions(ManagedChannel channel,
+ @Nullable ErrorHandler streamingErrorHandler,
+ @Nullable ServerErrorHandler serverErrorHandler) {
+ this.service = SubscriptionServiceGrpc.newStub(channel);
+ this.blockingServiceStub = SubscriptionServiceGrpc.newBlockingStub(channel);
+ this.streamingErrorHandler = streamingErrorHandler;
+ this.serverErrorHandler = serverErrorHandler;
+ this.items = synchronizedSet(new HashSet<>());
}
/**
@@ -134,8 +147,8 @@ public static Subscription from(Topic topic) {
* @deprecated please use {@link Subscription#toShortString()}
*/
@Deprecated
- public static String toShortString(Subscription subscription) {
- return subscription.toShortString();
+ public static String toShortString(Subscription s) {
+ return s.toShortString();
}
/**
@@ -152,8 +165,8 @@ public static String toShortString(Subscription subscription) {
* @see #cancel(Subscription)
*/
Subscription subscribeTo(Topic topic, StreamObserver observer) {
- Subscription subscription = blockingSubscriptionService.subscribe(topic);
- subscriptionService.activate(subscription, new SubscriptionObserver<>(observer));
+ Subscription subscription = blockingServiceStub.subscribe(topic);
+ service.activate(subscription, new SubscriptionObserver<>(observer));
add(subscription);
return subscription;
}
@@ -165,44 +178,96 @@ void addAll(Collection newSubscriptions) {
/** Remembers the passed subscription for future use. */
private void add(Subscription s) {
- subscriptions.add(checkNotNull(s));
+ items.add(checkNotNull(s));
}
/**
- * Cancels the passed subscription.
+ * Requests cancellation the passed subscription.
+ *
+ *
The cancellation of the subscription is done asynchronously.
*
* @return {@code true} if the subscription was previously made
*/
- public boolean cancel(Subscription subscription) {
- checkNotNull(subscription);
- requestCancellation(subscription);
- return subscriptions.remove(subscription);
+ public boolean cancel(Subscription s) {
+ checkNotNull(s);
+ boolean isActive = items.contains(s);
+ if (isActive) {
+ requestCancellation(s);
+ }
+ return isActive;
}
- private void requestCancellation(Subscription subscription) {
- //TODO:2020-04-17:alexander.yevsyukov: Check response and report the error.
- blockingSubscriptionService.cancel(subscription);
+ private void requestCancellation(Subscription s) {
+ service.cancel(s, new CancellationObserver(s));
}
- /** Cancels all the subscriptions. */
+ /**
+ * Requests cancellation of all subscriptions.
+ */
public void cancelAll() {
- Iterator iterator = subscriptions.iterator();
- while (iterator.hasNext()) {
- Subscription subscription = iterator.next();
- requestCancellation(subscription);
- iterator.remove();
- }
+ // Create the copy for iterating to avoid `ConcurrentModificationException` on removal.
+ ImmutableSet.copyOf(items)
+ .forEach(this::requestCancellation);
}
@VisibleForTesting
boolean contains(Subscription s) {
- return subscriptions.contains(s);
+ return items.contains(s);
}
/**
* Verifies if there are any active subscriptions.
*/
public boolean isEmpty() {
- return subscriptions.isEmpty();
+ return items.isEmpty();
+ }
+
+ /**
+ * Handles responses of cancellation requests.
+ */
+ private final class CancellationObserver implements StreamObserver {
+
+ private static final String UNABLE_TO_CANCEL = "Unable to cancel the subscription `%s`.";
+ private final Subscription subscription;
+
+ private CancellationObserver(Subscription subscription) {
+ this.subscription = checkNotNull(subscription);
+ }
+
+ @Override
+ public void onNext(Response response) {
+ if (response.isError()) {
+ Error err = response.error();
+ serverErrorHandler().accept(subscription, err);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ streamingErrorHandler().accept(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ items.remove(subscription);
+ }
+
+ private ErrorHandler streamingErrorHandler() {
+ return Optional.ofNullable(Subscriptions.this.streamingErrorHandler)
+ .orElse(new LoggingErrorHandler(
+ logger(), UNABLE_TO_CANCEL, shortDebugString(subscription)
+ ));
+ }
+
+ private ServerErrorHandler serverErrorHandler() {
+ return Optional.ofNullable(Subscriptions.this.serverErrorHandler)
+ .orElse(new LoggingServerErrorHandler(
+ logger(), UNABLE_TO_CANCEL + " Returned error: `%s`."
+ ));
+ }
+
+ private FluentLogger logger() {
+ return Subscriptions.this.logger();
+ }
}
}
diff --git a/client/src/main/java/io/spine/client/TargetBuilder.java b/client/src/main/java/io/spine/client/TargetBuilder.java
index 619d1ccf61e..f9258748aed 100644
--- a/client/src/main/java/io/spine/client/TargetBuilder.java
+++ b/client/src/main/java/io/spine/client/TargetBuilder.java
@@ -74,7 +74,7 @@
* @param
* a type of the builder implementations
*/
-public abstract class TargetBuilder {
+public abstract class TargetBuilder> {
private final Class extends Message> targetType;
@@ -329,7 +329,8 @@ private String queryString() {
String valueSeparator = "; ";
StringBuilder sb = new StringBuilder();
- Class extends TargetBuilder> builderCls = self().getClass();
+ @SuppressWarnings("unchecked") // Ensured by declaration of this class.
+ Class builderCls = (Class) self().getClass();
sb.append(builderCls.getSimpleName())
.append('(')
.append("SELECT ")
diff --git a/config b/config
index cb8f3e17002..8fab5d31ee0 160000
--- a/config
+++ b/config
@@ -1 +1 @@
-Subproject commit cb8f3e170024a9972d0b89cfa25044d83bfb9b3e
+Subproject commit 8fab5d31ee0b59cec4a17a670a7dcef074319ed4
diff --git a/core/src/main/java/io/spine/core/Acks.java b/core/src/main/java/io/spine/core/Acks.java
index 050180dc82e..507209f4b0d 100644
--- a/core/src/main/java/io/spine/core/Acks.java
+++ b/core/src/main/java/io/spine/core/Acks.java
@@ -47,7 +47,7 @@ public static CommandId toCommandId(Ack ack) {
Message unpacked = AnyPacker.unpack(ack.getMessageId());
if (!(unpacked instanceof CommandId)) {
throw newIllegalArgumentException(
- "The acknowledgement does not contain a command ID: `%s`.",
+ "Unable to get a command ID from the acknowledgement: `%s`.",
shortDebugString(ack)
);
}
diff --git a/core/src/main/java/io/spine/core/ByField.java b/core/src/main/java/io/spine/core/ByField.java
index 629e65835c9..a6065a57c18 100644
--- a/core/src/main/java/io/spine/core/ByField.java
+++ b/core/src/main/java/io/spine/core/ByField.java
@@ -28,7 +28,10 @@
*
This annotation should not be used directly to mark anything.
* Instead, use the annotation instances as
* a {@link io.spine.core.Subscribe#filter() @Subscribe.filter)} parameter.
+ *
+ * @deprecated Please use please use {@link Where @Where} annotation for the first method parameter.
*/
+@Deprecated
@Target({})
public @interface ByField {
diff --git a/core/src/main/java/io/spine/core/CommandContextMixin.java b/core/src/main/java/io/spine/core/CommandContextMixin.java
new file mode 100644
index 00000000000..0c06c0c0ebe
--- /dev/null
+++ b/core/src/main/java/io/spine/core/CommandContextMixin.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.core;
+
+import com.google.errorprone.annotations.Immutable;
+import io.spine.annotation.GeneratedMixin;
+
+/**
+ * Mix-in interface extending {@link CommandContext}.
+ */
+@GeneratedMixin
+@Immutable
+interface CommandContextMixin extends CommandContextOrBuilder, SignalContext {
+
+ @Override
+ default ActorContext actorContext() {
+ return getActorContext();
+ }
+}
diff --git a/core/src/main/java/io/spine/core/CommandMixin.java b/core/src/main/java/io/spine/core/CommandMixin.java
index 6ea862798be..7bc9eb18f9d 100644
--- a/core/src/main/java/io/spine/core/CommandMixin.java
+++ b/core/src/main/java/io/spine/core/CommandMixin.java
@@ -24,6 +24,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
+import io.spine.annotation.GeneratedMixin;
import io.spine.annotation.Internal;
import io.spine.base.CommandMessage;
import io.spine.protobuf.Messages;
@@ -37,6 +38,7 @@
/**
* Mixin interface for command objects.
*/
+@GeneratedMixin
@Immutable
interface CommandMixin
extends Signal,
@@ -44,16 +46,21 @@ interface CommandMixin
FieldAwareMessage {
/**
- * Obtains the ID of the tenant of the command.
+ * Obtains the time when the command was created.
*/
@Override
- default TenantId tenant() {
- return actorContext().getTenantId();
+ default Timestamp timestamp() {
+ return actorContext().getTimestamp();
}
- @Override
+ /**
+ * Obtains the time when the command was created.
+ *
+ * @deprecated please use {@link #timestamp()}
+ */
+ @Deprecated
default Timestamp time() {
- return actorContext().getTimestamp();
+ return timestamp();
}
@Override
diff --git a/core/src/main/java/io/spine/core/Commands.java b/core/src/main/java/io/spine/core/Commands.java
index 9a94be5a9a8..0fc488d7fb2 100644
--- a/core/src/main/java/io/spine/core/Commands.java
+++ b/core/src/main/java/io/spine/core/Commands.java
@@ -69,7 +69,7 @@ public static CommandMessage ensureMessage(Message commandOrMessage) {
*/
public static void sort(List commands) {
checkNotNull(commands);
- commands.sort((c1, c2) -> Timestamps.compare(c1.time(), c2.time()));
+ commands.sort((c1, c2) -> Timestamps.compare(c1.timestamp(), c2.timestamp()));
}
/**
diff --git a/core/src/main/java/io/spine/core/EnrichableMessageContext.java b/core/src/main/java/io/spine/core/EnrichableMessageContext.java
index f6616f94cbc..6240163584c 100644
--- a/core/src/main/java/io/spine/core/EnrichableMessageContext.java
+++ b/core/src/main/java/io/spine/core/EnrichableMessageContext.java
@@ -22,7 +22,6 @@
import com.google.errorprone.annotations.Immutable;
import com.google.protobuf.Message;
-import io.spine.annotation.GeneratedMixin;
import io.spine.base.MessageContext;
import io.spine.core.Enrichment.Container;
@@ -33,7 +32,6 @@
/**
* A common interface for message contexts that hold enrichments.
*/
-@GeneratedMixin
@Immutable
public interface EnrichableMessageContext extends MessageContext {
diff --git a/core/src/main/java/io/spine/core/EventContextMixin.java b/core/src/main/java/io/spine/core/EventContextMixin.java
index a95e632a78f..7a7af2267d3 100644
--- a/core/src/main/java/io/spine/core/EventContextMixin.java
+++ b/core/src/main/java/io/spine/core/EventContextMixin.java
@@ -23,13 +23,12 @@
import com.google.errorprone.annotations.Immutable;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Timestamp;
+import io.spine.annotation.GeneratedMixin;
import io.spine.annotation.Internal;
import io.spine.base.Identifier;
import io.spine.logging.Logging;
-import io.spine.time.InstantConverter;
import io.spine.validate.FieldAwareMessage;
-import java.time.Instant;
import java.util.Optional;
import static io.spine.util.Exceptions.newIllegalStateException;
@@ -37,9 +36,12 @@
/**
* Mixin interface for {@link EventContext}s.
*/
+@GeneratedMixin
@Immutable
-interface EventContextMixin extends EnrichableMessageContext,
- EventContextOrBuilder,
+interface EventContextMixin extends EventContextOrBuilder,
+ SignalContext,
+ WithTime,
+ EnrichableMessageContext,
FieldAwareMessage,
Logging {
@@ -58,6 +60,7 @@ interface EventContextMixin extends EnrichableMessageContext,
* thrown as it contradicts the Spine validation rules. See {@link EventContext} proto
* declaration.
*/
+ @Override
@SuppressWarnings({
"ClassReferencesSubclass", // which is the only impl.
"deprecation" // For backward compatibility.
@@ -132,27 +135,6 @@ default Optional rootMessage() {
}
}
- /**
- * Obtains the actor user ID.
- *
- *
The 'actor' is the user responsible for producing the given event.
- *
- *
It is obtained as follows:
- *
- *
For the events generated from commands, the actor context is taken from the
- * enclosing command context.
- *
For the event react chain, the command context of the topmost event is used.
- *
For the imported events, the separate import context contains information about an
- * actor.
- *
- *
- *
If the given event context contains no origin, an {@link IllegalArgumentException} is
- * thrown as it contradicts the Spine validation rules.
- */
- default UserId actor() {
- return actorContext().getActor();
- }
-
/**
* Obtains the ID of the entity which produced the event.
*/
@@ -167,21 +149,11 @@ default Object producer() {
*
* @see #instant()
*/
+ @Override
default Timestamp timestamp() {
return getTimestamp();
}
- /**
- * Obtains the time of the event as {@link Instant}.
- *
- * @see #timestamp()
- */
- default Instant instant() {
- Instant result = InstantConverter.reversed()
- .convert(getTimestamp());
- return result;
- }
-
/**
* Reads the values of the fields without using the reflection.
*
diff --git a/core/src/main/java/io/spine/core/EventMixin.java b/core/src/main/java/io/spine/core/EventMixin.java
index af6a5b2f60c..d5aacfe710a 100644
--- a/core/src/main/java/io/spine/core/EventMixin.java
+++ b/core/src/main/java/io/spine/core/EventMixin.java
@@ -43,17 +43,19 @@
interface EventMixin
extends Signal, FieldAwareMessage, EntityState {
- /**
- * Obtains the ID of the tenant of the event.
- */
@Override
- default TenantId tenant() {
- return actorContext().getTenantId();
+ default Timestamp timestamp() {
+ return context().getTimestamp();
}
- @Override
+ /**
+ * Obtains the time of the event.
+ *
+ * @deprecated please use {@link #timestamp()}
+ */
+ @Deprecated
default Timestamp time() {
- return context().getTimestamp();
+ return timestamp();
}
/**
diff --git a/core/src/main/java/io/spine/core/ResponseMixin.java b/core/src/main/java/io/spine/core/ResponseMixin.java
new file mode 100644
index 00000000000..254405cbd06
--- /dev/null
+++ b/core/src/main/java/io/spine/core/ResponseMixin.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.core;
+
+import com.google.errorprone.annotations.Immutable;
+import io.spine.annotation.GeneratedMixin;
+import io.spine.base.Error;
+
+/**
+ * Mixin interface for the {@link Response} objects.
+ */
+@GeneratedMixin
+@Immutable
+interface ResponseMixin extends ResponseOrBuilder {
+
+ /**
+ * Verifies if this response has the {@link Status.StatusCase#OK OK} status.
+ */
+ default boolean isOk() {
+ return getStatus().getStatusCase() == Status.StatusCase.OK;
+ }
+
+ /**
+ * Verifies if this response has the {@link Status.StatusCase#ERROR ERROR} status.
+ */
+ default boolean isError() {
+ return getStatus().getStatusCase() == Status.StatusCase.ERROR;
+ }
+
+ /**
+ * Obtains the error associated with the response or default instance is the response is not
+ * an error.
+ */
+ default Error error() {
+ return getStatus().getError();
+ }
+}
diff --git a/core/src/main/java/io/spine/core/Responses.java b/core/src/main/java/io/spine/core/Responses.java
index c31b9210a1b..9b135aa83ab 100644
--- a/core/src/main/java/io/spine/core/Responses.java
+++ b/core/src/main/java/io/spine/core/Responses.java
@@ -54,12 +54,10 @@ public static Status statusOk() {
/**
* Checks if the response is OK.
*
- * @return {@code true} if the passed response represents `ok` status,
- * {@code false} otherwise
+ * @deprecated please use {@link Response#isOk()}
*/
+ @Deprecated
public static boolean isOk(Response response) {
- boolean result = response.getStatus()
- .getStatusCase() == Status.StatusCase.OK;
- return result;
+ return response.isOk();
}
}
diff --git a/core/src/main/java/io/spine/core/Signal.java b/core/src/main/java/io/spine/core/Signal.java
index d283ff1d132..59b73c427bc 100644
--- a/core/src/main/java/io/spine/core/Signal.java
+++ b/core/src/main/java/io/spine/core/Signal.java
@@ -23,14 +23,12 @@
import com.google.errorprone.annotations.Immutable;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
-import com.google.protobuf.Timestamp;
import io.spine.annotation.GeneratedMixin;
import io.spine.annotation.SPI;
import io.spine.base.KnownMessage;
import io.spine.base.MessageContext;
import io.spine.base.SerializableMessage;
import io.spine.protobuf.AnyPacker;
-import io.spine.time.TimestampTemporal;
import io.spine.type.TypeUrl;
import java.util.Optional;
@@ -62,7 +60,7 @@
public interface Signal
- extends SerializableMessage {
+ extends SerializableMessage, WithActor, WithTime {
/**
* Obtains the identifier of the message.
@@ -79,7 +77,6 @@ public interface Signal type() {
+ @SuppressWarnings("unchecked") // Safe as we obtain it from an instance of .
+ Class extends M> type = (Class extends M>) enclosedMessage().getClass();
+ return type;
+ }
+
/**
* Obtains the unpacked form of the enclosed message.
*
@@ -107,21 +114,6 @@ default C context() {
return getContext();
}
- /**
- * Obtains the ID of the tenant under which the message was created.
- */
- TenantId tenant();
-
- /**
- * Obtains the time when the message was created.
- */
- Timestamp time();
-
- /**
- * Obtains the data about the actor who started the message chain.
- */
- ActorContext actorContext();
-
/**
* Obtains the type URL of the enclosed message.
*/
@@ -140,44 +132,6 @@ default boolean is(Class extends Message> enclosedMessageClass) {
return result;
}
- /**
- * Verifies if the message was created after the point in time.
- */
- default boolean isAfter(Timestamp bound) {
- checkNotNull(bound);
- TimestampTemporal timeTemporal = TimestampTemporal.from(time());
- TimestampTemporal boundTemporal = TimestampTemporal.from(bound);
- return timeTemporal.isLaterThan(boundTemporal);
- }
-
- /**
- * Verifies if the message was created before the point in time.
- */
- default boolean isBefore(Timestamp bound) {
- checkNotNull(bound);
- TimestampTemporal timeTemporal = TimestampTemporal.from(time());
- TimestampTemporal boundTemporal = TimestampTemporal.from(bound);
- return timeTemporal.isEarlierThan(boundTemporal);
- }
-
- /**
- * Verifies if the message was created within the passed period of time.
- *
- * @param periodStart
- * lower bound, exclusive
- * @param periodEnd
- * higher bound, inclusive
- * @return {@code true} if the time point of the command creation lies in between the given two
- */
- default boolean isBetween(Timestamp periodStart, Timestamp periodEnd) {
- checkNotNull(periodStart);
- checkNotNull(periodEnd);
- TimestampTemporal timeTemporal = TimestampTemporal.from(time());
- TimestampTemporal start = TimestampTemporal.from(periodStart);
- TimestampTemporal end = TimestampTemporal.from(periodEnd);
- return timeTemporal.isBetween(start, end);
- }
-
/**
* Obtains the ID of this message.
*/
diff --git a/core/src/main/java/io/spine/core/SignalContext.java b/core/src/main/java/io/spine/core/SignalContext.java
new file mode 100644
index 00000000000..0204314e21b
--- /dev/null
+++ b/core/src/main/java/io/spine/core/SignalContext.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.core;
+
+import com.google.errorprone.annotations.Immutable;
+import io.spine.base.MessageContext;
+
+/**
+ * A context of a message initiated by a user.
+ */
+@Immutable
+public interface SignalContext extends MessageContext, WithActor {
+}
diff --git a/core/src/main/java/io/spine/core/WithActor.java b/core/src/main/java/io/spine/core/WithActor.java
new file mode 100644
index 00000000000..976da710173
--- /dev/null
+++ b/core/src/main/java/io/spine/core/WithActor.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.core;
+
+import com.google.errorprone.annotations.Immutable;
+
+/**
+ * An object associated with an acting user.
+ */
+@Immutable
+public interface WithActor {
+
+ /**
+ * The context of the associated user.
+ */
+ ActorContext actorContext();
+
+ /**
+ * The ID of the associated user.
+ */
+ default UserId actor() {
+ return actorContext().getActor();
+ }
+
+ /**
+ * Obtains the ID of the tenant under which the message was created.
+ */
+ default TenantId tenant() {
+ return actorContext().getTenantId();
+ }
+}
diff --git a/core/src/main/java/io/spine/core/WithTime.java b/core/src/main/java/io/spine/core/WithTime.java
new file mode 100644
index 00000000000..d42c3f4e164
--- /dev/null
+++ b/core/src/main/java/io/spine/core/WithTime.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.core;
+
+import com.google.protobuf.Timestamp;
+import io.spine.time.InstantConverter;
+import io.spine.time.TimestampTemporal;
+
+import java.time.Instant;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An object with associated point in time.
+ */
+public interface WithTime {
+
+ /**
+ * Obtains the point in time associated with the object.
+ */
+ Timestamp timestamp();
+
+ /**
+ * Verifies if associated time is after the passed point in time.
+ */
+ default boolean isAfter(Timestamp bound) {
+ checkNotNull(bound);
+ TimestampTemporal timeTemporal = TimestampTemporal.from(timestamp());
+ TimestampTemporal boundTemporal = TimestampTemporal.from(bound);
+ return timeTemporal.isLaterThan(boundTemporal);
+ }
+
+ /**
+ * Verifies if the associated time is before the passed point in time.
+ */
+ default boolean isBefore(Timestamp bound) {
+ checkNotNull(bound);
+ TimestampTemporal timeTemporal = TimestampTemporal.from(timestamp());
+ TimestampTemporal boundTemporal = TimestampTemporal.from(bound);
+ return timeTemporal.isEarlierThan(boundTemporal);
+ }
+
+ /**
+ * Verifies if the associated time point is within the passed period of time.
+ *
+ * @param periodStart
+ * lower bound, exclusive
+ * @param periodEnd
+ * higher bound, inclusive
+ * @return {@code true} if the time point of the command creation lies in between the given two
+ */
+ default boolean isBetween(Timestamp periodStart, Timestamp periodEnd) {
+ checkNotNull(periodStart);
+ checkNotNull(periodEnd);
+ TimestampTemporal timeTemporal = TimestampTemporal.from(timestamp());
+ TimestampTemporal start = TimestampTemporal.from(periodStart);
+ TimestampTemporal end = TimestampTemporal.from(periodEnd);
+ return timeTemporal.isBetween(start, end);
+ }
+
+ /**
+ * Obtains the associated time point as {@link Instant}.
+ *
+ * @see #timestamp()
+ */
+ default Instant instant() {
+ Instant result = InstantConverter.reversed()
+ .convert(timestamp());
+ return checkNotNull(result);
+ }
+}
diff --git a/core/src/main/proto/spine/core/ack.proto b/core/src/main/proto/spine/core/ack.proto
index 47e586553bd..c1e0ed352d9 100644
--- a/core/src/main/proto/spine/core/ack.proto
+++ b/core/src/main/proto/spine/core/ack.proto
@@ -32,7 +32,7 @@ import "google/protobuf/any.proto";
import "spine/core/response.proto";
-// An acknowledgement of posting a message to a bus.
+// An acknowledgement of posting a message for asynchronous processing at the server-side.
//
// This response means that a message was either accepted for further processing, or rejected
// (e.g. because of a validation error).
diff --git a/core/src/main/proto/spine/core/command.proto b/core/src/main/proto/spine/core/command.proto
index d618e6919cb..58f7834ea9d 100644
--- a/core/src/main/proto/spine/core/command.proto
+++ b/core/src/main/proto/spine/core/command.proto
@@ -37,7 +37,7 @@ import "spine/core/diagnostics.proto";
// Command identifier.
message CommandId {
- option (is).java_type = "io.spine.core.CommandIdMixin";
+ option (is).java_type = "CommandIdMixin";
option (SPI_type) = true;
string uuid = 1;
@@ -59,7 +59,7 @@ message CommandId {
// There should be one and only one command handler associated with the type of the command.
//
message Command {
- option (is).java_type = "io.spine.core.CommandMixin";
+ option (is).java_type = "CommandMixin";
option (SPI_type) = true;
// The ID of the command.
@@ -97,7 +97,7 @@ message Command {
// Meta-information about the command and the environment, which generated the command.
message CommandContext {
- option (is).java_type = "io.spine.base.MessageContext";
+ option (is).java_type = "CommandContextMixin";
option (SPI_type) = true;
// Information about the environment of the user who created the command.
diff --git a/core/src/main/proto/spine/core/event.proto b/core/src/main/proto/spine/core/event.proto
index 6a602aff93f..59492f130d1 100644
--- a/core/src/main/proto/spine/core/event.proto
+++ b/core/src/main/proto/spine/core/event.proto
@@ -74,7 +74,7 @@ message EventId {
// See `EventContext.rejection` to determine the event kind.
//
message Event {
- option (is).java_type = "io.spine.core.EventMixin";
+ option (is).java_type = "EventMixin";
option (SPI_type) = true;
// The ID of the event.
@@ -89,7 +89,7 @@ message Event {
// Meta-information for an event.
message EventContext {
- option (is).java_type = "io.spine.core.EventContextMixin";
+ option (is).java_type = "EventContextMixin";
option (SPI_type) = true;
// When the event occurred.
diff --git a/core/src/main/proto/spine/core/response.proto b/core/src/main/proto/spine/core/response.proto
index 2666571f245..8838a31d48f 100644
--- a/core/src/main/proto/spine/core/response.proto
+++ b/core/src/main/proto/spine/core/response.proto
@@ -41,6 +41,7 @@ import "spine/core/event.proto";
// only upon receiving the `ok` status.
//
message Response {
+ option (is).java_type = "ResponseMixin";
// The response status.
Status status = 1 [(required) = true];
@@ -49,6 +50,8 @@ message Response {
// The response status.
message Status {
oneof status {
+ option (is_required) = true;
+
// The request was accepted to execution.
google.protobuf.Empty ok = 1;
diff --git a/core/src/test/java/io/spine/core/ResponsesTest.java b/core/src/test/java/io/spine/core/ResponsesTest.java
index 4c92c703cf0..d24aed5adc5 100644
--- a/core/src/test/java/io/spine/core/ResponsesTest.java
+++ b/core/src/test/java/io/spine/core/ResponsesTest.java
@@ -48,7 +48,9 @@ void returnOkResponse() {
@Test
@DisplayName("recognize OK response")
void recognizeOkResponse() {
- assertTrue(Responses.isOk(Responses.ok()));
+ Response ok = Responses.ok();
+ assertTrue(ok.isOk());
+ assertFalse(ok.isError());
}
@Test
@@ -60,6 +62,7 @@ void recognizeNotOkResponse() {
Response error = Response.newBuilder()
.setStatus(status)
.build();
- assertFalse(Responses.isOk(error));
+ assertFalse(error.isOk());
+ assertTrue(error.isError());
}
}
diff --git a/license-report.md b/license-report.md
index 03fc35c631c..95db3408f6b 100644
--- a/license-report.md
+++ b/license-report.md
@@ -1,6 +1,6 @@
-# Dependencies of `io.spine:spine-client:1.5.11`
+# Dependencies of `io.spine:spine-client:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -405,12 +405,12 @@
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:50 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine:spine-core:1.5.11`
+# Dependencies of `io.spine:spine-core:1.5.12`
## Runtime
1. **Group:** com.google.code.findbugs **Name:** jsr305 **Version:** 3.0.2
@@ -775,12 +775,12 @@ This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:51 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine.tools:spine-model-assembler:1.5.11`
+# Dependencies of `io.spine.tools:spine-model-assembler:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -1180,12 +1180,12 @@ This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:51 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine.tools:spine-model-verifier:1.5.11`
+# Dependencies of `io.spine.tools:spine-model-verifier:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -1645,12 +1645,12 @@ This report was generated on **Mon May 04 17:42:20 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:21 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:51 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine:spine-server:1.5.11`
+# Dependencies of `io.spine:spine-server:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -2067,12 +2067,12 @@ This report was generated on **Mon May 04 17:42:21 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:21 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:51 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine:spine-testutil-client:1.5.11`
+# Dependencies of `io.spine:spine-testutil-client:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -2526,12 +2526,12 @@ This report was generated on **Mon May 04 17:42:21 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:23 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:53 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine:spine-testutil-core:1.5.11`
+# Dependencies of `io.spine:spine-testutil-core:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -2993,12 +2993,12 @@ This report was generated on **Mon May 04 17:42:23 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:24 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
+This report was generated on **Thu May 14 15:29:54 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
-# Dependencies of `io.spine:spine-testutil-server:1.5.11`
+# Dependencies of `io.spine:spine-testutil-server:1.5.12`
## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
@@ -3496,4 +3496,4 @@ This report was generated on **Mon May 04 17:42:24 EEST 2020** using [Gradle-Lic
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.
-This report was generated on **Mon May 04 17:42:27 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
\ No newline at end of file
+This report was generated on **Thu May 14 15:29:55 EEST 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
\ No newline at end of file
diff --git a/model/model-verifier/src/test/java/io/spine/model/verify/given/EditAggregate.java b/model/model-verifier/src/test/java/io/spine/model/verify/given/EditAggregate.java
index b5b8d85c3b7..a8f01af4d9d 100644
--- a/model/model-verifier/src/test/java/io/spine/model/verify/given/EditAggregate.java
+++ b/model/model-verifier/src/test/java/io/spine/model/verify/given/EditAggregate.java
@@ -35,7 +35,7 @@ PhotoEdited handle(EditPhoto command, CommandContext ctx) {
return PhotoEdited
.newBuilder()
.setNewPhoto(command.getNewPhoto())
- .setEditor(ctx.getActorContext().getActor().getValue())
+ .setEditor(ctx.actor().getValue())
.build();
}
diff --git a/pom.xml b/pom.xml
index 52ab20280a7..88fc8d750f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@ all modules and does not describe the project structure per-subproject.
io.spinespine-core-java
-1.5.11
+1.5.122015
diff --git a/server/src/main/java/io/spine/server/CommandService.java b/server/src/main/java/io/spine/server/CommandService.java
index 9f66f54fbdf..f8e8d5106ac 100644
--- a/server/src/main/java/io/spine/server/CommandService.java
+++ b/server/src/main/java/io/spine/server/CommandService.java
@@ -46,7 +46,7 @@ public final class CommandService
extends CommandServiceGrpc.CommandServiceImplBase
implements Logging {
- private final ImmutableMap boundedContextMap;
+ private final ImmutableMap commandToContext;
/**
* Constructs new instance using the map from a {@code CommandClass} to
@@ -54,7 +54,7 @@ public final class CommandService
*/
private CommandService(Map map) {
super();
- this.boundedContextMap = ImmutableMap.copyOf(map);
+ this.commandToContext = ImmutableMap.copyOf(map);
}
/**
@@ -67,11 +67,11 @@ public static Builder newBuilder() {
@Override
public void post(Command request, StreamObserver responseObserver) {
CommandClass commandClass = CommandClass.of(request);
- BoundedContext boundedContext = boundedContextMap.get(commandClass);
- if (boundedContext == null) {
+ BoundedContext context = commandToContext.get(commandClass);
+ if (context == null) {
handleUnsupported(request, responseObserver);
} else {
- CommandBus commandBus = boundedContext.commandBus();
+ CommandBus commandBus = context.commandBus();
commandBus.post(request, responseObserver);
}
}
@@ -86,17 +86,20 @@ private void handleUnsupported(Command command, StreamObserver responseObse
responseObserver.onCompleted();
}
+ /**
+ * The builder for a {@code CommandService}.
+ */
public static class Builder {
- private final Set boundedContexts = Sets.newHashSet();
+ private final Set contexts = Sets.newHashSet();
/**
* Adds the {@code BoundedContext} to the builder.
*/
@CanIgnoreReturnValue
- public Builder add(BoundedContext boundedContext) {
+ public Builder add(BoundedContext context) {
// Saves it to a temporary set so that it is easy to remove it if needed.
- boundedContexts.add(boundedContext);
+ contexts.add(context);
return this;
}
@@ -104,19 +107,19 @@ public Builder add(BoundedContext boundedContext) {
* Removes the {@code BoundedContext} from the builder.
*/
@CanIgnoreReturnValue
- public Builder remove(BoundedContext boundedContext) {
- boundedContexts.remove(boundedContext);
+ public Builder remove(BoundedContext context) {
+ contexts.remove(context);
return this;
}
/**
* Verifies if the passed {@code BoundedContext} was previously added to the builder.
*
- * @param boundedContext the instance to check
+ * @param context the instance to check
* @return {@code true} if the instance was added to the builder, {@code false} otherwise
*/
- public boolean contains(BoundedContext boundedContext) {
- boolean contains = boundedContexts.contains(boundedContext);
+ public boolean contains(BoundedContext context) {
+ boolean contains = contexts.contains(context);
return contains;
}
@@ -135,7 +138,7 @@ public CommandService build() {
*/
private ImmutableMap createMap() {
ImmutableMap.Builder builder = ImmutableMap.builder();
- for (BoundedContext boundedContext : boundedContexts) {
+ for (BoundedContext boundedContext : contexts) {
putIntoMap(boundedContext, builder);
}
return builder.build();
@@ -145,12 +148,12 @@ private ImmutableMap createMap() {
* Associates {@code CommandClass}es with the instance of {@code BoundedContext}
* that handles such commands.
*/
- private static void putIntoMap(BoundedContext boundedContext,
+ private static void putIntoMap(BoundedContext context,
ImmutableMap.Builder builder) {
- CommandBus commandBus = boundedContext.commandBus();
+ CommandBus commandBus = context.commandBus();
Set cmdClasses = commandBus.registeredCommandClasses();
for (CommandClass commandClass : cmdClasses) {
- builder.put(commandClass, boundedContext);
+ builder.put(commandClass, context);
}
}
}
diff --git a/core/src/main/java/io/spine/core/MessageRejection.java b/server/src/main/java/io/spine/server/MessageError.java
similarity index 72%
rename from core/src/main/java/io/spine/core/MessageRejection.java
rename to server/src/main/java/io/spine/server/MessageError.java
index 72984a8151b..b317b2b2970 100644
--- a/core/src/main/java/io/spine/core/MessageRejection.java
+++ b/server/src/main/java/io/spine/server/MessageError.java
@@ -18,31 +18,24 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-package io.spine.core;
+package io.spine.server;
import io.spine.annotation.Internal;
import io.spine.base.Error;
/**
- * The report about a message being rejected from processing.
- *
- *
Such message could be:
- *
- *
an event;
- *
a command;
- *
an actor request (query, topic or subscription).
- *
+ * A common interface for errors that prevent a message from being processed by the server.
*/
@Internal
-public interface MessageRejection {
+public interface MessageError {
/**
- * Converts this {@code MessageRejection} into an {@link Error io.spine.base.Error}.
+ * Converts this instance into an {@link Error io.spine.base.Error}.
*/
Error asError();
/**
- * Converts this {@code MessageRejection} into a {@link Throwable java.lang.Throwable}.
+ * Converts this instance into a {@link Throwable java.lang.Throwable}.
*/
Throwable asThrowable();
}
diff --git a/core/src/main/java/io/spine/core/MessageInvalid.java b/server/src/main/java/io/spine/server/MessageInvalid.java
similarity index 79%
rename from core/src/main/java/io/spine/core/MessageInvalid.java
rename to server/src/main/java/io/spine/server/MessageInvalid.java
index f4fa20bcebd..dc5f48afc7a 100644
--- a/core/src/main/java/io/spine/core/MessageInvalid.java
+++ b/server/src/main/java/io/spine/server/MessageInvalid.java
@@ -18,15 +18,15 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-package io.spine.core;
+package io.spine.server;
import io.spine.annotation.Internal;
/**
- * An interface for the {@link MessageRejection} types which report a message being invalid.
+ * An interface for the {@link MessageError} types which report a message being invalid.
*
- *
Except the methods declared in {@link MessageRejection}, this type is a marker interface.
+ *
Except the methods declared in {@link MessageError}, this type is a marker interface.
*/
@Internal
-public interface MessageInvalid extends MessageRejection {
+public interface MessageInvalid extends MessageError {
}
diff --git a/server/src/main/java/io/spine/server/aggregate/ImportValidator.java b/server/src/main/java/io/spine/server/aggregate/ImportValidator.java
index 5b1201009f4..c0b3ab36135 100644
--- a/server/src/main/java/io/spine/server/aggregate/ImportValidator.java
+++ b/server/src/main/java/io/spine/server/aggregate/ImportValidator.java
@@ -21,7 +21,7 @@
package io.spine.server.aggregate;
import io.spine.base.EventMessage;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.bus.EnvelopeValidator;
import io.spine.server.type.EventEnvelope;
import io.spine.validate.ConstraintViolation;
diff --git a/server/src/main/java/io/spine/server/bus/EnvelopeValidator.java b/server/src/main/java/io/spine/server/bus/EnvelopeValidator.java
index 64f101eca5c..f6ffb1c175e 100644
--- a/server/src/main/java/io/spine/server/bus/EnvelopeValidator.java
+++ b/server/src/main/java/io/spine/server/bus/EnvelopeValidator.java
@@ -20,7 +20,7 @@
package io.spine.server.bus;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.type.MessageEnvelope;
import java.util.Optional;
diff --git a/server/src/main/java/io/spine/server/bus/MessageUnhandled.java b/server/src/main/java/io/spine/server/bus/MessageUnhandled.java
index ab9da8975e0..0620b9ebd30 100644
--- a/server/src/main/java/io/spine/server/bus/MessageUnhandled.java
+++ b/server/src/main/java/io/spine/server/bus/MessageUnhandled.java
@@ -21,14 +21,14 @@
package io.spine.server.bus;
import io.spine.annotation.Internal;
-import io.spine.core.MessageRejection;
+import io.spine.server.MessageError;
/**
- * An interface for the {@link MessageRejection} types which report an unhandled message being
+ * An interface for the {@link MessageError} types which report an unhandled message being
* posted into a {@link Bus}.
*
- *
Except the methods declared in {@link MessageRejection}, this type is a marker interface.
+ *
Except the methods declared in {@link MessageError}, this type is a marker interface.
*/
@Internal
-public interface MessageUnhandled extends MessageRejection {
+public interface MessageUnhandled extends MessageError {
}
diff --git a/server/src/main/java/io/spine/server/bus/ValidatingFilter.java b/server/src/main/java/io/spine/server/bus/ValidatingFilter.java
index a0fb89ca733..da981dd5a38 100644
--- a/server/src/main/java/io/spine/server/bus/ValidatingFilter.java
+++ b/server/src/main/java/io/spine/server/bus/ValidatingFilter.java
@@ -25,7 +25,7 @@
import io.spine.base.Error;
import io.spine.base.Identifier;
import io.spine.core.Ack;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.type.MessageEnvelope;
import java.util.Optional;
diff --git a/server/src/main/java/io/spine/server/commandbus/CommandException.java b/server/src/main/java/io/spine/server/commandbus/CommandException.java
index 5b6cc4f2b20..19bca5fa8e2 100644
--- a/server/src/main/java/io/spine/server/commandbus/CommandException.java
+++ b/server/src/main/java/io/spine/server/commandbus/CommandException.java
@@ -27,7 +27,7 @@
import io.spine.code.java.ClassName;
import io.spine.core.Command;
import io.spine.core.CommandValidationError;
-import io.spine.core.MessageRejection;
+import io.spine.server.MessageError;
import io.spine.server.type.CommandEnvelope;
import io.spine.type.TypeName;
@@ -38,7 +38,7 @@
/**
* Abstract base for exceptions related to commands.
*/
-public abstract class CommandException extends RuntimeException implements MessageRejection {
+public abstract class CommandException extends RuntimeException implements MessageError {
/**
* The name of the attribute of the command type reported in an error.
diff --git a/server/src/main/java/io/spine/server/commandbus/CommandValidator.java b/server/src/main/java/io/spine/server/commandbus/CommandValidator.java
index 4de283e4ab6..e7fbf65289e 100644
--- a/server/src/main/java/io/spine/server/commandbus/CommandValidator.java
+++ b/server/src/main/java/io/spine/server/commandbus/CommandValidator.java
@@ -26,8 +26,8 @@
import io.spine.base.CommandMessage;
import io.spine.core.Command;
import io.spine.core.CommandId;
-import io.spine.core.MessageInvalid;
import io.spine.core.TenantId;
+import io.spine.server.MessageInvalid;
import io.spine.server.bus.EnvelopeValidator;
import io.spine.server.type.CommandEnvelope;
import io.spine.validate.ConstraintViolation;
diff --git a/server/src/main/java/io/spine/server/commandbus/InvalidCommandException.java b/server/src/main/java/io/spine/server/commandbus/InvalidCommandException.java
index 8f80a760f18..20ff5a9d0e5 100644
--- a/server/src/main/java/io/spine/server/commandbus/InvalidCommandException.java
+++ b/server/src/main/java/io/spine/server/commandbus/InvalidCommandException.java
@@ -22,11 +22,12 @@
import com.google.protobuf.Message;
import com.google.protobuf.Value;
+import io.spine.base.CommandMessage;
import io.spine.base.Error;
import io.spine.base.Identifier;
import io.spine.core.Command;
import io.spine.core.CommandValidationError;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.type.CommandClass;
import io.spine.server.type.CommandEnvelope;
import io.spine.type.TypeName;
@@ -35,6 +36,7 @@
import java.util.Map;
+import static com.google.protobuf.TextFormat.shortDebugString;
import static java.lang.String.format;
/**
@@ -78,7 +80,7 @@ public static InvalidCommandException missingTenantId(Command command) {
Message commandMessage = envelope.message();
String errMsg = format(
"The command (class: `%s`, type: `%s`, id: `%s`) is posted to " +
- "multitenant Command Bus, but has no `tenant_id` attribute in the context.",
+ "multi-tenant Bounded Context, but has no `tenant_id` attribute in the context.",
CommandClass.of(commandMessage)
.value()
.getName(),
@@ -103,21 +105,24 @@ public static Error unknownTenantError(Message commandMessage, String errorText)
return error;
}
+ /**
+ * Creates an exception for the command which specifies a tenant in a single-tenant context.
+ */
public static InvalidCommandException inapplicableTenantId(Command command) {
CommandEnvelope cmd = CommandEnvelope.of(command);
TypeName typeName = TypeName.of(cmd.message());
String errMsg = format(
- "The command (class: %s, type: %s, id: %s) was posted to single-tenant " +
- "CommandBus, but has tenant_id: %s attribute set in the command context.",
+ "The command (class: `%s`, type: `%s`, id: `%s`) was posted to a single-tenant " +
+ "Bounded Context, but has `tenant_id` (`%s`) attribute set in the command context.",
cmd.messageClass(),
typeName,
- cmd.id(),
- cmd.tenantId());
+ shortDebugString(cmd.id()),
+ shortDebugString(cmd.tenantId()));
Error error = inapplicableTenantError(cmd.message(), errMsg);
return new InvalidCommandException(errMsg, command, error);
}
- private static Error inapplicableTenantError(Message commandMessage, String errMsg) {
+ private static Error inapplicableTenantError(CommandMessage commandMessage, String errMsg) {
Error error = Error
.newBuilder()
.setType(InvalidCommandException.class.getCanonicalName())
diff --git a/server/src/main/java/io/spine/server/event/EventComparator.java b/server/src/main/java/io/spine/server/event/EventComparator.java
index 58b6f624fb3..7798cc384b5 100644
--- a/server/src/main/java/io/spine/server/event/EventComparator.java
+++ b/server/src/main/java/io/spine/server/event/EventComparator.java
@@ -65,7 +65,7 @@ private static final class Chronological extends EventComparator {
@Override
public int compare(Event e1, Event e2) {
int result = Comparator
- .comparing(Event::time, Timestamps.comparator())
+ .comparing(Event::timestamp, Timestamps.comparator())
.thenComparing((e) -> e.getContext()
.getVersion()
.getNumber())
diff --git a/server/src/main/java/io/spine/server/event/EventException.java b/server/src/main/java/io/spine/server/event/EventException.java
index 339deb4498b..88317dadc91 100644
--- a/server/src/main/java/io/spine/server/event/EventException.java
+++ b/server/src/main/java/io/spine/server/event/EventException.java
@@ -25,7 +25,7 @@
import com.google.protobuf.Value;
import io.spine.base.Error;
import io.spine.base.EventMessage;
-import io.spine.core.MessageRejection;
+import io.spine.server.MessageError;
import io.spine.type.TypeName;
import java.util.Map;
@@ -33,7 +33,7 @@
/**
* A base for exceptions related to events.
*/
-public abstract class EventException extends RuntimeException implements MessageRejection {
+public abstract class EventException extends RuntimeException implements MessageError {
private static final long serialVersionUID = 0L;
diff --git a/server/src/main/java/io/spine/server/event/EventValidator.java b/server/src/main/java/io/spine/server/event/EventValidator.java
index d3da4037b82..3ba8b159002 100644
--- a/server/src/main/java/io/spine/server/event/EventValidator.java
+++ b/server/src/main/java/io/spine/server/event/EventValidator.java
@@ -22,7 +22,7 @@
import io.spine.base.EventMessage;
import io.spine.core.Event;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.bus.EnvelopeValidator;
import io.spine.server.type.EventEnvelope;
import io.spine.validate.ConstraintViolation;
diff --git a/server/src/main/java/io/spine/server/event/InvalidEventException.java b/server/src/main/java/io/spine/server/event/InvalidEventException.java
index 56b20a30f68..69e52dc2ef5 100644
--- a/server/src/main/java/io/spine/server/event/InvalidEventException.java
+++ b/server/src/main/java/io/spine/server/event/InvalidEventException.java
@@ -25,7 +25,7 @@
import io.spine.base.Error;
import io.spine.base.EventMessage;
import io.spine.core.EventValidationError;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.type.EventClass;
import io.spine.validate.ConstraintViolation;
import io.spine.validate.ExceptionFactory;
diff --git a/server/src/main/java/io/spine/server/model/ArgumentFilter.java b/server/src/main/java/io/spine/server/model/ArgumentFilter.java
index 3eaef441853..4b3e106c877 100644
--- a/server/src/main/java/io/spine/server/model/ArgumentFilter.java
+++ b/server/src/main/java/io/spine/server/model/ArgumentFilter.java
@@ -28,7 +28,6 @@
import io.spine.base.EventMessage;
import io.spine.base.Field;
import io.spine.base.FieldPath;
-import io.spine.core.ByField;
import io.spine.core.Subscribe;
import io.spine.core.Where;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -83,12 +82,12 @@ private static ArgumentFilter acceptingAll() {
*
If the method is not annotated for filtering, the returned instance
* {@linkplain ArgumentFilter#acceptsAll() accepts all} arguments.
*/
- @SuppressWarnings("deprecation") // still need to support `ByField` when building older models.
public static ArgumentFilter createFilter(Method method) {
Subscribe annotation = method.getAnnotation(Subscribe.class);
checkAnnotated(method, annotation);
@Nullable Where where = filterAnnotationOf(method);
- ByField byField = annotation.filter();
+ @SuppressWarnings("deprecation") // still need `ByField` when building older models.
+ io.spine.core.ByField byField = annotation.filter();
boolean byFieldEmpty = byField.path().isEmpty();
String fieldPath;
String value;
@@ -112,7 +111,8 @@ public static ArgumentFilter createFilter(Method method) {
*/
private static void checkNoByFieldAnnotation(boolean byFieldEmpty, Method method) {
String where = Where.class.getName();
- String byField = ByField.class.getName();
+ @SuppressWarnings("deprecation") // still need `ByField` when building older models.
+ String byField = io.spine.core.ByField.class.getName();
checkState(
byFieldEmpty,
"The subscriber method `%s()` has `@%s` and `@%s`" +
diff --git a/server/src/main/java/io/spine/server/stand/InvalidRequestException.java b/server/src/main/java/io/spine/server/stand/InvalidRequestException.java
index 2f70b2d6e7c..d25585a4c9f 100644
--- a/server/src/main/java/io/spine/server/stand/InvalidRequestException.java
+++ b/server/src/main/java/io/spine/server/stand/InvalidRequestException.java
@@ -22,13 +22,13 @@
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Message;
import io.spine.base.Error;
-import io.spine.core.MessageRejection;
+import io.spine.server.MessageError;
/**
* A base class for exceptions fired in case an invalid request
* has been submitted to {@linkplain Stand}.
*/
-public class InvalidRequestException extends RuntimeException implements MessageRejection {
+public class InvalidRequestException extends RuntimeException implements MessageError {
private static final long serialVersionUID = 0L;
diff --git a/server/src/main/java/io/spine/server/transport/Statuses.java b/server/src/main/java/io/spine/server/transport/Statuses.java
index ea8950be5d0..098a44799c7 100644
--- a/server/src/main/java/io/spine/server/transport/Statuses.java
+++ b/server/src/main/java/io/spine/server/transport/Statuses.java
@@ -25,8 +25,8 @@
import io.grpc.StatusRuntimeException;
import io.spine.annotation.Internal;
import io.spine.base.Error;
-import io.spine.core.MessageRejection;
import io.spine.grpc.MetadataConverter;
+import io.spine.server.MessageError;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.Status.INVALID_ARGUMENT;
@@ -46,13 +46,13 @@ private Statuses() {
* {@code Status.INVALID_ARGUMENT} with the passed cause.
*
*
Resulting {@code StatusRuntimeException} will contain the passed
- * {@link MessageRejection} transformed to
+ * {@link MessageError} transformed to
* the {@linkplain StatusRuntimeException#getTrailers() metadata}.
*
* @param cause the exception cause
* @return the constructed {@code StatusRuntimeException}
*/
- public static StatusRuntimeException invalidArgumentWithCause(MessageRejection cause) {
+ public static StatusRuntimeException invalidArgumentWithCause(MessageError cause) {
checkNotNull(cause);
return createException(cause.asThrowable(), cause.asError());
}
@@ -60,7 +60,7 @@ public static StatusRuntimeException invalidArgumentWithCause(MessageRejection c
/**
* Constructs the {@code StatusRuntimeException} with the given cause and error.
*
- * @see #invalidArgumentWithCause(MessageRejection)
+ * @see #invalidArgumentWithCause(MessageError)
*/
private static StatusRuntimeException createException(Throwable cause, Error error) {
Metadata metadata = MetadataConverter.toMetadata(error);
diff --git a/server/src/test/java/io/spine/client/AbstractClientTest.java b/server/src/test/java/io/spine/client/AbstractClientTest.java
index 20028a91b60..9132e70db7d 100644
--- a/server/src/test/java/io/spine/client/AbstractClientTest.java
+++ b/server/src/test/java/io/spine/client/AbstractClientTest.java
@@ -66,11 +66,14 @@ void createServerAndClient() throws IOException {
server = serverBuilder.build();
server.start();
- client = Client.inProcess(serverName)
- // When shutting down, terminate the client immediately since all
- // the requests made in tests are going to be complete by that time.
- .shutdownTimout(0, TimeUnit.SECONDS)
- .build();
+ client = newClientBuilder(serverName).build();
+ }
+
+ protected Client.Builder newClientBuilder(String serverName) {
+ return Client.inProcess(serverName)
+ // When shutting down, terminate the client immediately since all
+ // the requests made in tests are going to be complete by that time.
+ .shutdownTimout(0, TimeUnit.SECONDS);
}
@AfterEach
diff --git a/server/src/test/java/io/spine/client/ClientErrorHandlersTest.java b/server/src/test/java/io/spine/client/ClientErrorHandlersTest.java
new file mode 100644
index 00000000000..6c3b3e113b4
--- /dev/null
+++ b/server/src/test/java/io/spine/client/ClientErrorHandlersTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020, TeamDev. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.client;
+
+import com.google.common.collect.ImmutableList;
+import io.spine.server.BoundedContextBuilder;
+import io.spine.test.client.ClientTestContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+@DisplayName("`Client` should pass error handlers to `ClientRequest` for")
+public class ClientErrorHandlersTest extends AbstractClientTest {
+
+ @SuppressWarnings("UnnecessaryLambda") /* We need the reference for later checking.
+ Using a method reference as advised by Error Prone is not practical for the purpose of
+ these tests. */
+ private final ErrorHandler errorHandler = (throwable) -> {};
+
+ @SuppressWarnings("UnnecessaryLambda")
+ private final ServerErrorHandler serverErrorHandler = (message, error) -> {};
+
+ private ClientRequest request;
+
+ @Override
+ protected ImmutableList contexts() {
+ return ImmutableList.of(ClientTestContext.users());
+ }
+
+ /**
+ * Adds custom error handlers for the client instance to be used in this test suite.
+ */
+ @Override
+ protected Client.Builder newClientBuilder(String serverName) {
+ Client.Builder builder = super.newClientBuilder(serverName);
+ builder.onStreamingError(errorHandler)
+ .onServerError(serverErrorHandler);
+ return builder;
+ }
+
+ @BeforeEach
+ void createRequest() {
+ request = client().asGuest();
+ }
+
+ @Test
+ @DisplayName("a streaming error handler")
+ void streamingHandler() {
+ assertThat(request.streamingErrorHandler())
+ .isEqualTo(errorHandler);
+ }
+
+ @Test
+ @DisplayName("a posting error handler")
+ void postingHandler() {
+ assertThat(request.serverErrorHandler())
+ .isEqualTo(serverErrorHandler);
+ }
+}
diff --git a/server/src/test/java/io/spine/client/CommandRequestTest.java b/server/src/test/java/io/spine/client/CommandRequestTest.java
index 1789546d995..8a36b3df5ac 100644
--- a/server/src/test/java/io/spine/client/CommandRequestTest.java
+++ b/server/src/test/java/io/spine/client/CommandRequestTest.java
@@ -21,29 +21,41 @@
package io.spine.client;
import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Message;
import io.spine.base.CommandMessage;
+import io.spine.base.Error;
+import io.spine.base.EventMessage;
+import io.spine.core.Command;
+import io.spine.protobuf.AnyPacker;
import io.spine.server.BoundedContextBuilder;
import io.spine.test.client.ClientTestContext;
import io.spine.test.client.users.command.LogInUser;
+import io.spine.test.client.users.command.UnsupportedCommand;
import io.spine.test.client.users.event.UserAccountCreated;
import io.spine.test.client.users.event.UserLoggedIn;
import io.spine.test.client.users.rejection.Rejections.UserAlreadyLoggedIn;
import io.spine.testing.core.given.GivenUserId;
import io.spine.testing.logging.MuteLogging;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@MuteLogging
@DisplayName("`CommandRequest` should")
class CommandRequestTest extends AbstractClientTest {
+ /** The object under the test. */
+ private CommandRequest commandRequest;
+
/** Registers which event consumers were called. */
private final ConsumerCallCounter counter = new ConsumerCallCounter();
- private CommandRequest commandRequest;
@BeforeEach
void createCommandRequest() {
@@ -98,4 +110,107 @@ void rejections() {
assertThat(counter.contains(UserAlreadyLoggedIn.class))
.isTrue();
}
+
+ @Nested
+ @DisplayName("Allow setting custom streaming error handler")
+ class CustomStreamingErrorHandler {
+
+ @Test
+ @DisplayName("rejecting `null`")
+ void rejectingNull() {
+ assertThrows(NullPointerException.class, () -> commandRequest.onStreamingError(null));
+ }
+ }
+
+ @Nested
+ @DisplayName("Allow setting custom consumer error handler")
+ class CustomConsumerErrorHandler {
+
+ private boolean handlerInvoked;
+ private @Nullable Throwable passedThrowable;
+
+ @BeforeEach
+ void setup() {
+ handlerInvoked = false;
+ passedThrowable = null;
+ }
+
+ @Test
+ @DisplayName("rejecting `null`")
+ void rejectingNull() {
+ assertThrows(NullPointerException.class, () -> commandRequest.onConsumingError(null));
+ }
+
+ @Test
+ @DisplayName("invoking the handler when a consumer fails")
+ void invocation() {
+ ConsumerErrorHandler handler = (c, th) -> {
+ handlerInvoked = true;
+ passedThrowable = th;
+ };
+ RuntimeException exception = new RuntimeException("Consumer-generated error.");
+
+ commandRequest.onConsumingError(handler)
+ .observe(UserLoggedIn.class, e -> {
+ throw exception;
+ })
+ .post();
+
+ assertThat(handlerInvoked)
+ .isTrue();
+ assertThat(passedThrowable)
+ .isEqualTo(exception);
+ }
+ }
+
+ @Nested
+ @DisplayName("Allow setting custom posting error handler")
+ class CustomServerErrorHandler {
+
+ private @Nullable Message postedMessage;
+ private @Nullable Error returnedError;
+
+ @BeforeEach
+ void setup() {
+ postedMessage = null;
+ returnedError = null;
+ }
+
+ @Test
+ @DisplayName("rejecting `null`")
+ void rejectingNull() {
+ assertThrows(NullPointerException.class, () -> commandRequest.onServerError(null));
+ }
+
+ @Test
+ @DisplayName("invoking handler when an invalid command posted")
+ void invocation() {
+ ServerErrorHandler handler = (message, error) -> {
+ postedMessage = message;
+ returnedError = error;
+ };
+
+ UnsupportedCommand commandMessage = UnsupportedCommand
+ .newBuilder()
+ .setUser(GivenUserId.generated())
+ .build();
+ CommandRequest request =
+ client().asGuest()
+ .command(commandMessage)
+ .onServerError(handler);
+ request.post();
+
+ assertThat(returnedError)
+ .isNotNull();
+ assertThat(postedMessage)
+ .isInstanceOf(Command.class);
+ Command expected = Command
+ .newBuilder()
+ .setMessage(AnyPacker.pack(commandMessage))
+ .build();
+ assertThat(postedMessage)
+ .comparingExpectedFieldsOnly()
+ .isEqualTo(expected);
+ }
+ }
}
diff --git a/server/src/test/java/io/spine/client/ConsumerCallCounter.java b/server/src/test/java/io/spine/client/ConsumerCallCounter.java
index 5d2ab0e568f..c5362a38d29 100644
--- a/server/src/test/java/io/spine/client/ConsumerCallCounter.java
+++ b/server/src/test/java/io/spine/client/ConsumerCallCounter.java
@@ -42,7 +42,7 @@ void add(EventMessage e) {
add(e.getClass());
}
- void add(Class extends EventMessage> eventType) {
+ private void add(Class extends EventMessage> eventType) {
eventTypes.add(eventType);
}
diff --git a/server/src/test/java/io/spine/core/EventTest.java b/server/src/test/java/io/spine/core/EventTest.java
index 3c4fcaa3961..7fd52a67066 100644
--- a/server/src/test/java/io/spine/core/EventTest.java
+++ b/server/src/test/java/io/spine/core/EventTest.java
@@ -116,7 +116,7 @@ void message() {
void timestamp() {
Event event = GivenEvent.occurredMinutesAgo(1);
- assertThat(event.time())
+ assertThat(event.timestamp())
.isEqualTo(event.context()
.getTimestamp());
}
diff --git a/server/src/test/java/io/spine/server/StatusesTest.java b/server/src/test/java/io/spine/server/StatusesTest.java
index f8fe9b3709b..37feac94b67 100644
--- a/server/src/test/java/io/spine/server/StatusesTest.java
+++ b/server/src/test/java/io/spine/server/StatusesTest.java
@@ -24,7 +24,6 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.spine.base.Error;
-import io.spine.core.MessageRejection;
import io.spine.grpc.MetadataConverter;
import io.spine.server.event.UnsupportedEventException;
import io.spine.server.transport.Statuses;
@@ -61,7 +60,7 @@ void passNullToleranceCheck() {
@Test
@DisplayName("create invalid argument status exception")
void createInvalidArgumentStatusEx() {
- MessageRejection rejection =
+ MessageError rejection =
new UnsupportedEventException(Sample.messageOfType(ProjectCreated.class));
StatusRuntimeException statusRuntimeEx = invalidArgumentWithCause(rejection);
@SuppressWarnings("OptionalGetWithoutIsPresent")
diff --git a/server/src/test/java/io/spine/server/event/EventValidatorTest.java b/server/src/test/java/io/spine/server/event/EventValidatorTest.java
index 6ce31751ad4..ce395e1801b 100644
--- a/server/src/test/java/io/spine/server/event/EventValidatorTest.java
+++ b/server/src/test/java/io/spine/server/event/EventValidatorTest.java
@@ -23,7 +23,7 @@
import io.spine.base.Error;
import io.spine.core.Event;
import io.spine.core.EventValidationError;
-import io.spine.core.MessageInvalid;
+import io.spine.server.MessageInvalid;
import io.spine.server.type.EventEnvelope;
import io.spine.test.event.ProjectCreated;
import org.junit.jupiter.api.DisplayName;
diff --git a/server/src/test/java/io/spine/server/given/groups/FilteredStateSubscriber.java b/server/src/test/java/io/spine/server/given/groups/FilteredStateSubscriber.java
index f4ad2d29967..0e087d4e73a 100644
--- a/server/src/test/java/io/spine/server/given/groups/FilteredStateSubscriber.java
+++ b/server/src/test/java/io/spine/server/given/groups/FilteredStateSubscriber.java
@@ -20,7 +20,6 @@
package io.spine.server.given.groups;
-import io.spine.core.ByField;
import io.spine.core.Subscribe;
import io.spine.server.event.AbstractEventSubscriber;
import io.spine.server.given.organizations.Organization;
@@ -39,7 +38,7 @@
public class FilteredStateSubscriber extends AbstractEventSubscriber {
@Subscribe(
- filter = @ByField(path = "head.value", value = "42") // <-- Error here. Shouldn't have a filter.
+ filter = @io.spine.core.ByField(path = "head.value", value = "42") // <-- Error here. Shouldn't have a filter.
)
void on(Organization organization) {
halt();
diff --git a/server/src/test/java/io/spine/server/integration/given/DocumentAggregate.java b/server/src/test/java/io/spine/server/integration/given/DocumentAggregate.java
index 5499cd64b7c..1e50246ceda 100644
--- a/server/src/test/java/io/spine/server/integration/given/DocumentAggregate.java
+++ b/server/src/test/java/io/spine/server/integration/given/DocumentAggregate.java
@@ -54,7 +54,7 @@ DocumentCreated handle(CreateDocument command, CommandContext context) {
return DocumentCreated
.newBuilder()
.setId(command.getId())
- .setOwner(context.getActorContext().getActor())
+ .setOwner(context.actor())
.setWhenCreated(Now.get().asLocalDateTime())
.vBuild();
}
@@ -63,7 +63,7 @@ DocumentCreated handle(CreateDocument command, CommandContext context) {
TextEdited handle(EditText command, CommandContext context) {
Edit edit = Edit
.newBuilder()
- .setEditor(context.getActorContext().getActor())
+ .setEditor(context.actor())
.setPosition(command.getPosition())
.setTextAdded(command.getNewText())
.setCharsDeleted(command.getCharsToDelete())
diff --git a/server/src/test/java/io/spine/server/model/given/filter/Bucket.java b/server/src/test/java/io/spine/server/model/given/filter/Bucket.java
index b2bc8a6d7b3..1d80335a8e5 100644
--- a/server/src/test/java/io/spine/server/model/given/filter/Bucket.java
+++ b/server/src/test/java/io/spine/server/model/given/filter/Bucket.java
@@ -20,7 +20,6 @@
package io.spine.server.model.given.filter;
-import io.spine.core.ByField;
import io.spine.core.Subscribe;
import io.spine.core.Where;
@@ -57,7 +56,7 @@ void onlyPeas(@Where(field = "kind", equals = "PEA") BeanAdded e) {
}
@SuppressWarnings("deprecation") // to be migrated during removal of `@ByField`.
- @Subscribe(filter = @ByField(path = "kind", value = "BEAN"))
+ @Subscribe(filter = @io.spine.core.ByField(path = "kind", value = "BEAN"))
void onlyBeans(BeanAdded e) {
beans = beans + e.getNumber();
}
diff --git a/server/src/test/java/io/spine/server/model/given/map/DupEventFilterValue.java b/server/src/test/java/io/spine/server/model/given/map/DupEventFilterValue.java
index 6335bc56d5b..2410ff1dce4 100644
--- a/server/src/test/java/io/spine/server/model/given/map/DupEventFilterValue.java
+++ b/server/src/test/java/io/spine/server/model/given/map/DupEventFilterValue.java
@@ -20,7 +20,6 @@
package io.spine.server.model.given.map;
-import io.spine.core.ByField;
import io.spine.core.Subscribe;
import io.spine.server.projection.Projection;
import io.spine.server.projection.given.SavedString;
@@ -42,12 +41,12 @@ private DupEventFilterValue(String id) {
super(id);
}
- @Subscribe(filter = @ByField(path = VALUE_FIELD_PATH, value = "1"))
+ @Subscribe(filter = @io.spine.core.ByField(path = VALUE_FIELD_PATH, value = "1"))
void onString1(Int32Imported event) {
halt();
}
- @Subscribe(filter = @ByField(path = VALUE_FIELD_PATH, value = "+1"))
+ @Subscribe(filter = @io.spine.core.ByField(path = VALUE_FIELD_PATH, value = "+1"))
void onStringOne(Int32Imported event) {
halt();
}
diff --git a/server/src/test/java/io/spine/server/route/given/sur/MagazineAggregate.java b/server/src/test/java/io/spine/server/route/given/sur/MagazineAggregate.java
index 4ebf910fe40..730f211e4bf 100644
--- a/server/src/test/java/io/spine/server/route/given/sur/MagazineAggregate.java
+++ b/server/src/test/java/io/spine/server/route/given/sur/MagazineAggregate.java
@@ -56,8 +56,7 @@ ArtistName author(PublishArticle cmd, CommandContext ctx) {
}
return ArtistName.newBuilder()
- .setValue(ctx.getActorContext()
- .getActor()
+ .setValue(ctx.actor()
.getValue())
.vBuild();
}
diff --git a/server/src/test/java/io/spine/server/type/CommandEnvelopeTest.java b/server/src/test/java/io/spine/server/type/CommandEnvelopeTest.java
index f6ba64c1189..d7ed11f201b 100644
--- a/server/src/test/java/io/spine/server/type/CommandEnvelopeTest.java
+++ b/server/src/test/java/io/spine/server/type/CommandEnvelopeTest.java
@@ -73,8 +73,7 @@ void getActorContext() {
CommandEnvelope envelope = toEnvelope(command);
assertThat(envelope.actorContext())
- .isEqualTo(command.context()
- .getActorContext());
+ .isEqualTo(command.actorContext());
}
@Test
diff --git a/server/src/test/java/io/spine/server/type/EventEnvelopeTest.java b/server/src/test/java/io/spine/server/type/EventEnvelopeTest.java
index 3b597d27025..64c9ea7c91c 100644
--- a/server/src/test/java/io/spine/server/type/EventEnvelopeTest.java
+++ b/server/src/test/java/io/spine/server/type/EventEnvelopeTest.java
@@ -67,7 +67,7 @@ void fromCommandContext() {
CommandContext commandContext = commandContext();
EventContext context = eventContext(commandContext);
EventEnvelope envelope = envelope(context);
- assertEquals(commandContext.getActorContext(), envelope.actorContext());
+ assertEquals(commandContext.actorContext(), envelope.actorContext());
}
@Test
@@ -77,7 +77,7 @@ void fromCommandContextOfOrigin() {
EventContext originContext = eventContext(commandContext);
EventContext context = eventContext(originContext);
EventEnvelope envelope = envelope(context);
- assertEquals(commandContext.getActorContext(), envelope.actorContext());
+ assertEquals(commandContext.actorContext(), envelope.actorContext());
}
@Test
diff --git a/server/src/test/proto/spine/test/client/users/commands.proto b/server/src/test/proto/spine/test/client/users/commands.proto
index 21dfbadec71..9f61cdd20e3 100644
--- a/server/src/test/proto/spine/test/client/users/commands.proto
+++ b/server/src/test/proto/spine/test/client/users/commands.proto
@@ -43,3 +43,9 @@ message CreateUserAccount {
core.UserId user = 1;
UserAccount account = 2 [(validate) = true, (required) = true];
}
+
+// A command which is not supported by the backend, and as such should
+// generate an error when posted.
+message UnsupportedCommand {
+ core.UserId user = 1;
+}
diff --git a/testutil-core/src/test/java/io/spine/testing/core/given/GivenCommandContextTest.java b/testutil-core/src/test/java/io/spine/testing/core/given/GivenCommandContextTest.java
index cb4c8b5f923..85736c5faa9 100644
--- a/testutil-core/src/test/java/io/spine/testing/core/given/GivenCommandContextTest.java
+++ b/testutil-core/src/test/java/io/spine/testing/core/given/GivenCommandContextTest.java
@@ -63,8 +63,8 @@ void createWithRandomActor() {
checkValid(first);
checkValid(second);
- ActorContext firstActorContext = first.getActorContext();
- ActorContext secondActorContext = second.getActorContext();
+ ActorContext firstActorContext = first.actorContext();
+ ActorContext secondActorContext = second.actorContext();
assertNotEquals(firstActorContext.getActor(), secondActorContext.getActor());
}
diff --git a/testutil-server/src/test/java/io/spine/testing/server/blackbox/given/BbProjectAggregate.java b/testutil-server/src/test/java/io/spine/testing/server/blackbox/given/BbProjectAggregate.java
index 160fc532285..dd20a179e5a 100644
--- a/testutil-server/src/test/java/io/spine/testing/server/blackbox/given/BbProjectAggregate.java
+++ b/testutil-server/src/test/java/io/spine/testing/server/blackbox/given/BbProjectAggregate.java
@@ -106,7 +106,7 @@ BbAssigneeAdded handle(BbAssignProject command) {
@Assign
BbAssigneeAdded handle(BbAssignSelf command, CommandContext context) {
- UserId assignee = context.getActorContext().getActor();
+ UserId assignee = context.actor();
return BbAssigneeAdded
.newBuilder()
.setId(id())
diff --git a/version.gradle b/version.gradle
index 2ee63d00e1b..c1423b716ef 100644
--- a/version.gradle
+++ b/version.gradle
@@ -25,7 +25,7 @@
* as we want to manage the versions in a single source.
*/
-final def spineVersion = '1.5.11'
+final def spineVersion = '1.5.12'
ext {
// The version of the modules in this project.