diff --git a/RELEASING.md b/RELEASING.md index 228e2df422f..989426f328a 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -33,6 +33,7 @@ $ VERSION_FILES=( examples/example-jwt-auth/pom.xml examples/example-hostname/build.gradle examples/example-hostname/pom.xml + examples/example-servlet/build.gradle examples/example-tls/build.gradle examples/example-tls/pom.xml examples/example-xds/build.gradle diff --git a/all/build.gradle b/all/build.gradle index 08a3e32e159..e4d7e9085b5 100644 --- a/all/build.gradle +++ b/all/build.gradle @@ -19,6 +19,8 @@ def subprojects = [ project(':grpc-protobuf-lite'), project(':grpc-rls'), project(':grpc-services'), + project(':grpc-servlet'), + project(':grpc-servlet-jakarta'), project(':grpc-stub'), project(':grpc-testing'), project(':grpc-xds'), diff --git a/examples/example-servlet/README.md b/examples/example-servlet/README.md new file mode 100644 index 00000000000..f2e6188069a --- /dev/null +++ b/examples/example-servlet/README.md @@ -0,0 +1,37 @@ +# Hello World Example using Servlets + +This example uses Java Servlets instead of Netty for the gRPC server. This example requires `grpc-java` +and `protoc-gen-grpc-java` to already be built. You are strongly encouraged to check out a git release +tag, since these builds will already be available. + +```bash +git checkout v.. +``` +Otherwise, you must follow [COMPILING](../../COMPILING.md). + +To build the example, + +1. **[Install gRPC Java library SNAPSHOT locally, including code generation plugin](../../COMPILING.md) (Only need this step for non-released versions, e.g. master HEAD).** + +2. In this directory, build the war file +```bash +$ ../gradlew war +``` + +To run this, deploy the war, now found in `build/libs/example-servlet.war` to your choice of servlet +container. Note that this container must support the Servlet 4.0 spec, for this particular example must +use `javax.servlet` packages instead of the more modern `jakarta.servlet`, though there is a `grpc-servlet-jakarta` +artifact that can be used for Jakarta support. Be sure to enable http/2 support in the servlet container, +or clients will not be able to connect. + +To test that this is working properly, build the HelloWorldClient example and direct it to connect to your +http/2 server. From the parent directory: + +1. Build the executables: +```bash +$ ../gradlew installDist +``` +2. Run the client app, specifying the name to say hello to and the server's address: +```bash +$ ./build/install/examples/bin/hello-world-client World localhost:8080 +``` \ No newline at end of file diff --git a/examples/example-servlet/build.gradle b/examples/example-servlet/build.gradle new file mode 100644 index 00000000000..cf941065457 --- /dev/null +++ b/examples/example-servlet/build.gradle @@ -0,0 +1,46 @@ +plugins { + // ASSUMES GRADLE 5.6 OR HIGHER. Use plugin version 0.8.10 with earlier gradle versions + id 'com.google.protobuf' version '0.8.17' + // Generate IntelliJ IDEA's .idea & .iml project files + id 'idea' + id 'war' +} + +repositories { + maven { // The google mirror is less flaky than mavenCentral() + url "https://maven-central.storage-download.googleapis.com/maven2/" } + mavenLocal() +} + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +def grpcVersion = '1.53.0-SNAPSHOT' // CURRENT_GRPC_VERSION +def protocVersion = '3.21.7' + +dependencies { + implementation "io.grpc:grpc-protobuf:${grpcVersion}", + "io.grpc:grpc-servlet:${grpcVersion}", + "io.grpc:grpc-stub:${grpcVersion}" + + providedImplementation "javax.servlet:javax.servlet-api:4.0.1", + "org.apache.tomcat:annotations-api:6.0.53" +} + +protobuf { + protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" } + plugins { grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } } + generateProtoTasks { + all()*.plugins { grpc {} } + } +} + +// Inform IDEs like IntelliJ IDEA, Eclipse or NetBeans about the generated code. +sourceSets { + main { + java { + srcDirs 'build/generated/source/proto/main/grpc' + srcDirs 'build/generated/source/proto/main/java' + } + } +} diff --git a/examples/example-servlet/settings.gradle b/examples/example-servlet/settings.gradle new file mode 100644 index 00000000000..273558dd9cf --- /dev/null +++ b/examples/example-servlet/settings.gradle @@ -0,0 +1,8 @@ +pluginManagement { + repositories { + maven { // The google mirror is less flaky than mavenCentral() + url "https://maven-central.storage-download.googleapis.com/maven2/" + } + gradlePluginPortal() + } +} diff --git a/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java b/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java new file mode 100644 index 00000000000..a970c26a119 --- /dev/null +++ b/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java @@ -0,0 +1,79 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet.examples.helloworld; + +import io.grpc.stub.StreamObserver; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.servlet.ServletAdapter; +import io.grpc.servlet.ServletServerBuilder; +import java.io.IOException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * A servlet that hosts a gRPC server over HTTP/2 and shares the resource URI for the normal servlet + * clients over HTTP/1.0+. + * + *

For creating a servlet that solely serves gRPC services, do not follow this example, simply + * extend or register a {@link io.grpc.servlet.GrpcServlet} instead. + */ +@WebServlet(urlPatterns = {"/helloworld.Greeter/SayHello"}, asyncSupported = true) +public class HelloWorldServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + private final ServletAdapter servletAdapter = + new ServletServerBuilder().addService(new GreeterImpl()).buildServletAdapter(); + + private static final class GreeterImpl extends GreeterGrpc.GreeterImplBase { + GreeterImpl() {} + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException { + response.setContentType("text/html"); + response.getWriter().println("

Hello World!

"); + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException { + if (ServletAdapter.isGrpc(request)) { + servletAdapter.doPost(request, response); + } else { + response.setContentType("text/html"); + response.getWriter().println("

Hello non-gRPC client!

"); + } + } + + @Override + public void destroy() { + servletAdapter.destroy(); + super.destroy(); + } +} diff --git a/examples/example-servlet/src/main/proto/helloworld/helloworld.proto b/examples/example-servlet/src/main/proto/helloworld/helloworld.proto new file mode 100644 index 00000000000..c60d9416f1f --- /dev/null +++ b/examples/example-servlet/src/main/proto/helloworld/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml b/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml new file mode 100644 index 00000000000..426162a9d13 --- /dev/null +++ b/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml b/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml new file mode 100644 index 00000000000..9c83263e0c9 --- /dev/null +++ b/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml @@ -0,0 +1,9 @@ + + + + / + diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 6a272d3490d..76e333be4dd 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -244,11 +244,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata protected static final Empty EMPTY = Empty.getDefaultInstance(); - private void startServer() { - maybeStartHandshakerServer(); - ServerBuilder builder = getServerBuilder(); + private void configBuilder(@Nullable ServerBuilder builder) { if (builder == null) { - server = null; return; } testServiceExecutor = Executors.newScheduledThreadPool(2); @@ -266,6 +263,14 @@ private void startServer() { new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); + } + + protected void startServer(@Nullable ServerBuilder builder) { + maybeStartHandshakerServer(); + if (builder == null) { + server = null; + return; + } try { server = builder.build().start(); @@ -333,7 +338,9 @@ public ClientCall interceptCall( */ @Before public void setUp() { - startServer(); + ServerBuilder serverBuilder = getServerBuilder(); + configBuilder(serverBuilder); + startServer(serverBuilder); channel = createChannel(); blockingStub = diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java index 72cb211ecf3..c5ad99181ef 100644 --- a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java @@ -16,10 +16,12 @@ package io.grpc.netty; +import com.google.common.annotations.VisibleForTesting; import io.grpc.Internal; import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.GrpcUtil; import io.grpc.internal.SharedResourcePool; +import io.grpc.internal.TransportTracer; import io.netty.channel.socket.nio.NioSocketChannel; /** @@ -107,5 +109,11 @@ public static ClientTransportFactory buildTransportFactory(NettyChannelBuilder b return builder.buildTransportFactory(); } + @VisibleForTesting + public static void setTransportTracerFactory( + NettyChannelBuilder builder, TransportTracer.Factory factory) { + builder.setTransportTracerFactory(factory); + } + private InternalNettyChannelBuilder() {} } diff --git a/servlet/build.gradle b/servlet/build.gradle new file mode 100644 index 00000000000..f5ef32ae11e --- /dev/null +++ b/servlet/build.gradle @@ -0,0 +1,111 @@ +plugins { + id "java-library" + id "maven-publish" +} + +description = "gRPC: Servlet" + +// javax.servlet-api 4.0 requires a minimum of Java 8, so we might as well use that source level +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +def jettyVersion = '10.0.7' + +configurations { + itImplementation.extendsFrom(implementation) + undertowTestImplementation.extendsFrom(itImplementation) + tomcatTestImplementation.extendsFrom(itImplementation) + jettyTestImplementation.extendsFrom(itImplementation) +} + +sourceSets { + // Create a test sourceset for each classpath - could be simplified if we made new test directories + undertowTest {} + tomcatTest {} + + // Only compile these tests if java 11+ is being used + if (JavaVersion.current().isJava11Compatible()) { + jettyTest {} + } +} + +dependencies { + api project(':grpc-api') + compileOnly 'javax.servlet:javax.servlet-api:4.0.1', + libraries.javax.annotation // java 9, 10 needs it + + implementation project(':grpc-core'), + libraries.guava + + testImplementation 'javax.servlet:javax.servlet-api:4.0.1', + 'org.jetbrains.kotlinx:lincheck:2.14.1' + + itImplementation project(':grpc-servlet'), + project(':grpc-netty'), + project(':grpc-core').sourceSets.test.runtimeClasspath, + libraries.junit + itImplementation(project(':grpc-interop-testing')) { + // Avoid grpc-netty-shaded dependency + exclude group: 'io.grpc', module: 'grpc-alts' + exclude group: 'io.grpc', module: 'grpc-xds' + } + + undertowTestImplementation 'io.undertow:undertow-servlet:2.2.14.Final' + + tomcatTestImplementation 'org.apache.tomcat.embed:tomcat-embed-core:9.0.56' + + jettyTestImplementation "org.eclipse.jetty:jetty-servlet:${jettyVersion}", + "org.eclipse.jetty.http2:http2-server:${jettyVersion}", + "org.eclipse.jetty:jetty-client:${jettyVersion}" + project(':grpc-testing') +} + +test { + if (JavaVersion.current().isJava9Compatible()) { + jvmArgs += [ + // required for Lincheck + '--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED', + '--add-exports=java.base/jdk.internal.util=ALL-UNNAMED', + ] + } +} + +// Set up individual classpaths for each test, to avoid any mismatch, +// and ensure they are only used when supported by the current jvm +check.dependsOn(tasks.register('undertowTest', Test) { + classpath = sourceSets.undertowTest.runtimeClasspath + testClassesDirs = sourceSets.undertowTest.output.classesDirs +}) +check.dependsOn(tasks.register('tomcat9Test', Test) { + classpath = sourceSets.tomcatTest.runtimeClasspath + testClassesDirs = sourceSets.tomcatTest.output.classesDirs + + // Provide a temporary directory for tomcat to be deleted after test finishes + def tomcatTempDir = "$buildDir/tomcat_catalina_base" + systemProperty 'catalina.base', tomcatTempDir + doLast { + file(tomcatTempDir).deleteDir() + } + + // tomcat-embed-core 9 presently performs illegal reflective access on + // java.io.ObjectStreamClass$Caches.localDescs and sun.rmi.transport.Target.ccl, + // see https://lists.apache.org/thread/s0xr7tk2kfkkxfjps9n7dhh4cypfdhyy + if (JavaVersion.current().isJava9Compatible()) { + jvmArgs += ['--add-opens=java.base/java.io=ALL-UNNAMED', '--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED'] + } +}) + +// Only run these tests if java 11+ is being used +if (JavaVersion.current().isJava11Compatible()) { + check.dependsOn(tasks.register('jettyTest', Test) { + classpath = sourceSets.jettyTest.runtimeClasspath + testClassesDirs = sourceSets.jettyTest.output.classesDirs + }) +} + +jacocoTestReport { + executionData undertowTest, tomcat9Test + if (JavaVersion.current().isJava11Compatible()) { + executionData jettyTest + } +} diff --git a/servlet/jakarta/build.gradle b/servlet/jakarta/build.gradle new file mode 100644 index 00000000000..59f5ac78d80 --- /dev/null +++ b/servlet/jakarta/build.gradle @@ -0,0 +1,128 @@ +plugins { + id "java-library" + id "maven-publish" +} + +description = "gRPC: Jakarta Servlet" +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +// Set up classpaths and source directories for different servlet tests +configurations { + itImplementation.extendsFrom(implementation) + jettyTestImplementation.extendsFrom(itImplementation) + tomcatTestImplementation.extendsFrom(itImplementation) + undertowTestImplementation.extendsFrom(itImplementation) +} + +sourceSets { + undertowTest { + java { + include '**/Undertow*.java' + } + } + tomcatTest { + java { + include '**/Tomcat*.java' + } + } + // Only run these tests if java 11+ is being used + if (JavaVersion.current().isJava11Compatible()) { + jettyTest { + java { + include '**/Jetty*.java' + } + } + } +} + +// Mechanically transform sources from grpc-servlet to use the corrected packages +def migrate(String name, String inputDir, SourceSet sourceSet) { + def outputDir = layout.buildDirectory.dir('generated/sources/jakarta-' + name) + sourceSet.java.srcDir outputDir + return tasks.register('migrateSources' + name.capitalize(), Sync) { task -> + into(outputDir) + from("$inputDir/io/grpc/servlet") { + into('io/grpc/servlet/jakarta') + filter { String line -> + line.replaceAll('javax\\.servlet', 'jakarta.servlet') + .replaceAll('io\\.grpc\\.servlet', 'io.grpc.servlet.jakarta') + } + } + } +} + +compileJava.dependsOn migrate('main', '../src/main/java', sourceSets.main) + +sourcesJar.dependsOn migrateSourcesMain + +// Build the set of sourceSets and classpaths to modify, since Jetty 11 requires Java 11 +// and must be skipped +compileUndertowTestJava.dependsOn(migrate('undertowTest', '../src/undertowTest/java', sourceSets.undertowTest)) +compileTomcatTestJava.dependsOn(migrate('tomcatTest', '../src/tomcatTest/java', sourceSets.tomcatTest)) +if (JavaVersion.current().isJava11Compatible()) { + compileJettyTestJava.dependsOn(migrate('jettyTest', '../src/jettyTest/java', sourceSets.jettyTest)) +} + +// Disable checkstyle for this project, since it consists only of generated code +tasks.withType(Checkstyle) { + enabled = false +} + +dependencies { + api project(':grpc-api') + compileOnly 'jakarta.servlet:jakarta.servlet-api:5.0.0', + libraries.javax.annotation + + implementation project(':grpc-core'), + libraries.guava + + itImplementation project(':grpc-servlet-jakarta'), + project(':grpc-netty'), + project(':grpc-core').sourceSets.test.runtimeClasspath, + libraries.junit + itImplementation(project(':grpc-interop-testing')) { + // Avoid grpc-netty-shaded dependency + exclude group: 'io.grpc', module: 'grpc-alts' + exclude group: 'io.grpc', module: 'grpc-xds' + } + + tomcatTestImplementation 'org.apache.tomcat.embed:tomcat-embed-core:10.0.14' + + jettyTestImplementation "org.eclipse.jetty:jetty-servlet:11.0.7", + "org.eclipse.jetty.http2:http2-server:11.0.7" + + undertowTestImplementation 'io.undertow:undertow-servlet-jakartaee9:2.2.13.Final' +} + +// Set up individual classpaths for each test, to avoid any mismatch, +// and ensure they are only used when supported by the current jvm +check.dependsOn(tasks.register('undertowTest', Test) { + classpath = sourceSets.undertowTest.runtimeClasspath + testClassesDirs = sourceSets.undertowTest.output.classesDirs +}) +check.dependsOn(tasks.register('tomcat10Test', Test) { + classpath = sourceSets.tomcatTest.runtimeClasspath + testClassesDirs = sourceSets.tomcatTest.output.classesDirs + + // Provide a temporary directory for tomcat to be deleted after test finishes + def tomcatTempDir = "$buildDir/tomcat_catalina_base" + systemProperty 'catalina.base', tomcatTempDir + doLast { + file(tomcatTempDir).deleteDir() + } + + // tomcat-embed-core 10 presently performs illegal reflective access on + // java.io.ObjectStreamClass$Caches.localDescs and sun.rmi.transport.Target.ccl, + // see https://lists.apache.org/thread/s0xr7tk2kfkkxfjps9n7dhh4cypfdhyy + if (JavaVersion.current().isJava9Compatible()) { + jvmArgs += ['--add-opens=java.base/java.io=ALL-UNNAMED', '--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED'] + } +}) +// Only run these tests if java 11+ is being used +if (JavaVersion.current().isJava11Compatible()) { + check.dependsOn(tasks.register('jetty11Test', Test) { + classpath = sourceSets.jettyTest.runtimeClasspath + testClassesDirs = sourceSets.jettyTest.output.classesDirs + }) +} diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/GrpcServletSmokeTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/GrpcServletSmokeTest.java new file mode 100644 index 00000000000..0208645706e --- /dev/null +++ b/servlet/src/jettyTest/java/io/grpc/servlet/GrpcServletSmokeTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.BindableService; +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.integration.Messages.Payload; +import io.grpc.testing.integration.Messages.SimpleRequest; +import io.grpc.testing.integration.Messages.SimpleResponse; +import io.grpc.testing.integration.TestServiceGrpc; +import io.grpc.testing.integration.TestServiceImpl; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http2.parser.RateControl; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Smoke test for {@link GrpcServlet}. */ +@RunWith(JUnit4.class) +public class GrpcServletSmokeTest { + private static final String HOST = "localhost"; + private static final String MYAPP = "/grpc.testing.TestService"; + + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + private final ScheduledExecutorService scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(); + private int port; + private Server server; + + @Before + public void startServer() { + BindableService service = new TestServiceImpl(scheduledExecutorService); + GrpcServlet grpcServlet = new GrpcServlet(ImmutableList.of(service)); + server = new Server(0); + ServerConnector sc = (ServerConnector)server.getConnectors()[0]; + HTTP2CServerConnectionFactory factory = + new HTTP2CServerConnectionFactory(new HttpConfiguration()); + + // Explicitly disable safeguards against malicious clients, as some unit tests trigger this + factory.setRateControlFactory(new RateControl.Factory() {}); + + sc.addConnectionFactory(factory); + ServletContextHandler context = + new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath(MYAPP); + context.addServlet(new ServletHolder(grpcServlet), "/*"); + server.setHandler(context); + + try { + server.start(); + } catch (Exception e) { + throw new AssertionError(e); + } + + port = sc.getLocalPort(); + } + + @After + public void tearDown() { + scheduledExecutorService.shutdown(); + try { + server.stop(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + @Test + public void unaryCall() { + Channel channel = cleanupRule.register( + ManagedChannelBuilder.forAddress(HOST, port).usePlaintext().build()); + SimpleResponse response = TestServiceGrpc.newBlockingStub(channel).unaryCall( + SimpleRequest.newBuilder() + .setResponseSize(1234) + .setPayload(Payload.newBuilder().setBody(ByteString.copyFromUtf8("hello foo"))) + .build()); + assertThat(response.getPayload().getBody().size()).isEqualTo(1234); + } + + @Test + public void httpGetRequest() throws Exception { + HttpClient httpClient = new HttpClient(); + try { + httpClient.start(); + ContentResponse response = + httpClient.GET("http://" + HOST + ":" + port + MYAPP + "/UnaryCall"); + assertThat(response.getStatus()).isEqualTo(405); + assertThat(response.getContentAsString()).contains("GET method not supported"); + } finally { + httpClient.stop(); + } + } +} diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyInteropTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyInteropTest.java new file mode 100644 index 00000000000..ebdf029fe27 --- /dev/null +++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyInteropTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.testing.integration.AbstractInteropTest; +import org.eclipse.jetty.http2.parser.RateControl; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; + +public class JettyInteropTest extends AbstractInteropTest { + + private static final String HOST = "localhost"; + private static final String MYAPP = "/grpc.testing.TestService"; + private int port; + private Server server; + + @After + @Override + public void tearDown() { + super.tearDown(); + try { + server.stop(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + @Override + protected ServerBuilder getServerBuilder() { + return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + } + + @Override + protected void startServer(ServerBuilder builer) { + GrpcServlet grpcServlet = + new GrpcServlet(((ServletServerBuilder) builer).buildServletAdapter()); + server = new Server(0); + ServerConnector sc = (ServerConnector)server.getConnectors()[0]; + HTTP2CServerConnectionFactory factory = + new HTTP2CServerConnectionFactory(new HttpConfiguration()); + + // Explicitly disable safeguards against malicious clients, as some unit tests trigger this + factory.setRateControlFactory(new RateControl.Factory() {}); + + sc.addConnectionFactory(factory); + ServletContextHandler context = + new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath(MYAPP); + context.addServlet(new ServletHolder(grpcServlet), "/*"); + server.setHandler(context); + + try { + server.start(); + } catch (Exception e) { + throw new AssertionError(e); + } + + port = sc.getLocalPort(); + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = + (NettyChannelBuilder) ManagedChannelBuilder.forAddress(HOST, port) + .usePlaintext() + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + InternalNettyChannelBuilder.setStatsEnabled(builder, false); + builder.intercept(createCensusStatsClientInterceptor()); + return builder; + } +} diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java new file mode 100644 index 00000000000..7941afc9b4d --- /dev/null +++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import io.grpc.InternalChannelz; +import io.grpc.InternalInstrumented; +import io.grpc.ServerStreamTracer; +import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransportListener; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.eclipse.jetty.http2.parser.RateControl; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Ignore; +import org.junit.Test; + + +public class JettyTransportTest extends AbstractTransportTest { + private static final String MYAPP = "/service"; + + private final FakeClock fakeClock = new FakeClock(); + private Server jettyServer; + private int port; + + + @Override + protected InternalServer newServer(List streamTracerFactories) { + return new InternalServer() { + final InternalServer delegate = + new ServletServerBuilder().buildTransportServers(streamTracerFactories); + + @Override + public void start(ServerListener listener) throws IOException { + delegate.start(listener); + ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); + ServerTransportListener serverTransportListener = + listener.transportCreated(new ServletServerBuilder.ServerTransportImpl(scheduler)); + ServletAdapter adapter = + new ServletAdapter(serverTransportListener, streamTracerFactories, + Integer.MAX_VALUE); + GrpcServlet grpcServlet = new GrpcServlet(adapter); + + jettyServer = new Server(0); + ServerConnector sc = (ServerConnector) jettyServer.getConnectors()[0]; + HttpConfiguration httpConfiguration = new HttpConfiguration(); + + // Must be set for several tests to pass, so that the request handling can begin before + // content arrives. + httpConfiguration.setDelayDispatchUntilContent(false); + + HTTP2CServerConnectionFactory factory = + new HTTP2CServerConnectionFactory(httpConfiguration); + factory.setRateControlFactory(new RateControl.Factory() { + }); + sc.addConnectionFactory(factory); + ServletContextHandler context = + new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath(MYAPP); + context.addServlet(new ServletHolder(grpcServlet), "/*"); + jettyServer.setHandler(context); + + try { + jettyServer.start(); + } catch (Exception e) { + throw new AssertionError(e); + } + + port = sc.getLocalPort(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public SocketAddress getListenSocketAddress() { + return delegate.getListenSocketAddress(); + } + + @Override + public InternalInstrumented getListenSocketStats() { + return delegate.getListenSocketStats(); + } + + @Override + public List getListenSocketAddresses() { + return delegate.getListenSocketAddresses(); + } + + @Nullable + @Override + public List> getListenSocketStatsList() { + return delegate.getListenSocketStatsList(); + } + }; + } + + @Override + protected InternalServer newServer(int port, + List streamTracerFactories) { + return newServer(streamTracerFactories); + } + + @Override + protected ManagedClientTransport newClientTransport(InternalServer server) { + NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder + // Although specified here, address is ignored because we never call build. + .forAddress("localhost", 0) + .flowControlWindow(65 * 1024) + .negotiationType(NegotiationType.PLAINTEXT); + InternalNettyChannelBuilder + .setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer); + ClientTransportFactory clientFactory = + InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder); + return clientFactory.newClientTransport( + new InetSocketAddress("localhost", port), + new ClientTransportFactory.ClientTransportOptions() + .setAuthority(testAuthority(server)) + .setEagAttributes(eagAttrs()), + transportLogger()); + } + + @Override + protected String testAuthority(InternalServer server) { + return "localhost:" + port; + } + + @Override + protected void advanceClock(long offset, TimeUnit unit) { + fakeClock.forwardTime(offset, unit); + } + + @Override + protected long fakeCurrentTimeNanos() { + return fakeClock.getTicker().read(); + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverAlreadyListening() { + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void openStreamPreventsTermination() { + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void shutdownNowKillsServerStream() { + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverNotListening() { + } + + // FIXME + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlPushBack() { + } + + // FIXME + @Override + @Ignore("Jetty is broken on client RST_STREAM") + @Test + public void shutdownNowKillsClientStream() { + } + + @Override + @Ignore("Server side sockets are managed by the servlet container") + @Test + public void socketStats() { + } + + @Override + @Ignore("serverTransportListener will not terminate") + @Test + public void clientStartAndStopOnceConnected() { + } + + @Override + @Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply") + @Test + public void serverCancel() { + } + + @Override + @Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops") + @Test + public void interactionsAfterServerStreamCloseAreNoops() { + } + + @Override + @Ignore("listeners.poll() doesn't apply") + @Test + public void interactionsAfterClientStreamCancelAreNoops() { + } + + @Override + @Ignore("assertNull(serverStatus.getCause()) isn't true") + @Test + public void clientCancel() { + } + + @Override + @Ignore("regression since bumping grpc v1.46 to v1.53") + @Test + public void messageProducerOnlyProducesRequestedMessages() {} +} diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java new file mode 100644 index 00000000000..4f4e37fda87 --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -0,0 +1,282 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.servlet.ServletServerStream.toHexString; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.InternalLogId; +import io.grpc.servlet.ServletServerStream.ServletTransportState; +import java.io.IOException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; +import java.util.logging.Logger; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; +import javax.servlet.AsyncContext; +import javax.servlet.ServletOutputStream; + +/** Handles write actions from the container thread and the application thread. */ +final class AsyncServletOutputStreamWriter { + + /** + * Memory boundary for write actions. + * + *
+   * WriteState curState = writeState.get();  // mark a boundary
+   * doSomething();  // do something within the boundary
+   * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
+   * if (successful) {
+   *   // state has not changed since
+   *   return;
+   * } else {
+   *   // state is changed by another thread while doSomething(), need recompute
+   * }
+   * 
+ * + *

There are two threads, the container thread (calling {@code onWritePossible()}) and the + * application thread (calling {@code runOrBuffer()}) that read and update the + * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and + * only runOrBuffer() may turn it from true to false. + */ + private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT); + + private final Log log; + private final BiFunction writeAction; + private final ActionItem flushAction; + private final ActionItem completeAction; + private final BooleanSupplier isReady; + + /** + * New write actions will be buffered into this queue if the servlet output stream is not ready or + * the queue is not drained. + */ + // SPSC queue would do + private final Queue writeChain = new ConcurrentLinkedQueue<>(); + // for a theoretical race condition that onWritePossible() is called immediately after isReady() + // returns false and before writeState.compareAndSet() + @Nullable + private volatile Thread parkingThread; + + AsyncServletOutputStreamWriter( + AsyncContext asyncContext, + ServletTransportState transportState, + InternalLogId logId) throws IOException { + Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName()); + this.log = new Log() { + @Override + public void fine(String str, Object... params) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, "[" + logId + "]" + str, params); + } + } + + @Override + public void finest(String str, Object... params) { + if (logger.isLoggable(FINEST)) { + logger.log(FINEST, "[" + logId + "] " + str, params); + } + } + }; + + ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream(); + this.writeAction = (byte[] bytes, Integer numBytes) -> () -> { + outputStream.write(bytes, 0, numBytes); + transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes)); + log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes)); + }; + this.flushAction = () -> { + log.finest("flushBuffer"); + asyncContext.getResponse().flushBuffer(); + }; + this.completeAction = () -> { + log.fine("call is completing"); + transportState.runOnTransportThread( + () -> { + transportState.complete(); + asyncContext.complete(); + log.fine("call completed"); + }); + }; + this.isReady = () -> outputStream.isReady(); + } + + /** + * Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run. + * + * @param writeAction Provides an {@link ActionItem} to write given bytes with specified length. + * @param isReady Indicates whether the writer can write bytes at the moment (asynchronously). + */ + @VisibleForTesting + AsyncServletOutputStreamWriter( + BiFunction writeAction, + ActionItem flushAction, + ActionItem completeAction, + BooleanSupplier isReady, + Log log) { + this.writeAction = writeAction; + this.flushAction = flushAction; + this.completeAction = completeAction; + this.isReady = isReady; + this.log = log; + } + + /** Called from application thread. */ + void writeBytes(byte[] bytes, int numBytes) throws IOException { + runOrBuffer(writeAction.apply(bytes, numBytes)); + } + + /** Called from application thread. */ + void flush() throws IOException { + runOrBuffer(flushAction); + } + + /** Called from application thread. */ + void complete() { + try { + runOrBuffer(completeAction); + } catch (IOException ignore) { + // actually completeAction does not throw IOException + } + } + + /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */ + void onWritePossible() throws IOException { + log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready"); + assureReadyAndDrainedTurnsFalse(); + while (isReady.getAsBoolean()) { + WriteState curState = writeState.get(); + + ActionItem actionItem = writeChain.poll(); + if (actionItem != null) { + actionItem.run(); + continue; + } + + if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) { + // state has not changed since. + log.finest( + "onWritePossible: EXIT. All data available now is sent out and the servlet output" + + " stream is still ready"); + return; + } + // else, state changed by another thread (runOrBuffer()), need to drain the writeChain + // again + } + log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready"); + } + + private void assureReadyAndDrainedTurnsFalse() { + // readyAndDrained should have been set to false already. + // Just in case due to a race condition readyAndDrained is still true at this moment and is + // being set to false by runOrBuffer() concurrently. + while (writeState.get().readyAndDrained) { + parkingThread = Thread.currentThread(); + LockSupport.parkNanos(Duration.ofMinutes(1).toNanos()); // should return immediately + } + parkingThread = null; + } + + /** + * Either execute the write action directly, or buffer the action and let the container thread + * drain it. + * + *

Called from application thread. + */ + private void runOrBuffer(ActionItem actionItem) throws IOException { + WriteState curState = writeState.get(); + if (curState.readyAndDrained) { // write to the outputStream directly + actionItem.run(); + if (actionItem == completeAction) { + return; + } + if (!isReady.getAsBoolean()) { + boolean successful = + writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); + LockSupport.unpark(parkingThread); + checkState(successful, "Bug: curState is unexpectedly changed by another thread"); + log.finest("the servlet output stream becomes not ready"); + } + } else { // buffer to the writeChain + writeChain.offer(actionItem); + if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) { + checkState( + writeState.get().readyAndDrained, + "Bug: onWritePossible() should have changed readyAndDrained to true, but not"); + ActionItem lastItem = writeChain.poll(); + if (lastItem != null) { + checkState(lastItem == actionItem, "Bug: lastItem != actionItem"); + runOrBuffer(lastItem); + } + } // state has not changed since + } + } + + /** Write actions, e.g. writeBytes, flush, complete. */ + @FunctionalInterface + @VisibleForTesting + interface ActionItem { + void run() throws IOException; + } + + @VisibleForTesting // Lincheck test can not run with java.util.logging dependency. + interface Log { + default void fine(String str, Object...params) {} + + default void finest(String str, Object...params) {} + } + + private static final class WriteState { + + static final WriteState DEFAULT = new WriteState(false); + + /** + * The servlet output stream is ready and the writeChain is empty. + * + *

readyAndDrained turns from false to true when: + * {@code onWritePossible()} exits while currently there is no more data to write, but the last + * check of {@link javax.servlet.ServletOutputStream#isReady()} is true. + * + *

readyAndDrained turns from true to false when: + * {@code runOrBuffer()} exits while either the action item is written directly to the + * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()} + * right after that returns false, or the action item is buffered into the writeChain. + */ + final boolean readyAndDrained; + + WriteState(boolean readyAndDrained) { + this.readyAndDrained = readyAndDrained; + } + + /** + * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code + * runOrBuffer()} can set it to false. + */ + @CheckReturnValue + WriteState withReadyAndDrained(boolean readyAndDrained) { + return new WriteState(readyAndDrained); + } + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java b/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java new file mode 100644 index 00000000000..a73b1fdfe6d --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.BindableService; +import io.grpc.ExperimentalApi; +import java.io.IOException; +import java.util.List; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * A simple servlet backed by a gRPC server. Must set {@code asyncSupported} to true. The {@code + * /contextRoot/urlPattern} must match the gRPC services' path, which is + * "/full-service-name/short-method-name". + * + *

The API is experimental. The authors would like to know more about the real usecases. Users + * are welcome to provide feedback by commenting on + * the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +public class GrpcServlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + private final ServletAdapter servletAdapter; + + @VisibleForTesting + GrpcServlet(ServletAdapter servletAdapter) { + this.servletAdapter = servletAdapter; + } + + /** + * Instantiate the servlet serving the given list of gRPC services. ServerInterceptors can be + * added on each gRPC service by {@link + * io.grpc.ServerInterceptors#intercept(BindableService, io.grpc.ServerInterceptor...)} + */ + public GrpcServlet(List bindableServices) { + this(loadServices(bindableServices)); + } + + private static ServletAdapter loadServices(List bindableServices) { + ServletServerBuilder serverBuilder = new ServletServerBuilder(); + bindableServices.forEach(serverBuilder::addService); + return serverBuilder.buildServletAdapter(); + } + + @Override + protected final void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException { + servletAdapter.doGet(request, response); + } + + @Override + protected final void doPost(HttpServletRequest request, HttpServletResponse response) + throws IOException { + servletAdapter.doPost(request, response); + } + + @Override + public void destroy() { + servletAdapter.destroy(); + super.destroy(); + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java b/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java new file mode 100644 index 00000000000..5a567916f99 --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java @@ -0,0 +1,333 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; + +import com.google.common.io.BaseEncoding; +import io.grpc.Attributes; +import io.grpc.ExperimentalApi; +import io.grpc.Grpc; +import io.grpc.InternalLogId; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ReadableBuffers; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.StatsTraceContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server + * process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be + * instantiated by {@link ServletServerBuilder#buildServletAdapter()}. + * + *

In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link + * javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet + * backed by the gRPC server associated with the adapter. The servlet must support Asynchronous + * Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2. + * + *

The API is experimental. The authors would like to know more about the real usecases. Users + * are welcome to provide feedback by commenting on + * the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +public final class ServletAdapter { + + static final Logger logger = Logger.getLogger(ServletAdapter.class.getName()); + + private final ServerTransportListener transportListener; + private final List streamTracerFactories; + private final int maxInboundMessageSize; + private final Attributes attributes; + + ServletAdapter( + ServerTransportListener transportListener, + List streamTracerFactories, + int maxInboundMessageSize) { + this.transportListener = transportListener; + this.streamTracerFactories = streamTracerFactories; + this.maxInboundMessageSize = maxInboundMessageSize; + attributes = transportListener.transportReady(Attributes.EMPTY); + } + + /** + * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest, + * HttpServletResponse)} to serve gRPC GET request. + * + *

This method is currently not implemented. + * + *

Note that in rare case gRPC client sends GET requests. + * + *

Do not modify {@code req} and {@code resp} before or after calling this method. However, + * calling {@code resp.setBufferSize()} before invocation is allowed. + */ + public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported"); + } + + /** + * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest, + * HttpServletResponse)} to serve gRPC POST request. + * + *

Do not modify {@code req} and {@code resp} before or after calling this method. However, + * calling {@code resp.setBufferSize()} before invocation is allowed. + */ + public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation"); + checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request"); + + InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null); + logger.log(FINE, "[{0}] RPC started", logId); + + AsyncContext asyncCtx = req.startAsync(req, resp); + + String method = req.getRequestURI().substring(1); // remove the leading "/" + Metadata headers = getHeaders(req); + + if (logger.isLoggable(FINEST)) { + logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method}); + logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers}); + } + + Long timeoutNanos = headers.get(TIMEOUT_KEY); + if (timeoutNanos == null) { + timeoutNanos = 0L; + } + asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos)); + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext(streamTracerFactories, method, headers); + + ServletServerStream stream = new ServletServerStream( + asyncCtx, + statsTraceCtx, + maxInboundMessageSize, + attributes.toBuilder() + .set( + Grpc.TRANSPORT_ATTR_REMOTE_ADDR, + new InetSocketAddress(req.getRemoteHost(), req.getRemotePort())) + .set( + Grpc.TRANSPORT_ATTR_LOCAL_ADDR, + new InetSocketAddress(req.getLocalAddr(), req.getLocalPort())) + .build(), + getAuthority(req), + logId); + + transportListener.streamCreated(stream, method, headers); + stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated); + + asyncCtx.getRequest().getInputStream() + .setReadListener(new GrpcReadListener(stream, asyncCtx, logId)); + asyncCtx.addListener(new GrpcAsyncListener(stream, logId)); + } + + // This method must use Enumeration and its members, since that is the only way to read headers + // from the servlet api. + @SuppressWarnings("JdkObsolete") + private static Metadata getHeaders(HttpServletRequest req) { + Enumeration headerNames = req.getHeaderNames(); + checkNotNull( + headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()"); + List byteArrays = new ArrayList<>(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + Enumeration values = req.getHeaders(headerName); + if (values == null) { + continue; + } + while (values.hasMoreElements()) { + String value = values.nextElement(); + if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); + byteArrays.add(BaseEncoding.base64().decode(value)); + } else { + byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); + byteArrays.add(value.getBytes(StandardCharsets.US_ASCII)); + } + } + } + return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][]{})); + } + + // This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which + // can only return StringBuffer instances + @SuppressWarnings("JdkObsolete") + private static String getAuthority(HttpServletRequest req) { + try { + return new URI(req.getRequestURL().toString()).getAuthority(); + } catch (URISyntaxException e) { + logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL()); + return req.getServerName() + ":" + req.getServerPort(); + } + } + + /** + * Call this method when the adapter is no longer needed. The gRPC server will be terminated. + */ + public void destroy() { + transportListener.transportTerminated(); + } + + private static final class GrpcAsyncListener implements AsyncListener { + final InternalLogId logId; + final ServletServerStream stream; + + GrpcAsyncListener(ServletServerStream stream, InternalLogId logId) { + this.stream = stream; + this.logId = logId; + } + + @Override + public void onComplete(AsyncEvent event) {} + + @Override + public void onTimeout(AsyncEvent event) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable()); + } + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM in the end. + if (!event.getAsyncContext().getResponse().isCommitted()) { + stream.cancel(Status.DEADLINE_EXCEEDED); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED)); + } + } + + @Override + public void onError(AsyncEvent event) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable()); + } + + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!event.getAsyncContext().getResponse().isCommitted()) { + stream.cancel(Status.fromThrowable(event.getThrowable())); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState().transportReportStatus( + Status.fromThrowable(event.getThrowable()))); + } + } + + @Override + public void onStartAsync(AsyncEvent event) {} + } + + private static final class GrpcReadListener implements ReadListener { + final ServletServerStream stream; + final AsyncContext asyncCtx; + final ServletInputStream input; + final InternalLogId logId; + + GrpcReadListener( + ServletServerStream stream, + AsyncContext asyncCtx, + InternalLogId logId) throws IOException { + this.stream = stream; + this.asyncCtx = asyncCtx; + input = asyncCtx.getRequest().getInputStream(); + this.logId = logId; + } + + final byte[] buffer = new byte[4 * 1024]; + + @Override + public void onDataAvailable() throws IOException { + logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId); + + while (input.isReady()) { + int length = input.read(buffer); + if (length == -1) { + logger.log(FINEST, "[{0}] inbound data: read end of stream", logId); + return; + } else { + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] inbound data: length = {1}, bytes = {2}", + new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)}); + } + + byte[] copy = Arrays.copyOf(buffer, length); + stream.transportState().runOnTransportThread( + () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false)); + } + } + + logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId); + } + + @Override + public void onAllDataRead() { + logger.log(FINE, "[{0}] onAllDataRead", logId); + stream.transportState().runOnTransportThread(() -> + stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true)); + } + + @Override + public void onError(Throwable t) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), t); + } + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!asyncCtx.getResponse().isCommitted()) { + stream.cancel(Status.fromThrowable(t)); + } else { + stream.transportState().runOnTransportThread( + () -> stream.transportState() + .transportReportStatus(Status.fromThrowable(t))); + } + } + } + + /** + * Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client. + * + * @return true if the request comes from a gRPC client + */ + public static boolean isGrpc(HttpServletRequest request) { + return request.getContentType() != null + && request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC); + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java new file mode 100644 index 00000000000..3e852ea3c09 --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java @@ -0,0 +1,268 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Attributes; +import io.grpc.ExperimentalApi; +import io.grpc.ForwardingServerBuilder; +import io.grpc.Internal; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ServerImplBuilder; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerStream; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder; +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Builder to build a gRPC server that can run as a servlet. This is for advanced custom settings. + * Normally, users should consider extending the out-of-box {@link GrpcServlet} directly instead. + * + *

The API is experimental. The authors would like to know more about the real usecases. Users + * are welcome to provide feedback by commenting on + * the tracking issue. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +@NotThreadSafe +public final class ServletServerBuilder extends ForwardingServerBuilder { + List streamTracerFactories; + int maxInboundMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + + private final ServerImplBuilder serverImplBuilder; + + private ScheduledExecutorService scheduler; + private boolean internalCaller; + private boolean usingCustomScheduler; + private InternalServerImpl internalServer; + + public ServletServerBuilder() { + serverImplBuilder = new ServerImplBuilder(this::buildTransportServers); + } + + /** + * Builds a gRPC server that can run as a servlet. + * + *

The returned server will not be started or bound to a port. + * + *

Users should not call this method directly. Instead users should call + * {@link #buildServletAdapter()} which internally will call {@code build()} and {@code start()} + * appropriately. + * + * @throws IllegalStateException if this method is called by users directly + */ + @Override + public Server build() { + checkState(internalCaller, "build() method should not be called directly by an application"); + return super.build(); + } + + /** + * Creates a {@link ServletAdapter}. + */ + public ServletAdapter buildServletAdapter() { + return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize); + } + + private ServerTransportListener buildAndStart() { + Server server; + try { + internalCaller = true; + server = build().start(); + } catch (IOException e) { + // actually this should never happen + throw new RuntimeException(e); + } finally { + internalCaller = false; + } + + if (!usingCustomScheduler) { + scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + } + + // Create only one "transport" for all requests because it has no knowledge of which request is + // associated with which client socket. This "transport" does not do socket connection, the + // container does. + ServerTransportImpl serverTransport = new ServerTransportImpl(scheduler); + ServerTransportListener delegate = + internalServer.serverListener.transportCreated(serverTransport); + return new ServerTransportListener() { + @Override + public void streamCreated(ServerStream stream, String method, Metadata headers) { + delegate.streamCreated(stream, method, headers); + } + + @Override + public Attributes transportReady(Attributes attributes) { + return delegate.transportReady(attributes); + } + + @Override + public void transportTerminated() { + server.shutdown(); + delegate.transportTerminated(); + if (!usingCustomScheduler) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); + } + } + }; + } + + @VisibleForTesting + InternalServer buildTransportServers( + List streamTracerFactories) { + checkNotNull(streamTracerFactories, "streamTracerFactories"); + this.streamTracerFactories = streamTracerFactories; + internalServer = new InternalServerImpl(); + return internalServer; + } + + @Internal + @Override + protected ServerBuilder delegate() { + return serverImplBuilder; + } + + /** + * Throws {@code UnsupportedOperationException}. TLS should be configured by the servlet + * container. + */ + @Override + public ServletServerBuilder useTransportSecurity(File certChain, File privateKey) { + throw new UnsupportedOperationException("TLS should be configured by the servlet container"); + } + + @Override + public ServletServerBuilder maxInboundMessageSize(int bytes) { + checkArgument(bytes >= 0, "bytes must be >= 0"); + maxInboundMessageSize = bytes; + return this; + } + + /** + * Provides a custom scheduled executor service to the server builder. + * + * @return this + */ + public ServletServerBuilder scheduledExecutorService(ScheduledExecutorService scheduler) { + this.scheduler = checkNotNull(scheduler, "scheduler"); + usingCustomScheduler = true; + return this; + } + + private static final class InternalServerImpl implements InternalServer { + + ServerListener serverListener; + + InternalServerImpl() {} + + @Override + public void start(ServerListener listener) { + serverListener = listener; + } + + @Override + public void shutdown() { + if (serverListener != null) { + serverListener.serverShutdown(); + } + } + + @Override + public SocketAddress getListenSocketAddress() { + return new SocketAddress() { + @Override + public String toString() { + return "ServletServer"; + } + }; + } + + @Override + public InternalInstrumented getListenSocketStats() { + // sockets are managed by the servlet container, grpc is ignorant of that + return null; + } + + @Override + public List getListenSocketAddresses() { + return Collections.emptyList(); + } + + @Nullable + @Override + public List> getListenSocketStatsList() { + return null; + } + } + + @VisibleForTesting + static final class ServerTransportImpl implements ServerTransport { + + private final InternalLogId logId = InternalLogId.allocate(ServerTransportImpl.class, null); + private final ScheduledExecutorService scheduler; + + ServerTransportImpl(ScheduledExecutorService scheduler) { + this.scheduler = checkNotNull(scheduler, "scheduler"); + } + + @Override + public void shutdown() {} + + @Override + public void shutdownNow(Status reason) {} + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return scheduler; + } + + @Override + public ListenableFuture getStats() { + // does not support instrumentation + return null; + } + + @Override + public InternalLogId getLogId() { + return logId; + } + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java new file mode 100644 index 00000000000..0415eea942e --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java @@ -0,0 +1,336 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC; +import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.WARNING; + +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Attributes; +import io.grpc.InternalLogId; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.internal.AbstractServerStream; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SerializingExecutor; +import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportFrameUtil; +import io.grpc.internal.TransportTracer; +import io.grpc.internal.WritableBuffer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.servlet.AsyncContext; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletResponse; + +final class ServletServerStream extends AbstractServerStream { + + private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName()); + + private final ServletTransportState transportState; + private final Sink sink = new Sink(); + private final AsyncContext asyncCtx; + private final HttpServletResponse resp; + private final Attributes attributes; + private final String authority; + private final InternalLogId logId; + private final AsyncServletOutputStreamWriter writer; + + ServletServerStream( + AsyncContext asyncCtx, + StatsTraceContext statsTraceCtx, + int maxInboundMessageSize, + Attributes attributes, + String authority, + InternalLogId logId) throws IOException { + super(ByteArrayWritableBuffer::new, statsTraceCtx); + transportState = + new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer()); + this.attributes = attributes; + this.authority = authority; + this.logId = logId; + this.asyncCtx = asyncCtx; + this.resp = (HttpServletResponse) asyncCtx.getResponse(); + this.writer = new AsyncServletOutputStreamWriter( + asyncCtx, transportState, logId); + resp.getOutputStream().setWriteListener(new GrpcWriteListener()); + } + + @Override + protected ServletTransportState transportState() { + return transportState; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public String getAuthority() { + return authority; + } + + @Override + public int streamId() { + return -1; + } + + @Override + protected Sink abstractServerStreamSink() { + return sink; + } + + private void writeHeadersToServletResponse(Metadata metadata) { + // Discard any application supplied duplicates of the reserved headers + metadata.discardAll(CONTENT_TYPE_KEY); + metadata.discardAll(GrpcUtil.TE_HEADER); + metadata.discardAll(GrpcUtil.USER_AGENT_KEY); + + if (logger.isLoggable(FINE)) { + logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata}); + } + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType(CONTENT_TYPE_GRPC); + + byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata); + for (int i = 0; i < serializedHeaders.length; i += 2) { + resp.addHeader( + new String(serializedHeaders[i], StandardCharsets.US_ASCII), + new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII)); + } + } + + final class ServletTransportState extends TransportState { + + private final SerializingExecutor transportThreadExecutor = + new SerializingExecutor(MoreExecutors.directExecutor()); + + private ServletTransportState( + int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); + } + + @Override + public void runOnTransportThread(Runnable r) { + transportThreadExecutor.execute(r); + } + + @Override + public void bytesRead(int numBytes) { + // no-op + // no flow control yet + } + + @Override + public void deframeFailed(Throwable cause) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause); + } + cancel(Status.fromThrowable(cause)); + } + } + + private static final class ByteArrayWritableBuffer implements WritableBuffer { + + private final int capacity; + final byte[] bytes; + private int index; + + ByteArrayWritableBuffer(int capacityHint) { + this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))]; + this.capacity = bytes.length; + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + System.arraycopy(src, srcIndex, bytes, index, length); + index += length; + } + + @Override + public void write(byte b) { + bytes[index++] = b; + } + + @Override + public int writableBytes() { + return capacity - index; + } + + @Override + public int readableBytes() { + return index; + } + + @Override + public void release() {} + } + + private final class GrpcWriteListener implements WriteListener { + + @Override + public void onError(Throwable t) { + if (logger.isLoggable(FINE)) { + logger.log(FINE, String.format("[{%s}] Error: ", logId), t); + } + + // If the resp is not committed, cancel() to avoid being redirected to an error page. + // Else, the container will send RST_STREAM at the end. + if (!resp.isCommitted()) { + cancel(Status.fromThrowable(t)); + } else { + transportState.runOnTransportThread( + () -> transportState.transportReportStatus(Status.fromThrowable(t))); + } + } + + @Override + public void onWritePossible() throws IOException { + writer.onWritePossible(); + } + } + + private final class Sink implements AbstractServerStream.Sink { + final TrailerSupplier trailerSupplier = new TrailerSupplier(); + + @Override + public void writeHeaders(Metadata headers) { + writeHeadersToServletResponse(headers); + resp.setTrailerFields(trailerSupplier); + try { + writer.flush(); + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e); + cancel(Status.fromThrowable(e)); + } + } + + @Override + public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) { + if (frame == null && !flush) { + return; + } + + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}", + new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages}); + } + + try { + if (frame != null) { + int numBytes = frame.readableBytes(); + if (numBytes > 0) { + onSendingBytes(numBytes); + } + writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes()); + } + + if (flush) { + writer.flush(); + } + } catch (IOException e) { + logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e); + cancel(Status.fromThrowable(e)); + } + } + + @Override + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { + if (logger.isLoggable(FINE)) { + logger.log( + FINE, + "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}", + new Object[] {logId, trailers, headersSent, status}); + } + if (!headersSent) { + writeHeadersToServletResponse(trailers); + } else { + byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers); + for (int i = 0; i < serializedHeaders.length; i += 2) { + String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII); + String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII); + trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue); + trailerSupplier.get().putIfAbsent(key, newValue); + } + } + + writer.complete(); + } + + @Override + public void cancel(Status status) { + if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) { + return; // let the servlet timeout, the container will sent RST_STREAM automatically + } + transportState.runOnTransportThread(() -> transportState.transportReportStatus(status)); + // There is no way to RST_STREAM with CANCEL code, so write trailers instead + close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata()); + CountDownLatch countDownLatch = new CountDownLatch(1); + transportState.runOnTransportThread(() -> { + asyncCtx.complete(); + countDownLatch.countDown(); + }); + try { + countDownLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class TrailerSupplier implements Supplier> { + final Map trailers = Collections.synchronizedMap(new HashMap<>()); + + TrailerSupplier() {} + + @Override + public Map get() { + return trailers; + } + } + + static String toHexString(byte[] bytes, int length) { + String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64)); + if (length > 80) { + hex += "..."; + } + if (length > 64) { + int offset = max(64, length - 16); + hex += BaseEncoding.base16().encode(bytes, offset, length - offset); + } + return hex; + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/package-info.java b/servlet/src/main/java/io/grpc/servlet/package-info.java new file mode 100644 index 00000000000..13d521fdde5 --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/package-info.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * API that implements gRPC server as a servlet. The API requires that the application container + * supports Servlet 4.0 and enables HTTP/2. + * + *

The API is experimental. The authors would like to know more about the real usecases. Users + * are welcome to provide feedback by commenting on + * the tracking issue. + */ +@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066") +package io.grpc.servlet; diff --git a/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java b/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java new file mode 100644 index 00000000000..61da2bf4c69 --- /dev/null +++ b/servlet/src/test/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static com.google.common.truth.Truth.assertWithMessage; +import static org.jetbrains.kotlinx.lincheck.strategy.managed.ManagedStrategyGuaranteeKt.forClasses; + +import io.grpc.servlet.AsyncServletOutputStreamWriter.ActionItem; +import io.grpc.servlet.AsyncServletOutputStreamWriter.Log; +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import org.jetbrains.kotlinx.lincheck.LinChecker; +import org.jetbrains.kotlinx.lincheck.annotations.OpGroupConfig; +import org.jetbrains.kotlinx.lincheck.annotations.Operation; +import org.jetbrains.kotlinx.lincheck.annotations.Param; +import org.jetbrains.kotlinx.lincheck.paramgen.BooleanGen; +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingCTest; +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions; +import org.jetbrains.kotlinx.lincheck.verifier.VerifierState; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test concurrency correctness of {@link AsyncServletOutputStreamWriter} using model checking with + * Lincheck. + * + *

This test should only call AsyncServletOutputStreamWriter's API surface and not rely on any + * implementation detail such as whether it's using a lock-free approach or not. + * + *

The test executes two threads concurrently, one for write and flush, and the other for + * onWritePossible up to {@link #OPERATIONS_PER_THREAD} operations on each thread. Lincheck will + * test all possibly interleaves (on context switch) between the two threads, and then verify the + * operations are linearizable in each interleave scenario. + */ +@ModelCheckingCTest +@OpGroupConfig(name = "update", nonParallel = true) +@OpGroupConfig(name = "write", nonParallel = true) +@Param(name = "keepReady", gen = BooleanGen.class) +@RunWith(JUnit4.class) +public class AsyncServletOutputStreamWriterConcurrencyTest extends VerifierState { + private static final int OPERATIONS_PER_THREAD = 6; + + private final AsyncServletOutputStreamWriter writer; + private final boolean[] keepReadyArray = new boolean[OPERATIONS_PER_THREAD]; + + private volatile boolean isReady; + // when isReadyReturnedFalse, writer.onWritePossible() will be called. + private volatile boolean isReadyReturnedFalse; + private int producerIndex; + private int consumerIndex; + private int bytesWritten; + + /** Public no-args constructor. */ + public AsyncServletOutputStreamWriterConcurrencyTest() { + BiFunction writeAction = + (bytes, numBytes) -> () -> { + assertWithMessage("write should only be called while isReady() is true") + .that(isReady) + .isTrue(); + // The byte to be written must equal to consumerIndex, otherwise execution order is wrong + assertWithMessage("write in wrong order").that(bytes[0]).isEqualTo((byte) consumerIndex); + bytesWritten++; + writeOrFlush(); + }; + + ActionItem flushAction = () -> { + assertWithMessage("flush must only be called while isReady() is true").that(isReady).isTrue(); + writeOrFlush(); + }; + + writer = new AsyncServletOutputStreamWriter( + writeAction, + flushAction, + () -> { }, + this::isReady, + new Log() {}); + } + + private void writeOrFlush() { + boolean keepReady = keepReadyArray[consumerIndex]; + if (!keepReady) { + isReady = false; + } + consumerIndex++; + } + + private boolean isReady() { + if (!isReady) { + assertWithMessage("isReady() already returned false, onWritePossible() will be invoked") + .that(isReadyReturnedFalse).isFalse(); + isReadyReturnedFalse = true; + } + return isReady; + } + + /** + * Writes a single byte with value equal to {@link #producerIndex}. + * + * @param keepReady when the byte is written: + * the ServletOutputStream should remain ready if keepReady == true; + * the ServletOutputStream should become unready if keepReady == false. + */ + // @com.google.errorprone.annotations.Keep + @Operation(group = "write") + public void write(@Param(name = "keepReady") boolean keepReady) throws IOException { + keepReadyArray[producerIndex] = keepReady; + writer.writeBytes(new byte[]{(byte) producerIndex}, 1); + producerIndex++; + } + + /** + * Flushes the writer. + * + * @param keepReady when flushing: + * the ServletOutputStream should remain ready if keepReady == true; + * the ServletOutputStream should become unready if keepReady == false. + */ + // @com.google.errorprone.annotations.Keep // called by lincheck reflectively + @Operation(group = "write") + public void flush(@Param(name = "keepReady") boolean keepReady) throws IOException { + keepReadyArray[producerIndex] = keepReady; + writer.flush(); + producerIndex++; + } + + /** If the writer is not ready, let it turn ready and call writer.onWritePossible(). */ + // @com.google.errorprone.annotations.Keep // called by lincheck reflectively + @Operation(group = "update") + public void maybeOnWritePossible() throws IOException { + if (isReadyReturnedFalse) { + isReadyReturnedFalse = false; + isReady = true; + writer.onWritePossible(); + } + } + + @Override + protected Object extractState() { + return bytesWritten; + } + + @Test + public void linCheck() { + ModelCheckingOptions options = new ModelCheckingOptions() + .actorsBefore(0) + .threads(2) + .actorsPerThread(OPERATIONS_PER_THREAD) + .actorsAfter(0) + .addGuarantee( + forClasses( + ConcurrentLinkedQueue.class.getName(), + AtomicReference.class.getName()) + .allMethods() + .treatAsAtomic()); + LinChecker.check(AsyncServletOutputStreamWriterConcurrencyTest.class, options); + } +} diff --git a/servlet/src/test/java/io/grpc/servlet/ServletServerBuilderTest.java b/servlet/src/test/java/io/grpc/servlet/ServletServerBuilderTest.java new file mode 100644 index 00000000000..d571cfd45d5 --- /dev/null +++ b/servlet/src/test/java/io/grpc/servlet/ServletServerBuilderTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.util.Enumeration; +import java.util.StringTokenizer; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.servlet.AsyncContext; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test for {@link ServletServerBuilder}. */ +@RunWith(JUnit4.class) +public class ServletServerBuilderTest { + + @Test + public void scheduledExecutorService() throws Exception { + ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class); + HttpServletRequest request = mock(HttpServletRequest.class); + HttpServletResponse response = mock(HttpServletResponse.class); + AsyncContext asyncContext = mock(AsyncContext.class); + ServletInputStream inputStream = mock(ServletInputStream.class); + ServletOutputStream outputStream = mock(ServletOutputStream.class); + ScheduledFuture future = mock(ScheduledFuture.class); + + doReturn(future).when(scheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + doReturn(true).when(request).isAsyncSupported(); + doReturn(asyncContext).when(request).startAsync(request, response); + doReturn("application/grpc").when(request).getContentType(); + doReturn("/hello/world").when(request).getRequestURI(); + @SuppressWarnings({"JdkObsolete", "unchecked"}) // Required by servlet API signatures. + // StringTokenizer is actually Enumeration + Enumeration headerNames = + (Enumeration) ((Enumeration) new StringTokenizer("grpc-timeout")); + @SuppressWarnings({"JdkObsolete", "unchecked"}) + Enumeration headers = + (Enumeration) ((Enumeration) new StringTokenizer("1m")); + doReturn(headerNames).when(request).getHeaderNames(); + doReturn(headers).when(request).getHeaders("grpc-timeout"); + doReturn(new StringBuffer("localhost:8080")).when(request).getRequestURL(); + doReturn(inputStream).when(request).getInputStream(); + doReturn("1.1.1.1").when(request).getLocalAddr(); + doReturn(8080).when(request).getLocalPort(); + doReturn("remote").when(request).getRemoteHost(); + doReturn(80).when(request).getRemotePort(); + doReturn(outputStream).when(response).getOutputStream(); + doReturn(request).when(asyncContext).getRequest(); + doReturn(response).when(asyncContext).getResponse(); + + ServletServerBuilder serverBuilder = + new ServletServerBuilder().scheduledExecutorService(scheduler); + ServletAdapter servletAdapter = serverBuilder.buildServletAdapter(); + servletAdapter.doPost(request, response); + + verify(asyncContext).setTimeout(1); + + // The following just verifies that scheduler is populated to the transport. + // It doesn't matter what tasks (such as handshake timeout and request deadline) are actually + // scheduled. + verify(scheduler, timeout(5000).atLeastOnce()) + .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + } +} diff --git a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java new file mode 100644 index 00000000000..f28a7419286 --- /dev/null +++ b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.testing.integration.AbstractInteropTest; +import java.io.File; +import org.apache.catalina.Context; +import org.apache.catalina.LifecycleException; +import org.apache.catalina.startup.Tomcat; +import org.apache.coyote.http2.Http2Protocol; +import org.apache.tomcat.util.http.fileupload.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Interop test for Tomcat server and Netty client. + */ +public class TomcatInteropTest extends AbstractInteropTest { + + private static final String HOST = "localhost"; + private static final String MYAPP = "/grpc.testing.TestService"; + private int port; + private Tomcat server; + + @After + @Override + public void tearDown() { + super.tearDown(); + try { + server.stop(); + } catch (LifecycleException e) { + throw new AssertionError(e); + } + } + + @AfterClass + public static void cleanUp() throws Exception { + FileUtils.deleteDirectory(new File("tomcat.0")); + } + + @Override + protected ServerBuilder getServerBuilder() { + return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + } + + @Override + protected void startServer(ServerBuilder builer) { + server = new Tomcat(); + server.setPort(0); + Context ctx = server.addContext(MYAPP, new File("build/tmp").getAbsolutePath()); + Tomcat + .addServlet( + ctx, "io.grpc.servlet.TomcatInteropTest", + new GrpcServlet(((ServletServerBuilder) builer).buildServletAdapter())) + .setAsyncSupported(true); + ctx.addServletMappingDecoded("/*", "io.grpc.servlet.TomcatInteropTest"); + + // Explicitly disable safeguards against malicious clients, as some unit tests trigger these + Http2Protocol http2Protocol = new Http2Protocol(); + http2Protocol.setOverheadCountFactor(0); + http2Protocol.setOverheadWindowUpdateThreshold(0); + http2Protocol.setOverheadContinuationThreshold(0); + http2Protocol.setOverheadDataThreshold(0); + + server.getConnector().addUpgradeProtocol(http2Protocol); + try { + server.start(); + } catch (LifecycleException e) { + throw new RuntimeException(e); + } + + port = server.getConnector().getLocalPort(); + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = + (NettyChannelBuilder) ManagedChannelBuilder.forAddress(HOST, port) + .usePlaintext() + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + InternalNettyChannelBuilder.setStatsEnabled(builder, false); + builder.intercept(createCensusStatsClientInterceptor()); + return builder; + } + + @Override + protected boolean metricsExpected() { + return false; // otherwise re-test will not work + } + + // FIXME + @Override + @Ignore("Tomcat is broken on client GOAWAY") + @Test + public void gracefulShutdown() {} + + // FIXME + @Override + @Ignore("Tomcat is not able to send trailer only") + @Test + public void specialStatusMessage() {} + + // FIXME + @Override + @Ignore("Tomcat is not able to send trailer only") + @Test + public void unimplementedMethod() {} + + // FIXME + @Override + @Ignore("Tomcat is not able to send trailer only") + @Test + public void statusCodeAndMessage() {} + + // FIXME + @Override + @Ignore("Tomcat is not able to send trailer only") + @Test + public void emptyStream() {} +} diff --git a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java new file mode 100644 index 00000000000..43c69e13fdd --- /dev/null +++ b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java @@ -0,0 +1,270 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.ServerStreamTracer; +import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransportListener; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.servlet.ServletServerBuilder.ServerTransportImpl; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.catalina.Context; +import org.apache.catalina.LifecycleException; +import org.apache.catalina.startup.Tomcat; +import org.apache.coyote.http2.Http2Protocol; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Transport test for Tomcat server and Netty client. + */ +public class TomcatTransportTest extends AbstractTransportTest { + private static final String MYAPP = "/service"; + + private final FakeClock fakeClock = new FakeClock(); + + private Tomcat tomcatServer; + private int port; + + @After + @Override + public void tearDown() throws InterruptedException { + super.tearDown(); + try { + tomcatServer.stop(); + } catch (LifecycleException e) { + throw new AssertionError(e); + } + } + + @Override + protected InternalServer newServer(List streamTracerFactories) { + return new InternalServer() { + final InternalServer delegate = + new ServletServerBuilder().buildTransportServers(streamTracerFactories); + + @Override + public void start(ServerListener listener) throws IOException { + delegate.start(listener); + ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); + ServerTransportListener serverTransportListener = + listener.transportCreated(new ServerTransportImpl(scheduler)); + ServletAdapter adapter = + new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE); + GrpcServlet grpcServlet = new GrpcServlet(adapter); + + tomcatServer = new Tomcat(); + tomcatServer.setPort(0); + Context ctx = tomcatServer.addContext(MYAPP, new File("build/tmp").getAbsolutePath()); + Tomcat.addServlet(ctx, "TomcatTransportTest", grpcServlet) + .setAsyncSupported(true); + ctx.addServletMappingDecoded("/*", "TomcatTransportTest"); + tomcatServer.getConnector().addUpgradeProtocol(new Http2Protocol()); + try { + tomcatServer.start(); + } catch (LifecycleException e) { + throw new RuntimeException(e); + } + + port = tomcatServer.getConnector().getLocalPort(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public SocketAddress getListenSocketAddress() { + return delegate.getListenSocketAddress(); + } + + @Override + public InternalInstrumented getListenSocketStats() { + return delegate.getListenSocketStats(); + } + + @Override + public List getListenSocketAddresses() { + return delegate.getListenSocketAddresses(); + } + + @Nullable + @Override + public List> getListenSocketStatsList() { + return delegate.getListenSocketStatsList(); + } + }; + } + + @Override + protected InternalServer newServer( + int port, List streamTracerFactories) { + return newServer(streamTracerFactories); + } + + @Override + protected ManagedClientTransport newClientTransport(InternalServer server) { + NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder + // Although specified here, address is ignored because we never call build. + .forAddress("localhost", 0) + .flowControlWindow(65 * 1024) + .negotiationType(NegotiationType.PLAINTEXT); + InternalNettyChannelBuilder + .setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer); + ClientTransportFactory clientFactory = + InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder); + return clientFactory.newClientTransport( + new InetSocketAddress("localhost", port), + new ClientTransportFactory.ClientTransportOptions() + .setAuthority(testAuthority(server)) + .setEagAttributes(eagAttrs()), + transportLogger()); + } + + @Override + protected String testAuthority(InternalServer server) { + return "localhost:" + port; + } + + @Override + protected void advanceClock(long offset, TimeUnit unit) { + fakeClock.forwardNanos(unit.toNanos(offset)); + } + + @Override + protected long fakeCurrentTimeNanos() { + return fakeClock.getTicker().read(); + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverAlreadyListening() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void openStreamPreventsTermination() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void shutdownNowKillsServerStream() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverNotListening() {} + + // FIXME + @Override + @Ignore("Tomcat is broken on client GOAWAY") + @Test + public void newStream_duringShutdown() {} + + // FIXME + @Override + @Ignore("Tomcat is broken on client GOAWAY") + @Test + public void ping_duringShutdown() {} + + // FIXME + @Override + @Ignore("Tomcat is broken on client RST_STREAM") + @Test + public void frameAfterRstStreamShouldNotBreakClientChannel() {} + + // FIXME + @Override + @Ignore("Tomcat is broken on client RST_STREAM") + @Test + public void shutdownNowKillsClientStream() {} + + // FIXME + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlPushBack() {} + + @Override + @Ignore("Server side sockets are managed by the servlet container") + @Test + public void socketStats() {} + + @Override + @Ignore("serverTransportListener will not terminate") + @Test + public void clientStartAndStopOnceConnected() {} + + @Override + @Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply") + @Test + public void serverCancel() {} + + @Override + @Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops") + @Test + public void interactionsAfterServerStreamCloseAreNoops() {} + + @Override + @Ignore("listeners.poll() doesn't apply") + @Test + public void interactionsAfterClientStreamCancelAreNoops() {} + + @Override + @Ignore("assertNull(serverStatus.getCause()) isn't true") + @Test + public void clientCancel() {} + + @Override + @Ignore("Tomcat does not support trailers only") + @Test + public void earlyServerClose_noServerHeaders() {} + + @Override + @Ignore("Tomcat does not support trailers only") + @Test + public void earlyServerClose_serverFailure() {} + + @Override + @Ignore("Tomcat does not support trailers only") + @Test + public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() {} + + @Override + @Ignore("regression since bumping grpc v1.46 to v1.53") + @Test + public void messageProducerOnlyProducesRequestedMessages() {} +} diff --git a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowInteropTest.java b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowInteropTest.java new file mode 100644 index 00000000000..600400b14b8 --- /dev/null +++ b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowInteropTest.java @@ -0,0 +1,128 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static io.undertow.servlet.Servlets.defaultContainer; +import static io.undertow.servlet.Servlets.deployment; +import static io.undertow.servlet.Servlets.servlet; + +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.testing.integration.AbstractInteropTest; +import io.undertow.Handlers; +import io.undertow.Undertow; +import io.undertow.UndertowOptions; +import io.undertow.server.HttpHandler; +import io.undertow.server.handlers.PathHandler; +import io.undertow.servlet.api.DeploymentInfo; +import io.undertow.servlet.api.DeploymentManager; +import io.undertow.servlet.api.InstanceFactory; +import io.undertow.servlet.util.ImmediateInstanceHandle; +import java.net.InetSocketAddress; +import javax.servlet.Servlet; +import javax.servlet.ServletException; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Interop test for Undertow server and Netty client. + */ +public class UndertowInteropTest extends AbstractInteropTest { + private static final String HOST = "localhost"; + private static final String MYAPP = "/grpc.testing.TestService"; + private int port; + private Undertow server; + private DeploymentManager manager; + + @After + @Override + public void tearDown() { + super.tearDown(); + if (server != null) { + server.stop(); + } + if (manager != null) { + try { + manager.stop(); + } catch (ServletException e) { + throw new AssertionError("failed to stop container", e); + } + } + } + + @Override + protected ServletServerBuilder getServerBuilder() { + return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + } + + @Override + protected void startServer(ServerBuilder builder) { + GrpcServlet grpcServlet = + new GrpcServlet(((ServletServerBuilder) builder).buildServletAdapter()); + InstanceFactory instanceFactory = + () -> new ImmediateInstanceHandle<>(grpcServlet); + DeploymentInfo servletBuilder = + deployment() + .setClassLoader(UndertowInteropTest.class.getClassLoader()) + .setContextPath(MYAPP) + .setDeploymentName("UndertowInteropTest.war") + .addServlets( + servlet("InteropTestServlet", GrpcServlet.class, instanceFactory) + .addMapping("/*") + .setAsyncSupported(true)); + + manager = defaultContainer().addDeployment(servletBuilder); + manager.deploy(); + + HttpHandler servletHandler; + try { + servletHandler = manager.start(); + } catch (ServletException e) { + throw new RuntimeException(e); + } + PathHandler path = Handlers.path(Handlers.redirect(MYAPP)) + .addPrefixPath("/", servletHandler); // for unimplementedService test + server = Undertow.builder() + .setServerOption(UndertowOptions.ENABLE_HTTP2, true) + .setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 5000 /* 5 sec */) + .addHttpListener(0, HOST) + .setHandler(path) + .build(); + server.start(); + port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort(); + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = (NettyChannelBuilder) ManagedChannelBuilder + .forAddress(HOST, port) + .usePlaintext() + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + InternalNettyChannelBuilder.setStatsEnabled(builder, false); + builder.intercept(createCensusStatsClientInterceptor()); + return builder; + } + + // FIXME + @Override + @Ignore("Undertow is broken on client GOAWAY") + @Test + public void gracefulShutdown() {} +} diff --git a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java new file mode 100644 index 00000000000..9d894b5e3f2 --- /dev/null +++ b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java @@ -0,0 +1,304 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import static io.undertow.servlet.Servlets.defaultContainer; +import static io.undertow.servlet.Servlets.deployment; +import static io.undertow.servlet.Servlets.servlet; + +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.ServerStreamTracer; +import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.FakeClock; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransportListener; +import io.grpc.netty.InternalNettyChannelBuilder; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.servlet.ServletServerBuilder.ServerTransportImpl; +import io.undertow.Handlers; +import io.undertow.Undertow; +import io.undertow.UndertowOptions; +import io.undertow.server.HttpHandler; +import io.undertow.server.handlers.PathHandler; +import io.undertow.servlet.api.DeploymentInfo; +import io.undertow.servlet.api.DeploymentManager; +import io.undertow.servlet.api.InstanceFactory; +import io.undertow.servlet.util.ImmediateInstanceHandle; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.servlet.Servlet; +import javax.servlet.ServletException; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Transport test for Undertow server and Netty client. + */ +public class UndertowTransportTest extends AbstractTransportTest { + + private static final String HOST = "localhost"; + private static final String MYAPP = "/service"; + + private final FakeClock fakeClock = new FakeClock(); + + private Undertow undertowServer; + private DeploymentManager manager; + private int port; + + @After + @Override + public void tearDown() throws InterruptedException { + super.tearDown(); + if (undertowServer != null) { + undertowServer.stop(); + } + if (manager != null) { + try { + manager.stop(); + } catch (ServletException e) { + throw new AssertionError("failed to stop container", e); + } + } + } + + @Override + protected InternalServer newServer(List + streamTracerFactories) { + return new InternalServer() { + final InternalServer delegate = + new ServletServerBuilder().buildTransportServers(streamTracerFactories); + + @Override + public void start(ServerListener listener) throws IOException { + delegate.start(listener); + ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); + ServerTransportListener serverTransportListener = + listener.transportCreated(new ServerTransportImpl(scheduler)); + ServletAdapter adapter = + new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE); + GrpcServlet grpcServlet = new GrpcServlet(adapter); + InstanceFactory instanceFactory = + () -> new ImmediateInstanceHandle<>(grpcServlet); + DeploymentInfo servletBuilder = + deployment() + .setClassLoader(UndertowInteropTest.class.getClassLoader()) + .setContextPath(MYAPP) + .setDeploymentName("UndertowTransportTest.war") + .addServlets( + servlet("TransportTestServlet", GrpcServlet.class, instanceFactory) + .addMapping("/*") + .setAsyncSupported(true)); + + manager = defaultContainer().addDeployment(servletBuilder); + manager.deploy(); + + HttpHandler servletHandler; + try { + servletHandler = manager.start(); + } catch (ServletException e) { + throw new RuntimeException(e); + } + PathHandler path = + Handlers.path(Handlers.redirect(MYAPP)) + .addPrefixPath("/", servletHandler); // for unimplementedService test + undertowServer = + Undertow.builder() + .setServerOption(UndertowOptions.ENABLE_HTTP2, true) + .setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 5000 /* 5 sec */) + .addHttpListener(0, HOST) + .setHandler(path) + .build(); + undertowServer.start(); + port = ((InetSocketAddress) undertowServer.getListenerInfo().get(0).getAddress()).getPort(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public SocketAddress getListenSocketAddress() { + return delegate.getListenSocketAddress(); + } + + @Override + public InternalInstrumented getListenSocketStats() { + return delegate.getListenSocketStats(); + } + + @Override + public List getListenSocketAddresses() { + return delegate.getListenSocketAddresses(); + } + + @Nullable + @Override + public List> getListenSocketStatsList() { + return delegate.getListenSocketStatsList(); + } + }; + } + + @Override + protected InternalServer newServer(int port, + List streamTracerFactories) { + return newServer(streamTracerFactories); + } + + @Override + protected ManagedClientTransport newClientTransport(InternalServer server) { + NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder + // Although specified here, address is ignored because we never call build. + .forAddress("localhost", 0) + .flowControlWindow(65 * 1024) + .negotiationType(NegotiationType.PLAINTEXT); + InternalNettyChannelBuilder + .setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer); + ClientTransportFactory clientFactory = + InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder); + return clientFactory.newClientTransport( + new InetSocketAddress("localhost", port), + new ClientTransportFactory.ClientTransportOptions() + .setAuthority(testAuthority(server)) + .setEagAttributes(eagAttrs()), + transportLogger()); + } + + @Override + protected String testAuthority(InternalServer server) { + return "localhost:" + port; + } + + @Override + protected void advanceClock(long offset, TimeUnit unit) { + fakeClock.forwardNanos(unit.toNanos(offset)); + } + + @Override + protected long fakeCurrentTimeNanos() { + return fakeClock.getTicker().read(); + } + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverAlreadyListening() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void openStreamPreventsTermination() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void shutdownNowKillsServerStream() {} + + @Override + @Ignore("Skip the test, server lifecycle is managed by the container") + @Test + public void serverNotListening() {} + + @Override + @Ignore("Skip the test, can not set HTTP/2 SETTINGS_MAX_HEADER_LIST_SIZE") + @Test + public void serverChecksInboundMetadataSize() {} + + // FIXME + @Override + @Ignore("Undertow is broken on client GOAWAY") + @Test + public void newStream_duringShutdown() {} + + // FIXME + @Override + @Ignore("Undertow is broken on client GOAWAY") + @Test + public void ping_duringShutdown() {} + + // FIXME + @Override + @Ignore("Undertow is broken on client RST_STREAM") + @Test + public void frameAfterRstStreamShouldNotBreakClientChannel() {} + + // FIXME + @Override + @Ignore("Undertow is broken on client RST_STREAM") + @Test + public void shutdownNowKillsClientStream() {} + + // FIXME: https://github.com/grpc/grpc-java/issues/8925 + @Override + @Ignore("flaky") + @Test + public void clientCancelFromWithinMessageRead() {} + + // FIXME + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlPushBack() {} + + @Override + @Ignore("Server side sockets are managed by the servlet container") + @Test + public void socketStats() {} + + @Override + @Ignore("serverTransportListener will not terminate") + @Test + public void clientStartAndStopOnceConnected() {} + + @Override + @Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply") + @Test + public void serverCancel() {} + + @Override + @Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops") + @Test + public void interactionsAfterServerStreamCloseAreNoops() {} + + @Override + @Ignore("listeners.poll() doesn't apply") + @Test + public void interactionsAfterClientStreamCancelAreNoops() {} + + + @Override + @Ignore("assertNull(serverStatus.getCause()) isn't true") + @Test + public void clientCancel() {} + + @Override + @Ignore("regression since bumping grpc v1.46 to v1.53") + @Test + public void messageProducerOnlyProducesRequestedMessages() {} +} diff --git a/settings.gradle b/settings.gradle index 25767e7e48b..92db19a8839 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,6 +48,8 @@ include ":grpc-all" include ":grpc-alts" include ":grpc-benchmarks" include ":grpc-services" +include ":grpc-servlet" +include ":grpc-servlet-jakarta" include ":grpc-xds" include ":grpc-bom" include ":grpc-rls" @@ -76,6 +78,8 @@ project(':grpc-all').projectDir = "$rootDir/all" as File project(':grpc-alts').projectDir = "$rootDir/alts" as File project(':grpc-benchmarks').projectDir = "$rootDir/benchmarks" as File project(':grpc-services').projectDir = "$rootDir/services" as File +project(':grpc-servlet').projectDir = "$rootDir/servlet" as File +project(':grpc-servlet-jakarta').projectDir = "$rootDir/servlet/jakarta" as File project(':grpc-xds').projectDir = "$rootDir/xds" as File project(':grpc-bom').projectDir = "$rootDir/bom" as File project(':grpc-rls').projectDir = "$rootDir/rls" as File