Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,24 @@ struct GRPCStreamStateMachine {
}
}

mutating func send(message: [UInt8], endStream: Bool) throws {
mutating func send(message: [UInt8]) throws {
switch self.configuration {
case .client:
try self.clientSend(message: message, endStream: endStream)
try self.clientSend(message: message)
case .server:
if endStream {
try self.invalidState(
"Can't end response stream by sending a message - send(status:metadata:) must be called"
)
}
try self.serverSend(message: message)
}
}

mutating func closeOutbound() throws {
switch self.configuration {
case .client:
try self.clientCloseOutbound()
case .server:
try self.invalidState("Server cannot call close: it must send status and trailers.")
}
}

mutating func send(
status: Status,
metadata: Metadata
Expand Down Expand Up @@ -532,31 +536,36 @@ extension GRPCStreamStateMachine {
}
}

private mutating func clientSend(message: [UInt8], endStream: Bool) throws {
// Client sends message.
private mutating func clientSend(message: [UInt8]) throws {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState("Client not yet open.")
case .clientOpenServerIdle(var state):
state.framer.append(message)
if endStream {
self.state = .clientClosedServerIdle(.init(previousState: state))
} else {
self.state = .clientOpenServerIdle(state)
}
self.state = .clientOpenServerIdle(state)
case .clientOpenServerOpen(var state):
state.framer.append(message)
if endStream {
self.state = .clientClosedServerOpen(.init(previousState: state))
} else {
self.state = .clientOpenServerOpen(state)
}
case .clientOpenServerClosed(let state):
self.state = .clientOpenServerOpen(state)
case .clientOpenServerClosed:
// The server has closed, so it makes no sense to send the rest of the request.
// However, do close if endStream is set.
if endStream {
self.state = .clientClosedServerClosed(.init(previousState: state))
}
()
case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
try self.invalidState(
"Client is closed, cannot send a message."
)
}
}

private mutating func clientCloseOutbound() throws {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState("Client not yet open.")
case .clientOpenServerIdle(let state):
self.state = .clientClosedServerIdle(.init(previousState: state))
case .clientOpenServerOpen(let state):
self.state = .clientClosedServerOpen(.init(previousState: state))
case .clientOpenServerClosed(let state):
self.state = .clientClosedServerClosed(.init(previousState: state))
case .clientClosedServerIdle, .clientClosedServerOpen, .clientClosedServerClosed:
try self.invalidState(
"Client is closed, cannot send a message."
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ extension GRPCServerStreamHandler {

case .message(let message):
do {
try self.stateMachine.send(message: message, endStream: false)
try self.stateMachine.send(message: message)
// TODO: move the promise handling into the state machine
promise?.succeed()
} catch {
Expand Down
Loading