diff --git a/client/src/main/proto/spine/client/client.proto b/client/src/main/proto/spine/client/client.proto index a20e7cb2e7b..c9f439fd170 100644 --- a/client/src/main/proto/spine/client/client.proto +++ b/client/src/main/proto/spine/client/client.proto @@ -28,7 +28,6 @@ option java_outer_classname = "ClientProto"; option java_package = "org.spine3.client"; import "spine/annotations.proto"; -import "spine/base/event.proto"; // Version of the code executed on the client. message CodeVersion { diff --git a/client/src/main/proto/spine/client/client_service.proto b/client/src/main/proto/spine/client/client_service.proto index e8a347e8133..d17e8e03585 100644 --- a/client/src/main/proto/spine/client/client_service.proto +++ b/client/src/main/proto/spine/client/client_service.proto @@ -31,26 +31,11 @@ option java_generate_equals_and_hash = true; import "spine/annotations.proto"; import "spine/base/command.proto"; -import "spine/base/event.proto"; import "spine/base/response.proto"; -// A topic of interest the client can subscribe and unsubscribe. -message Topic { - //TODO:2016-01-14:alexander.yevsyukov: Define this type. E.g. there can be some structure, which describes many - // points of interest at once. See Pub-sub for possible API inspiration. Chances are it's going to be one of underlying - // implementations. - string value = 1; -} // A service for sending commands from clients. service ClientService { // Request to handle a command. rpc Post(base.Command) returns (base.Response); - - // Request to receive events on the topic of interest. - rpc Subscribe(Topic) returns (stream base.Event); - - // The request to unsubscribe from the topic. - // This should close the stream opened by `Subscribe` call with the same `Topic` value. - rpc Unsubscribe(Topic) returns (base.Response); } diff --git a/client/src/main/proto/spine/client/entities.proto b/client/src/main/proto/spine/client/entities.proto index 617ef40e6d2..ab9d3596f09 100644 --- a/client/src/main/proto/spine/client/entities.proto +++ b/client/src/main/proto/spine/client/entities.proto @@ -28,11 +28,8 @@ option java_outer_classname = "EntitiesProto"; option java_package = "org.spine3.client"; import "google/protobuf/any.proto"; -import "google/protobuf/field_mask.proto"; import "spine/annotations.proto"; -import "spine/ui/language.proto"; -import "spine/base/response.proto"; // Represents an ID of an entity. // diff --git a/client/src/main/proto/spine/client/query_service.proto b/client/src/main/proto/spine/client/query_service.proto index 4689c7d8c38..36e7afec695 100644 --- a/client/src/main/proto/spine/client/query_service.proto +++ b/client/src/main/proto/spine/client/query_service.proto @@ -27,7 +27,6 @@ option java_multiple_files = true; option java_outer_classname = "QueryServiceProto"; option java_generate_equals_and_hash = true; -import "google/protobuf/any.proto"; import "spine/annotations.proto"; import "spine/client/query.proto"; diff --git a/client/src/main/proto/spine/client/subscription.proto b/client/src/main/proto/spine/client/subscription.proto new file mode 100644 index 00000000000..ddd579cd8c0 --- /dev/null +++ b/client/src/main/proto/spine/client/subscription.proto @@ -0,0 +1,83 @@ +// +// Copyright 2016, TeamDev Ltd. All rights reserved. +// +// Redistribution and use in source and/or binary forms, with or without +// modification, must retain the above copyright notice and the following +// disclaimer. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +syntax = "proto3"; + +package spine.client; + +option (type_url_prefix) = "type.spine3.org"; +option java_generate_equals_and_hash = true; +option java_multiple_files = true; +option java_outer_classname = "SubscriptionProto"; +option java_package = "org.spine3.client"; + + +import "google/protobuf/any.proto"; +import "google/protobuf/field_mask.proto"; + +import "spine/annotations.proto"; +import "spine/base/response.proto"; +import "spine/client/entities.proto"; + +// An object defining a unit of subscription. +// +// Defines the target (entities and criteria) of subscription. +message Topic { + + // Defines the entity of interest, e.g. entity type URL and a set of subscription criteria. + Target target = 1; + + // Field mask to be applied to the entity updates applicable to this topic. + // + // Applied to each of the entity state messages before returning in scope of SubscriptionUpdate. + google.protobuf.FieldMask field_mask = 2; + + // Reserved for utility fields. + reserved 3 to 6; +} + +// Wrapped set of read-side entity updates on a topic with the specific subscription ID. +message SubscriptionUpdate { + + // The ID of the current subscription. + SubscriptionId id = 1; + + // Represents the base part of the response. I.e. whether the Topic subscription requires has been acked or not. + base.Response response = 2; + + // Reserved for more subscription update attributes. + reserved 3 to 9; + + // Entity updates packed as Any. + // + // Each of the update messages is affected by the field mask set for the current subscription. + repeated google.protobuf.Any updates = 10; +} + +// The ID of the subscription. +// +// Created when the client subscribes to a topic. +// See SubscriptionService#Subscribe(Topic). +message SubscriptionId { + + // Unique identifier of the subscription. + // + // Typically built using Java's UUID.toString() functionality. + string uuid = 1; +} diff --git a/client/src/main/proto/spine/client/subscription_service.proto b/client/src/main/proto/spine/client/subscription_service.proto new file mode 100644 index 00000000000..0f679b04671 --- /dev/null +++ b/client/src/main/proto/spine/client/subscription_service.proto @@ -0,0 +1,46 @@ +// +// Copyright 2016, TeamDev Ltd. All rights reserved. +// +// Redistribution and use in source and/or binary forms, with or without +// modification, must retain the above copyright notice and the following +// disclaimer. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +syntax = "proto3"; + +package spine.client; + +option (type_url_prefix) = "type.spine3.org"; +option java_package = "org.spine3.client.grpc"; +option java_multiple_files = true; +option java_outer_classname = "SubscriptionServiceProto"; +option java_generate_equals_and_hash = true; + + +import "spine/annotations.proto"; +import "spine/client/subscription.proto"; +import "spine/base/response.proto"; + +// A service for subscribing to the read-side changes by clients. +service SubscriptionService { + + // Subscribe to a particular read-side updates. + // + // Topic defines the target of subscription and other attributes (like field masks). + // The result is a SubscriptionUpdate stream, + rpc Subscribe (Topic) returns (stream SubscriptionUpdate); + + // Cancel the subscription by its ID. + rpc CancelSubscription (SubscriptionId) returns (base.Response); +} diff --git a/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java b/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java index c4599abcb6c..9df51a9117c 100644 --- a/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java +++ b/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java @@ -31,7 +31,6 @@ import org.spine3.base.Response; import org.spine3.client.CommandFactory; import org.spine3.client.grpc.ClientServiceGrpc; -import org.spine3.client.grpc.Topic; import org.spine3.examples.aggregate.command.AddOrderLine; import org.spine3.examples.aggregate.command.CreateOrder; import org.spine3.examples.aggregate.command.PayForOrder; @@ -66,9 +65,9 @@ public class ClientApp { private static final int SHUTDOWN_TIMEOUT_SEC = 5; private final CommandFactory commandFactory; - private final Topic topic = Topic.getDefaultInstance(); private final ManagedChannel channel; private final ClientServiceGrpc.ClientServiceBlockingStub blockingClient; + // TODO[alex.tymchenko]: switch to SubscriptionService instead. private final ClientServiceGrpc.ClientServiceStub nonBlockingClient; private final StreamObserver observer = new StreamObserver() { @@ -140,14 +139,10 @@ private Command payForOrder(OrderId orderId) { return commandFactory.create(msg); } - private void subscribe() { - nonBlockingClient.subscribe(topic, observer); - } /** Sends requests to the server. */ public static void main(String[] args) throws InterruptedException { final ClientApp client = new ClientApp(SERVICE_HOST, DEFAULT_CLIENT_SERVICE_PORT); - client.subscribe(); final List requests = client.generateRequests(); @@ -178,7 +173,6 @@ private List generateRequests() { * @throws InterruptedException if waiting is interrupted. */ private void shutdown() throws InterruptedException { - blockingClient.unsubscribe(topic); channel.shutdown().awaitTermination(SHUTDOWN_TIMEOUT_SEC, SECONDS); } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 58e7b927f5d..6d19c1542b6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-bin.zip diff --git a/server/src/main/java/org/spine3/server/ClientService.java b/server/src/main/java/org/spine3/server/ClientService.java index 7728c93007a..1ab5f73018b 100644 --- a/server/src/main/java/org/spine3/server/ClientService.java +++ b/server/src/main/java/org/spine3/server/ClientService.java @@ -26,10 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.spine3.base.Command; -import org.spine3.base.Event; import org.spine3.base.Response; -import org.spine3.base.Responses; -import org.spine3.client.grpc.Topic; import org.spine3.server.command.error.CommandException; import org.spine3.server.command.error.UnsupportedCommandException; import org.spine3.server.type.CommandClass; @@ -75,22 +72,6 @@ private static void handleUnsupported(Command request, StreamObserver responseObserver.onError(Statuses.invalidArgumentWithCause(unsupported)); } - @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status. - @Override - public void subscribe(Topic request, StreamObserver responseObserver) { - //TODO:2016-05-25:alexander.yevsyukov: Subscribe the client to the topic in the corresponding BoundedContext. - // This API is likely to change to support Firebase-like registration where listening is - // done by the client SDK implementation. - } - - @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status. - @Override - public void unsubscribe(Topic request, StreamObserver responseObserver) { - //TODO:2016-05-25:alexander.yevsyukov: Unsubscribe the client from the topic in the corresponding BoundedContext. - responseObserver.onNext(Responses.ok()); - responseObserver.onCompleted(); - } - public static class Builder { private final Set boundedContexts = Sets.newHashSet(); diff --git a/server/src/main/java/org/spine3/server/stand/Stand.java b/server/src/main/java/org/spine3/server/stand/Stand.java index 6f15d357b64..ecdc46bffc9 100644 --- a/server/src/main/java/org/spine3/server/stand/Stand.java +++ b/server/src/main/java/org/spine3/server/stand/Stand.java @@ -258,7 +258,7 @@ public void execute(Query query, StreamObserver responseObserver) private ImmutableCollection internalExecute(Query query) { - final ImmutableSet.Builder resultBuilder = ImmutableSet.builder(); + final ImmutableList.Builder resultBuilder = ImmutableList.builder(); final Target target = query.getTarget(); @@ -280,7 +280,7 @@ private ImmutableCollection internalExecute(Query query) { feedStateRecordsToBuilder(resultBuilder, stateRecords); } - final ImmutableSet result = resultBuilder.build(); + final ImmutableList result = resultBuilder.build(); return result; } @@ -293,7 +293,9 @@ private ImmutableCollection fetchFromStandStorage(Target ta } else { final EntityFilters filters = target.getFilters(); - if (filters != null && filters.getIdFilter() != null) { + + // TODO[alex.tymchenko]: do we need to check for null at all? How about, say, Python gRPC client? + if (filters != null && filters.getIdFilter() != null && !filters.getIdFilter().getIdsList().isEmpty()) { final EntityIdFilter idFilter = filters.getIdFilter(); final Collection stateIds = Collections2.transform(idFilter.getIdsList(), aggregateStateIdTransformer(typeUrl)); @@ -355,7 +357,7 @@ private static ImmutableCollection fetchFromEntityRepository(T return result; } - private static void feedEntitiesToBuilder(ImmutableSet.Builder resultBuilder, ImmutableCollection all) { + private static void feedEntitiesToBuilder(ImmutableList.Builder resultBuilder, ImmutableCollection all) { for (Entity record : all) { final Message state = record.getState(); final Any packedState = AnyPacker.pack(state); @@ -363,7 +365,7 @@ private static void feedEntitiesToBuilder(ImmutableSet.Builder resultBuilde } } - private static void feedStateRecordsToBuilder(ImmutableSet.Builder resultBuilder, ImmutableCollection all) { + private static void feedStateRecordsToBuilder(ImmutableList.Builder resultBuilder, ImmutableCollection all) { for (EntityStorageRecord record : all) { final Any state = record.getState(); resultBuilder.add(state); diff --git a/server/src/main/java/org/spine3/server/stand/StandFunnel.java b/server/src/main/java/org/spine3/server/stand/StandFunnel.java index 9e3cc659ce0..1c4e9afc294 100644 --- a/server/src/main/java/org/spine3/server/stand/StandFunnel.java +++ b/server/src/main/java/org/spine3/server/stand/StandFunnel.java @@ -108,8 +108,6 @@ public Stand getStand() { * *

