Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/io/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public T handshakeTimeout(long timeout, TimeUnit unit) {
/**
* Sets the time without read activity before sending a keepalive ping. An unreasonably small
* value might be increased, and {@code Long.MAX_VALUE} nano seconds or an unreasonably large
* value will disable keepalive. The typical default is infinite when supported.
* value will disable keepalive. The typical default is two hours when supported.
*
* @throws IllegalArgumentException if time is not positive
* @throws UnsupportedOperationException if unsupported
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected interface Sink {
* @param flush {@code true} if more data may not be arriving soon
* @param numMessages the number of messages this frame represents
*/
void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages);
void writeFrame(WritableBuffer frame, boolean flush, int numMessages);

/**
* Sends trailers to the remote end point. This call implies end of stream.
Expand Down Expand Up @@ -108,7 +108,14 @@ public final void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
// Since endOfStream is triggered by the sending of trailers, avoid flush here and just flush
// after the trailers.
abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush, numMessages);
if (frame == null) {
assert endOfStream;
return;
}
if (endOfStream) {
flush = false;
}
abstractServerStreamSink().writeFrame(frame, flush, numMessages);
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ public static void closeQuietly(@Nullable Closeable message) {
}
}

/** Reads {@code in} until end of stream. */
public static void exhaust(InputStream in) throws IOException {
byte[] buf = new byte[256];
while (in.read(buf) != -1) {}
}

/**
* Checks whether the given item exists in the iterable. This is copied from Guava Collect's
* {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,13 @@ public void serverStartInterrupted() throws Exception {
}
assumeTrue("transport is not using InetSocketAddress", port != -1);
server.shutdown();
assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));

server = newServer(port, Arrays.asList(serverStreamTracerFactory));
boolean success;
Thread.currentThread().interrupt();
try {
server.start(serverListener);
server.start(serverListener = new MockServerListener());
success = true;
} catch (Exception ex) {
success = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package io.grpc.testing.integration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import io.grpc.ChannelCredentials;
import io.grpc.Metadata;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.TlsChannelCredentials;
Expand All @@ -30,9 +26,7 @@
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.MetadataUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -84,36 +78,8 @@ protected NettyChannelBuilder createChannelBuilder() {
}
}

@Test
public void remoteAddr() {
InetSocketAddress isa = (InetSocketAddress) obtainRemoteClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
// It should not be the same as the server
assertNotEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
}

@Test
public void localAddr() throws Exception {
InetSocketAddress isa = (InetSocketAddress) obtainLocalServerAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
assertEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
}

@Test
public void tlsInfo() {
assertX500SubjectDn("CN=testclient, O=Internet Widgits Pty Ltd, ST=Some-State, C=AU");
}

@Test
public void contentLengthPermitted() throws Exception {
// Some third-party gRPC implementations (e.g., ServiceTalk) include Content-Length. The HTTP/2
// code starting in Netty 4.1.60.Final has special-cased handling of Content-Length, and may
// call uncommon methods on our custom headers implementation.
// https://github.com/grpc/grpc-java/issues/7953
Metadata contentLength = new Metadata();
contentLength.put(Metadata.Key.of("content-length", Metadata.ASCII_STRING_MARSHALLER), "5");
blockingStub
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(contentLength))
.emptyCall(EMPTY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.testing.integration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import io.grpc.ChannelCredentials;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.TlsChannelCredentials;
import io.grpc.TlsServerCredentials;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.okhttp.InternalOkHttpChannelBuilder;
import io.grpc.okhttp.InternalOkHttpServerBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
import io.grpc.okhttp.OkHttpServerBuilder;
import io.grpc.stub.MetadataUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

/**
* Integration tests for GRPC over the various HTTP2 transports.
*/
@RunWith(Parameterized.class)
public class Http2Test extends AbstractInteropTest {
@BeforeClass
public static void loadConscrypt() throws Exception {
// Load conscrypt if it is available. Either Conscrypt or Jetty ALPN needs to be available for
// OkHttp to negotiate.
TestUtils.installConscryptIfAvailable();
}

enum Transport {
NETTY, OKHTTP;
}

/** Parameterized test cases. */
@Parameters(name = "client={0},server={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{Transport.NETTY, Transport.NETTY},
{Transport.OKHTTP, Transport.OKHTTP},
{Transport.OKHTTP, Transport.NETTY},
{Transport.NETTY, Transport.OKHTTP},
});
}

