From 5e29894f8e13016844e77ebc0fba77af497532ea Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 21 Mar 2024 21:06:06 +0000 Subject: [PATCH] Add a closeOutbound func to GRPCStreamStateMachine and remove endStream param from send(message:) --- .../GRPCStreamStateMachine.swift | 57 +++++++----- .../Server/GRPCServerStreamHandler.swift | 2 +- .../GRPCStreamStateMachineTests.swift | 93 ++++++++++--------- 3 files changed, 82 insertions(+), 70 deletions(-) diff --git a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift index 8abd927f8..eb37b76ff 100644 --- a/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift +++ b/Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift @@ -325,20 +325,24 @@ struct GRPCStreamStateMachine { } } - mutating func send(message: [UInt8], endStream: Bool) throws { + mutating func send(message: [UInt8]) throws { switch self.configuration { case .client: - try self.clientSend(message: message, endStream: endStream) + try self.clientSend(message: message) case .server: - if endStream { - try self.invalidState( - "Can't end response stream by sending a message - send(status:metadata:) must be called" - ) - } try self.serverSend(message: message) } } + mutating func closeOutbound() throws { + switch self.configuration { + case .client: + try self.clientCloseOutbound() + case .server: + try self.invalidState("Server cannot call close: it must send status and trailers.") + } + } + mutating func send( status: Status, metadata: Metadata @@ -532,31 +536,36 @@ extension GRPCStreamStateMachine { } } - private mutating func clientSend(message: [UInt8], endStream: Bool) throws { - // Client sends message. + private mutating func clientSend(message: [UInt8]) throws { switch self.state { case .clientIdleServerIdle: try self.invalidState("Client not yet open.") case .clientOpenServerIdle(var state): state.framer.append(message) - if endStream { - self.state = .clientClosedServerIdle(.init(previousState: state)) - } else { - self.state = .clientOpenServerIdle(state) - } + self.state = .clientOpenServerIdle(state) case .clientOpenServerOpen(var state): state.framer.append(message) - if endStream { - self.state = .clientClosedServerOpen(.init(previousState: state)) - } else { - self.state = .clientOpenServerOpen(state) - } - case .clientOpenServerClosed(let state): + self.state = .clientOpenServerOpen(state) + case .clientOpenServerClosed: // The server has closed, so it makes no sense to send the rest of the request. - // However, do close if endStream is set. - if endStream { - self.state = .clientClosedServerClosed(.init(previousState: state)) - } + () + case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: + try self.invalidState( + "Client is closed, cannot send a message." + ) + } + } + + private mutating func clientCloseOutbound() throws { + switch self.state { + case .clientIdleServerIdle: + try self.invalidState("Client not yet open.") + case .clientOpenServerIdle(let state): + self.state = .clientClosedServerIdle(.init(previousState: state)) + case .clientOpenServerOpen(let state): + self.state = .clientClosedServerOpen(.init(previousState: state)) + case .clientOpenServerClosed(let state): + self.state = .clientClosedServerClosed(.init(previousState: state)) case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed: try self.invalidState( "Client is closed, cannot send a message." diff --git a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift index e7317df5a..22cd50330 100644 --- a/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift +++ b/Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift @@ -153,7 +153,7 @@ extension GRPCServerStreamHandler { case .message(let message): do { - try self.stateMachine.send(message: message, endStream: false) + try self.stateMachine.send(message: message) // TODO: move the promise handling into the state machine promise?.succeed() } catch { diff --git a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift index 01e329420..29261257f 100644 --- a/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift +++ b/Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift @@ -170,21 +170,21 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) case .clientClosedServerOpen: // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) case .clientClosedServerClosed: // Open client XCTAssertNoThrow(try stateMachine.send(metadata: [])) // Open server XCTAssertNoThrow(try stateMachine.receive(headers: serverMetadata, endStream: false)) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) } @@ -238,7 +238,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try to send a message without opening (i.e. without sending initial metadata) XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client not yet open.") @@ -252,7 +252,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: targetState) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [])) } } @@ -266,7 +266,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Client is closed, cannot send a message.") @@ -637,7 +637,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) let expectedBytes: [UInt8] = [ 0, // compression flag: unset @@ -663,7 +663,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) let request = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) @@ -679,7 +679,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) let request = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) @@ -694,7 +694,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { // Queue a message, but assert the action is .noMoreMessages nevertheless, // because the server is closed. - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) } @@ -702,7 +702,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerIdle) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. @@ -722,7 +723,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message and close client - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: true)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. @@ -741,13 +743,13 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { func testNextOutboundMessageWhenClientClosedAndServerClosed() throws { var stateMachine = self.makeClientStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) // Close server XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Even though we have enqueued a message, don't send it, because the server // is closed. @@ -821,7 +823,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { try stateMachine.receive(buffer: receivedBytes, endStream: false) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Even though the client is closed, because it received a message while open, // we must get the message now. @@ -843,7 +845,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertNoThrow(try stateMachine.receive(headers: .serverTrailers, endStream: true)) // Close client - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: true)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Even though the client is closed, because it received a message while open, // we must get the message now. @@ -889,7 +891,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message, endStream: false) + try stateMachine.send(message: message) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) @@ -909,7 +911,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) // Client sends end - try stateMachine.send(message: [], endStream: true) + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Server ends let metadataReceivedAction = try stateMachine.receive( @@ -953,7 +955,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message, endStream: true) + XCTAssertNoThrow(try stateMachine.send(message: message)) + XCTAssertNoThrow(try stateMachine.closeOutbound()) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .noMoreMessages) @@ -1028,7 +1031,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { let message = [UInt8]([1, 2, 3, 4]) let framedMessage = try self.frameMessage(message, compress: false) - try stateMachine.send(message: message, endStream: false) + try stateMachine.send(message: message) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .sendMessage(framedMessage)) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) @@ -1046,8 +1049,8 @@ final class GRPCStreamClientStateMachineTests: XCTestCase { ]) ) - // Client sends end - try stateMachine.send(message: [], endStream: true) + // Client closes + XCTAssertNoThrow(try stateMachine.closeOutbound()) // Server sends response XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages) @@ -1239,7 +1242,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1255,7 +1258,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Now send a message XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1269,7 +1272,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Now send a message - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [])) } func testSendMessageWhenClientOpenAndServerClosed() { @@ -1278,7 +1281,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1290,7 +1293,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual( @@ -1305,7 +1308,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending a message: even though client is closed, we should send it // because it may be expecting a response. - XCTAssertNoThrow(try stateMachine.send(message: [], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [])) } func testSendMessageWhenClientClosedAndServerClosed() { @@ -1314,7 +1317,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1360,7 +1363,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1382,7 +1385,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1426,7 +1429,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1448,7 +1451,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { // Try sending another message: it should fail because server is now closed. XCTAssertThrowsError( ofType: RPCError.self, - try stateMachine.send(message: [], endStream: false) + try stateMachine.send(message: []) ) { error in XCTAssertEqual(error.code, .internalError) XCTAssertEqual(error.message, "Server can't send a message if it's closed.") @@ -1906,7 +1909,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) let response = try stateMachine.nextOutboundMessage() let expectedBytes: [UInt8] = [ @@ -1929,7 +1932,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) let originalMessage = [UInt8]([42, 42, 43, 43]) - XCTAssertNoThrow(try stateMachine.send(message: originalMessage, endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: originalMessage)) let response = try stateMachine.nextOutboundMessage() let framedMessage = try self.frameMessage(originalMessage, compress: true) @@ -1940,7 +1943,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -1976,13 +1979,13 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerOpen) // Send a message - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) // Close client XCTAssertNoThrow(try stateMachine.receive(buffer: .init(), endStream: true)) // Send another message - XCTAssertNoThrow(try stateMachine.send(message: [43, 43], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [43, 43])) // Make sure that getting the next outbound message _does_ return the message // we have enqueued. @@ -2006,7 +2009,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { var stateMachine = self.makeServerStateMachine(targetState: .clientClosedServerOpen) // Send a message and close server - XCTAssertNoThrow(try stateMachine.send(message: [42, 42], endStream: false)) + XCTAssertNoThrow(try stateMachine.send(message: [42, 42])) XCTAssertNoThrow( try stateMachine.send( status: .init(code: .ok, message: ""), @@ -2185,8 +2188,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse, endStream: false) - try stateMachine.send(message: secondResponse, endStream: false) + try stateMachine.send(message: firstResponse) + try stateMachine.send(message: secondResponse) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) @@ -2250,8 +2253,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse, endStream: false) - try stateMachine.send(message: secondResponse, endStream: false) + try stateMachine.send(message: firstResponse) + try stateMachine.send(message: secondResponse) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false) @@ -2312,8 +2315,8 @@ final class GRPCStreamServerStateMachineTests: XCTestCase { let firstResponse = [UInt8]([5, 6, 7]) let secondResponse = [UInt8]([8, 9, 10]) XCTAssertEqual(try stateMachine.nextOutboundMessage(), .awaitMoreMessages) - try stateMachine.send(message: firstResponse, endStream: false) - try stateMachine.send(message: secondResponse, endStream: false) + try stateMachine.send(message: firstResponse) + try stateMachine.send(message: secondResponse) // Make sure messages are outbound let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)