From ffb642da20170c43f923bfc2d3958c1b6ddc86c4 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 12:10:36 +0000 Subject: [PATCH 1/9] Correctly handle server sending EOS in a data frame --- .../Client/GRPCClientStreamHandler.swift | 32 ++-- .../GRPCStreamStateMachine.swift | 80 ++++++++-- .../Server/GRPCServerStreamHandler.swift | 26 ++-- .../Client/GRPCClientStreamHandlerTests.swift | 72 ++++++++- .../GRPCStreamStateMachineTests.swift | 143 ++++++++++++++---- 5 files changed, 291 insertions(+), 62 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 5be8f1025..478059dcf 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -67,16 +67,27 @@ extension GRPCClientStreamHandler { switch frameData.data { case .byteBuffer(let buffer): do { - try self.stateMachine.receive(buffer: buffer, endStream: endStream) - loop: while true { - switch self.stateMachine.nextInboundMessage() { - case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) - case .awaitMoreMessages: - break loop - case .noMoreMessages: - context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) - break loop + switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) { + case .endRPCAndForwardErrorStatus(let status): + if let rpcError = RPCError(status: status) { + context.fireErrorCaught(rpcError) + } + return + + case .readInbound: + loop: while true { + switch self.stateMachine.nextInboundMessage() { + case .receiveMessage(let message): + context.fireChannelRead(self.wrapInboundOut(.message(message))) + case .awaitMoreMessages: + break loop + case .noMoreMessages: + // This could only happen if the server sends a data frame with EOS + // set, without sending status and trailers. + // If this happens, we should have forwarded an error status above + // so we should never reach this point. Do nothing. + break loop + } } } } catch { @@ -105,6 +116,7 @@ extension GRPCClientStreamHandler { case .receivedStatusAndMetadata(let status, let metadata): context.fireChannelRead(self.wrapInboundOut(.status(status, metadata))) + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) case .doNothing: () diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 1c8eca27f..580948800 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -292,6 +292,12 @@ private enum GRPCStreamStateMachineState { self.inboundMessageBuffer = previousState.inboundMessageBuffer } + init(previousState: ClientOpenServerOpenState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + init(previousState: ClientOpenServerClosedState) { self.framer = previousState.framer self.compressor = previousState.compressor @@ -388,12 +394,24 @@ struct GRPCStreamStateMachine { } } - mutating func receive(buffer: ByteBuffer, endStream: Bool) throws { + enum OnBufferReceivedAction: Equatable { + case readInbound + + // Client-specific actions + + // This will be returned when the server sends a data frame with EOS set. + // This is invalid as per the protocol specification, because the server + // can only close by sending trailers, not by setting EOS when sending + // a message. + case endRPCAndForwardErrorStatus(Status) + } + + mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction { switch self.configuration { case .client: - try self.clientReceive(buffer: buffer, endStream: endStream) + return try self.clientReceive(buffer: buffer, endStream: endStream) case .server: - try self.serverReceive(buffer: buffer, endStream: endStream) + return try self.serverReceive(buffer: buffer, endStream: endStream) } } @@ -860,38 +878,68 @@ extension GRPCStreamStateMachine { } } - private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws { + private mutating func clientReceive( + buffer: ByteBuffer, + endStream: Bool + ) throws -> OnBufferReceivedAction { // This is a message received by the client, from the server. switch self.state { case .clientIdleServerIdle: try self.invalidState( "Cannot have received anything from server if client is not yet open." ) + case .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState( "Server cannot have sent a message before sending the initial metadata." ) + case .clientOpenServerOpen(var state): + if endStream { + // This is invalid as per the protocol specification, because the server + // can only close by sending trailers, not by setting EOS when sending + // a message. + self.state = .clientClosedServerClosed(.init(previousState: state)) + return .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) + } + try state.deframer.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } + self.state = .clientOpenServerOpen(state) + return .readInbound + + case .clientClosedServerOpen(var state): if endStream { - self.state = .clientOpenServerClosed(.init(previousState: state)) - } else { - self.state = .clientOpenServerOpen(state) + self.state = .clientClosedServerClosed(.init(previousState: state)) + return .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) } - case .clientClosedServerOpen(var state): + // The client may have sent the end stream and thus it's closed, // but the server may still be responding. // The client must have a deframer set up, so force-unwrap is okay. try state.deframer!.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } - if endStream { - self.state = .clientClosedServerClosed(.init(previousState: state)) - } else { - self.state = .clientClosedServerOpen(state) - } + self.state = .clientClosedServerOpen(state) + return .readInbound + case .clientOpenServerClosed, .clientClosedServerClosed: try self.invalidState( "Cannot have received anything from a closed server." @@ -1314,7 +1362,10 @@ extension GRPCStreamStateMachine { } } - private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws { + private mutating func serverReceive( + buffer: ByteBuffer, + endStream: Bool + ) throws -> OnBufferReceivedAction { switch self.state { case .clientIdleServerIdle: try self.invalidState( @@ -1354,6 +1405,7 @@ extension GRPCStreamStateMachine { "Client can't send a message if closed." ) } + return .readInbound } private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame { diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index 14c576c20..3624679e9 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -64,16 +64,22 @@ extension GRPCServerStreamHandler { switch frameData.data { case .byteBuffer(let buffer): do { - try self.stateMachine.receive(buffer: buffer, endStream: endStream) - loop: while true { - switch self.stateMachine.nextInboundMessage() { - case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) - case .awaitMoreMessages: - break loop - case .noMoreMessages: - context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) - break loop + switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) { + case .endRPCAndForwardErrorStatus: + preconditionFailure( + "OnBufferReceivedAction.endRPCAndForwardErrorStatus should never be returned for the server." + ) + case .readInbound: + loop: while true { + switch self.stateMachine.nextInboundMessage() { + case .receiveMessage(let message): + context.fireChannelRead(self.wrapInboundOut(.message(message))) + case .awaitMoreMessages: + break loop + case .noMoreMessages: + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) + break loop + } } } } catch { diff --git a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift index 3b0d461b1..3cdb5a01f 100644 --- a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift @@ -305,7 +305,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase { buffer.writeInteger(UInt8(0)) // not compressed buffer.writeInteger(UInt32(42)) // message length buffer.writeRepeatingByte(0, count: 42) // message - let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true) + let clientDataPayload = HTTP2Frame.FramePayload.Data( + data: .byteBuffer(buffer), + endStream: false + ) XCTAssertThrowsError( ofType: RPCError.self, try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)) @@ -321,6 +324,73 @@ final class GRPCClientStreamHandlerTests: XCTestCase { XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) } + func testServerEndsStreamWhenSendingMessage_ResultsInErrorStatus() throws { + let handler = GRPCClientStreamHandler( + methodDescriptor: .init(service: "test", method: "test"), + scheme: .http, + outboundEncoding: .identity, + acceptedEncodings: [], + maximumPayloadSize: 100, + skipStateMachineAssertions: true + ) + + let channel = EmbeddedChannel(handler: handler) + + // Send client's initial metadata + XCTAssertNoThrow( + try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + ) + + // Make sure we have sent right metadata. + let writtenMetadata = try channel.assertReadHeadersOutbound() + + XCTAssertEqual( + writtenMetadata.headers, + [ + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + ] + ) + + // Server sends initial metadata + let serverInitialMetadata: HPACKHeaders = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + ] + XCTAssertNoThrow( + try channel.writeInbound( + HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata)) + ) + ) + XCTAssertEqual( + try channel.readInbound(as: RPCResponsePart.self), + .metadata(Metadata(headers: serverInitialMetadata)) + ) + + // Server sends message with EOS set. + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) // not compressed + buffer.writeInteger(UInt32(42)) // message length + buffer.writeRepeatingByte(0, count: 42) // message + let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true) + XCTAssertThrowsError( + ofType: RPCError.self, + try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)) + ) { error in + XCTAssertEqual(error.code, .internalError) + XCTAssertEqual( + error.message, + "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers." + ) + } + + // Make sure we didn't read the received message + XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + } + func testServerEndsStream() throws { let handler = GRPCClientStreamHandler( methodDescriptor: .init(service: "test", method: "test"), diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 324d47fdb..cae705269 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -599,8 +599,22 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] { var stateMachine = self.makeClientStateMachine(targetState: targetState) - XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) - XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) + XCTAssertEqual( + try stateMachine.receive(buffer: .init(), endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: .init(), endStream: true), + .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) + ) } } @@ -776,7 +790,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -790,7 +807,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -804,7 +824,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -821,7 +844,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close client XCTAssertNoThrow(try stateMachine.closeOutbound()) @@ -840,7 +866,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -906,8 +935,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -988,8 +1023,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -1069,8 +1110,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -2061,7 +2108,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2076,7 +2126,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2090,7 +2143,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow( @@ -2117,7 +2173,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) @@ -2136,7 +2195,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow( @@ -2189,9 +2251,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends response @@ -2226,7 +2294,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { try secondPromise.futureResult.assertSuccess().wait() // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server ends let response = try stateMachine.send( @@ -2259,13 +2330,22 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server sends initial metadata let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) @@ -2324,9 +2404,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends initial metadata @@ -2342,7 +2428,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server sends response let firstResponse = [UInt8]([5, 6, 7]) From 37471dc47872010347a02a8d9e695fd498bb0f08 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 13:35:32 +0000 Subject: [PATCH 2/9] Only forward close when mode is all --- .../Client/GRPCClientStreamHandler.swift | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 478059dcf..0eaeef3c4 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -173,7 +173,27 @@ extension GRPCClientStreamHandler { func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { switch mode { - case .output, .all: + case .input: + context.close(mode: .input, promise: promise) + + case .output: + // We flush all pending messages and update the internal state machine's + // state, but we don't close the outbound end of the channel, because + // forwarding the close in this case would cause the HTTP2 stream handler + // to close the whole channel (as the mode is ignored in its implementation). + do { + try self.stateMachine.closeOutbound() + // Force a flush by calling _flush instead of flush + // (otherwise, we'd skip flushing if we're in a read loop) + self._flush(context: context) + } catch { + promise?.fail(error) + context.fireErrorCaught(error) + } + + case .all: + // Since we're closing the whole channel here, we *do* forward the close + // down the pipeline. do { try self.stateMachine.closeOutbound() // Force a flush by calling _flush @@ -184,9 +204,6 @@ extension GRPCClientStreamHandler { promise?.fail(error) context.fireErrorCaught(error) } - - case .input: - context.close(mode: .input, promise: promise) } } From 062789b2386f417882c457b3d055bf3bd4bda87b Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 13:40:43 +0000 Subject: [PATCH 3/9] Don't use canonical form for grpcStatusMessage header --- Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift | 2 +- Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 580948800..07b4f9fb0 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -747,7 +747,7 @@ extension GRPCStreamStateMachine { } let statusMessage = - metadata.firstString(forKey: .grpcStatusMessage) + metadata.first(name: GRPCHTTP2Keys.grpcStatusMessage.rawValue) .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? "" var convertedMetadata = Metadata(headers: metadata) diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index cae705269..a886d0053 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -456,14 +456,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue), GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall( - "Some status message" + "Some, status, message" )!, "custom-key": "custom-value", ] let trailers = try stateMachine.receive(headers: trailersOnlyResponse, endStream: true) switch trailers { case .receivedStatusAndMetadata(let status, let metadata): - XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) + XCTAssertEqual(status, Status(code: .internalError, message: "Some, status, message")) XCTAssertEqual( metadata, [ From f72626d7f3d270e63f9bca6d985834fee09b84df Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:27:59 +0000 Subject: [PATCH 4/9] Stop leaking a promise --- Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 0eaeef3c4..707744550 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -186,6 +186,7 @@ extension GRPCClientStreamHandler { // Force a flush by calling _flush instead of flush // (otherwise, we'd skip flushing if we're in a read loop) self._flush(context: context) + promise?.succeed() } catch { promise?.fail(error) context.fireErrorCaught(error) From 06b0b8d8692cebffedc0ab4b3fd58eb1454755c4 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:28:16 +0000 Subject: [PATCH 5/9] Forward status instead of firing error --- Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 707744550..087a55ccc 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -69,10 +69,8 @@ extension GRPCClientStreamHandler { do { switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) { case .endRPCAndForwardErrorStatus(let status): - if let rpcError = RPCError(status: status) { - context.fireErrorCaught(rpcError) - } - return + context.fireChannelRead(self.wrapInboundOut(.status(status, [:]))) + context.close(promise: nil) case .readInbound: loop: while true { From 9ab5d06cc91193da11109bfb074eeba47aada41d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:28:35 +0000 Subject: [PATCH 6/9] Add canonicalForm param to headers' firstString --- Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 07b4f9fb0..4554f863e 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -747,7 +747,7 @@ extension GRPCStreamStateMachine { } let statusMessage = - metadata.first(name: GRPCHTTP2Keys.grpcStatusMessage.rawValue) + metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false) .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? "" var convertedMetadata = Metadata(headers: metadata) @@ -1495,10 +1495,11 @@ internal enum GRPCHTTP2Keys: String { } extension HPACKHeaders { - internal func firstString(forKey key: GRPCHTTP2Keys) -> String? { - self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map { - String($0) - } + internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? { + self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true }) + .map { + String($0) + } } internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) { From 4847f3cd3de1a4830b5e868c8ba7c786fd8d5cc3 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:47:43 +0000 Subject: [PATCH 7/9] Replace input close with user event fired --- Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 087a55ccc..0cc1994e5 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -172,7 +172,7 @@ extension GRPCClientStreamHandler { func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { switch mode { case .input: - context.close(mode: .input, promise: promise) + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) case .output: // We flush all pending messages and update the internal state machine's From 5c13077df7cdce043760ed383165862d63ed9778 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 14:47:48 +0000 Subject: [PATCH 8/9] Fix broken test --- .../Client/GRPCClientStreamHandlerTests.swift | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift index 3cdb5a01f..6e7d68acd 100644 --- a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift @@ -324,7 +324,7 @@ final class GRPCClientStreamHandlerTests: XCTestCase { XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) } - func testServerEndsStreamWhenSendingMessage_ResultsInErrorStatus() throws { + func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws { let handler = GRPCClientStreamHandler( methodDescriptor: .init(service: "test", method: "test"), scheme: .http, @@ -376,19 +376,20 @@ final class GRPCClientStreamHandlerTests: XCTestCase { buffer.writeInteger(UInt32(42)) // message length buffer.writeRepeatingByte(0, count: 42) // message let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true) - XCTAssertThrowsError( - ofType: RPCError.self, - try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)) - ) { error in - XCTAssertEqual(error.code, .internalError) - XCTAssertEqual( - error.message, - "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers." - ) - } + XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))) - // Make sure we didn't read the received message - XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) + // Make sure we got status + trailers with the right error. + XCTAssertEqual( + try channel.readInbound(as: RPCResponsePart.self), + .status( + Status( + code: .internalError, + message: + "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers." + ), + [:] + ) + ) } func testServerEndsStream() throws { From 25429278746bba409af08ff72f6d82561f0aa640 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 27 Mar 2024 15:22:15 +0000 Subject: [PATCH 9/9] Succeed leaked promise --- Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 0cc1994e5..18e36ab26 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -173,6 +173,7 @@ extension GRPCClientStreamHandler { switch mode { case .input: context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) + promise?.succeed() case .output: // We flush all pending messages and update the internal state machine's