private final Transport clientType;
private final Transport serverType;

public Http2Test(Transport clientType, Transport serverType) {
this.clientType = clientType;
this.serverType = serverType;
}

@Override
protected ServerBuilder<?> getServerBuilder() {
// Starts the server with HTTPS.
ServerCredentials serverCreds;
try {
serverCreds = TlsServerCredentials.create(
TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
ServerBuilder<?> builder;
if (serverType == Transport.NETTY) {
NettyServerBuilder nettyBuilder = NettyServerBuilder.forPort(0, serverCreds)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW);
// Disable the default census stats tracer, use testing tracer instead.
InternalNettyServerBuilder.setStatsEnabled(nettyBuilder, false);
builder = nettyBuilder;
} else {
OkHttpServerBuilder okHttpBuilder = OkHttpServerBuilder.forPort(0, serverCreds)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW);
// Disable the default census stats tracer, use testing tracer instead.
InternalOkHttpServerBuilder.setStatsEnabled(okHttpBuilder, false);
builder = okHttpBuilder;
}
return builder
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.addStreamTracerFactory(createCustomCensusTracerFactory());
}

@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
ChannelCredentials channelCreds;
try {
channelCreds = TlsChannelCredentials.newBuilder()
.trustManager(TestUtils.loadCert("ca.pem"))
.build();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
int port = ((InetSocketAddress) getListenAddress()).getPort();
ManagedChannelBuilder<?> builder;
if (clientType == Transport.NETTY) {
NettyChannelBuilder nettyBuilder = NettyChannelBuilder
.forAddress("localhost", port, channelCreds)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
InternalNettyChannelBuilder.setStatsEnabled(nettyBuilder, false);
builder = nettyBuilder;
} else {
OkHttpChannelBuilder okHttpBuilder = OkHttpChannelBuilder
.forAddress("localhost", port, channelCreds)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
InternalOkHttpChannelBuilder.setStatsEnabled(okHttpBuilder, false);
builder = okHttpBuilder;
}
return builder
.overrideAuthority(TestUtils.TEST_SERVER_HOST)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.intercept(createCensusStatsClientInterceptor());
}

@Test
public void remoteAddr() {
InetSocketAddress isa = (InetSocketAddress) obtainRemoteClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
// It should not be the same as the server
assertNotEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
}

@Test
public void localAddr() throws Exception {
InetSocketAddress isa = (InetSocketAddress) obtainLocalServerAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
assertEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
}

@Test
public void contentLengthPermitted() throws Exception {
// Some third-party gRPC implementations (e.g., ServiceTalk) include Content-Length. The HTTP/2
// code starting in Netty 4.1.60.Final has special-cased handling of Content-Length, and may
// call uncommon methods on our custom headers implementation.
// https://github.com/grpc/grpc-java/issues/7953
Metadata contentLength = new Metadata();
contentLength.put(Metadata.Key.of("content-length", Metadata.ASCII_STRING_MARSHALLER), "5");
blockingStub
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(contentLength))
.emptyCall(EMPTY);
}
}
4 changes: 4 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -854,6 +855,9 @@ public void onHeadersRead(ChannelHandlerContext ctx,
keepAliveManager.onDataReceived();
}
NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
if (endStream) {
NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
}
}

@Override
Expand Down
4 changes: 0 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ public void writeHeaders(Metadata headers) {

private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
if (frame == null) {
writeQueue.scheduleFlush();
return;
}
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf().touch();
final int numBytes = bytebuf.readableBytes();
// Add the bytes to outbound flow control.
Expand Down
1 change: 1 addition & 0 deletions okhttp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ java_library(
deps = [
"//api",
"//core:internal",
"//core:util",
"@com_google_code_findbugs_jsr305//jar",
"@com_google_errorprone_error_prone_annotations//jar",
"@com_google_guava_guava//jar",
Expand Down
2 changes: 1 addition & 1 deletion okhttp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {
testImplementation project(':grpc-core').sourceSets.test.output,
project(':grpc-api').sourceSets.test.output,
project(':grpc-testing'),
project(':grpc-netty'),
libraries.netty.codec.http2,
libraries.okhttp
signature "org.codehaus.mojo.signature:java17:1.0@signature"
signature "net.sf.androidscents.signature:android-api-level-14:4.0_r4@signature"
Expand Down
Loading