Skip to content
Permalink
Browse files
Support router-api annotations for gRPC services (#912)
Motivation:

gRPC should support annotations that help to configure the execution
strategy per service or per route. Also, gRPC should compute the
effective execution strategy based on the main execution strategy
from the builder configuration and from the route configuration.

Modifications:

- Parse annotations from `route-api` module;
- Compute the difference between execution strategy on the builder
and on the route;
- Add tests to verify that offloading works correctly for different
execution strategy configurations;
- Add tests to verify that misconfiguration of `route-api` annotations
will be explained to the user;
- Adjust gRPC codegen for new features;
- Change `difference` functions to compare with `noOffloadsStrategy`
by configuration, not by reference;
- Make `NoOffloadsHttpExecutionStrategy` no op;
- Fix tests;

Result:

gRPC routes correctly compute per-route execution strategy
configuration.
  • Loading branch information
idelpivnitskiy committed Jan 17, 2020
1 parent 464554a commit 6bb2722b1643016c6c808bf8d0ae972bf7f222de
Showing 40 changed files with 1,859 additions and 282 deletions.
@@ -21,7 +21,8 @@ rootProject.subprojects.findAll { !it.name.contains("bom") && !it.name.contains(
dependencies.constraints.add("api", it)
}

// Keep publishing and signing configuration in sync with servicetalk-gradle-plugin-internal/src/main/groovy/io/servicetalk/gradle/plugin/internal/ServiceTalkLibraryPlugin.groovy
// Keep publishing and signing configuration in sync with ServiceTalkLibraryPlugin.groovy from
// servicetalk-gradle-plugin-internal
publishing {
publications {
mavenJava(MavenPublication) {
@@ -19,13 +19,15 @@ apply plugin: "io.servicetalk.servicetalk-gradle-plugin-internal-library"
dependencies {
api project(":servicetalk-concurrent-api")
api project(":servicetalk-http-api")
api project(":servicetalk-router-api")
api ("com.google.api.grpc:proto-google-common-protos:$protoGoogleCommonProtosVersion") {
exclude group: "com.google.protobuf"
}

implementation project(":servicetalk-annotations")
implementation project(":servicetalk-concurrent-api-internal")
implementation project(":servicetalk-concurrent-internal")
implementation project(":servicetalk-router-utils-internal")
implementation project(":servicetalk-utils-internal")
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
@@ -72,7 +72,7 @@
import static io.servicetalk.grpc.api.GrpcUtils.readGrpcMessageEncoding;
import static io.servicetalk.grpc.api.GrpcUtils.setStatus;
import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService;
import static io.servicetalk.http.api.HttpExecutionStrategies.noOffloadsStrategy;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
import static io.servicetalk.http.api.HttpRequestMethod.POST;
import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;
@@ -107,7 +107,7 @@ private GrpcRouter(final Map<String, RouteProvider> routes,
}

Single<ServerContext> bind(final ServerBinder binder, final ExecutionContext executionContext) {
CompositeCloseable closeable = AsyncCloseables.newCompositeCloseable();
final CompositeCloseable closeable = AsyncCloseables.newCompositeCloseable();
final Map<String, StreamingHttpService> allRoutes = new HashMap<>();
populateRoutes(executionContext, allRoutes, routes, closeable);
populateRoutes(executionContext, allRoutes, streamingRoutes, closeable);
@@ -255,7 +255,7 @@ <Req, Resp> Builder addStreamingRoute(
final StreamingRoute<Req, Resp> route, final Class<Req> requestClass,
final Class<Resp> responseClass, final GrpcSerializationProvider serializationProvider) {
streamingRoutes.put(path, new RouteProvider(executionContext -> {
StreamingHttpService service = new StreamingHttpService() {
final StreamingHttpService service = new StreamingHttpService() {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
@@ -295,7 +295,7 @@ public StreamingHttpService adaptor() {

@Override
public HttpExecutionStrategy serviceInvocationStrategy() {
return executionStrategy == null ? noOffloadsStrategy() : executionStrategy;
return executionStrategy == null ? defaultStrategy() : executionStrategy;
}
};
}, () -> route, () -> toRequestStreamingRoute(route), () -> toResponseStreamingRoute(route),
@@ -323,8 +323,7 @@ public Completable closeAsync() {
public Completable closeAsyncGracefully() {
return route.closeAsyncGracefully();
}
}, requestClass, responseClass,
serializationProvider);
}, requestClass, responseClass, serializationProvider);
}

/**
@@ -463,9 +462,9 @@ public void close() throws Exception {
public void closeGracefully() throws Exception {
route.closeGracefully();
}
}, strategy -> executionStrategy == null ? strategy : executionStrategy), () -> toStreaming(route),
() -> toRequestStreamingRoute(route), () -> toResponseStreamingRoute(route),
() -> toRoute(route), route));
}, strategy -> executionStrategy == null ? strategy : executionStrategy),
() -> toStreaming(route), () -> toRequestStreamingRoute(route),
() -> toResponseStreamingRoute(route), () -> toRoute(route), route));
return this;
}

0 comments on commit 6bb2722

Please sign in to comment.