diff --git a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift index 5be8f1025..18e36ab26 100644 --- a/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift @@ -67,16 +67,25 @@ extension GRPCClientStreamHandler { switch frameData.data { case .byteBuffer(let buffer): do { - try self.stateMachine.receive(buffer: buffer, endStream: endStream) - loop: while true { - switch self.stateMachine.nextInboundMessage() { - case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) - case .awaitMoreMessages: - break loop - case .noMoreMessages: - context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) - break loop + switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) { + case .endRPCAndForwardErrorStatus(let status): + context.fireChannelRead(self.wrapInboundOut(.status(status, [:]))) + context.close(promise: nil) + + case .readInbound: + loop: while true { + switch self.stateMachine.nextInboundMessage() { + case .receiveMessage(let message): + context.fireChannelRead(self.wrapInboundOut(.message(message))) + case .awaitMoreMessages: + break loop + case .noMoreMessages: + // This could only happen if the server sends a data frame with EOS + // set, without sending status and trailers. + // If this happens, we should have forwarded an error status above + // so we should never reach this point. Do nothing. + break loop + } } } } catch { @@ -105,6 +114,7 @@ extension GRPCClientStreamHandler { case .receivedStatusAndMetadata(let status, let metadata): context.fireChannelRead(self.wrapInboundOut(.status(status, metadata))) + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) case .doNothing: () @@ -161,7 +171,29 @@ extension GRPCClientStreamHandler { func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { switch mode { - case .output, .all: + case .input: + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) + promise?.succeed() + + case .output: + // We flush all pending messages and update the internal state machine's + // state, but we don't close the outbound end of the channel, because + // forwarding the close in this case would cause the HTTP2 stream handler + // to close the whole channel (as the mode is ignored in its implementation). + do { + try self.stateMachine.closeOutbound() + // Force a flush by calling _flush instead of flush + // (otherwise, we'd skip flushing if we're in a read loop) + self._flush(context: context) + promise?.succeed() + } catch { + promise?.fail(error) + context.fireErrorCaught(error) + } + + case .all: + // Since we're closing the whole channel here, we *do* forward the close + // down the pipeline. do { try self.stateMachine.closeOutbound() // Force a flush by calling _flush @@ -172,9 +204,6 @@ extension GRPCClientStreamHandler { promise?.fail(error) context.fireErrorCaught(error) } - - case .input: - context.close(mode: .input, promise: promise) } } diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 1c8eca27f..4554f863e 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -292,6 +292,12 @@ private enum GRPCStreamStateMachineState { self.inboundMessageBuffer = previousState.inboundMessageBuffer } + init(previousState: ClientOpenServerOpenState) { + self.framer = previousState.framer + self.compressor = previousState.compressor + self.inboundMessageBuffer = previousState.inboundMessageBuffer + } + init(previousState: ClientOpenServerClosedState) { self.framer = previousState.framer self.compressor = previousState.compressor @@ -388,12 +394,24 @@ struct GRPCStreamStateMachine { } } - mutating func receive(buffer: ByteBuffer, endStream: Bool) throws { + enum OnBufferReceivedAction: Equatable { + case readInbound + + // Client-specific actions + + // This will be returned when the server sends a data frame with EOS set. + // This is invalid as per the protocol specification, because the server + // can only close by sending trailers, not by setting EOS when sending + // a message. + case endRPCAndForwardErrorStatus(Status) + } + + mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction { switch self.configuration { case .client: - try self.clientReceive(buffer: buffer, endStream: endStream) + return try self.clientReceive(buffer: buffer, endStream: endStream) case .server: - try self.serverReceive(buffer: buffer, endStream: endStream) + return try self.serverReceive(buffer: buffer, endStream: endStream) } } @@ -729,7 +747,7 @@ extension GRPCStreamStateMachine { } let statusMessage = - metadata.firstString(forKey: .grpcStatusMessage) + metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false) .map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? "" var convertedMetadata = Metadata(headers: metadata) @@ -860,38 +878,68 @@ extension GRPCStreamStateMachine { } } - private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws { + private mutating func clientReceive( + buffer: ByteBuffer, + endStream: Bool + ) throws -> OnBufferReceivedAction { // This is a message received by the client, from the server. switch self.state { case .clientIdleServerIdle: try self.invalidState( "Cannot have received anything from server if client is not yet open." ) + case .clientOpenServerIdle, .clientClosedServerIdle: try self.invalidState( "Server cannot have sent a message before sending the initial metadata." ) + case .clientOpenServerOpen(var state): + if endStream { + // This is invalid as per the protocol specification, because the server + // can only close by sending trailers, not by setting EOS when sending + // a message. + self.state = .clientClosedServerClosed(.init(previousState: state)) + return .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) + } + try state.deframer.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } + self.state = .clientOpenServerOpen(state) + return .readInbound + + case .clientClosedServerOpen(var state): if endStream { - self.state = .clientOpenServerClosed(.init(previousState: state)) - } else { - self.state = .clientOpenServerOpen(state) + self.state = .clientClosedServerClosed(.init(previousState: state)) + return .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) } - case .clientClosedServerOpen(var state): + // The client may have sent the end stream and thus it's closed, // but the server may still be responding. // The client must have a deframer set up, so force-unwrap is okay. try state.deframer!.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } - if endStream { - self.state = .clientClosedServerClosed(.init(previousState: state)) - } else { - self.state = .clientClosedServerOpen(state) - } + self.state = .clientClosedServerOpen(state) + return .readInbound + case .clientOpenServerClosed, .clientClosedServerClosed: try self.invalidState( "Cannot have received anything from a closed server." @@ -1314,7 +1362,10 @@ extension GRPCStreamStateMachine { } } - private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws { + private mutating func serverReceive( + buffer: ByteBuffer, + endStream: Bool + ) throws -> OnBufferReceivedAction { switch self.state { case .clientIdleServerIdle: try self.invalidState( @@ -1354,6 +1405,7 @@ extension GRPCStreamStateMachine { "Client can't send a message if closed." ) } + return .readInbound } private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame { @@ -1443,10 +1495,11 @@ internal enum GRPCHTTP2Keys: String { } extension HPACKHeaders { - internal func firstString(forKey key: GRPCHTTP2Keys) -> String? { - self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map { - String($0) - } + internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? { + self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true }) + .map { + String($0) + } } internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) { diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index 14c576c20..3624679e9 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -64,16 +64,22 @@ extension GRPCServerStreamHandler { switch frameData.data { case .byteBuffer(let buffer): do { - try self.stateMachine.receive(buffer: buffer, endStream: endStream) - loop: while true { - switch self.stateMachine.nextInboundMessage() { - case .receiveMessage(let message): - context.fireChannelRead(self.wrapInboundOut(.message(message))) - case .awaitMoreMessages: - break loop - case .noMoreMessages: - context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) - break loop + switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) { + case .endRPCAndForwardErrorStatus: + preconditionFailure( + "OnBufferReceivedAction.endRPCAndForwardErrorStatus should never be returned for the server." + ) + case .readInbound: + loop: while true { + switch self.stateMachine.nextInboundMessage() { + case .receiveMessage(let message): + context.fireChannelRead(self.wrapInboundOut(.message(message))) + case .awaitMoreMessages: + break loop + case .noMoreMessages: + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) + break loop + } } } } catch { diff --git a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift index 3b0d461b1..6e7d68acd 100644 --- a/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift +++ b/Tests/GRPCHTTP2CoreTests/Client/GRPCClientStreamHandlerTests.swift @@ -305,7 +305,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase { buffer.writeInteger(UInt8(0)) // not compressed buffer.writeInteger(UInt32(42)) // message length buffer.writeRepeatingByte(0, count: 42) // message - let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true) + let clientDataPayload = HTTP2Frame.FramePayload.Data( + data: .byteBuffer(buffer), + endStream: false + ) XCTAssertThrowsError( ofType: RPCError.self, try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)) @@ -321,6 +324,74 @@ final class GRPCClientStreamHandlerTests: XCTestCase { XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self)) } + func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws { + let handler = GRPCClientStreamHandler( + methodDescriptor: .init(service: "test", method: "test"), + scheme: .http, + outboundEncoding: .identity, + acceptedEncodings: [], + maximumPayloadSize: 100, + skipStateMachineAssertions: true + ) + + let channel = EmbeddedChannel(handler: handler) + + // Send client's initial metadata + XCTAssertNoThrow( + try channel.writeOutbound(RPCRequestPart.metadata(Metadata())) + ) + + // Make sure we have sent right metadata. + let writtenMetadata = try channel.assertReadHeadersOutbound() + + XCTAssertEqual( + writtenMetadata.headers, + [ + GRPCHTTP2Keys.method.rawValue: "POST", + GRPCHTTP2Keys.scheme.rawValue: "http", + GRPCHTTP2Keys.path.rawValue: "test/test", + GRPCHTTP2Keys.contentType.rawValue: "application/grpc", + GRPCHTTP2Keys.te.rawValue: "trailers", + ] + ) + + // Server sends initial metadata + let serverInitialMetadata: HPACKHeaders = [ + GRPCHTTP2Keys.status.rawValue: "200", + GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, + ] + XCTAssertNoThrow( + try channel.writeInbound( + HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata)) + ) + ) + XCTAssertEqual( + try channel.readInbound(as: RPCResponsePart.self), + .metadata(Metadata(headers: serverInitialMetadata)) + ) + + // Server sends message with EOS set. + var buffer = ByteBuffer() + buffer.writeInteger(UInt8(0)) // not compressed + buffer.writeInteger(UInt32(42)) // message length + buffer.writeRepeatingByte(0, count: 42) // message + let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true) + XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))) + + // Make sure we got status + trailers with the right error. + XCTAssertEqual( + try channel.readInbound(as: RPCResponsePart.self), + .status( + Status( + code: .internalError, + message: + "Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers." + ), + [:] + ) + ) + } + func testServerEndsStream() throws { let handler = GRPCClientStreamHandler( methodDescriptor: .init(service: "test", method: "test"), diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 324d47fdb..a886d0053 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -456,14 +456,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.internalError.rawValue), GRPCHTTP2Keys.grpcStatusMessage.rawValue: GRPCStatusMessageMarshaller.marshall( - "Some status message" + "Some, status, message" )!, "custom-key": "custom-value", ] let trailers = try stateMachine.receive(headers: trailersOnlyResponse, endStream: true) switch trailers { case .receivedStatusAndMetadata(let status, let metadata): - XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) + XCTAssertEqual(status, Status(code: .internalError, message: "Some, status, message")) XCTAssertEqual( metadata, [ @@ -599,8 +599,22 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] { var stateMachine = self.makeClientStateMachine(targetState: targetState) - XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) - XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) + XCTAssertEqual( + try stateMachine.receive(buffer: .init(), endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: .init(), endStream: true), + .endRPCAndForwardErrorStatus( + Status( + code: .internalError, + message: """ + Server sent EOS alongside a data frame, but server is only allowed \ + to close by sending status and trailers. + """ + ) + ) + ) } } @@ -776,7 +790,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -790,7 +807,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -804,7 +824,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -821,7 +844,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close client XCTAssertNoThrow(try stateMachine.closeOutbound()) @@ -840,7 +866,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) @@ -906,8 +935,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -988,8 +1023,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -1069,8 +1110,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(buffer: firstResponse, endStream: false) - try stateMachine.receive(buffer: secondResponse, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstResponse, endStream: false), + .readInbound + ) + XCTAssertEqual( + try stateMachine.receive(buffer: secondResponse, endStream: false), + .readInbound + ) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -2061,7 +2108,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2076,7 +2126,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2090,7 +2143,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow( @@ -2117,7 +2173,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) @@ -2136,7 +2195,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(buffer: receivedBytes, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: receivedBytes, endStream: false), + .readInbound + ) // Close server XCTAssertNoThrow( @@ -2189,9 +2251,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends response @@ -2226,7 +2294,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { try secondPromise.futureResult.assertSuccess().wait() // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server ends let response = try stateMachine.send( @@ -2259,13 +2330,22 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server sends initial metadata let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) @@ -2324,9 +2404,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(buffer: firstMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: firstMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(buffer: secondMessage, endStream: false) + XCTAssertEqual( + try stateMachine.receive(buffer: secondMessage, endStream: false), + .readInbound + ) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends initial metadata @@ -2342,7 +2428,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends end - try stateMachine.receive(buffer: ByteBuffer(), endStream: true) + XCTAssertEqual( + try stateMachine.receive(buffer: ByteBuffer(), endStream: true), + .readInbound + ) // Server sends response let firstResponse = [UInt8]([5, 6, 7])