From ee6fe73d940f3877703511d3c6ef8482722a1fa0 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 20 Mar 2024 16:51:59 +0000 Subject: [PATCH] Rename GRPCStreamStateMachine receive methods --- .../GRPCStreamStateMachine.swift | 66 +++--- .../Server/GRPCServerStreamHandler.swift | 4 +- .../GRPCStreamStateMachineTests.swift | 202 +++++++++--------- 3 files changed, 136 insertions(+), 136 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index c3eda1363..8abd927f8 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -367,25 +367,25 @@ struct GRPCStreamStateMachine { case rejectRPC(trailers: HPACKHeaders) } - mutating func receive(metadata: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived { + mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived { switch self.configuration { case .client: - return try self.clientReceive(metadata: metadata, endStream: endStream) + return try self.clientReceive(headers: headers, endStream: endStream) case .server(let serverConfiguration): return try self.serverReceive( - metadata: metadata, + headers: headers, endStream: endStream, configuration: serverConfiguration ) } } - mutating func receive(message: ByteBuffer, endStream: Bool) throws { + mutating func receive(buffer: ByteBuffer, endStream: Bool) throws { switch self.configuration { case .client: - try self.clientReceive(bytes: message, endStream: endStream) + try self.clientReceive(buffer: buffer, endStream: endStream) case .server: - try self.serverReceive(bytes: message, endStream: endStream) + try self.serverReceive(buffer: buffer, endStream: endStream) } } @@ -722,12 +722,12 @@ extension GRPCStreamStateMachine { } private mutating func clientReceive( - metadata: HPACKHeaders, + headers: HPACKHeaders, endStream: Bool ) throws -> OnMetadataReceived { switch self.state { case .clientOpenServerIdle(let state): - switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) { + switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) { case (.invalid(let action), true): // The headers are invalid, but the server signalled that it was // closing the stream, so close both client and server. @@ -739,9 +739,9 @@ extension GRPCStreamStateMachine { case (.valid, true): // This is a trailers-only response: close server. self.state = .clientOpenServerClosed(.init(previousState: state)) - return try self.validateAndReturnStatusAndMetadata(metadata) + return try self.validateAndReturnStatusAndMetadata(headers) case (.valid, false): - switch self.processInboundEncoding(metadata) { + switch self.processInboundEncoding(headers) { case .error(let failure): return failure case .success(let inboundEncoding): @@ -759,7 +759,7 @@ extension GRPCStreamStateMachine { decompressor: decompressor ) ) - return .receivedMetadata(Metadata(headers: metadata)) + return .receivedMetadata(Metadata(headers: headers)) } } @@ -772,10 +772,10 @@ extension GRPCStreamStateMachine { if endStream { self.state = .clientOpenServerClosed(.init(previousState: state)) } - return try self.validateAndReturnStatusAndMetadata(metadata) + return try self.validateAndReturnStatusAndMetadata(headers) case .clientClosedServerIdle(let state): - switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) { + switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) { case (.invalid(let action), true): // The headers are invalid, but the server signalled that it was // closing the stream, so close the server side too. @@ -787,9 +787,9 @@ extension GRPCStreamStateMachine { case (.valid, true): // This is a trailers-only response: close server. self.state = .clientClosedServerClosed(.init(previousState: state)) - return try self.validateAndReturnStatusAndMetadata(metadata) + return try self.validateAndReturnStatusAndMetadata(headers) case (.valid, false): - switch self.processInboundEncoding(metadata) { + switch self.processInboundEncoding(headers) { case .error(let failure): return failure case .success(let inboundEncoding): @@ -799,7 +799,7 @@ extension GRPCStreamStateMachine { decompressionAlgorithm: inboundEncoding ) ) - return .receivedMetadata(Metadata(headers: metadata)) + return .receivedMetadata(Metadata(headers: headers)) } } @@ -812,7 +812,7 @@ extension GRPCStreamStateMachine { if endStream { self.state = .clientClosedServerClosed(.init(previousState: state)) } - return try self.validateAndReturnStatusAndMetadata(metadata) + return try self.validateAndReturnStatusAndMetadata(headers) case .clientClosedServerClosed: // We could end up here if we received a grpc-status header in a previous @@ -821,7 +821,7 @@ extension GRPCStreamStateMachine { // We wouldn't want to throw in that scenario, so we just ignore it. // Note that we don't want to ignore it if EOS is not set here though, as // then it would be an invalid payload. - if !endStream || metadata.count > 0 { + if !endStream || headers.count > 0 { try self.invalidState( "Server is closed, nothing could have been sent." ) @@ -838,7 +838,7 @@ extension GRPCStreamStateMachine { } } - private mutating func clientReceive(bytes: ByteBuffer, endStream: Bool) throws { + private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws { // This is a message received by the client, from the server. switch self.state { case .clientIdleServerIdle: @@ -850,7 +850,7 @@ extension GRPCStreamStateMachine { "Server cannot have sent a message before sending the initial metadata." ) case .clientOpenServerOpen(var state): - try state.deframer.process(buffer: bytes) { deframedMessage in + try state.deframer.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } if endStream { @@ -862,7 +862,7 @@ extension GRPCStreamStateMachine { // The client may have sent the end stream and thus it's closed, // but the server may still be responding. // The client must have a deframer set up, so force-unwrap is okay. - try state.deframer!.process(buffer: bytes) { deframedMessage in + try state.deframer!.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } if endStream { @@ -1106,13 +1106,13 @@ extension GRPCStreamStateMachine { } private mutating func serverReceive( - metadata: HPACKHeaders, + headers: HPACKHeaders, endStream: Bool, configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration ) throws -> OnMetadataReceived { switch self.state { case .clientIdleServerIdle(let state): - let contentType = metadata.firstString(forKey: .contentType) + let contentType = headers.firstString(forKey: .contentType) .flatMap { ContentType(value: $0) } if contentType == nil { self.state = .clientOpenServerClosed(.init(previousState: state)) @@ -1123,7 +1123,7 @@ extension GRPCStreamStateMachine { return .rejectRPC(trailers: trailers) } - let path = metadata.firstString(forKey: .path) + let path = headers.firstString(forKey: .path) .flatMap { MethodDescriptor(fullyQualifiedMethod: $0) } if path == nil { return self.closeServerAndBuildRejectRPCAction( @@ -1136,7 +1136,7 @@ extension GRPCStreamStateMachine { ) } - let scheme = metadata.firstString(forKey: .scheme) + let scheme = headers.firstString(forKey: .scheme) .flatMap { Scheme(rawValue: $0) } if scheme == nil { return self.closeServerAndBuildRejectRPCAction( @@ -1149,7 +1149,7 @@ extension GRPCStreamStateMachine { ) } - guard let method = metadata.firstString(forKey: .method), method == "POST" else { + guard let method = headers.firstString(forKey: .method), method == "POST" else { return self.closeServerAndBuildRejectRPCAction( currentState: state, endStream: endStream, @@ -1160,7 +1160,7 @@ extension GRPCStreamStateMachine { ) } - guard let te = metadata.firstString(forKey: .te), te == "trailers" else { + guard let te = headers.firstString(forKey: .te), te == "trailers" else { return self.closeServerAndBuildRejectRPCAction( currentState: state, endStream: endStream, @@ -1178,7 +1178,7 @@ extension GRPCStreamStateMachine { // Firstly, find out if we support the client's chosen encoding, and reject // the RPC if we don't. let inboundEncoding: CompressionAlgorithm - let encodingValues = metadata.values( + let encodingValues = headers.values( forHeader: GRPCHTTP2Keys.encoding.rawValue, canonicalForm: true ) @@ -1236,7 +1236,7 @@ extension GRPCStreamStateMachine { // Secondly, find a compatible encoding the server can use to compress outbound messages, // based on the encodings the client has advertised. var outboundEncoding: CompressionAlgorithm = .identity - let clientAdvertisedEncodings = metadata.values( + let clientAdvertisedEncodings = headers.values( forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue, canonicalForm: true ) @@ -1280,7 +1280,7 @@ extension GRPCStreamStateMachine { ) } - return .receivedMetadata(Metadata(headers: metadata)) + return .receivedMetadata(Metadata(headers: headers)) case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed: try self.invalidState( "Client shouldn't have sent metadata twice." @@ -1292,7 +1292,7 @@ extension GRPCStreamStateMachine { } } - private mutating func serverReceive(bytes: ByteBuffer, endStream: Bool) throws { + private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws { switch self.state { case .clientIdleServerIdle: try self.invalidState( @@ -1301,7 +1301,7 @@ extension GRPCStreamStateMachine { case .clientOpenServerIdle(var state): // Deframer must be present on the server side, as we know the decompression // algorithm from the moment the client opens. - try state.deframer!.process(buffer: bytes) { deframedMessage in + try state.deframer!.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } @@ -1311,7 +1311,7 @@ extension GRPCStreamStateMachine { self.state = .clientOpenServerIdle(state) } case .clientOpenServerOpen(var state): - try state.deframer.process(buffer: bytes) { deframedMessage in + try state.deframer.process(buffer: buffer) { deframedMessage in state.inboundMessageBuffer.append(deframedMessage) } diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index c4da0d4ed..e7317df5a 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -63,7 +63,7 @@ extension GRPCServerStreamHandler { switch frameData.data { case .byteBuffer(let buffer): do { - try self.stateMachine.receive(message: buffer, endStream: endStream) + try self.stateMachine.receive(buffer: buffer, endStream: endStream) loop: while true { switch self.stateMachine.nextInboundMessage() { case .receiveMessage(let message): @@ -86,7 +86,7 @@ extension GRPCServerStreamHandler { case .headers(let headers): do { let action = try self.stateMachine.receive( - metadata: headers.headers, + headers: headers.headers, endStream: headers.endStream ) switch action { diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 53470c9aa..01e329420 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -158,14 +158,14 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server - XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) case .clientOpenServerClosed: // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server - XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) // Close server - XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) case .clientClosedServerIdle: // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) @@ -175,18 +175,18 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server - XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) // Close client XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) case .clientClosedServerClosed: // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server - XCTAssertNoThrow(try stateMachine.receive(metadata: serverMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) // Close client XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) // Close server - XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) } return stateMachine @@ -301,7 +301,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .init(), endStream: false) + try stateMachine.receive(headers: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.") @@ -316,7 +316,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Receive metadata with unexpected non-200 status code let action = try stateMachine.receive( - metadata: [GRPCHTTP2Keys.status.rawValue: "300"], + headers: [GRPCHTTP2Keys.status.rawValue: "300"], endStream: false ) @@ -338,7 +338,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Receive metadata = open server let action = try stateMachine.receive( - metadata: [ + headers: [ GRPCHTTP2Keys.status.rawValue: "200", GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, GRPCHTTP2Keys.encoding.rawValue: "deflate", @@ -369,7 +369,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, try stateMachine.receive( - metadata: [ + headers: [ GRPCHTTP2Keys.status.rawValue: "200", GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, GRPCHTTP2Keys.encoding.rawValue: "deflate", @@ -388,7 +388,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Now make sure everything works well if we include grpc-status let action = try stateMachine.receive( - metadata: [ + headers: [ GRPCHTTP2Keys.status.rawValue: "200", GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue), GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, @@ -423,7 +423,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .init(), endStream: false) + try stateMachine.receive(headers: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.") @@ -439,7 +439,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Receive an end trailer XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .init(), endStream: true) + try stateMachine.receive(headers: .init(), endStream: true) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server cannot have sent metadata if the client is idle.") @@ -459,7 +459,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { )!, "custom-key": "custom-value", ] - let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true) + let trailers = try stateMachine.receive(headers: trailersOnlyResponse, endStream: true) switch trailers { case .receivedStatusAndMetadata(let status, let metadata): XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) @@ -482,7 +482,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Receive an end trailer let action = try stateMachine.receive( - metadata: [ + headers: [ GRPCHTTP2Keys.status.rawValue: "200", GRPCHTTP2Keys.grpcStatus.rawValue: String(Status.Code.ok.rawValue), GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue, @@ -514,7 +514,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Receive another end trailer XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .init(), endStream: true) + try stateMachine.receive(headers: .init(), endStream: true) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server is closed, nothing could have been sent.") @@ -534,7 +534,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { )!, "custom-key": "custom-value", ] - let trailers = try stateMachine.receive(metadata: trailersOnlyResponse, endStream: true) + let trailers = try stateMachine.receive(headers: trailersOnlyResponse, endStream: true) switch trailers { case .receivedStatusAndMetadata(let status, let metadata): XCTAssertEqual(status, Status(code: .internalError, message: "Some status message")) @@ -557,7 +557,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Close server again (endStream = true) and assert we don't throw. // This can happen if the previous close was caused by a grpc-status header // and then the server sends an empty frame with EOS set. - XCTAssertEqual(try stateMachine.receive(metadata: .init(), endStream: true), .doNothing) + XCTAssertEqual(try stateMachine.receive(headers: .init(), endStream: true), .doNothing) } // - MARK: Receive message @@ -567,7 +567,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -583,7 +583,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -598,8 +598,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { for targetState in [TargetStateMachineState.clientOpenServerOpen, .clientClosedServerOpen] { var stateMachine = self.makeClientStateMachine(targetState: targetState) - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) } } @@ -609,7 +609,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Cannot have received anything from a closed server.") @@ -744,7 +744,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) // Close server - XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) // Close client XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) @@ -773,7 +773,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -787,7 +787,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -801,10 +801,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close server - XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .noMoreMessages) @@ -818,7 +818,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close client XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) @@ -837,10 +837,10 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close server - XCTAssertNoThrow(try stateMachine.receive(metadata: .serverTrailers, endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) // Close client XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) @@ -872,7 +872,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( - metadata: .serverInitialMetadata, + headers: .serverInitialMetadata, endStream: false ) XCTAssertEqual( @@ -900,8 +900,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(message: firstResponse, endStream: false) - try stateMachine.receive(message: secondResponse, endStream: false) + try stateMachine.receive(buffer: firstResponse, endStream: false) + try stateMachine.receive(buffer: secondResponse, endStream: false) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -913,7 +913,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server ends let metadataReceivedAction = try stateMachine.receive( - metadata: .serverTrailers, + headers: .serverTrailers, endStream: true ) let receivedMetadata = { @@ -959,7 +959,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( - metadata: .serverInitialMetadata, + headers: .serverInitialMetadata, endStream: false ) XCTAssertEqual( @@ -978,8 +978,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(message: firstResponse, endStream: false) - try stateMachine.receive(message: secondResponse, endStream: false) + try stateMachine.receive(buffer: firstResponse, endStream: false) + try stateMachine.receive(buffer: secondResponse, endStream: false) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -988,7 +988,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server ends let metadataReceivedAction = try stateMachine.receive( - metadata: .serverTrailers, + headers: .serverTrailers, endStream: true ) let receivedMetadata = { @@ -1034,7 +1034,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server sends initial metadata let serverInitialHeadersAction = try stateMachine.receive( - metadata: .serverInitialMetadata, + headers: .serverInitialMetadata, endStream: false ) XCTAssertEqual( @@ -1056,8 +1056,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let firstResponse = try self.frameMessage(firstResponseBytes, compress: false) let secondResponseBytes = [UInt8]([8, 9, 10]) let secondResponse = try self.frameMessage(secondResponseBytes, compress: false) - try stateMachine.receive(message: firstResponse, endStream: false) - try stateMachine.receive(message: secondResponse, endStream: false) + try stateMachine.receive(buffer: firstResponse, endStream: false) + try stateMachine.receive(buffer: secondResponse, endStream: false) // Make sure messages have arrived XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(firstResponseBytes)) @@ -1066,7 +1066,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Server ends let metadataReceivedAction = try stateMachine.receive( - metadata: .serverTrailers, + headers: .serverTrailers, endStream: true ) let receivedMetadata = { @@ -1110,15 +1110,15 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { break case .clientOpenServerIdle: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) case .clientOpenServerOpen: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) // Open server XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) case .clientOpenServerClosed: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) // Open server XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) // Close server @@ -1130,23 +1130,23 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) case .clientClosedServerIdle: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) case .clientClosedServerOpen: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) // Open server XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) case .clientClosedServerClosed: // Open client - XCTAssertNoThrow(try stateMachine.receive(metadata: clientMetadata, endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(headers: clientMetadata, endStream: false)) // Open server XCTAssertNoThrow(try stateMachine.send(metadata: Metadata(headers: .serverInitialMetadata))) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Close server XCTAssertNoThrow( try stateMachine.send( @@ -1475,7 +1475,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { func testReceiveMetadataWhenClientIdleAndServerIdle() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) - let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + let action = try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) XCTAssertEqual( action, .receivedMetadata(Metadata(headers: .clientInitialMetadata)) @@ -1485,7 +1485,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { func testReceiveMetadataWhenClientIdleAndServerIdle_WithEndStream() throws { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) - let action = try stateMachine.receive(metadata: .clientInitialMetadata, endStream: true) + let action = try stateMachine.receive(headers: .clientInitialMetadata, endStream: true) XCTAssertEqual( action, .receivedMetadata(Metadata(headers: .clientInitialMetadata)) @@ -1496,7 +1496,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithoutContentType, + headers: .receivedWithoutContentType, endStream: false ) @@ -1510,7 +1510,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithInvalidContentType, + headers: .receivedWithInvalidContentType, endStream: false ) @@ -1524,7 +1524,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithoutEndpoint, + headers: .receivedWithoutEndpoint, endStream: false ) @@ -1545,7 +1545,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithoutTE, + headers: .receivedWithoutTE, endStream: false ) @@ -1567,7 +1567,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithInvalidTE, + headers: .receivedWithInvalidTE, endStream: false ) @@ -1589,7 +1589,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithoutMethod, + headers: .receivedWithoutMethod, endStream: false ) @@ -1611,7 +1611,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithInvalidMethod, + headers: .receivedWithInvalidMethod, endStream: false ) @@ -1633,7 +1633,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithoutScheme, + headers: .receivedWithoutScheme, endStream: false ) @@ -1654,7 +1654,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientIdleServerIdle) let action = try stateMachine.receive( - metadata: .receivedWithInvalidScheme, + headers: .receivedWithInvalidScheme, endStream: false ) @@ -1677,7 +1677,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try opening client with a compression algorithm that is not accepted // by the server. let action = try stateMachine.receive( - metadata: .clientInitialMetadataWithGzipCompression, + headers: .clientInitialMetadataWithGzipCompression, endStream: false ) @@ -1705,7 +1705,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try receiving initial metadata again - should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") @@ -1717,7 +1717,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") @@ -1729,7 +1729,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client shouldn't have sent metadata twice.") @@ -1741,7 +1741,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") @@ -1753,7 +1753,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") @@ -1765,7 +1765,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(metadata: .clientInitialMetadata, endStream: false) + try stateMachine.receive(headers: .clientInitialMetadata, endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't have sent metadata if closed.") @@ -1779,7 +1779,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Can't have received a message if client is idle.") @@ -1790,13 +1790,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle) // Receive messages successfully: the second one should close client. - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Verify client is now closed XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't send a message if closed.") @@ -1807,13 +1807,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Receive messages successfully: the second one should close client. - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Verify client is now closed XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't send a message if closed.") @@ -1824,7 +1824,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed) // Client is not done sending request, don't fail. - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: false)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: false)) } func testReceiveMessageWhenClientClosedAndServerIdle() { @@ -1832,7 +1832,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't send a message if closed.") @@ -1844,7 +1844,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't send a message if closed.") @@ -1856,7 +1856,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.receive(message: .init(), endStream: false) + try stateMachine.receive(buffer: .init(), endStream: false) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client can't send a message if closed.") @@ -1979,7 +1979,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Send another message XCTAssertNoThrow(try stateMachine.send(message: [43, 43], endStream: false)) @@ -2048,7 +2048,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage([42, 42])) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2063,7 +2063,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let originalMessage = [UInt8]([42, 42, 43, 43]) let receivedBytes = try self.frameMessage(originalMessage, compress: true) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(originalMessage)) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -2077,7 +2077,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close server XCTAssertNoThrow( @@ -2104,10 +2104,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Even though the client is closed, because the server received a message // while it was still open, we must get the message now. @@ -2123,7 +2123,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { 0, 0, 0, 2, // message length: 2 bytes 42, 42, // original message ]) - try stateMachine.receive(message: receivedBytes, endStream: false) + try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close server XCTAssertNoThrow( @@ -2134,7 +2134,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Close client - XCTAssertNoThrow(try stateMachine.receive(message: .init(), endStream: true)) + XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Even though the client and server are closed, because the server received // a message while the client was still open, we must get the message now. @@ -2149,7 +2149,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Client sends metadata let receiveMetadataAction = try stateMachine.receive( - metadata: .clientInitialMetadata, + headers: .clientInitialMetadata, endStream: false ) XCTAssertEqual( @@ -2176,9 +2176,9 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(message: firstMessage, endStream: false) + try stateMachine.receive(buffer: firstMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(message: secondMessage, endStream: false) + try stateMachine.receive(buffer: secondMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends response @@ -2193,7 +2193,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessages)) // Client sends end - try stateMachine.receive(message: ByteBuffer(), endStream: true) + try stateMachine.receive(buffer: ByteBuffer(), endStream: true) // Server ends let response = try stateMachine.send( @@ -2211,7 +2211,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Client sends metadata let receiveMetadataAction = try stateMachine.receive( - metadata: .clientInitialMetadata, + headers: .clientInitialMetadata, endStream: false ) XCTAssertEqual( @@ -2226,13 +2226,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(message: firstMessage, endStream: false) + try stateMachine.receive(buffer: firstMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(message: secondMessage, endStream: false) + try stateMachine.receive(buffer: secondMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Client sends end - try stateMachine.receive(message: ByteBuffer(), endStream: true) + try stateMachine.receive(buffer: ByteBuffer(), endStream: true) // Server sends initial metadata let sentInitialHeaders = try stateMachine.send(metadata: Metadata(headers: ["custom": "value"])) @@ -2273,7 +2273,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Client sends metadata let receiveMetadataAction = try stateMachine.receive( - metadata: .clientInitialMetadata, + headers: .clientInitialMetadata, endStream: false ) XCTAssertEqual( @@ -2288,9 +2288,9 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstMessage = completeMessage.getSlice(at: 0, length: 4)! let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)! - try stateMachine.receive(message: firstMessage, endStream: false) + try stateMachine.receive(buffer: firstMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) - try stateMachine.receive(message: secondMessage, endStream: false) + try stateMachine.receive(buffer: secondMessage, endStream: false) XCTAssertEqual(stateMachine.nextInboundMessage(), .receiveMessage(deframedMessage)) // Server sends initial metadata @@ -2306,7 +2306,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { ) // Client sends end - try stateMachine.receive(message: ByteBuffer(), endStream: true) + try stateMachine.receive(buffer: ByteBuffer(), endStream: true) // Server sends response let firstResponse = [UInt8]([5, 6, 7])