diff --git a/.idea/codeStyleSettings.xml b/.idea/codeStyleSettings.xml
index 13da8a541b7..139e2ef95f0 100644
--- a/.idea/codeStyleSettings.xml
+++ b/.idea/codeStyleSettings.xml
@@ -11,8 +11,10 @@
+
+
+
-
diff --git a/.travis.yml b/.travis.yml
index c1fbb632799..5d97f7740a3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,3 +6,5 @@ before_install:
after_success:
# See: https://github.com/codecov/example-java/blob/master/.travis.yml
- bash <(curl -s https://codecov.io/bash)
+
+script: ./gradlew check --stacktrace
diff --git a/build.gradle b/build.gradle
index a75eb0c2973..1059b75f4fa 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,7 +54,7 @@ allprojects {
apply plugin: 'idea'
group = 'org.spine3'
- version = '0.5.19-SNAPSHOT'
+ version = '0.6.0-SNAPSHOT'
}
ext {
diff --git a/client/src/main/java/org/spine3/base/Commands.java b/client/src/main/java/org/spine3/base/Commands.java
index 0af4c384d34..d29c1cfc3e0 100644
--- a/client/src/main/java/org/spine3/base/Commands.java
+++ b/client/src/main/java/org/spine3/base/Commands.java
@@ -47,7 +47,7 @@
import static org.spine3.base.CommandContext.newBuilder;
import static org.spine3.base.Identifiers.idToString;
import static org.spine3.protobuf.Timestamps.getCurrentTime;
-import static org.spine3.validate.Validate.checkIsPositive;
+import static org.spine3.validate.Validate.checkPositive;
import static org.spine3.validate.Validate.checkNotEmptyOrBlank;
import static org.spine3.validate.Validate.isNotDefault;
@@ -287,7 +287,7 @@ public static Command setSchedulingTime(Command command, Timestamp schedulingTim
*/
@Internal
public static Command setSchedule(Command command, Duration delay, Timestamp schedulingTime) {
- checkIsPositive(schedulingTime, "command scheduling time");
+ checkPositive(schedulingTime, "command scheduling time");
final CommandContext context = command.getContext();
final CommandContext.Schedule scheduleUpdated = context.getSchedule()
.toBuilder()
diff --git a/client/src/main/java/org/spine3/base/Queries.java b/client/src/main/java/org/spine3/base/Queries.java
new file mode 100644
index 00000000000..14cf708d807
--- /dev/null
+++ b/client/src/main/java/org/spine3/base/Queries.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.spine3.base;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.FieldMask;
+import com.google.protobuf.Message;
+import org.spine3.client.EntityFilters;
+import org.spine3.client.EntityId;
+import org.spine3.client.EntityIdFilter;
+import org.spine3.client.Query;
+import org.spine3.client.Target;
+import org.spine3.protobuf.AnyPacker;
+import org.spine3.protobuf.KnownTypes;
+import org.spine3.protobuf.TypeUrl;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Set;
+
+import static org.spine3.base.Queries.Targets.allOf;
+import static org.spine3.base.Queries.Targets.someOf;
+
+/**
+ * Client-side utilities for working with queries.
+ *
+ * @author Alex Tymchenko
+ * @author Dmytro Dashenkov
+ */
+public class Queries {
+
+ private Queries() {
+ }
+
+ /**
+ * Create a {@link Query} to read certain entity states by IDs with the {@link FieldMask}
+ * applied to each of the results.
+ *
+ *
Allows to specify a set of identifiers to be used during the {@code Query} processing. The processing
+ * results will contain only the entities, which IDs are present among the {@code ids}.
+ *
+ *
Allows to set property paths for a {@link FieldMask}, applied to each of the query results.
+ * This processing is performed according to the
+ * FieldMask specs.
+ *
+ *
In case the {@code paths} array contains entries inapplicable to the resulting entity
+ * (e.g. a {@code path} references a missing field), such invalid paths are silently ignored.
+ *
+ * @param entityClass the class of a target entity
+ * @param paths the property paths for the {@code FieldMask} applied to each of results
+ * @return an instance of {@code Query} formed according to the passed parameters
+ */
+ public static Query readAll(Class extends Message> entityClass, String... paths) {
+ final FieldMask fieldMask = FieldMask.newBuilder()
+ .addAllPaths(Arrays.asList(paths))
+ .build();
+ final Query result = composeQuery(entityClass, null, fieldMask);
+ return result;
+ }
+
+ /**
+ * Create a {@link Query} to read certain entity states by IDs.
+ *
+ *
Allows to specify a set of identifiers to be used during the {@code Query} processing. The processing
+ * results will contain only the entities, which IDs are present among the {@code ids}.
+ *
+ *
Unlike {@link Queries#readByIds(Class, Set, String...)}, the {@code Query} processing will not change
+ * the resulting entities.
+ *
+ * @param entityClass the class of a target entity
+ * @param ids the entity IDs of interest
+ * @return an instance of {@code Query} formed according to the passed parameters
+ */
+ public static Query readByIds(Class extends Message> entityClass, Set extends Message> ids) {
+ return composeQuery(entityClass, ids, null);
+ }
+
+ /**
+ * Create a {@link Query} to read all states of a certain entity.
+ *
+ *
Unlike {@link Queries#readAll(Class, String...)}, the {@code Query} processing will not change
+ * the resulting entities.
+ *
+ * @param entityClass the class of a target entity
+ * @return an instance of {@code Query} formed according to the passed parameters
+ */
+ public static Query readAll(Class extends Message> entityClass) {
+ return composeQuery(entityClass, null, null);
+ }
+
+ private static Query composeQuery(Class extends Message> entityClass, @Nullable Set extends Message> ids, @Nullable FieldMask fieldMask) {
+ final Target target = ids == null ? allOf(entityClass) : someOf(entityClass, ids);
+ final Query.Builder queryBuilder = Query.newBuilder()
+ .setTarget(target);
+ if (fieldMask != null) {
+ queryBuilder.setFieldMask(fieldMask);
+ }
+ final Query result = queryBuilder
+ .build();
+ return result;
+ }
+
+ /**
+ * Extract the type of {@link Target} for the given {@link Query}.
+ *
+ *
Returns null if the {@code Target} type is unknown to the application.
+ *
+ * @param query the query of interest.
+ * @return the type of the {@code Query#getTarget()} or null, if the type is unknown.
+ */
+ @Nullable
+ public static TypeUrl typeOf(Query query) {
+ final Target target = query.getTarget();
+ final String typeAsString = target.getType();
+ final TypeUrl type = KnownTypes.getTypeUrl(typeAsString);
+ return type;
+ }
+
+ /**
+ * Client-side utilities for working with {@link Query} and {@link org.spine3.client.Subscription} targets.
+ *
+ * @author Alex Tymchenko
+ * @author Dmytro Dashenkov
+ */
+ public static class Targets {
+
+ private Targets() {
+ }
+
+ /**
+ * Create a {@link Target} for a subset of the entity states by specifying their IDs.
+ *
+ * @param entityClass the class of a target entity
+ * @param ids the IDs of interest
+ * @return the instance of {@code Target} assembled according to the parameters.
+ */
+ public static Target someOf(Class extends Message> entityClass, Set extends Message> ids) {
+ final Target result = composeTarget(entityClass, ids);
+ return result;
+ }
+
+ /**
+ * Create a {@link Target} for all of the specified entity states.
+ *
+ * @param entityClass the class of a target entity
+ * @return the instance of {@code Target} assembled according to the parameters.
+ */
+ public static Target allOf(Class extends Message> entityClass) {
+ final Target result = composeTarget(entityClass, null);
+ return result;
+ }
+
+ /* package */
+ static Target composeTarget(Class extends Message> entityClass, @Nullable Set extends Message> ids) {
+ final TypeUrl type = TypeUrl.of(entityClass);
+
+ final boolean includeAll = ids == null;
+
+ final EntityIdFilter.Builder idFilterBuilder = EntityIdFilter.newBuilder();
+
+ if (!includeAll) {
+ for (Message rawId : ids) {
+ final Any packedId = AnyPacker.pack(rawId);
+ final EntityId entityId = EntityId.newBuilder()
+ .setId(packedId)
+ .build();
+ idFilterBuilder.addIds(entityId);
+ }
+ }
+ final EntityIdFilter idFilter = idFilterBuilder.build();
+ final EntityFilters filters = EntityFilters.newBuilder()
+ .setIdFilter(idFilter)
+ .build();
+ final Target.Builder builder = Target.newBuilder()
+ .setType(type.getTypeName());
+ if (includeAll) {
+ builder.setIncludeAll(true);
+ } else {
+ builder.setFilters(filters);
+ }
+
+ return builder.build();
+ }
+ }
+}
diff --git a/client/src/main/java/org/spine3/protobuf/KnownTypes.java b/client/src/main/java/org/spine3/protobuf/KnownTypes.java
index d4755da2bfe..e5e70721e7e 100644
--- a/client/src/main/java/org/spine3/protobuf/KnownTypes.java
+++ b/client/src/main/java/org/spine3/protobuf/KnownTypes.java
@@ -63,6 +63,7 @@
import org.spine3.protobuf.error.UnknownTypeException;
import org.spine3.type.ClassName;
+import javax.annotation.Nullable;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -105,8 +106,8 @@ public class KnownTypes {
*/
private static final ImmutableMap typeNameToUrlMap = buildTypeToUrlMap(knownTypes);
-
- private KnownTypes() {}
+ private KnownTypes() {
+ }
/** Retrieves Protobuf type URLs known to the application. */
public static ImmutableSet getTypeUrls() {
@@ -145,7 +146,8 @@ public static ClassName getClassName(TypeUrl typeUrl) throws UnknownTypeExceptio
* @throws IllegalStateException if there is no Protobuf type for the specified class
*/
public static TypeUrl getTypeUrl(ClassName className) {
- final TypeUrl result = knownTypes.inverse().get(className);
+ final TypeUrl result = knownTypes.inverse()
+ .get(className);
if (result == null) {
throw new IllegalStateException("No Protobuf type URL found for the Java class " + className);
}
@@ -153,6 +155,7 @@ public static TypeUrl getTypeUrl(ClassName className) {
}
/** Returns a Protobuf type URL by Protobuf type name. */
+ @Nullable
public static TypeUrl getTypeUrl(String typeName) {
final TypeUrl typeUrl = typeNameToUrlMap.get(typeName);
return typeUrl;
@@ -203,7 +206,8 @@ private void putProperties(Properties properties) {
*
This method needs to be updated with introduction of new Google Protobuf types
* after they are used in the framework.
*/
- @SuppressWarnings("OverlyLongMethod") // OK as there are many types in Protobuf and we want to keep this code in one place.
+ @SuppressWarnings("OverlyLongMethod")
+ // OK as there are many types in Protobuf and we want to keep this code in one place.
private Builder addStandardProtobufTypes() {
// Types from `any.proto`.
put(Any.class);
@@ -217,14 +221,14 @@ private Builder addStandardProtobufTypes() {
put(DescriptorProtos.FileDescriptorSet.class);
put(DescriptorProtos.FileDescriptorProto.class);
put(DescriptorProtos.DescriptorProto.class);
- // Inner types of `DescriptorProto`
- put(DescriptorProtos.DescriptorProto.ExtensionRange.class);
- put(DescriptorProtos.DescriptorProto.ReservedRange.class);
+ // Inner types of `DescriptorProto`
+ put(DescriptorProtos.DescriptorProto.ExtensionRange.class);
+ put(DescriptorProtos.DescriptorProto.ReservedRange.class);
put(DescriptorProtos.FieldDescriptorProto.class);
- putEnum(DescriptorProtos.FieldDescriptorProto.Type.getDescriptor(),
- DescriptorProtos.FieldDescriptorProto.Type.class);
- putEnum(DescriptorProtos.FieldDescriptorProto.Label.getDescriptor(),
+ putEnum(DescriptorProtos.FieldDescriptorProto.Type.getDescriptor(),
+ DescriptorProtos.FieldDescriptorProto.Type.class);
+ putEnum(DescriptorProtos.FieldDescriptorProto.Label.getDescriptor(),
DescriptorProtos.FieldDescriptorProto.Label.class);
put(DescriptorProtos.OneofDescriptorProto.class);
@@ -237,21 +241,21 @@ private Builder addStandardProtobufTypes() {
DescriptorProtos.FileOptions.OptimizeMode.class);
put(DescriptorProtos.MessageOptions.class);
put(DescriptorProtos.FieldOptions.class);
- putEnum(DescriptorProtos.FieldOptions.CType.getDescriptor(),
- DescriptorProtos.FieldOptions.CType.class);
- putEnum(DescriptorProtos.FieldOptions.JSType.getDescriptor(),
- DescriptorProtos.FieldOptions.JSType.class);
+ putEnum(DescriptorProtos.FieldOptions.CType.getDescriptor(),
+ DescriptorProtos.FieldOptions.CType.class);
+ putEnum(DescriptorProtos.FieldOptions.JSType.getDescriptor(),
+ DescriptorProtos.FieldOptions.JSType.class);
put(DescriptorProtos.EnumOptions.class);
put(DescriptorProtos.EnumValueOptions.class);
put(DescriptorProtos.ServiceOptions.class);
put(DescriptorProtos.MethodOptions.class);
put(DescriptorProtos.UninterpretedOption.class);
put(DescriptorProtos.SourceCodeInfo.class);
- // Inner types of `SourceCodeInfo`.
- put(DescriptorProtos.SourceCodeInfo.Location.class);
+ // Inner types of `SourceCodeInfo`.
+ put(DescriptorProtos.SourceCodeInfo.Location.class);
put(DescriptorProtos.GeneratedCodeInfo.class);
- // Inner types of `GeneratedCodeInfo`.
- put(DescriptorProtos.GeneratedCodeInfo.Annotation.class);
+ // Inner types of `GeneratedCodeInfo`.
+ put(DescriptorProtos.GeneratedCodeInfo.Annotation.class);
// Types from `duration.proto`.
put(Duration.class);
@@ -277,8 +281,8 @@ private Builder addStandardProtobufTypes() {
// Types from `type.proto`.
put(Type.class);
put(Field.class);
- putEnum(Field.Kind.getDescriptor(), Field.Kind.class);
- putEnum(Field.Cardinality.getDescriptor(), Field.Cardinality.class);
+ putEnum(Field.Kind.getDescriptor(), Field.Kind.class);
+ putEnum(Field.Cardinality.getDescriptor(), Field.Cardinality.class);
put(com.google.protobuf.Enum.class);
put(EnumValue.class);
put(Option.class);
@@ -313,8 +317,8 @@ private void putEnum(EnumDescriptor desc, Class extends EnumLite> enumClass) {
private void put(TypeUrl typeUrl, ClassName className) {
if (resultMap.containsKey(typeUrl)) {
log().warn("Duplicate key in the {} map: {}. " +
- "It may be caused by the `task.descriptorSetOptions.includeImports` option " +
- "set to `true` in the `build.gradle`.", KnownTypes.class.getName(), typeUrl);
+ "It may be caused by the `task.descriptorSetOptions.includeImports` option " +
+ "set to `true` in the `build.gradle`.", KnownTypes.class.getName(), typeUrl);
return;
}
resultMap.put(typeUrl, className);
diff --git a/client/src/main/java/org/spine3/protobuf/Timestamps.java b/client/src/main/java/org/spine3/protobuf/Timestamps.java
index bef01a178d8..e66d9c30840 100644
--- a/client/src/main/java/org/spine3/protobuf/Timestamps.java
+++ b/client/src/main/java/org/spine3/protobuf/Timestamps.java
@@ -203,9 +203,9 @@ public static int compare(@Nullable Timestamp t1, @Nullable Timestamp t2) {
return 1;
}
int result = Long.compare(t1.getSeconds(), t2.getSeconds());
- result = (result == 0) ?
- Integer.compare(t1.getNanos(), t2.getNanos()) :
- result;
+ result = (result == 0)
+ ? Integer.compare(t1.getNanos(), t2.getNanos())
+ : result;
return result;
}
diff --git a/client/src/main/java/org/spine3/protobuf/TypeUrl.java b/client/src/main/java/org/spine3/protobuf/TypeUrl.java
index 75bb0abbd25..f8eae45933e 100644
--- a/client/src/main/java/org/spine3/protobuf/TypeUrl.java
+++ b/client/src/main/java/org/spine3/protobuf/TypeUrl.java
@@ -122,9 +122,9 @@ public static TypeUrl of(EnumDescriptor descriptor) {
@Internal
public static TypeUrl of(String typeUrlOrName) {
checkNotEmptyOrBlank(typeUrlOrName, "type URL or name");
- final TypeUrl typeUrl = isTypeUrl(typeUrlOrName) ?
- ofTypeUrl(typeUrlOrName) :
- ofTypeName(typeUrlOrName);
+ final TypeUrl typeUrl = isTypeUrl(typeUrlOrName)
+ ? ofTypeUrl(typeUrlOrName)
+ : ofTypeName(typeUrlOrName);
return typeUrl;
}
diff --git a/client/src/main/java/org/spine3/validate/Validate.java b/client/src/main/java/org/spine3/validate/Validate.java
index f5a2a66e487..325edb680f0 100644
--- a/client/src/main/java/org/spine3/validate/Validate.java
+++ b/client/src/main/java/org/spine3/validate/Validate.java
@@ -187,7 +187,7 @@ public static String checkNotEmptyOrBlank(String stringToCheck, String fieldName
* @return the passed timestamp
* @throws IllegalArgumentException if any of the requirements are not met
*/
- public static Timestamp checkIsPositive(Timestamp timestamp, String nameToLog) {
+ public static Timestamp checkPositive(Timestamp timestamp, String nameToLog) {
checkNotNull(timestamp, nameToLog + " is null.");
checkArgument(timestamp.getSeconds() > 0, nameToLog + " must have a positive number of seconds.");
checkArgument(timestamp.getNanos() >= 0, nameToLog + " must not have a negative number of nanoseconds.");
diff --git a/client/src/main/proto/spine/client/command_service.proto b/client/src/main/proto/spine/client/command_service.proto
new file mode 100644
index 00000000000..c8e1b2a3ea3
--- /dev/null
+++ b/client/src/main/proto/spine/client/command_service.proto
@@ -0,0 +1,42 @@
+//
+// Copyright 2016, TeamDev Ltd. All rights reserved.
+//
+// Redistribution and use in source and/or binary forms, with or without
+// modification, must retain the above copyright notice and the following
+// disclaimer.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+syntax = "proto3";
+
+package spine.client;
+
+// We put gRPC-based classes into `grpc` sub-package, which is annotated as `@Internal`
+// to hide implementation details from the public API of the framework.
+option (type_url_prefix) = "type.spine3.org";
+option java_package = "org.spine3.client.grpc";
+option java_multiple_files = true;
+option java_outer_classname = "CommandServiceProto";
+option java_generate_equals_and_hash = true;
+
+import "spine/annotations.proto";
+import "spine/base/command.proto";
+import "spine/base/response.proto";
+
+
+// A service for sending commands from clients.
+service CommandService {
+
+ // Request to handle a command.
+ rpc Post(base.Command) returns (base.Response);
+}
diff --git a/client/src/main/proto/spine/client/entities.proto b/client/src/main/proto/spine/client/entities.proto
new file mode 100644
index 00000000000..ab9d3596f09
--- /dev/null
+++ b/client/src/main/proto/spine/client/entities.proto
@@ -0,0 +1,85 @@
+//
+// Copyright 2016, TeamDev Ltd. All rights reserved.
+//
+// Redistribution and use in source and/or binary forms, with or without
+// modification, must retain the above copyright notice and the following
+// disclaimer.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+syntax = "proto3";
+
+package spine.client;
+
+option (type_url_prefix) = "type.spine3.org";
+option java_generate_equals_and_hash = true;
+option java_multiple_files = true;
+option java_outer_classname = "EntitiesProto";
+option java_package = "org.spine3.client";
+
+import "google/protobuf/any.proto";
+
+import "spine/annotations.proto";
+
+// Represents an ID of an entity.
+//
+// Acts as an read-side API identifier for Projection state, Aggregate state and other entities.
+message EntityId {
+
+ // An ID of an entity.
+ google.protobuf.Any id = 1;
+}
+
+// Defines the entity type and filters.
+//
+// Use Target to specify and narrow down the source for Topic and Query by an entity type and various criteria.
+message Target {
+
+ // Represents TypeUrl for the entity of interest.
+ string type = 1;
+
+ // Modifies the entity collection of a specified type.
+ // Allows either to narrow it down by filtering out the entity objects or include all entity instances
+ // into the target.
+ oneof criterion {
+
+ // The instruction to include all objects of a given type.
+ bool include_all = 2;
+
+ // Filter the objects of a given type by certain criteria.
+ EntityFilters filters = 3;
+ }
+}
+
+// Set of filters used to modify the entity collection.
+//
+// To be used in scope of read-side API for specifying the target of query and subscription operations.
+message EntityFilters {
+
+ // Match entities by IDs.
+ EntityIdFilter id_filter = 1;
+
+ // Reserved for more filter types
+ reserved 2 to 40;
+}
+
+// Allows to add an ID filter for the read operations.
+//
+// Used to modify the collection of interest by filtering out the objects with identifiers not included into the filter.
+// SQL equivalent is "... where entity.id IN (...)".
+message EntityIdFilter {
+
+ // The collection of entity IDs.
+ repeated EntityId ids = 1;
+}
+
diff --git a/client/src/main/proto/spine/client/query.proto b/client/src/main/proto/spine/client/query.proto
index 0280acc61d9..3b8cb2c0a0c 100644
--- a/client/src/main/proto/spine/client/query.proto
+++ b/client/src/main/proto/spine/client/query.proto
@@ -27,11 +27,50 @@ option java_multiple_files = true;
option java_outer_classname = "QueryProto";
option java_package = "org.spine3.client";
+
+import "google/protobuf/any.proto";
+import "google/protobuf/field_mask.proto";
+
import "spine/annotations.proto";
import "spine/ui/language.proto";
+import "spine/base/response.proto";
+import "spine/client/entities.proto";
+
message QueryContext {
ui.Language language = 1;
//TODO:2015-12-16:alexander.yevsyukov: Finish
-}
\ No newline at end of file
+}
+
+// The main abstraction over the read-side API.
+//
+// Allows clients to form the requests to the read-side through the `QueryService`.
+// `Query` execution typically results in a QueryResponse object.
+message Query {
+
+ // Defines the entity of interest, e.g. entity type URL and a set of fetch criteria.
+ Target target = 1;
+
+ // Field mask to be applied to the items of the query result.
+ google.protobuf.FieldMask field_mask = 2;
+
+ // Reserved for utility fields like query creation date, required response timeframe etc.
+ reserved 3 to 6;
+}
+
+// The result of `Query` processing.
+//
+// Contains the actual processing results and other response attributes.
+// Used as a result of {@code QueryService#Read(Query)} gRPC method call.
+message QueryResponse {
+
+ // Represents the base part of the response. I.e. whether the `Query` has been acked or not.
+ base.Response response = 1;
+
+ // Reserved for more query response attributes, e.g. to describe paginated response etc.
+ reserved 2 to 4;
+
+ // Entity states (each packed as `Any`) returned to the API user as a result of Query execution.
+ repeated google.protobuf.Any messages = 5;
+}
diff --git a/client/src/main/proto/spine/client/query_service.proto b/client/src/main/proto/spine/client/query_service.proto
new file mode 100644
index 00000000000..36e7afec695
--- /dev/null
+++ b/client/src/main/proto/spine/client/query_service.proto
@@ -0,0 +1,39 @@
+//
+// Copyright 2016, TeamDev Ltd. All rights reserved.
+//
+// Redistribution and use in source and/or binary forms, with or without
+// modification, must retain the above copyright notice and the following
+// disclaimer.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+syntax = "proto3";
+
+package spine.client;
+
+option (type_url_prefix) = "type.spine3.org";
+option java_package = "org.spine3.client.grpc";
+option java_multiple_files = true;
+option java_outer_classname = "QueryServiceProto";
+option java_generate_equals_and_hash = true;
+
+
+import "spine/annotations.proto";
+import "spine/client/query.proto";
+
+// A service for querying the read-side from clients.
+service QueryService {
+
+ // Reads a certain data from the read-side by setting the criteria via Query.
+ rpc Read (Query) returns (QueryResponse);
+}
diff --git a/client/src/main/proto/spine/client/subscription.proto b/client/src/main/proto/spine/client/subscription.proto
new file mode 100644
index 00000000000..14e221a2546
--- /dev/null
+++ b/client/src/main/proto/spine/client/subscription.proto
@@ -0,0 +1,91 @@
+//
+// Copyright 2016, TeamDev Ltd. All rights reserved.
+//
+// Redistribution and use in source and/or binary forms, with or without
+// modification, must retain the above copyright notice and the following
+// disclaimer.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+syntax = "proto3";
+
+package spine.client;
+
+option (type_url_prefix) = "type.spine3.org";
+option java_generate_equals_and_hash = true;
+option java_multiple_files = true;
+option java_outer_classname = "SubscriptionProto";
+option java_package = "org.spine3.client";
+
+
+import "google/protobuf/any.proto";
+import "google/protobuf/field_mask.proto";
+
+import "spine/annotations.proto";
+import "spine/base/response.proto";
+import "spine/client/entities.proto";
+
+// An object defining a unit of subscription.
+//
+// Defines the target (entities and criteria) of subscription.
+message Topic {
+
+ // Defines the entity of interest, e.g. entity type URL and a set of subscription criteria.
+ Target target = 1;
+
+ // Field mask to be applied to the entity updates applicable to this topic.
+ //
+ // Applied to each of the entity state messages before returning in scope of SubscriptionUpdate.
+ google.protobuf.FieldMask field_mask = 2;
+
+ // Reserved for utility fields.
+ reserved 3 to 6;
+}
+
+// Wrapped collection of read-side entity updates on a topic with the specific subscription ID.
+message SubscriptionUpdate {
+
+ // The subscription in which scope this update is propagated.
+ Subscription subscription = 1;
+
+ // Represents the base part of the response. I.e. whether the Topic subscription requires has been acked or not.
+ base.Response response = 2;
+
+ // Reserved for more subscription update attributes.
+ reserved 3 to 9;
+
+ // Entity updates packed as Any.
+ //
+ // Each of the update messages is affected by the field mask set for the current subscription.
+ repeated google.protobuf.Any updates = 10;
+}
+
+// The subscription object.
+//
+// Created when the client subscribes to a topic inside the read-side implementation.
+// Generally should not be created in the client code.
+// See SubscriptionService#Subscribe(Topic).
+message Subscription {
+
+ // Unique identifier of the subscription.
+ //
+ // Typically built using Java's UUID.toString() functionality.
+ // Must be unique in scope of a bounded context.
+ string id = 1;
+
+ // Represents TypeUrl of the target entity for this subscription.
+ string type = 2;
+
+ // Reserved for subscription attributes.
+ reserved 3 to 10;
+}
diff --git a/client/src/main/proto/spine/client/client_service.proto b/client/src/main/proto/spine/client/subscription_service.proto
similarity index 53%
rename from client/src/main/proto/spine/client/client_service.proto
rename to client/src/main/proto/spine/client/subscription_service.proto
index 8c4a1da3ffd..5a501f0c22a 100644
--- a/client/src/main/proto/spine/client/client_service.proto
+++ b/client/src/main/proto/spine/client/subscription_service.proto
@@ -22,36 +22,36 @@ syntax = "proto3";
package spine.client;
option (type_url_prefix) = "type.spine3.org";
-// We put gRPC-based classes into `grpc` sub-package, which is annotated as `@Internal`
-// to hide implementation details from the public API of the framework.
option java_package = "org.spine3.client.grpc";
option java_multiple_files = true;
-option java_outer_classname = "ClientServiceProto";
+option java_outer_classname = "SubscriptionServiceProto";
option java_generate_equals_and_hash = true;
+
import "spine/annotations.proto";
-import "spine/base/command.proto";
-import "spine/base/event.proto";
+import "spine/client/subscription.proto";
import "spine/base/response.proto";
-// The topic of interest the client can subscribe and unsubscribe.
-message Topic {
- //TODO:2016-01-14:alexander.yevsyukov: Define this type. E.g. there can be some structure, which describes many
- // points of interest at once. See Pub-sub for possible API inspiration. Chances are it's going to be one of underlying
- // implementations.
- string value = 1;
-}
+// A service for subscribing to the read-side changes by clients.
+service SubscriptionService {
+
+ // Create the subscription to the particular read-side updates.
+ //
+ // Topic defines the target of subscription and other attributes (like field masks).
+ // NOTE: this method serves for Topic creation; to start listening for updates use #Activate(Subscription).
+ rpc Subscribe (Topic) returns (Subscription);
-// The service for sending commands from clients.
-service ClientService {
- // The request to handle a command.
- rpc Post(base.Command) returns (base.Response);
+ // Activate the subscription to the particular read-side updates.
+ //
+ // The client will start receiving the subscription updates upon new topic target changes in the read-side.
+ // Cancelled subscriptions cannot be activated.
+ rpc Activate(Subscription) returns (stream SubscriptionUpdate);
- // The request to receive events on the topic of interest.
- rpc Subscribe(Topic) returns (stream base.Event);
- // The request to unsubscribe from the topic.
- // This should close the stream opened by `Subscribe` call with the same `Topic` value.
- rpc Unsubscribe(Topic) returns (base.Response);
+ // Cancel the given subscription.
+ //
+ // Clients will stop receiving the SubscriptionUpdate after this method is called.
+ // The subscription is destroyed as a result of invocation.
+ rpc Cancel (Subscription) returns (base.Response);
}
diff --git a/client/src/test/java/org/spine3/base/QueriesShould.java b/client/src/test/java/org/spine3/base/QueriesShould.java
new file mode 100644
index 00000000000..0a711325774
--- /dev/null
+++ b/client/src/test/java/org/spine3/base/QueriesShould.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.spine3.base;
+
+import com.google.protobuf.FieldMask;
+import com.google.protobuf.ProtocolStringList;
+import org.junit.Test;
+import org.spine3.client.EntityFilters;
+import org.spine3.client.EntityId;
+import org.spine3.client.EntityIdFilter;
+import org.spine3.client.Query;
+import org.spine3.client.Target;
+import org.spine3.protobuf.AnyPacker;
+import org.spine3.protobuf.TypeUrl;
+import org.spine3.test.queries.TestEntity;
+import org.spine3.test.queries.TestEntityId;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.spine3.test.Tests.hasPrivateUtilityConstructor;
+
+/**
+ * @author Alex Tymchenko
+ */
+@SuppressWarnings({"LocalVariableNamingConvention", "MagicNumber", "MethodParameterNamingConvention"})
+public class QueriesShould {
+
+ // See {@code queries_should.proto} for declaration.
+ private static final Class TARGET_ENTITY_CLASS = TestEntity.class;
+ private static final String TARGET_ENTITY_TYPE_URL = "type.spine3.org/spine.test.queries.TestEntity";
+
+ @Test
+ public void have_private_constructor() {
+ assertTrue(hasPrivateUtilityConstructor(Queries.class));
+ }
+
+ @Test
+ public void have_private_constructor_of_targets_class() {
+ assertTrue(hasPrivateUtilityConstructor(Queries.Targets.class));
+ }
+
+ @Test
+ public void compose_proper_read_all_query() {
+ final Class targetEntityClass = TestEntity.class;
+ final Query readAllQuery = Queries.readAll(targetEntityClass);
+ assertNotNull(readAllQuery);
+
+ checkTypeCorrectAndFiltersEmpty(targetEntityClass, readAllQuery);
+
+ checkFieldMaskEmpty(readAllQuery);
+ }
+
+ @Test
+ public void compose_proper_read_all_query_with_single_path() {
+ final Class targetEntityClass = TestEntity.class;
+ final String expectedEntityPath = singleTestEntityPath();
+ final Query readAllWithPathFilteringQuery = Queries.readAll(targetEntityClass, expectedEntityPath);
+ assertNotNull(readAllWithPathFilteringQuery);
+
+ checkTypeCorrectAndFiltersEmpty(targetEntityClass, readAllWithPathFilteringQuery);
+ verifySinglePathInQuery(expectedEntityPath, readAllWithPathFilteringQuery);
+ }
+
+ @Test
+ public void compose_proper_read_all_query_with_multiple_random_paths() {
+ final Class targetEntityClass = TestEntity.class;
+
+ final String[] paths = multipleRandomPaths();
+ final Query readAllWithPathFilteringQuery = Queries.readAll(targetEntityClass, paths);
+ assertNotNull(readAllWithPathFilteringQuery);
+
+ checkTypeCorrectAndFiltersEmpty(targetEntityClass, readAllWithPathFilteringQuery);
+ verifyMultiplePathsInQuery(paths, readAllWithPathFilteringQuery);
+ }
+
+ @Test
+ public void compose_proper_read_by_ids_query() {
+ final Set testEntityIds = multipleIds();
+ final Query readByIdsQuery = Queries.readByIds(TARGET_ENTITY_CLASS, testEntityIds);
+ assertNotNull(readByIdsQuery);
+
+ checkFieldMaskEmpty(readByIdsQuery);
+
+ final Target target = checkTarget(TARGET_ENTITY_CLASS, readByIdsQuery);
+
+ verifyIdFilter(testEntityIds, target.getFilters());
+ }
+
+ @Test
+ public void compose_proper_read_by_ids_query_with_single_path() {
+ final Set testEntityIds = multipleIds();
+ final String expectedPath = singleTestEntityPath();
+ final Query readByIdsWithSinglePathQuery = Queries.readByIds(
+ TARGET_ENTITY_CLASS,
+ testEntityIds,
+ expectedPath);
+ assertNotNull(readByIdsWithSinglePathQuery);
+
+ final Target target = checkTarget(TARGET_ENTITY_CLASS, readByIdsWithSinglePathQuery);
+
+ verifyIdFilter(testEntityIds, target.getFilters());
+ verifySinglePathInQuery(expectedPath, readByIdsWithSinglePathQuery);
+ }
+
+ @Test
+ public void compose_proper_read_by_ids_query_with_multiple_random_paths() {
+ final Set testEntityIds = multipleIds();
+ final String[] paths = multipleRandomPaths();
+ final Query readByIdsWithSinglePathQuery = Queries.readByIds(
+ TARGET_ENTITY_CLASS,
+ testEntityIds,
+ paths);
+ assertNotNull(readByIdsWithSinglePathQuery);
+
+ final Target target = checkTarget(TARGET_ENTITY_CLASS, readByIdsWithSinglePathQuery);
+
+ verifyIdFilter(testEntityIds, target.getFilters());
+ verifyMultiplePathsInQuery(paths, readByIdsWithSinglePathQuery);
+ }
+
+ @Test
+ public void return_proper_type_for_known_target() {
+ final Target target = Queries.Targets.allOf(TARGET_ENTITY_CLASS);
+ final Query query = Query.newBuilder()
+ .setTarget(target)
+ .build();
+ final TypeUrl type = Queries.typeOf(query);
+ assertNotNull(type);
+ assertEquals(TARGET_ENTITY_TYPE_URL, type.toString());
+ }
+
+ @Test
+ public void return_null_if_target_type_unknown() {
+ final Target target = Target.newBuilder()
+ .setType("Inexistent Message Type")
+ .build();
+ final Query query = Query.newBuilder()
+ .setTarget(target)
+ .build();
+ final TypeUrl type = Queries.typeOf(query);
+ assertNull(type);
+ }
+
+ private static void verifyMultiplePathsInQuery(String[] paths, Query readAllWithPathFilteringQuery) {
+ final FieldMask fieldMask = readAllWithPathFilteringQuery.getFieldMask();
+ assertEquals(paths.length, fieldMask.getPathsCount());
+ final ProtocolStringList pathsList = fieldMask.getPathsList();
+ for (String expectedPath : paths) {
+ assertTrue(pathsList.contains(expectedPath));
+ }
+ }
+
+ private static void verifySinglePathInQuery(String expectedEntityPath, Query query) {
+ final FieldMask fieldMask = query.getFieldMask();
+ assertEquals(1, fieldMask.getPathsCount()); // as we set the only path value.
+
+ final String firstPath = fieldMask.getPaths(0);
+ assertEquals(expectedEntityPath, firstPath);
+ }
+
+ private static String[] multipleRandomPaths() {
+ return new String[]{"some", "random", "paths"};
+ }
+
+ private static String singleTestEntityPath() {
+ return TestEntity.getDescriptor()
+ .getFields()
+ .get(1)
+ .getFullName();
+ }
+
+ private static Set multipleIds() {
+ return newHashSet(TestEntityId.newBuilder()
+ .setValue(1)
+ .build(),
+ TestEntityId.newBuilder()
+ .setValue(7)
+ .build(),
+ TestEntityId.newBuilder()
+ .setValue(15)
+ .build());
+ }
+
+ private static void verifyIdFilter(Set expectedIds, EntityFilters filters) {
+ assertNotNull(filters);
+ final EntityIdFilter idFilter = filters.getIdFilter();
+ assertNotNull(idFilter);
+ final List actualListOfIds = idFilter.getIdsList();
+ for (TestEntityId testEntityId : expectedIds) {
+ final EntityId expectedEntityId = EntityId.newBuilder()
+ .setId(AnyPacker.pack(testEntityId))
+ .build();
+ assertTrue(actualListOfIds.contains(expectedEntityId));
+ }
+ }
+
+ private static void checkFieldMaskEmpty(Query query) {
+ final FieldMask fieldMask = query.getFieldMask();
+ assertNotNull(fieldMask);
+ assertEquals(FieldMask.getDefaultInstance(), fieldMask);
+ }
+
+ private static void checkTypeCorrectAndFiltersEmpty(Class expectedTargetClass, Query query) {
+ final Target entityTarget = checkTarget(expectedTargetClass, query);
+
+ final EntityFilters filters = entityTarget.getFilters();
+ assertNotNull(filters);
+ assertEquals(EntityFilters.getDefaultInstance(), filters);
+ }
+
+ private static Target checkTarget(Class targetEntityClass, Query query) {
+ final Target entityTarget = query.getTarget();
+ assertNotNull(entityTarget);
+
+ final String expectedTypeName = TypeUrl.of(targetEntityClass)
+ .getTypeName();
+ assertEquals(expectedTypeName, entityTarget.getType());
+ return entityTarget;
+ }
+}
diff --git a/client/src/test/java/org/spine3/validate/ValidateShould.java b/client/src/test/java/org/spine3/validate/ValidateShould.java
index acbf3aea744..1fd88bb7c2a 100644
--- a/client/src/test/java/org/spine3/validate/ValidateShould.java
+++ b/client/src/test/java/org/spine3/validate/ValidateShould.java
@@ -81,34 +81,34 @@ public void return_non_default_value_on_check() {
@Test(expected = IllegalArgumentException.class)
public void throw_exception_if_timestamp_seconds_value_is_zero() {
- Validate.checkIsPositive(Timestamp.newBuilder()
- .setSeconds(0)
- .setNanos(5)
- .build(), "");
+ Validate.checkPositive(Timestamp.newBuilder()
+ .setSeconds(0)
+ .setNanos(5)
+ .build(), "");
}
@Test(expected = IllegalArgumentException.class)
public void throw_exception_if_timestamp_seconds_value_is_negative() {
- Validate.checkIsPositive(Timestamp.newBuilder()
- .setSeconds(-5)
- .setNanos(5)
- .build(), "");
+ Validate.checkPositive(Timestamp.newBuilder()
+ .setSeconds(-5)
+ .setNanos(5)
+ .build(), "");
}
@Test(expected = IllegalArgumentException.class)
public void throw_exception_if_timestamp_nanos_value_is_negative() {
- Validate.checkIsPositive(Timestamp.newBuilder()
- .setSeconds(5)
- .setNanos(-5)
- .build(), "");
+ Validate.checkPositive(Timestamp.newBuilder()
+ .setSeconds(5)
+ .setNanos(-5)
+ .build(), "");
}
@Test
public void do_not_throw_exception_if_timestamp_seconds_and_nanos_are_positive() {
- Validate.checkIsPositive(Timestamp.newBuilder()
- .setSeconds(5)
- .setNanos(5)
- .build(), "");
+ Validate.checkPositive(Timestamp.newBuilder()
+ .setSeconds(5)
+ .setNanos(5)
+ .build(), "");
}
@Test(expected = NullPointerException.class)
diff --git a/client/src/test/proto/spine/test/queries_should.proto b/client/src/test/proto/spine/test/queries_should.proto
new file mode 100644
index 00000000000..ec38a46decd
--- /dev/null
+++ b/client/src/test/proto/spine/test/queries_should.proto
@@ -0,0 +1,50 @@
+//
+// Copyright 2016, TeamDev Ltd. All rights reserved.
+//
+// Redistribution and use in source and/or binary forms, with or without
+// modification, must retain the above copyright notice and the following
+// disclaimer.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+syntax = "proto3";
+
+package spine.test.queries;
+
+option (type_url_prefix) = "type.spine3.org";
+option java_package="org.spine3.test.queries";
+option java_multiple_files = true;
+option java_outer_classname = "QueriesShouldProto";
+
+
+import "spine/annotations.proto";
+
+// Only for usage in `QueriesShould` class.
+
+// Simple ID for tests.
+message TestEntityId {
+
+ // Numeric value wrapped into this ID.
+ int32 value = 1;
+}
+
+// Test entity used as a `Target` for queries.
+message TestEntity {
+
+ // Entity ID.
+ TestEntityId id = 1;
+
+ string first_field = 2;
+
+ bool second_field = 3;
+}
diff --git a/examples/build.gradle b/examples/build.gradle
new file mode 100644
index 00000000000..4a0ed672fc7
--- /dev/null
+++ b/examples/build.gradle
@@ -0,0 +1,13 @@
+dependencies {
+ compile project(path: ':client')
+ compile project(path: ':server')
+ //Use jdk14 bindings for the sample application
+ compile group: 'org.slf4j', name: 'slf4j-jdk14', version: project.slf4JVersion
+}
+
+sourceSets.main.java.srcDir generatedSpineDir
+
+apply from: generateDescriptorSetPlugin
+
+apply plugin: spineProtobufPluginId
+
diff --git a/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java b/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java
new file mode 100644
index 00000000000..7a1d2d53335
--- /dev/null
+++ b/examples/src/main/java/org/spine3/examples/aggregate/ClientApp.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.examples.aggregate;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.base.Command;
+import org.spine3.base.Identifiers;
+import org.spine3.base.Queries;
+import org.spine3.base.Response;
+import org.spine3.client.CommandFactory;
+import org.spine3.client.Subscription;
+import org.spine3.client.SubscriptionUpdate;
+import org.spine3.client.Target;
+import org.spine3.client.Topic;
+import org.spine3.client.grpc.CommandServiceGrpc;
+import org.spine3.client.grpc.SubscriptionServiceGrpc;
+import org.spine3.examples.aggregate.command.AddOrderLine;
+import org.spine3.examples.aggregate.command.CreateOrder;
+import org.spine3.examples.aggregate.command.PayForOrder;
+import org.spine3.money.Money;
+import org.spine3.protobuf.AnyPacker;
+import org.spine3.protobuf.Messages;
+import org.spine3.time.ZoneOffsets;
+
+import java.util.List;
+
+import static com.google.common.collect.Lists.newLinkedList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.spine3.client.ConnectionConstants.DEFAULT_CLIENT_SERVICE_PORT;
+import static org.spine3.client.UserUtil.newUserId;
+import static org.spine3.money.Currency.USD;
+import static org.spine3.money.MoneyUtil.newMoney;
+import static org.spine3.protobuf.Messages.toText;
+
+/**
+ * Sample of a client implementation.
+ *
+ * @author Mikhail Melnik
+ * @author Mikhail Mikhaylov
+ * @author Alexander Litus
+ */
+@SuppressWarnings("OverlyCoupledClass") // OK for a self-contained all-in-one example.
+public class ClientApp {
+
+ @SuppressWarnings("DuplicateStringLiteralInspection")
+ private static final String SERVICE_HOST = "localhost";
+
+ private static final String RPC_FAILED = "RPC failed";
+ private static final int SHUTDOWN_TIMEOUT_SEC = 5;
+
+ private final CommandFactory commandFactory;
+ private final ManagedChannel channel;
+ private final CommandServiceGrpc.CommandServiceBlockingStub blockingClient;
+ private final SubscriptionServiceGrpc.SubscriptionServiceStub subscriptionClient;
+
+ private final StreamObserver orderUpdateObserver = new StreamObserver() {
+ @Override
+ public void onNext(SubscriptionUpdate subscriptionUpdate) {
+ final String updateText = Messages.toText(subscriptionUpdate);
+ log().info(" + Order updated: {}", updateText);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log().error("Subscription update delivery error occurred", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ log().info("Subscription update delivery completed.");
+ }
+ };
+
+
+ /** Construct the client connecting to server at {@code host:port}. */
+ public ClientApp(String host, int port) {
+ commandFactory = CommandFactory.newBuilder()
+ .setActor(newUserId(Identifiers.newUuid()))
+ .setZoneOffset(ZoneOffsets.UTC)
+ .build();
+ channel = ManagedChannelBuilder
+ .forAddress(host, port)
+ .usePlaintext(true)
+ .build();
+ blockingClient = CommandServiceGrpc.newBlockingStub(channel);
+ subscriptionClient = SubscriptionServiceGrpc.newStub(channel);
+ }
+
+ private Command createOrder(OrderId orderId) {
+ final CreateOrder msg = CreateOrder.newBuilder()
+ .setOrderId(orderId)
+ .build();
+ return commandFactory.create(msg);
+ }
+
+ private Command addOrderLine(OrderId orderId) {
+ final int bookPriceUsd = 52;
+ final Book book = Book.newBuilder()
+ .setBookId(BookId.newBuilder()
+ .setISBN("978-0321125217")
+ .build())
+ .setAuthor("Eric Evans")
+ .setTitle("Domain Driven Design.")
+ .setPrice(newMoney(bookPriceUsd, USD))
+ .build();
+ final int quantity = 1;
+ final Money totalPrice = newMoney(bookPriceUsd * quantity, USD);
+ final OrderLine orderLine = OrderLine.newBuilder()
+ .setProductId(AnyPacker.pack(book.getBookId()))
+ .setQuantity(quantity)
+ .setPrice(totalPrice)
+ .build();
+ final AddOrderLine msg = AddOrderLine.newBuilder()
+ .setOrderId(orderId)
+ .setOrderLine(orderLine)
+ .build();
+ return commandFactory.create(msg);
+ }
+
+ private Command payForOrder(OrderId orderId) {
+ final BillingInfo billingInfo = BillingInfo.newBuilder()
+ .setInfo("Payment info is here.")
+ .build();
+ final PayForOrder msg = PayForOrder.newBuilder()
+ .setOrderId(orderId)
+ .setBillingInfo(billingInfo)
+ .build();
+ return commandFactory.create(msg);
+ }
+
+
+ private void subscribe() {
+ final Target allOrders = Queries.Targets.allOf(Order.class);
+ final Topic topic = Topic.newBuilder()
+ .setTarget(allOrders)
+ .build();
+ subscriptionClient.subscribe(topic, new StreamObserver() {
+
+ private Subscription latestSubscription;
+
+ @Override
+ public void onNext(Subscription subscription) {
+ this.latestSubscription = subscription;
+ final String eventText = Messages.toText(subscription);
+ log().info(eventText);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log().error("Subscription error occurred", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ log().info("Subscription request completed.");
+ subscriptionClient.activate(latestSubscription, orderUpdateObserver);
+ }
+
+ });
+ }
+
+
+ /** Sends requests to the server. */
+ public static void main(String[] args) throws InterruptedException {
+ final ClientApp client = new ClientApp(SERVICE_HOST, DEFAULT_CLIENT_SERVICE_PORT);
+ client.subscribe();
+
+ final List requests = client.generateRequests();
+
+ for (Command request : requests) {
+ log().info("Sending a request: " + request.getMessage()
+ .getTypeUrl() + "...");
+ final Response result = client.post(request);
+ log().info("Result: " + toText(result));
+ }
+
+ client.shutdown();
+ }
+
+ /** Creates several test requests. */
+ private List generateRequests() {
+ final List result = newLinkedList();
+ for (int i = 0; i < 10; i++) {
+ final OrderId orderId = OrderId.newBuilder()
+ .setValue(String.valueOf(i))
+ .build();
+ result.add(createOrder(orderId));
+ result.add(addOrderLine(orderId));
+ result.add(payForOrder(orderId));
+ }
+ return result;
+ }
+
+ /**
+ * Shutdown the connection channel.
+ *
+ * @throws InterruptedException if waiting is interrupted.
+ */
+ private void shutdown() throws InterruptedException {
+ channel.shutdown()
+ .awaitTermination(SHUTDOWN_TIMEOUT_SEC, SECONDS);
+ }
+
+ /** Sends a request to the server. */
+ private Response post(Command request) {
+ Response result = null;
+ try {
+ result = blockingClient.post(request);
+ } catch (RuntimeException e) {
+ log().warn(RPC_FAILED, e);
+ }
+ return result;
+ }
+
+ private enum LogSingleton {
+ INSTANCE;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Logger value = LoggerFactory.getLogger(ClientApp.class);
+ }
+
+ private static Logger log() {
+ return LogSingleton.INSTANCE.value;
+ }
+}
diff --git a/examples/src/main/java/org/spine3/examples/aggregate/server/Server.java b/examples/src/main/java/org/spine3/examples/aggregate/server/Server.java
new file mode 100644
index 00000000000..85412371d97
--- /dev/null
+++ b/examples/src/main/java/org/spine3/examples/aggregate/server/Server.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.spine3.examples.aggregate.server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.server.BoundedContext;
+import org.spine3.server.CommandService;
+import org.spine3.server.SubscriptionService;
+import org.spine3.server.event.EventSubscriber;
+import org.spine3.server.storage.StorageFactory;
+import org.spine3.server.storage.memory.InMemoryStorageFactory;
+import org.spine3.server.transport.GrpcContainer;
+
+import java.io.IOException;
+
+import static org.spine3.client.ConnectionConstants.DEFAULT_CLIENT_SERVICE_PORT;
+
+/**
+ * Sample gRPC server implementation.
+ *
+ * @author Mikhail Melnik
+ * @author Alexander Litus
+ */
+public class Server {
+
+ private final GrpcContainer grpcContainer;
+ private final BoundedContext boundedContext;
+
+ public Server(StorageFactory storageFactory) {
+ // Create a bounded context.
+ this.boundedContext = BoundedContext.newBuilder()
+ .setStorageFactory(storageFactory)
+ .build();
+ // Create and register a repository with the bounded context.
+ final OrderRepository repository = new OrderRepository(boundedContext);
+ boundedContext.register(repository);
+
+ // Subscribe an event subscriber in the bounded context.
+ final EventSubscriber eventLogger = new EventLogger();
+ boundedContext.getEventBus()
+ .subscribe(eventLogger);
+
+ // Create a command service with this bounded context,
+ final CommandService commandService = CommandService.newBuilder()
+ .addBoundedContext(boundedContext)
+ .build();
+
+ // and a subscription service for the same bounded context.
+ final SubscriptionService subscriptionService = SubscriptionService.newBuilder()
+ .addBoundedContext(boundedContext)
+ .build();
+
+
+ // Create a gRPC server and schedule the client service instance for deployment.
+ this.grpcContainer = GrpcContainer.newBuilder()
+ .setPort(DEFAULT_CLIENT_SERVICE_PORT)
+ .addService(commandService)
+ .addService(subscriptionService)
+ .build();
+ }
+
+ public void start() throws IOException {
+ grpcContainer.start();
+ grpcContainer.addShutdownHook();
+ log().info("Server started, listening to commands on the port " + DEFAULT_CLIENT_SERVICE_PORT);
+ }
+
+ public void awaitTermination() {
+ grpcContainer.awaitTermination();
+ }
+
+ public void shutdown() throws Exception {
+ grpcContainer.shutdown();
+ boundedContext.close();
+ }
+
+ /** The entry point of the server application. */
+ public static void main(String[] args) throws IOException {
+ final Server server = new Server(InMemoryStorageFactory.getInstance());
+ server.start();
+ server.awaitTermination();
+ }
+
+ private static Logger log() {
+ return LogSingleton.INSTANCE.value;
+ }
+
+ private enum LogSingleton {
+ INSTANCE;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Logger value = LoggerFactory.getLogger(Server.class);
+ }
+}
diff --git a/examples/src/test/java/org/spine3/examples/aggregate/server/ServerShould.java b/examples/src/test/java/org/spine3/examples/aggregate/server/ServerShould.java
new file mode 100644
index 00000000000..d6cb30c5898
--- /dev/null
+++ b/examples/src/test/java/org/spine3/examples/aggregate/server/ServerShould.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.examples.aggregate.server;
+
+import org.junit.Test;
+import org.spine3.examples.aggregate.ClientApp;
+import org.spine3.server.storage.memory.InMemoryStorageFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Throwables.propagate;
+
+@SuppressWarnings("InstanceMethodNamingConvention")
+public class ServerShould {
+
+ @Test
+ public void run_on_in_memory_storage() throws Exception {
+ final Server[] serverRef = new Server[1];
+ final CountDownLatch serverStartLatch = new CountDownLatch(1);
+
+ final Thread serverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final Server server = new Server(InMemoryStorageFactory.getInstance());
+ serverRef[0] = server;
+ try {
+ server.start();
+ server.awaitTermination();
+ serverStartLatch.countDown();
+ } catch (IOException e) {
+ throw propagate(e);
+ }
+ }
+ });
+
+ serverThread.start();
+
+ serverStartLatch.await(5, TimeUnit.SECONDS);
+ //noinspection ZeroLengthArrayAllocation
+ ClientApp.main(new String[0]);
+
+ serverRef[0].shutdown();
+ }
+}
diff --git a/server/src/main/java/org/spine3/server/BoundedContext.java b/server/src/main/java/org/spine3/server/BoundedContext.java
index 46b78d3fc44..dd4bb7b1095 100644
--- a/server/src/main/java/org/spine3/server/BoundedContext.java
+++ b/server/src/main/java/org/spine3/server/BoundedContext.java
@@ -43,6 +43,9 @@
import org.spine3.server.integration.IntegrationEvent;
import org.spine3.server.integration.IntegrationEventContext;
import org.spine3.server.integration.grpc.IntegrationEventSubscriberGrpc;
+import org.spine3.server.stand.Stand;
+import org.spine3.server.stand.StandFunnel;
+import org.spine3.server.storage.StandStorage;
import org.spine3.server.storage.StorageFactory;
import org.spine3.validate.Validate;
@@ -64,7 +67,7 @@
* @author Mikhail Melnik
*/
public class BoundedContext extends IntegrationEventSubscriberGrpc.IntegrationEventSubscriberImplBase
- implements AutoCloseable {
+ implements AutoCloseable {
/** The default name for a {@code BoundedContext}. */
public static final String DEFAULT_NAME = "Main";
@@ -80,6 +83,8 @@ public class BoundedContext extends IntegrationEventSubscriberGrpc.IntegrationEv
private final StorageFactory storageFactory;
private final CommandBus commandBus;
private final EventBus eventBus;
+ private final Stand stand;
+ private final StandFunnel standFunnel;
private final List> repositories = Lists.newLinkedList();
@@ -89,6 +94,8 @@ private BoundedContext(Builder builder) {
this.storageFactory = builder.storageFactory;
this.commandBus = builder.commandBus;
this.eventBus = builder.eventBus;
+ this.stand = builder.stand;
+ this.standFunnel = builder.standFunnel;
}
/**
@@ -110,6 +117,7 @@ public static Builder newBuilder() {
*
Closes {@link EventBus}.
*
Closes {@link CommandStore}.
*
Closes {@link EventStore}.
+ *
Closes {@link Stand}.
*
Shuts down all registered repositories. Each registered repository is:
*
*
un-registered from {@link CommandBus}
@@ -125,6 +133,7 @@ public void close() throws Exception {
storageFactory.close();
commandBus.close();
eventBus.close();
+ stand.close();
shutDownRepositories();
@@ -183,6 +192,7 @@ public boolean isMultitenant() {
if (repository instanceof EventDispatcher) {
eventBus.register((EventDispatcher) repository);
}
+ stand.registerTypeSupplier(repository);
}
private void checkStorageAssigned(Repository repository) {
@@ -207,10 +217,10 @@ private static Event toEvent(IntegrationEvent integrationEvent) {
final IntegrationEventContext sourceContext = integrationEvent.getContext();
final StringValue producerId = newStringValue(sourceContext.getBoundedContextName());
final EventContext context = EventContext.newBuilder()
- .setEventId(sourceContext.getEventId())
- .setTimestamp(sourceContext.getTimestamp())
- .setProducerId(AnyPacker.pack(producerId))
- .build();
+ .setEventId(sourceContext.getEventId())
+ .setTimestamp(sourceContext.getTimestamp())
+ .setProducerId(AnyPacker.pack(producerId))
+ .build();
final Event result = Events.createEvent(integrationEvent.getMessage(), context);
return result;
}
@@ -227,6 +237,18 @@ public EventBus getEventBus() {
return this.eventBus;
}
+ /** Obtains instance of {@link StandFunnel} of this {@code BoundedContext}. */
+ @CheckReturnValue
+ public StandFunnel getStandFunnel() {
+ return this.standFunnel;
+ }
+
+ /** Obtains instance of {@link Stand} of this {@code BoundedContext}. */
+ @CheckReturnValue
+ public Stand getStand() {
+ return stand;
+ }
+
/**
* A builder for producing {@code BoundedContext} instances.
*
@@ -244,6 +266,9 @@ public static class Builder {
private EventBus eventBus;
private boolean multitenant;
private EventEnricher eventEnricher;
+ private Stand stand;
+ private StandFunnel standFunnel;
+ private Executor standFunnelExecutor;
/**
* Sets the name for a new bounded context.
@@ -371,6 +396,26 @@ public EventEnricher getEventEnricher() {
return eventEnricher;
}
+ public Builder setStand(Stand stand) {
+ this.stand = checkNotNull(stand);
+ return this;
+ }
+
+ @Nullable
+ public Stand getStand() {
+ return stand;
+ }
+
+ @Nullable
+ public Executor getStandFunnelExecutor() {
+ return standFunnelExecutor;
+ }
+
+ public Builder setStandFunnelExecutor(Executor standFunnelExecutor) {
+ this.standFunnelExecutor = standFunnelExecutor;
+ return this;
+ }
+
public BoundedContext build() {
checkNotNull(storageFactory, "storageFactory must be set");
@@ -388,6 +433,12 @@ public BoundedContext build() {
eventBus = createEventBus();
}
+ if (stand == null) {
+ stand = createStand(storageFactory);
+ }
+
+ standFunnel = createStandFunnel(standFunnelExecutor);
+
commandBus.setMultitenant(this.multitenant);
final BoundedContext result = new BoundedContext(this);
@@ -396,6 +447,21 @@ public BoundedContext build() {
return result;
}
+ private StandFunnel createStandFunnel(@Nullable Executor standFunnelExecutor) {
+ StandFunnel standFunnel;
+ if (standFunnelExecutor == null) {
+ standFunnel = StandFunnel.newBuilder()
+ .setStand(stand)
+ .build();
+ } else {
+ standFunnel = StandFunnel.newBuilder()
+ .setExecutor(standFunnelExecutor)
+ .setStand(stand)
+ .build();
+ }
+ return standFunnel;
+ }
+
private CommandStore createCommandStore() {
final CommandStore result = new CommandStore(storageFactory.createCommandStorage());
return result;
@@ -432,6 +498,14 @@ private EventBus createEventBus() {
.build();
return result;
}
+
+ private static Stand createStand(StorageFactory storageFactory) {
+ final StandStorage standStorage = storageFactory.createStandStorage();
+ final Stand result = Stand.newBuilder()
+ .setStorage(standStorage)
+ .build();
+ return result;
+ }
}
private enum LogSingleton {
diff --git a/server/src/main/java/org/spine3/server/ClientService.java b/server/src/main/java/org/spine3/server/ClientService.java
deleted file mode 100644
index 1797d36da44..00000000000
--- a/server/src/main/java/org/spine3/server/ClientService.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Copyright 2016, TeamDev Ltd. All rights reserved.
- *
- * Redistribution and use in source and/or binary forms, with or without
- * modification, must retain the above copyright notice and the following
- * disclaimer.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.spine3.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import io.grpc.ServerBuilder;
-import io.grpc.ServerServiceDefinition;
-import io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.spine3.base.Command;
-import org.spine3.base.Event;
-import org.spine3.base.Response;
-import org.spine3.base.Responses;
-import org.spine3.client.ConnectionConstants;
-import org.spine3.client.grpc.Topic;
-import org.spine3.server.command.error.CommandException;
-import org.spine3.server.command.error.UnsupportedCommandException;
-import org.spine3.server.type.CommandClass;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Throwables.propagate;
-
-/**
- * The {@code ClientService} allows client applications to post commands and
- * receive updates from the application backend.
- *
- * @author Alexander Yevsyukov
- */
-public class ClientService extends org.spine3.client.grpc.ClientServiceGrpc.ClientServiceImplBase {
-
- private static final String SERVICE_NOT_STARTED_MSG = "Service was not started or is shutdown already.";
-
- private final int port;
-
- private final ImmutableMap boundedContextMap;
-
- @Nullable
- private io.grpc.Server grpcServer;
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- protected ClientService(Builder builder) {
- this.port = builder.getPort();
- this.boundedContextMap = builder.getBoundedContextMap();
- }
-
- /**
- * Starts the service.
- *
- * @throws IOException if unable to bind
- */
- public void start() throws IOException {
- checkState(grpcServer == null, "Service is started already.");
- grpcServer = createGrpcServer(port);
- grpcServer.start();
- }
-
- /** Waits for the service to become terminated. */
- public void awaitTermination() {
- checkState(grpcServer != null, SERVICE_NOT_STARTED_MSG);
- try {
- grpcServer.awaitTermination();
- } catch (InterruptedException e) {
- throw propagate(e);
- }
- }
-
- /**
- * Makes the JVM shut down the service when it is shutting down itself.
- *
- *
Call this method when running the service in a separate JVM.
- */
- public void addShutdownHook() {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- @SuppressWarnings("UseOfSystemOutOrSystemErr")
- @Override
- public void run() {
- final String serviceClass = ClientService.this.getClass().getName();
- try {
- if (!isShutdown()) {
- System.err.println("Shutting down " + serviceClass + " since JVM is shutting down...");
- shutdown();
- System.err.println(serviceClass + " shut down.");
- }
- } catch (RuntimeException e) {
- e.printStackTrace(System.err);
- }
- }
- }));
- }
-
- /**
- * Returns {@code true} if the service is shutdown or was not started at all, {@code false} otherwise.
- *
- * @see ClientService#shutdown()
- */
- public boolean isShutdown() {
- final boolean isShutdown = grpcServer == null;
- return isShutdown;
- }
-
- /** Initiates an orderly shutdown in which existing calls continue but new calls are rejected. */
- public void shutdown() {
- checkState(grpcServer != null, SERVICE_NOT_STARTED_MSG);
- grpcServer.shutdown();
- grpcServer = null;
- }
-
- @VisibleForTesting
- /* package */ io.grpc.Server createGrpcServer(int port) {
- final ServerServiceDefinition service = bindService();
- final ServerBuilder builder = ServerBuilder.forPort(port)
- .addService(service);
- return builder.build();
- }
-
- @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status.
- @Override
- public void post(Command request, StreamObserver responseObserver) {
- final CommandClass commandClass = CommandClass.of(request);
- final BoundedContext boundedContext = boundedContextMap.get(commandClass);
- if (boundedContext == null) {
- handleUnsupported(request, responseObserver);
- } else {
- boundedContext.getCommandBus().post(request, responseObserver);
- }
- }
-
- private static void handleUnsupported(Command request, StreamObserver responseObserver) {
- final CommandException unsupported = new UnsupportedCommandException(request);
- log().error("Unsupported command posted to ClientService", unsupported);
- responseObserver.onError(Statuses.invalidArgumentWithCause(unsupported));
- }
-
- @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status.
- @Override
- public void subscribe(Topic request, StreamObserver responseObserver) {
- //TODO:2016-05-25:alexander.yevsyukov: Subscribe the client to the topic in the corresponding BoundedContext.
- // This API is likely to change to support Firebase-like registration where listening is
- // done by the client SDK implementation.
- }
-
- @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status.
- @Override
- public void unsubscribe(Topic request, StreamObserver responseObserver) {
- //TODO:2016-05-25:alexander.yevsyukov: Unsubscribe the client from the topic in the corresponding BoundedContext.
- responseObserver.onNext(Responses.ok());
- responseObserver.onCompleted();
- }
-
- public static class Builder {
-
- private final Set boundedContexts = Sets.newHashSet();
- private ImmutableMap boundedContextMap;
-
- private int port = ConnectionConstants.DEFAULT_CLIENT_SERVICE_PORT;
-
- public Builder addBoundedContext(BoundedContext boundedContext) {
- // Save it to a temporary set so that it is easy to remove it if needed.
- boundedContexts.add(boundedContext);
- return this;
- }
-
- public Builder removeBoundedContext(BoundedContext boundedContext) {
- boundedContexts.remove(boundedContext);
- return this;
- }
-
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public int getPort() {
- return this.port;
- }
-
- @SuppressWarnings("ReturnOfCollectionOrArrayField") // is immutable
- public ImmutableMap getBoundedContextMap() {
- return boundedContextMap;
- }
-
- /**
- * Builds the {@link ClientService}.
- * {@link ConnectionConstants#DEFAULT_CLIENT_SERVICE_PORT} is used by default.
- */
- public ClientService build() {
- this.boundedContextMap = createBoundedContextMap();
- final ClientService result = new ClientService(this);
- return result;
- }
-
- private ImmutableMap createBoundedContextMap() {
- final ImmutableMap.Builder builder = ImmutableMap.builder();
- for (BoundedContext boundedContext : boundedContexts) {
- addBoundedContext(builder, boundedContext);
- }
- return builder.build();
- }
-
- private static void addBoundedContext(ImmutableMap.Builder mapBuilder,
- BoundedContext boundedContext) {
- final Set cmdClasses = boundedContext.getCommandBus().getSupportedCommandClasses();
- for (CommandClass commandClass : cmdClasses) {
- mapBuilder.put(commandClass, boundedContext);
- }
- }
- }
-
- private enum LogSingleton {
- INSTANCE;
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
- private final Logger value = LoggerFactory.getLogger(ClientService.class);
- }
-
- private static Logger log() {
- return LogSingleton.INSTANCE.value;
- }
-}
diff --git a/server/src/main/java/org/spine3/server/CommandService.java b/server/src/main/java/org/spine3/server/CommandService.java
new file mode 100644
index 00000000000..07b523a00a8
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/CommandService.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.server;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.base.Command;
+import org.spine3.base.Response;
+import org.spine3.client.grpc.CommandServiceGrpc;
+import org.spine3.server.command.error.CommandException;
+import org.spine3.server.command.error.UnsupportedCommandException;
+import org.spine3.server.type.CommandClass;
+
+import java.util.Set;
+
+/**
+ * The {@code CommandService} allows client applications to post commands and
+ * receive updates from the application backend.
+ *
+ * @author Alexander Yevsyukov
+ */
+public class CommandService
+ extends CommandServiceGrpc.CommandServiceImplBase {
+
+ private final ImmutableMap boundedContextMap;
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ protected CommandService(Builder builder) {
+ this.boundedContextMap = builder.getBoundedContextMap();
+ }
+
+ @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status.
+ @Override
+ public void post(Command request, StreamObserver responseObserver) {
+ final CommandClass commandClass = CommandClass.of(request);
+ final BoundedContext boundedContext = boundedContextMap.get(commandClass);
+ if (boundedContext == null) {
+ handleUnsupported(request, responseObserver);
+ } else {
+ boundedContext.getCommandBus()
+ .post(request, responseObserver);
+ }
+ }
+
+ private static void handleUnsupported(Command request, StreamObserver responseObserver) {
+ final CommandException unsupported = new UnsupportedCommandException(request);
+ log().error("Unsupported command posted to CommandService", unsupported);
+ responseObserver.onError(Statuses.invalidArgumentWithCause(unsupported));
+ }
+
+ public static class Builder {
+
+ private final Set boundedContexts = Sets.newHashSet();
+ private ImmutableMap boundedContextMap;
+
+ public Builder addBoundedContext(BoundedContext boundedContext) {
+ // Save it to a temporary set so that it is easy to remove it if needed.
+ boundedContexts.add(boundedContext);
+ return this;
+ }
+
+ public Builder removeBoundedContext(BoundedContext boundedContext) {
+ boundedContexts.remove(boundedContext);
+ return this;
+ }
+
+ @SuppressWarnings("ReturnOfCollectionOrArrayField") // is immutable
+ public ImmutableMap getBoundedContextMap() {
+ return boundedContextMap;
+ }
+
+ /**
+ * Builds the {@link CommandService}.
+ */
+ public CommandService build() {
+ this.boundedContextMap = createBoundedContextMap();
+ final CommandService result = new CommandService(this);
+ return result;
+ }
+
+ private ImmutableMap createBoundedContextMap() {
+ final ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (BoundedContext boundedContext : boundedContexts) {
+ addBoundedContext(builder, boundedContext);
+ }
+ return builder.build();
+ }
+
+ private static void addBoundedContext(ImmutableMap.Builder mapBuilder,
+ BoundedContext boundedContext) {
+ final Set cmdClasses = boundedContext.getCommandBus()
+ .getSupportedCommandClasses();
+ for (CommandClass commandClass : cmdClasses) {
+ mapBuilder.put(commandClass, boundedContext);
+ }
+ }
+ }
+
+ private enum LogSingleton {
+ INSTANCE;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Logger value = LoggerFactory.getLogger(CommandService.class);
+ }
+
+ private static Logger log() {
+ return LogSingleton.INSTANCE.value;
+ }
+}
diff --git a/server/src/main/java/org/spine3/server/QueryService.java b/server/src/main/java/org/spine3/server/QueryService.java
new file mode 100644
index 00000000000..3111f110332
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/QueryService.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.spine3.server;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.base.Queries;
+import org.spine3.client.Query;
+import org.spine3.client.QueryResponse;
+import org.spine3.client.grpc.QueryServiceGrpc;
+import org.spine3.protobuf.TypeUrl;
+
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The {@code QueryService} provides a synchronous way to fetch read-side state from the server.
+ *
+ *
For asynchronous read-side updates please see {@link SubscriptionService}.
+ *
+ * @author Alex Tymchenko
+ */
+public class QueryService extends QueryServiceGrpc.QueryServiceImplBase {
+
+ private final ImmutableMap typeToContextMap;
+
+ private QueryService(Builder builder) {
+ this.typeToContextMap = builder.getBoundedContextMap();
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ @SuppressWarnings("RefusedBequest") // as we override default implementation with `unimplemented` status.
+ @Override
+ public void read(Query query, StreamObserver responseObserver) {
+ log().debug("Incoming query: {}", query);
+
+ final TypeUrl type = Queries.typeOf(query);
+ checkNotNull(type, "Unknown type for query target");
+
+ final BoundedContext boundedContext = typeToContextMap.get(type);
+ try {
+ boundedContext.getStand()
+ .execute(query, responseObserver);
+ } catch (@SuppressWarnings("OverlyBroadCatchBlock") Exception e) {
+ log().error("Error processing query", e);
+ responseObserver.onError(e);
+ }
+ }
+
+ public static class Builder {
+ private final Set boundedContexts = Sets.newHashSet();
+ private ImmutableMap typeToContextMap;
+
+ public Builder addBoundedContext(BoundedContext boundedContext) {
+ // Save it to a temporary set so that it is easy to remove it if needed.
+ boundedContexts.add(boundedContext);
+ return this;
+ }
+
+ public Builder removeBoundedContext(BoundedContext boundedContext) {
+ boundedContexts.remove(boundedContext);
+ return this;
+ }
+
+ @SuppressWarnings("ReturnOfCollectionOrArrayField") // the collection returned is immutable
+ public ImmutableMap getBoundedContextMap() {
+ return typeToContextMap;
+ }
+
+ /**
+ * Builds the {@link QueryService}.
+ *
+ * @throws IllegalStateException if no bounded contexts were added.
+ */
+ public QueryService build() throws IllegalStateException {
+ if (boundedContexts.isEmpty()) {
+ throw new IllegalStateException("Query service must have at least one bounded context.");
+ }
+ this.typeToContextMap = createBoundedContextMap();
+ final QueryService result = new QueryService(this);
+ return result;
+ }
+
+ private ImmutableMap createBoundedContextMap() {
+ final ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (BoundedContext boundedContext : boundedContexts) {
+ addBoundedContext(builder, boundedContext);
+ }
+ return builder.build();
+ }
+
+ private static void addBoundedContext(ImmutableMap.Builder mapBuilder,
+ BoundedContext boundedContext) {
+
+ final ImmutableSet exposedTypes = boundedContext.getStand()
+ .getExposedTypes();
+
+ for (TypeUrl availableType : exposedTypes) {
+ mapBuilder.put(availableType, boundedContext);
+ }
+ }
+ }
+
+ private static Logger log() {
+ return LogSingleton.INSTANCE.value;
+ }
+
+ private enum LogSingleton {
+ INSTANCE;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Logger value = LoggerFactory.getLogger(QueryService.class);
+ }
+
+}
diff --git a/server/src/main/java/org/spine3/server/SubscriptionService.java b/server/src/main/java/org/spine3/server/SubscriptionService.java
new file mode 100644
index 00000000000..b62c9f867e1
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/SubscriptionService.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.spine3.server;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Any;
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.base.Response;
+import org.spine3.base.Responses;
+import org.spine3.client.Subscription;
+import org.spine3.client.SubscriptionUpdate;
+import org.spine3.client.Target;
+import org.spine3.client.Topic;
+import org.spine3.client.grpc.SubscriptionServiceGrpc;
+import org.spine3.protobuf.KnownTypes;
+import org.spine3.protobuf.TypeUrl;
+import org.spine3.server.stand.Stand;
+
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The {@code SubscriptionService} provides an asynchronous way to fetch read-side state from the server.
+ *
+ *
At this point only {@link org.spine3.client.EntityIdFilter} is supported. All other filters are ignored.
+ *
+ *
Filtering by IDs set via {@code EntityIdFilter} is performed in the same way as by {@link #loadAll(Iterable)}.
+ *
+ *
NOTE: The storage must be assigned before calling this method.
+ *
+ * @param filters entity filters
+ * @param fieldMask mask to apply to the entities
+ * @return all the entities in this repository passed the filters.
+ */
+ @CheckReturnValue
+ public ImmutableCollection find(EntityFilters filters, FieldMask fieldMask) {
+ final List idsList = filters.getIdFilter()
+ .getIdsList();
+ final Class expectedIdClass = getIdClass();
+
+ final Collection domainIds = Collections2.transform(idsList, new Function() {
+ @Nullable
+ @Override
+ public I apply(@Nullable EntityId input) {
+ Preconditions.checkNotNull(input);
+ final Any idAsAny = input.getId();
+
+ final TypeUrl typeUrl = TypeUrl.ofEnclosed(idAsAny);
+ final Class messageClass = toMessageClass(typeUrl);
+
+ final boolean classIsSame = expectedIdClass.equals(messageClass);
+ checkState(classIsSame,
+ "Unexpected ID of type " + messageClass + " encountered. " + "Expected: " + expectedIdClass);
+
+ final Message idAsMessage = AnyPacker.unpack(idAsAny);
+
+ // As the message class is the same as expected, the conversion is safe.
+ @SuppressWarnings("unchecked")
+ final I id = (I) idAsMessage;
+ return id;
+ }
+ });
+
+ final ImmutableCollection result = loadAll(domainIds, fieldMask);
+ return result;
+ }
+
private E toEntity(I id, EntityStorageRecord record) {
+ return toEntity(id, record, FieldMask.getDefaultInstance());
+ }
+
+ private E toEntity(I id, EntityStorageRecord record, FieldMask fieldMask) {
final E entity = create(id);
- final M state = unpack(record.getState());
+ @SuppressWarnings("unchecked")
+ final M state = (M) FieldMasks.applyMask(fieldMask, unpack(record.getState()), getEntityStateType());
entity.setState(state, record.getVersion(), record.getWhenModified());
return entity;
}
@@ -104,9 +260,21 @@ private EntityStorageRecord toEntityRecord(E entity) {
final Timestamp whenModified = entity.whenModified();
final int version = entity.getVersion();
final EntityStorageRecord.Builder builder = EntityStorageRecord.newBuilder()
- .setState(stateAny)
- .setWhenModified(whenModified)
- .setVersion(version);
+ .setState(stateAny)
+ .setWhenModified(whenModified)
+ .setVersion(version);
return builder.build();
}
+
+ private Function, E> storageRecordToEntityTransformer() {
+ return new Function, E>() {
+ @Nullable
+ @Override
+ public E apply(@Nullable Map.Entry input) {
+ Preconditions.checkNotNull(input);
+ final E result = toEntity(input.getKey(), input.getValue());
+ return result;
+ }
+ };
+ }
}
diff --git a/server/src/main/java/org/spine3/server/entity/FieldMasks.java b/server/src/main/java/org/spine3/server/entity/FieldMasks.java
new file mode 100644
index 00000000000..3a1a8076d2a
--- /dev/null
+++ b/server/src/main/java/org/spine3/server/entity/FieldMasks.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2016, TeamDev Ltd. All rights reserved.
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.spine3.server.entity;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.FieldMask;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.spine3.protobuf.KnownTypes;
+import org.spine3.protobuf.TypeUrl;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A utility class for {@code FieldMask} processing against instances of {@link Message}.
+ *
+ * @author Dmytro Dashenkov
+ */
+@SuppressWarnings("UtilityClass")
+public class FieldMasks {
+
+ private static final String CONSTRUCTOR_INVOCATION_ERROR_LOGGING_PATTERN =
+ "Constructor for type %s could not be found or called: ";
+ private static final String BUILDER_CLASS_ERROR_LOGGING_PATTERN =
+ "Class for name %s could not be found. Try to rebuild the project. Make sure \"known_types.properties\" exists.";
+ private static final String TYPE_CAST_ERROR_LOGGING_PATTERN =
+ "Class %s must be assignable from com.google.protobuf.Message. Try to rebuild the project. Make sure type URL is valid.";
+
+ private FieldMasks() {
+ }
+
+ /**
+ * Applies the given {@code FieldMask} to given collection of {@link Message}s.
+ * Does not change the {@link Collection} itself.
+ *
+ *
In case the {@code FieldMask} instance contains invalid field declarations, they are ignored and
+ * do not affect the execution result.
+ *
+ * @param mask {@code FieldMask} to apply to each item of the input {@link Collection}.
+ * @param messages {@link Message}s to filter.
+ * @param type type of the {@link Message}s.
+ * @return messages with the {@code FieldMask} applied
+ */
+ @Nonnull
+ public static Collection applyMask(FieldMask mask,
+ Collection messages,
+ TypeUrl type) {
+ final List filtered = new LinkedList<>();
+ final ProtocolStringList filter = mask.getPathsList();
+ final Class builderClass = getBuilderForType(type);
+
+ if (filter.isEmpty() || builderClass == null) {
+ return Collections.unmodifiableCollection(messages);
+ }
+
+ try {
+ final Constructor builderConstructor = builderClass.getDeclaredConstructor();
+ builderConstructor.setAccessible(true);
+
+ for (Message wholeMessage : messages) {
+ final M message = messageForFilter(filter, builderConstructor, wholeMessage);
+ filtered.add(message);
+ }
+ } catch (NoSuchMethodException |
+ InvocationTargetException |
+ IllegalAccessException |
+ InstantiationException e) {
+ // If any reflection failure happens, return all the data without any mask applied.
+ log().warn(String.format(CONSTRUCTOR_INVOCATION_ERROR_LOGGING_PATTERN, builderClass.getCanonicalName()), e);
+ return Collections.unmodifiableCollection(messages);
+ }
+ return Collections.unmodifiableList(filtered);
+ }
+
+ /**
+ * Applies the {@code FieldMask} to the given {@link Message} the {@code mask} parameter is valid.
+ *
+ *
In case the {@code FieldMask} instance contains invalid field declarations, they are ignored and
+ * do not affect the execution result.
+ *
+ * @param mask the {@code FieldMask} to apply.
+ * @param message the {@link Message} to apply given mask to.
+ * @param typeUrl type of given {@link Message}.
+ * @return the message of the same type as the given one with only selected fields if the {@code mask} is valid,
+ * original message otherwise.
+ */
+ public static M applyMask(FieldMask mask, M message, TypeUrl typeUrl) {
+ if (!mask.getPathsList()
+ .isEmpty()) {
+ return doApply(mask, message, typeUrl);
+ }
+ return message;
+ }
+
+ private static M doApply(FieldMask mask, M message, TypeUrl type) {
+ final ProtocolStringList filter = mask.getPathsList();
+ final Class builderClass = getBuilderForType(type);
+
+ if (builderClass == null) {
+ return message;
+ }
+
+ try {
+ final Constructor builderConstructor = builderClass.getDeclaredConstructor();
+ builderConstructor.setAccessible(true);
+
+ final M result = messageForFilter(filter, builderConstructor, message);
+ return result;
+ } catch (NoSuchMethodException |
+ InvocationTargetException |
+ IllegalAccessException |
+ InstantiationException e) {
+ log().warn(String.format(CONSTRUCTOR_INVOCATION_ERROR_LOGGING_PATTERN, builderClass.getCanonicalName()), e);
+ return message;
+ }
+ }
+
+ private static M messageForFilter(
+ ProtocolStringList filter,
+ Constructor builderConstructor, Message wholeMessage)
+ throws InstantiationException,
+ IllegalAccessException,
+ InvocationTargetException {
+ final B builder = builderConstructor.newInstance();
+
+ final List fields = wholeMessage.getDescriptorForType()
+ .getFields();
+ for (Descriptors.FieldDescriptor field : fields) {
+ if (filter.contains(field.getFullName())) {
+ builder.setField(field, wholeMessage.getField(field));
+ }
+ }
+ @SuppressWarnings("unchecked") // It's fine as the constructor is of {@code MessageCls.Builder} type.
+ final M result = (M) builder.build();
+ return result;
+ }
+
+ @SuppressWarnings("unchecked") // We assume that {@code KnownTypes#getClassName(TypeUrl) works properly.
+ @Nullable
+ private static Class getBuilderForType(TypeUrl typeUrl) {
+ Class builderClass;
+ final String className = KnownTypes.getClassName(typeUrl)
+ .value();
+ try {
+ builderClass = (Class) Class.forName(className)
+ .getClasses()[0];
+ } catch (ClassNotFoundException e) {
+ final String message = String.format(
+ BUILDER_CLASS_ERROR_LOGGING_PATTERN,
+ className);
+ log().warn(message, e);
+ builderClass = null;
+ } catch (ClassCastException e) {
+ final String message = String.format(
+ TYPE_CAST_ERROR_LOGGING_PATTERN,
+ className);
+ log().warn(message, e);
+ builderClass = null;
+ }
+
+ return builderClass;
+ }
+
+ private static Logger log() {
+ return LogSingleton.INSTANCE.value;
+ }
+
+ private enum LogSingleton {
+ INSTANCE;
+ @SuppressWarnings("NonSerializableFieldInSerializableClass")
+ private final Logger value = LoggerFactory.getLogger(FieldMasks.class);
+ }
+}
diff --git a/server/src/main/java/org/spine3/server/entity/Repository.java b/server/src/main/java/org/spine3/server/entity/Repository.java
index 913930640cf..3047966918e 100644
--- a/server/src/main/java/org/spine3/server/entity/Repository.java
+++ b/server/src/main/java/org/spine3/server/entity/Repository.java
@@ -20,16 +20,20 @@
package org.spine3.server.entity;
+import org.spine3.protobuf.KnownTypes;
+import org.spine3.protobuf.TypeUrl;
import org.spine3.server.BoundedContext;
import org.spine3.server.reflect.Classes;
import org.spine3.server.storage.Storage;
import org.spine3.server.storage.StorageFactory;
+import org.spine3.type.ClassName;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagate;
import static java.lang.reflect.Modifier.isPrivate;
@@ -40,7 +44,6 @@
*
* @param the type of IDs of entities managed by the repository
* @param the entity type
- *
* @author Alexander Yevsyukov
*/
public abstract class Repository> implements AutoCloseable {
@@ -62,6 +65,20 @@ public abstract class Repository> implements AutoClose
/** The data storage for this repository. */
private Storage storage;
+ /**
+ * Cached value for the entity state type.
+ *
+ *
Used to optimise heavy {@link #getEntityStateType()} calls.
+ **/
+ private volatile TypeUrl entityStateType;
+
+ /**
+ * Cached value for the entity class.
+ *
+ *
Used to optimize heavy {@link #getEntityClass()} calls.
+ **/
+ private volatile Class entityClass;
+
/**
* Creates the repository in the passed {@link BoundedContext}.
*
@@ -112,7 +129,24 @@ protected Class getIdClass() {
/** Returns the class of entities managed by this repository. */
@CheckReturnValue
protected Class getEntityClass() {
- return Classes.getGenericParameterType(getClass(), ENTITY_CLASS_GENERIC_INDEX);
+ if (entityClass == null) {
+ entityClass = Classes.getGenericParameterType(getClass(), ENTITY_CLASS_GENERIC_INDEX);
+ }
+ checkNotNull(entityClass);
+ return entityClass;
+ }
+
+ /** Returns the {@link TypeUrl} for the state objects wrapped by entities managed by this repository */
+ @CheckReturnValue
+ public TypeUrl getEntityStateType() {
+ if (entityStateType == null) {
+ final Class entityClass = getEntityClass();
+ final Class