Skip to content

Commit

Permalink
Handle header with errors and endStream = true (#10384) (#10414)
Browse files Browse the repository at this point in the history
* Eliminate NPE by skipping further processing when stream is defined, but doesn't have a property for streamKey (header processing identified an error)

Fixes #10364

* Add unit test for missing content type
  • Loading branch information
larry-safran committed Jul 24, 2023
1 parent c96b7d5 commit 22de216
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2106,6 +2106,9 @@ private static void assertStatusEquals(Status expected, Status actual) {
* be present, and the cause should be stripped away.
*/
private static void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
if (!clientStreamStatus.isOk() && clientStreamStatus.getCode() != expectedStatus.getCode()) {
System.out.println("Full Status: " + clientStreamStatus);
}
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
Expand Down
17 changes: 11 additions & 6 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfSt
flowControlPing().onDataRead(data.readableBytes(), padding);
try {
NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
if (stream == null) {
return;
}
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
PerfMark.attachTag(stream.tag());
stream.inboundDataReceived(data, endOfStream);
Expand Down Expand Up @@ -679,12 +682,14 @@ void returnProcessedBytes(Http2Stream http2Stream, int bytes) {

private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
stream.complete();
}
});
if (stream != null) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
stream.complete();
}
});
}
}

/**
Expand Down
28 changes: 28 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,34 @@ public void headersWithMultipleHostsShouldFail() throws Exception {
any(ChannelPromise.class));
}

@Test
public void headersWithErrAndEndStreamReturnErrorButNotThrowNpe() throws Exception {
manualSetUp();
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
.add(AsciiString.of("host"), AsciiString.of("example.com"))
.path(new AsciiString("/foo/bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
channelRead(headersFrame);
channelRead(emptyGrpcFrame(STREAM_ID, true));

Http2Headers responseHeaders = new DefaultHttp2Headers()
.set(InternalStatus.CODE_KEY.name(), String.valueOf(Code.INTERNAL.value()))
.set(InternalStatus.MESSAGE_KEY.name(), "Content-Type is missing from the request")
.status("" + 415)
.set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");

verifyWrite()
.writeHeaders(
eq(ctx()),
eq(STREAM_ID),
eq(responseHeaders),
eq(0),
eq(false),
any(ChannelPromise.class));

}

@Test
public void headersWithAuthorityAndHostUsesAuthority() throws Exception {
manualSetUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ public void testStaticStrideSchedulerNonIntegers1() {
Map<Integer, Integer> pickCount = new HashMap<>();
for (int i = 0; i < 1000; i++) {
int result = sss.pick();
pickCount.put(result, pickCount.getOrDefault(result, 0) + 1);
pickCount.merge(result, 1, (o, v) -> o + v);
}
for (int i = 0; i < 3; i++) {
assertThat(Math.abs(pickCount.getOrDefault(i, 0) / 1000.0 - weights[i] / totalWeight))
Expand Down

0 comments on commit 22de216

Please sign in to comment.