The value must not be null. * - *

If this method is not used, a default value will be used. - * * @param stand the instance of {@link Stand}. * @return {@code this} instance of {@code Builder} */ @@ -127,6 +125,8 @@ public Executor getExecutor() { * *

The value must not be {@code null}. * + *

If this method is not used, a default value will be used. + * * @param executor the instance of {@code Executor}. * @return {@code this} instance of {@code Builder} */ diff --git a/server/src/test/java/org/spine3/server/stand/Given.java b/server/src/test/java/org/spine3/server/stand/Given.java new file mode 100644 index 00000000000..2184ac0511d --- /dev/null +++ b/server/src/test/java/org/spine3/server/stand/Given.java @@ -0,0 +1,177 @@ +/* + * Copyright 2016, TeamDev Ltd. All rights reserved. + * + * Redistribution and use in source and/or binary forms, with or without + * modification, must retain the above copyright notice and the following + * disclaimer. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.spine3.server.stand; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.spine3.base.Command; +import org.spine3.base.CommandContext; +import org.spine3.base.Event; +import org.spine3.base.EventContext; +import org.spine3.base.EventId; +import org.spine3.base.Identifiers; +import org.spine3.protobuf.AnyPacker; +import org.spine3.protobuf.TypeUrl; +import org.spine3.server.BoundedContext; +import org.spine3.server.aggregate.Aggregate; +import org.spine3.server.aggregate.AggregateRepository; +import org.spine3.server.aggregate.Apply; +import org.spine3.server.command.Assign; +import org.spine3.server.event.Subscribe; +import org.spine3.server.projection.Projection; +import org.spine3.server.projection.ProjectionRepository; +import org.spine3.server.storage.memory.InMemoryStorageFactory; +import org.spine3.test.projection.Project; +import org.spine3.test.projection.ProjectId; +import org.spine3.test.projection.command.CreateProject; +import org.spine3.test.projection.event.ProjectCreated; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +/** + * @author Dmytro Dashenkov + */ +/*package*/ class Given { + + /*package*/ static final int THREADS_COUNT_IN_POOL_EXECUTOR = 10; + /*package*/ static final int SEVERAL = THREADS_COUNT_IN_POOL_EXECUTOR; + + private static final String PROJECT_UUID = Identifiers.newUuid(); + /*package*/ static final int AWAIT_SECONDS = 6; + + private Given() { + } + + /*package*/ static class StandTestProjectionRepository extends ProjectionRepository { + /*package*/ StandTestProjectionRepository(BoundedContext boundedContext) { + super(boundedContext); + } + + @Override + protected ProjectId getEntityId(Message event, EventContext context) { + return ProjectId.newBuilder().setId(PROJECT_UUID).build(); + } + } + + /*package*/ static class StandTestAggregateRepository extends AggregateRepository { + + /** + * Creates a new repository instance. + * + * @param boundedContext the bounded context to which this repository belongs + */ + /*package*/ StandTestAggregateRepository(BoundedContext boundedContext) { + super(boundedContext); + } + } + + private static class StandTestAggregate extends Aggregate { + + /** + * Creates a new aggregate instance. + * + * @param id the ID for the new aggregate + * @throws IllegalArgumentException if the ID is not of one of the supported types + */ + public StandTestAggregate(ProjectId id) { + super(id); + } + + @Assign + public List handle(CreateProject createProject, CommandContext context) { + return null; + } + + @Apply + public void handle(ProjectCreated event) { + // Do nothing + } + } + + /*package*/ static class StandTestProjection extends Projection { + /** + * Creates a new instance. + * + * Required to be public. + * + * @param id the ID for the new instance + * @throws IllegalArgumentException if the ID is not of one of the supported types + */ + public StandTestProjection(ProjectId id) { + super(id); + } + + @Subscribe + public void handle(ProjectCreated event, EventContext context) { + // Do nothing + } + } + + /*package*/ static Event validEvent() { + return Event.newBuilder() + .setMessage(AnyPacker.pack(ProjectCreated.newBuilder() + .setProjectId(ProjectId.newBuilder().setId("12345AD0")) + .build()) + .toBuilder() + .setTypeUrl(TypeUrl.SPINE_TYPE_URL_PREFIX + "/" + ProjectCreated.getDescriptor().getFullName()) + .build()) + .setContext(EventContext.newBuilder() + .setDoNotEnrich(true) + .setCommandContext(CommandContext.getDefaultInstance()) + .setEventId(EventId.newBuilder() + .setUuid(Identifiers.newUuid()) + .build())) + .build(); + } + + /*package*/ static Command validCommand() { + return Command.newBuilder() + .setMessage(AnyPacker.pack(CreateProject.getDefaultInstance())) + .setContext(CommandContext.getDefaultInstance()) + .build(); + } + + /*package*/ static ProjectionRepository projectionRepo(BoundedContext context) { + return new StandTestProjectionRepository(context); + } + + /*package*/ static AggregateRepository aggregateRepo(BoundedContext context) { + return new StandTestAggregateRepository(context); + } + + /*package*/ static BoundedContext boundedContext(Stand stand, int concurrentThreads) { + final Executor executor = concurrentThreads > 0 ? Executors.newFixedThreadPool(concurrentThreads) : + MoreExecutors.directExecutor(); + + return boundedContextBuilder(stand) + .setStandFunnelExecutor(executor) + .build(); + } + + private static BoundedContext.Builder boundedContextBuilder(Stand stand) { + return BoundedContext.newBuilder() + .setStand(stand) + .setStorageFactory(InMemoryStorageFactory.getInstance()); + } +} diff --git a/server/src/test/java/org/spine3/server/stand/StandFunnelShould.java b/server/src/test/java/org/spine3/server/stand/StandFunnelShould.java index 9e42f37e68e..d952c30ea84 100644 --- a/server/src/test/java/org/spine3/server/stand/StandFunnelShould.java +++ b/server/src/test/java/org/spine3/server/stand/StandFunnelShould.java @@ -22,28 +22,41 @@ package org.spine3.server.stand; import com.google.protobuf.Any; +import io.netty.util.internal.ConcurrentSet; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.spine3.server.BoundedContext; +import org.spine3.server.aggregate.AggregateRepository; +import org.spine3.server.projection.ProjectionRepository; +import org.spine3.server.storage.memory.InMemoryStorageFactory; import org.spine3.testdata.TestStandFactory; +import java.security.SecureRandom; +import java.util.Random; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * @author Alex Tymchenko + * @author Dmytro Dashenkov */ public class StandFunnelShould { // **** Positive scenarios (unit) **** - /** - * - Initialize properly with various Builder options; - * - deliver mock updates to the stand (invoke proper methods with particular arguments) - test the delivery only. - */ - @Test public void initialize_properly_with_stand_only() { final Stand stand = TestStandFactory.create(); @@ -53,6 +66,45 @@ public void initialize_properly_with_stand_only() { Assert.assertNotNull(standFunnel); } + @Test + public void initialize_properly_with_all_builder_options() { + final Stand stand = TestStandFactory.create(); + final Executor executor = Executors.newSingleThreadExecutor(); + + final StandFunnel funnel = StandFunnel.newBuilder() + .setStand(stand) + .setExecutor(executor) + .build(); + Assert.assertNotNull(funnel); + } + + @Test + public void initialize_properly_with_no_executor() { + final Stand stand = TestStandFactory.create(); + + final StandFunnel funnelForBusyStand = StandFunnel.newBuilder() + .setStand(stand) + .build(); + Assert.assertNotNull(funnelForBusyStand); + } + + @Test + public void deliver_mock_updates_to_stand() { + final Object id = new Object(); + final Any state = Any.getDefaultInstance(); + + final Stand stand = mock(Stand.class); + doNothing().when(stand).update(id, state); + + final StandFunnel funnel = StandFunnel.newBuilder() + .setStand(stand) + .build(); + + funnel.post(id, state); + + verify(stand).update(id, state); + } + @Test public void use_executor_from_builder() { @@ -80,9 +132,14 @@ public void execute(Runnable command) { // **** Negative scenarios (unit) **** - /** - * - Fail to initialise with improper stand. - */ + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test(expected = NullPointerException.class) + public void fail_to_initialize_with_improper_stand() { + @SuppressWarnings("ConstantConditions") // null is marked as improper with this warning + final StandFunnel.Builder builder = StandFunnel.newBuilder().setStand(null); + + builder.build(); + } @SuppressWarnings("ResultOfMethodCallIgnored") @Test(expected = IllegalStateException.class) @@ -93,11 +150,154 @@ public void fail_to_initialize_from_empty_builder() { // **** Integration scenarios ( -> StandFunnel -> Mock Stand) **** - /** - * - Deliver updates from projection repo on update; - * - deliver updates from aggregate repo on update; - * - deliver the updates from several projection and aggregate repositories. - */ + @Test + public void deliver_updates_from_projection_repository() { + checkUpdatesDelivery(false, projectionRepositoryDispatch()); + } + + @Test + public void deliver_updates_form_aggregate_repository() { + checkUpdatesDelivery(false, aggregateRepositoryDispatch()); + } + + @Test + public void deliver_updates_from_several_repositories_in_single_thread() { + checkUpdatesDelivery(false, getSeveralRepositoryDispatchCalls()); + } + + @Test + public void deliver_updates_from_several_repositories_in_multiple_threads() { + checkUpdatesDelivery(true, getSeveralRepositoryDispatchCalls()); + } + + private static BoundedContextAction[] getSeveralRepositoryDispatchCalls() { + final BoundedContextAction[] result = new BoundedContextAction[Given.SEVERAL]; + final Random random = new SecureRandom(); + + for (int i = 0; i < result.length; i++) { + result[i] = (i % 2 == 0) ? aggregateRepositoryDispatch() : projectionRepositoryDispatch(); + } + + return result; + } + + private static void checkUpdatesDelivery(boolean isConcurrent, BoundedContextAction... dispatchActions) { + checkNotNull(dispatchActions); + + final Stand stand = mock(Stand.class); + final BoundedContext boundedContext = spy(Given.boundedContext(stand, + isConcurrent ? + Given.THREADS_COUNT_IN_POOL_EXECUTOR : 0)); + for (BoundedContextAction dispatchAction : dispatchActions) { + dispatchAction.perform(boundedContext); + } + + // Was called as many times as there are dispatch actions. + verify(boundedContext, times(dispatchActions.length)).getStandFunnel(); + + if (isConcurrent) { + await(Given.AWAIT_SECONDS); + } + + verify(stand, times(dispatchActions.length)).update(ArgumentMatchers.any(), any(Any.class)); + } + + private static void await(int seconds) { + try { + Thread.sleep(seconds); + } catch (InterruptedException ignore) { + } + } + + private static BoundedContextAction aggregateRepositoryDispatch() { + return new BoundedContextAction() { + @Override + public void perform(BoundedContext context) { + // Init repository + final AggregateRepository repository = Given.aggregateRepo(context); + + repository.initStorage(InMemoryStorageFactory.getInstance()); + + try { + // Mock aggregate and mock stand are not able to handle events returned after command handling. + // This causes IllegalStateException to be thrown. + // Note that this is not the end of a test case, so we can't just "expect=IllegalStateException" + repository.dispatch(Given.validCommand()); + } catch (IllegalStateException e) { + // Handle null event dispatching after the command is handled. + + // Check if this error is caused by returning nuu or empty list after command processing. + // Proceed crash if it's not + if (!e.getMessage() + .contains("No record found for command ID: EMPTY")) { + throw e; + } + } + } + }; + } + + private static BoundedContextAction projectionRepositoryDispatch() { + return new BoundedContextAction() { + @Override + public void perform(BoundedContext context) { + // Init repository + final ProjectionRepository repository = Given.projectionRepo(context); + + repository.initStorage(InMemoryStorageFactory.getInstance()); + repository.setOnline(); + + // Dispatch an update from projection repo + repository.dispatch(Given.validEvent()); + } + }; + } + + + @SuppressWarnings("MethodWithMultipleLoops") + @Test + public void deliver_updates_through_several_threads() throws InterruptedException { + final int threadsCount = Given.THREADS_COUNT_IN_POOL_EXECUTOR; + @SuppressWarnings("LocalVariableNamingConvention") // Too long variable name + final int threadExecutionMaxAwaitSeconds = Given.AWAIT_SECONDS; + + final Set threadInvocationRegistry = new ConcurrentSet<>(); + + final Stand stand = mock(Stand.class); + doNothing().when(stand).update(ArgumentMatchers.any(), any(Any.class)); + + final StandFunnel standFunnel = StandFunnel.newBuilder() + .setStand(stand) + .build(); + + final ExecutorService executor = Executors.newFixedThreadPool(threadsCount); + + + final Runnable task = new Runnable() { + @Override + public void run() { + final String threadName = Thread.currentThread().getName(); + Assert.assertFalse(threadInvocationRegistry.contains(threadName)); + + standFunnel.post(new Object(), Any.getDefaultInstance()); + + threadInvocationRegistry.add(threadName); + } + }; + + for (int i = 0; i < threadsCount; i++) { + executor.execute(task); + } + + executor.awaitTermination(threadExecutionMaxAwaitSeconds, TimeUnit.SECONDS); + + Assert.assertEquals(threadInvocationRegistry.size(), threadsCount); + + } + + private interface BoundedContextAction { + void perform(BoundedContext context); + } } diff --git a/server/src/test/java/org/spine3/server/stand/StandShould.java b/server/src/test/java/org/spine3/server/stand/StandShould.java index 7880fe510a9..2dfcc386eb5 100644 --- a/server/src/test/java/org/spine3/server/stand/StandShould.java +++ b/server/src/test/java/org/spine3/server/stand/StandShould.java @@ -21,15 +21,19 @@ */ package org.spine3.server.stand; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.protobuf.Any; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import io.grpc.stub.StreamObserver; import org.junit.Test; import org.mockito.ArgumentMatcher; -import org.mockito.InOrder; +import org.mockito.ArgumentMatchers; import org.spine3.base.Responses; import org.spine3.client.EntityFilters; import org.spine3.client.EntityId; @@ -40,21 +44,30 @@ import org.spine3.protobuf.AnyPacker; import org.spine3.protobuf.TypeUrl; import org.spine3.server.BoundedContext; -import org.spine3.server.Given; -import org.spine3.server.projection.Projection; +import org.spine3.server.Given.CustomerAggregate; +import org.spine3.server.Given.CustomerAggregateRepository; import org.spine3.server.projection.ProjectionRepository; +import org.spine3.server.stand.Given.StandTestProjectionRepository; import org.spine3.server.storage.EntityStorageRecord; +import org.spine3.server.stand.Given.StandTestProjectionRepository; import org.spine3.server.storage.StandStorage; import org.spine3.test.clientservice.customer.Customer; import org.spine3.test.clientservice.customer.CustomerId; import org.spine3.test.projection.Project; import org.spine3.test.projection.ProjectId; +import javax.annotation.Nullable; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executor; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Maps.newHashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -62,18 +75,21 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.calls; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.spine3.testdata.TestBoundedContextFactory.newBoundedContext; /** * @author Alex Tymchenko */ -@SuppressWarnings("OverlyCoupledClass") //It's OK for a test. +//It's OK for a test. +@SuppressWarnings({"OverlyCoupledClass", "InstanceMethodNamingConvention", "ClassWithTooManyMethods"}) public class StandShould { + private static final int TOTAL_CUSTOMERS_FOR_BATCH_READING = 10; + private static final int TOTAL_PROJECTS_FOR_BATCH_READING = 10; // **** Positive scenarios **** @@ -126,7 +142,7 @@ public void register_aggregate_repositories() { checkTypesEmpty(stand); - final Given.CustomerAggregateRepository customerAggregateRepo = new Given.CustomerAggregateRepository(boundedContext); + final CustomerAggregateRepository customerAggregateRepo = new CustomerAggregateRepository(boundedContext); stand.registerTypeSupplier(customerAggregateRepo); final Descriptors.Descriptor customerEntityDescriptor = Customer.getDescriptor(); @@ -134,7 +150,7 @@ public void register_aggregate_repositories() { checkHasExactlyOne(stand.getKnownAggregateTypes(), customerEntityDescriptor); @SuppressWarnings("LocalVariableNamingConvention") - final Given.CustomerAggregateRepository anotherCustomerAggregateRepo = new Given.CustomerAggregateRepository(boundedContext); + final CustomerAggregateRepository anotherCustomerAggregateRepo = new CustomerAggregateRepository(boundedContext); stand.registerTypeSupplier(anotherCustomerAggregateRepo); checkHasExactlyOne(stand.getAvailableTypes(), customerEntityDescriptor); checkHasExactlyOne(stand.getKnownAggregateTypes(), customerEntityDescriptor); @@ -143,7 +159,6 @@ public void register_aggregate_repositories() { @Test public void use_provided_executor_upon_update_of_watched_type() { final Executor executor = mock(Executor.class); - final InOrder executorInOrder = inOrder(executor); final Stand stand = Stand.newBuilder() .setCallbackExecutor(executor) .build(); @@ -154,28 +169,25 @@ public void use_provided_executor_upon_update_of_watched_type() { final TypeUrl projectProjectionType = TypeUrl.of(Project.class); stand.watch(projectProjectionType, emptyUpdateCallback()); - executorInOrder.verify(executor, never()) - .execute(any(Runnable.class)); + verify(executor, never()).execute(any(Runnable.class)); final Any someUpdate = AnyPacker.pack(Project.getDefaultInstance()); final Object someId = new Object(); stand.update(someId, someUpdate); - executorInOrder.verify(executor, calls(1)) - .execute(any(Runnable.class)); + verify(executor, times(1)).execute(any(Runnable.class)); } @Test public void operate_with_storage_provided_through_builder() { final StandStorage standStorageMock = mock(StandStorage.class); - final InOrder standStorageInOrder = inOrder(standStorageMock); final Stand stand = Stand.newBuilder() .setStorage(standStorageMock) .build(); assertNotNull(stand); final BoundedContext boundedContext = newBoundedContext(stand); - final Given.CustomerAggregateRepository customerAggregateRepo = new Given.CustomerAggregateRepository(boundedContext); + final CustomerAggregateRepository customerAggregateRepo = new CustomerAggregateRepository(boundedContext); stand.registerTypeSupplier(customerAggregateRepo); @@ -183,13 +195,12 @@ public void operate_with_storage_provided_through_builder() { final CustomerId customerId = CustomerId.newBuilder() .setNumber(numericIdValue) .build(); - final Given.CustomerAggregate customerAggregate = customerAggregateRepo.create(customerId); + final CustomerAggregate customerAggregate = customerAggregateRepo.create(customerId); final Customer customerState = customerAggregate.getState(); final Any packedState = AnyPacker.pack(customerState); final TypeUrl customerType = TypeUrl.of(Customer.class); - standStorageInOrder.verify(standStorageMock, never()) - .write(any(AggregateStateId.class), any(EntityStorageRecord.class)); + verify(standStorageMock, never()).write(any(AggregateStateId.class), any(EntityStorageRecord.class)); stand.update(customerId, packedState); @@ -197,20 +208,30 @@ public void operate_with_storage_provided_through_builder() { final EntityStorageRecord expectedRecord = EntityStorageRecord.newBuilder() .setState(packedState) .build(); - standStorageInOrder.verify(standStorageMock, calls(1)) - .write(eq(expectedAggregateStateId), recordStateMatcher(expectedRecord)); + verify(standStorageMock, times(1)).write(eq(expectedAggregateStateId), recordStateMatcher(expectedRecord)); } @Test public void return_empty_list_for_aggregate_read_all_on_empty_stand_storage() { - final StandStorage standStorageMock = mock(StandStorage.class); - // Return an empty collection on {@link StandStorage#readAllByType(TypeUrl)} call. - final ImmutableList emptyResultList = ImmutableList.builder().build(); - when(standStorageMock.readAllByType(any(TypeUrl.class))).thenReturn(emptyResultList); + final TypeUrl customerType = TypeUrl.of(Customer.class); + final Target customerTarget = Target.newBuilder() + .setIncludeAll(true) + .setType(customerType.getTypeName()) + .build(); - final Stand stand = prepareStandWithAggregateRepo(standStorageMock); + checkEmptyResultForTargetOnEmptyStorage(customerTarget); + } + + @Test + public void return_empty_list_to_unknown_type_reading() { + final Stand stand = Stand.newBuilder() + .build(); + + checkTypesEmpty(stand); + + // Customer type was NOT registered. final TypeUrl customerType = TypeUrl.of(Customer.class); final Target customerTarget = Target.newBuilder() .setIncludeAll(true) @@ -224,56 +245,403 @@ public void return_empty_list_for_aggregate_read_all_on_empty_stand_storage() { stand.execute(readAllCustomers, responseObserver); final List messageList = checkAndGetMessageList(responseObserver); - assertTrue("Query returned a non-empty response message list though the target was empty", messageList - .isEmpty()); + + assertTrue("Query returned a non-empty response message list for an unknown type", messageList.isEmpty()); + } + + @Test + public void return_empty_list_for_aggregate_read_by_ids_on_empty_stand_storage() { + + final TypeUrl customerType = TypeUrl.of(Customer.class); + + final EntityId firstId = wrapCustomerId(1); + final EntityId anotherId = wrapCustomerId(2); + final EntityIdFilter idFilter = EntityIdFilter.newBuilder() + .addIds(firstId) + .addIds(anotherId) + .build(); + final EntityFilters filters = EntityFilters.newBuilder() + .setIdFilter(idFilter) + .build(); + final Target customerTarget = Target.newBuilder() + .setIncludeAll(false) + .setType(customerType.getTypeName()) + .setFilters(filters) + .build(); + + checkEmptyResultForTargetOnEmptyStorage(customerTarget); + } + + @Test + public void return_empty_list_for_aggregate_reads_with_filters_not_set() { + + final TypeUrl customerType = TypeUrl.of(Customer.class); + final Target customerTarget = Target.newBuilder() + .setIncludeAll(false) + .setType(customerType.getTypeName()) + .build(); + checkEmptyResultOnNonEmptyStorageForQueryTarget(customerTarget); + } + + @Test + public void return_empty_list_for_aggregate_reads_with_id_filter_not_set() { + + final TypeUrl customerType = TypeUrl.of(Customer.class); + final EntityFilters filters = EntityFilters.newBuilder() + .build(); + final Target customerTarget = Target.newBuilder() + .setIncludeAll(false) + .setType(customerType.getTypeName()) + .setFilters(filters) + .build(); + checkEmptyResultOnNonEmptyStorageForQueryTarget(customerTarget); + } + + @Test + public void return_empty_list_for_aggregate_reads_with_empty_list_of_ids() { + + final TypeUrl customerType = TypeUrl.of(Customer.class); + final EntityFilters filters = EntityFilters.newBuilder() + .setIdFilter(EntityIdFilter.getDefaultInstance()) + .build(); + final Target customerTarget = Target.newBuilder() + .setIncludeAll(false) + .setType(customerType.getTypeName()) + .setFilters(filters) + .build(); + checkEmptyResultOnNonEmptyStorageForQueryTarget(customerTarget); } @Test public void return_single_result_for_aggregate_state_read_by_id() { + doCheckReadingCustomersById(1); + } - // Define the types and values used as a test data. + @Test + public void return_multiple_results_for_aggregate_state_batch_read_by_ids() { + doCheckReadingCustomersById(TOTAL_CUSTOMERS_FOR_BATCH_READING); + } + + + @Test + public void return_single_result_for_projection_read_by_id() { + doCheckReadingProjectsById(1); + } + + @Test + public void return_multiple_results_for_projection_batch_read_by_ids() { + doCheckReadingProjectsById(TOTAL_PROJECTS_FOR_BATCH_READING); + } + + @Test + public void trigger_callback_upon_change_of_watched_aggregate_state() { + final Stand stand = prepareStandWithAggregateRepo(mock(StandStorage.class)); final TypeUrl customerType = TypeUrl.of(Customer.class); - final Customer customer = Customer.getDefaultInstance(); - final Any customerState = AnyPacker.pack(customer); - final CustomerId customerId = CustomerId.newBuilder() - .setNumber(42) + + final MemoizeStandUpdateCallback memoizeCallback = new MemoizeStandUpdateCallback(); + stand.watch(customerType, memoizeCallback); + assertNull(memoizeCallback.newEntityState); + + final Map.Entry sampleData = fillSampleCustomers(1).entrySet() + .iterator() + .next(); + final CustomerId customerId = sampleData.getKey(); + final Customer customer = sampleData.getValue(); + final Any packedState = AnyPacker.pack(customer); + stand.update(customerId, packedState); + + assertEquals(packedState, memoizeCallback.newEntityState); + } + + private static void checkEmptyResultForTargetOnEmptyStorage(Target customerTarget) { + final StandStorage standStorageMock = mock(StandStorage.class); + // Return an empty collection on {@link StandStorage#readAllByType(TypeUrl)} call. + final ImmutableList emptyResultList = ImmutableList.builder().build(); + when(standStorageMock.readAllByType(any(TypeUrl.class))).thenReturn(emptyResultList); + + final Stand stand = prepareStandWithAggregateRepo(standStorageMock); + + final Query readAllCustomers = Query.newBuilder() + .setTarget(customerTarget) + .build(); + + final MemoizeQueryResponseObserver responseObserver = new MemoizeQueryResponseObserver(); + stand.execute(readAllCustomers, responseObserver); + + final List messageList = checkAndGetMessageList(responseObserver); + assertTrue("Query returned a non-empty response message list though the target was empty", messageList.isEmpty()); + } + + private static void doCheckReadingProjectsById(int numberOfProjects) { + // Define the types and values used as a test data. + final Map sampleProjects = newHashMap(); + final TypeUrl projectType = TypeUrl.of(Project.class); + fillSampleProjects(sampleProjects, numberOfProjects); + + final StandTestProjectionRepository projectionRepository = mock(StandTestProjectionRepository.class); + when(projectionRepository.getEntityStateType()).thenReturn(projectType); + setupExpectedFindAllBehaviour(sampleProjects, projectionRepository); + + final Stand stand = prepareStandWithProjectionRepo(projectionRepository); + + // Now we are ready to query. + final EntityIdFilter idFilter = idFilterForProjection(sampleProjects.keySet()); + + final Target projectTarget = Target.newBuilder() + .setFilters(EntityFilters.newBuilder() + .setIdFilter(idFilter)) + .setType(projectType.getTypeName()) + .build(); + final Query readMultipleProjects = Query.newBuilder() + .setTarget(projectTarget) .build(); - final AggregateStateId stateId = AggregateStateId.of(customerId, customerType); + final MemoizeQueryResponseObserver responseObserver = new MemoizeQueryResponseObserver(); + stand.execute(readMultipleProjects, responseObserver); + + final List messageList = checkAndGetMessageList(responseObserver); + assertEquals(sampleProjects.size(), messageList.size()); + final Collection allCustomers = sampleProjects.values(); + for (Any singleRecord : messageList) { + final Project unpackedSingleResult = AnyPacker.unpack(singleRecord); + assertTrue(allCustomers.contains(unpackedSingleResult)); + } + } + + private static void doCheckReadingCustomersById(int numberOfCustomers) { + // Define the types and values used as a test data. + final TypeUrl customerType = TypeUrl.of(Customer.class); + final Map sampleCustomers = fillSampleCustomers(numberOfCustomers); // Prepare the stand and its mock storage to act. final StandStorage standStorageMock = mock(StandStorage.class); - final EntityStorageRecord entityStorageRecord = EntityStorageRecord.newBuilder() - .setState(customerState) - .build(); - when(standStorageMock.read(eq(stateId))).thenReturn(entityStorageRecord); - final Stand stand = prepareStandWithAggregateRepo(standStorageMock); + setupExpectedBulkReadBehaviour(sampleCustomers, customerType, standStorageMock); - // Trigger the update. - stand.update(customerId, customerState); + final Stand stand = prepareStandWithAggregateRepo(standStorageMock); + triggerMultipleUpdates(sampleCustomers, stand); // Now we are ready to query. - final EntityIdFilter idFilter = EntityIdFilter.newBuilder() - .addIds(EntityId.newBuilder() - .setId(AnyPacker.pack(customerId))) - .build(); + final EntityIdFilter idFilter = idFilterForAggregate(sampleCustomers.keySet()); + final Target customerTarget = Target.newBuilder() .setFilters(EntityFilters.newBuilder() .setIdFilter(idFilter)) .setType(customerType.getTypeName()) .build(); - final Query readSingleCustomer = Query.newBuilder() + final Query readMultipleCustomers = Query.newBuilder() + .setTarget(customerTarget) + .build(); + + final MemoizeQueryResponseObserver responseObserver = new MemoizeQueryResponseObserver(); + stand.execute(readMultipleCustomers, responseObserver); + + final List messageList = checkAndGetMessageList(responseObserver); + assertEquals(sampleCustomers.size(), messageList.size()); + final Collection allCustomers = sampleCustomers.values(); + for (Any singleRecord : messageList) { + final Customer unpackedSingleResult = AnyPacker.unpack(singleRecord); + assertTrue(allCustomers.contains(unpackedSingleResult)); + } + } + + private static void checkEmptyResultOnNonEmptyStorageForQueryTarget(Target customerTarget) { + final StandStorage standStorageMock = mock(StandStorage.class); + + // Return non-empty results on any storage read call. + final EntityStorageRecord someRecord = EntityStorageRecord.getDefaultInstance(); + final ImmutableList nonEmptyList = ImmutableList.builder().add(someRecord) + .build(); + when(standStorageMock.readAllByType(any(TypeUrl.class))).thenReturn(nonEmptyList); + when(standStorageMock.read(any(AggregateStateId.class))).thenReturn(someRecord); + when(standStorageMock.readAll()).thenReturn(Maps.newHashMap()); + when(standStorageMock.readBulk(ArgumentMatchers.anyIterable())).thenReturn(nonEmptyList); + + final Stand stand = prepareStandWithAggregateRepo(standStorageMock); + + final Query queryWithNoFilters = Query.newBuilder() .setTarget(customerTarget) .build(); final MemoizeQueryResponseObserver responseObserver = new MemoizeQueryResponseObserver(); - stand.execute(readSingleCustomer, responseObserver); + stand.execute(queryWithNoFilters, responseObserver); final List messageList = checkAndGetMessageList(responseObserver); - assertEquals(1, messageList.size()); - final Any singleRecord = messageList.get(0); - final Message unpackedSingleResult = AnyPacker.unpack(singleRecord); - assertEquals(customer, unpackedSingleResult); + assertTrue("Query returned a non-empty response message list though the filter was not set", messageList.isEmpty()); + } + + + @SuppressWarnings("ConstantConditions") + private static void setupExpectedBulkReadBehaviour(Map sampleCustomers, TypeUrl customerType, + StandStorage standStorageMock) { + final ImmutableList.Builder stateIdsBuilder = ImmutableList.builder(); + final ImmutableList.Builder recordsBuilder = ImmutableList.builder(); + for (CustomerId customerId : sampleCustomers.keySet()) { + final AggregateStateId stateId = AggregateStateId.of(customerId, customerType); + final Customer customer = Customer.getDefaultInstance(); + final Any customerState = AnyPacker.pack(customer); + final EntityStorageRecord entityStorageRecord = EntityStorageRecord.newBuilder() + .setState(customerState) + .build(); + stateIdsBuilder.add(stateId); + recordsBuilder.add(entityStorageRecord); + + when(standStorageMock.read(eq(stateId))).thenReturn(entityStorageRecord); + } + + final ImmutableList stateIds = stateIdsBuilder.build(); + final ImmutableList records = recordsBuilder.build(); + + final Iterable matchingIds = argThat(aggregateIdsIterableMatcher(stateIds)); + when(standStorageMock.readBulk(matchingIds)).thenReturn(records); + } + + + @SuppressWarnings("ConstantConditions") + private static void setupExpectedFindAllBehaviour(Map sampleProjects, + StandTestProjectionRepository projectionRepository) { + + final Set projectIds = sampleProjects.keySet(); + final ImmutableCollection allResults = toProjectionCollection(projectIds); + + for (ProjectId projectId : projectIds) { + when(projectionRepository.find(eq(projectId))).thenReturn(new Given.StandTestProjection(projectId)); + } + + final Iterable matchingIds = argThat(projectionIdsIterableMatcher(projectIds)); + when(projectionRepository.findBulk(matchingIds)).thenReturn(allResults); + + when(projectionRepository.findAll()).thenReturn(allResults); + + final EntityFilters matchingFilter = argThat(entityFilterMatcher(projectIds)); + when(projectionRepository.findAll(matchingFilter)).thenReturn(allResults); + } + + @SuppressWarnings("OverlyComplexAnonymousInnerClass") + private static ArgumentMatcher entityFilterMatcher(final Set projectIds) { + // This argument matcher does NOT mimic the exact repository behavior. + // Instead, it only matches the EntityFilters instance in case it has EntityIdFilter with ALL the expected IDs. + return new ArgumentMatcher() { + @Override + public boolean matches(EntityFilters argument) { + boolean everyElementPresent = true; + for (EntityId entityId : argument.getIdFilter() + .getIdsList()) { + final Any idAsAny = entityId.getId(); + final Message rawId = AnyPacker.unpack(idAsAny); + if (rawId instanceof ProjectId) { + final ProjectId convertedProjectId = (ProjectId) rawId; + everyElementPresent = everyElementPresent && projectIds.contains(convertedProjectId); + } else { + everyElementPresent = false; + } + } + return everyElementPresent; + } + }; + } + + private static ImmutableCollection toProjectionCollection(Collection values) { + final Collection transformed = Collections2.transform(values, new Function() { + @Nullable + @Override + public Given.StandTestProjection apply(@Nullable ProjectId input) { + checkNotNull(input); + return new Given.StandTestProjection(input); + } + }); + final ImmutableList result = ImmutableList.copyOf(transformed); + return result; + } + + private static EntityId wrapCustomerId(int number) { + final CustomerId customerId = CustomerId.newBuilder() + .setNumber(number) + .build(); + final Any packedId = AnyPacker.pack(customerId); + return EntityId.newBuilder() + .setId(packedId) + .build(); + } + + private static ArgumentMatcher> projectionIdsIterableMatcher(final Set projectIds) { + return new ArgumentMatcher>() { + @Override + public boolean matches(Iterable argument) { + boolean everyElementPresent = true; + for (ProjectId projectId : argument) { + everyElementPresent = everyElementPresent && projectIds.contains(projectId); + } + return everyElementPresent; + } + }; + } + + private static ArgumentMatcher> aggregateIdsIterableMatcher(final List stateIds) { + return new ArgumentMatcher>() { + @Override + public boolean matches(Iterable argument) { + boolean everyElementPresent = true; + for (AggregateStateId aggregateStateId : argument) { + everyElementPresent = everyElementPresent && stateIds.contains(aggregateStateId); + } + return everyElementPresent; + } + }; + } + + private static EntityIdFilter idFilterForAggregate(Collection customerIds) { + final EntityIdFilter.Builder idFilterBuilder = EntityIdFilter.newBuilder(); + for (CustomerId id : customerIds) { + idFilterBuilder + .addIds(EntityId.newBuilder() + .setId(AnyPacker.pack(id))); + } + return idFilterBuilder.build(); + } + + private static EntityIdFilter idFilterForProjection(Collection projectIds) { + final EntityIdFilter.Builder idFilterBuilder = EntityIdFilter.newBuilder(); + for (ProjectId id : projectIds) { + idFilterBuilder + .addIds(EntityId.newBuilder() + .setId(AnyPacker.pack(id))); + } + return idFilterBuilder.build(); + } + + private static void triggerMultipleUpdates(Map sampleCustomers, Stand stand) { + // Trigger the aggregate state updates. + for (CustomerId id : sampleCustomers.keySet()) { + final Customer sampleCustomer = sampleCustomers.get(id); + final Any customerState = AnyPacker.pack(sampleCustomer); + stand.update(id, customerState); + } + } + + private static Map fillSampleCustomers(int numberOfCustomers) { + final Map sampleCustomers = newHashMap(); + for (int customerIndex = 0; customerIndex < numberOfCustomers; customerIndex++) { + final Customer customer = Customer.getDefaultInstance(); + + @SuppressWarnings("UnsecureRandomNumberGeneration") + final Random randomizer = new Random(); + final CustomerId customerId = CustomerId.newBuilder() + .setNumber(randomizer.nextInt()) + .build(); + sampleCustomers.put(customerId, customer); + } + return sampleCustomers; + } + + private static void fillSampleProjects(Map sampleProjects, int numberOfProjects) { + for (int projectIndex = 0; projectIndex < numberOfProjects; projectIndex++) { + final Project project = Project.getDefaultInstance(); + final ProjectId projectId = ProjectId.newBuilder() + .setId(UUID.randomUUID() + .toString()) + .build(); + sampleProjects.put(projectId, project); + } } private static List checkAndGetMessageList(MemoizeQueryResponseObserver responseObserver) { @@ -297,11 +665,19 @@ private static Stand prepareStandWithAggregateRepo(StandStorage standStorageMock assertNotNull(stand); final BoundedContext boundedContext = newBoundedContext(stand); - final Given.CustomerAggregateRepository customerAggregateRepo = new Given.CustomerAggregateRepository(boundedContext); + final org.spine3.server.Given.CustomerAggregateRepository customerAggregateRepo = new org.spine3.server.Given.CustomerAggregateRepository(boundedContext); stand.registerTypeSupplier(customerAggregateRepo); return stand; } + private static Stand prepareStandWithProjectionRepo(ProjectionRepository projectionRepository) { + final Stand stand = Stand.newBuilder() + .build(); + assertNotNull(stand); + stand.registerTypeSupplier(projectionRepository); + return stand; + } + private static EntityStorageRecord recordStateMatcher(final EntityStorageRecord expectedRecord) { return argThat(new ArgumentMatcher() { @Override @@ -347,26 +723,6 @@ public void onEntityStateUpdate(Any newEntityState) { // ***** Inner classes used for tests. ***** - private static class StandTestProjection extends Projection { - /** - * Creates a new instance. - * - * @param id the ID for the new instance - * @throws IllegalArgumentException if the ID is not of one of the supported types - */ - public StandTestProjection(ProjectId id) { - super(id); - } - } - - - private static class StandTestProjectionRepository extends ProjectionRepository { - protected StandTestProjectionRepository(BoundedContext boundedContext) { - super(boundedContext); - } - } - - /** * A {@link StreamObserver} storing the state of {@link Query} execution. */ @@ -392,4 +748,18 @@ public void onCompleted() { } } + + + /** + * A {@link StreamObserver} storing the state of {@link Query} execution. + */ + private static class MemoizeStandUpdateCallback implements Stand.StandUpdateCallback { + + private Any newEntityState; + + @Override + public void onEntityStateUpdate(Any newEntityState) { + this.newEntityState = newEntityState; + } + } } diff --git a/server/src/test/java/org/spine3/testdata/TestStandFactory.java b/server/src/test/java/org/spine3/testdata/TestStandFactory.java index d54df4fae6d..63477a00a7c 100644 --- a/server/src/test/java/org/spine3/testdata/TestStandFactory.java +++ b/server/src/test/java/org/spine3/testdata/TestStandFactory.java @@ -21,14 +21,15 @@ */ package org.spine3.testdata; +import org.mockito.Mockito; import org.spine3.server.stand.Stand; -import org.spine3.server.storage.memory.InMemoryStandStorage; /** * Creates stands for tests. * * @author Alex Tymchenko */ +@SuppressWarnings("UtilityClass") public class TestStandFactory { private TestStandFactory() {} @@ -38,4 +39,8 @@ public static Stand create() { .build(); return stand; } + + public static Stand createMock() { + return Mockito.mock(Stand.class); + } }