Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,18 @@ extension GRPCClientStreamHandler {
do {
self.flushPending = true
let headers = try self.stateMachine.send(metadata: metadata)
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
// TODO: move the promise handling into the state machine
promise?.succeed()
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
} catch {
context.fireErrorCaught(error)
// TODO: move the promise handling into the state machine
promise?.fail(error)
context.fireErrorCaught(error)
}

case .message(let message):
do {
try self.stateMachine.send(message: message)
// TODO: move the promise handling into the state machine
promise?.succeed()
try self.stateMachine.send(message: message, promise: promise)
} catch {
context.fireErrorCaught(error)
// TODO: move the promise handling into the state machine
promise?.fail(error)
context.fireErrorCaught(error)
}
}
}
Expand Down Expand Up @@ -197,12 +191,12 @@ extension GRPCClientStreamHandler {
private func _flush(context: ChannelHandlerContext) {
do {
loop: while true {
switch try self.stateMachine.nextOutboundMessage() {
case .sendMessage(let byteBuffer):
switch try self.stateMachine.nextOutboundFrame() {
case .sendFrame(let byteBuffer, let promise):
self.flushPending = true
context.write(
self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
promise: nil
promise: promise
)

case .noMoreMessages:
Expand Down
22 changes: 15 additions & 7 deletions Sources/GRPCHTTP2Core/GRPCMessageFramer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct GRPCMessageFramer {
/// reserves capacity in powers of 2. This way, we can take advantage of the whole buffer.
static let maximumWriteBufferLength = 65_536

private var pendingMessages: OneOrManyQueue<[UInt8]>
private var pendingMessages: OneOrManyQueue<(bytes: [UInt8], promise: EventLoopPromise<Void>?)>

private var writeBuffer: ByteBuffer

Expand All @@ -44,16 +44,18 @@ struct GRPCMessageFramer {

/// Queue the given bytes to be framed and potentially coalesced alongside other messages in a `ByteBuffer`.
/// The resulting data will be returned when calling ``GRPCMessageFramer/next()``.
mutating func append(_ bytes: [UInt8]) {
self.pendingMessages.append(bytes)
mutating func append(_ bytes: [UInt8], promise: EventLoopPromise<Void>?) {
self.pendingMessages.append((bytes, promise))
}

/// If there are pending messages to be framed, a `ByteBuffer` will be returned with the framed data.
/// Data may also be compressed (if configured) and multiple frames may be coalesced into the same `ByteBuffer`.
/// - Parameter compressor: An optional compressor: if present, payloads will be compressed; otherwise
/// they'll be framed as-is.
/// - Throws: If an error is encountered, such as a compression failure, an error will be thrown.
mutating func next(compressor: Zlib.Compressor? = nil) throws -> ByteBuffer? {
mutating func next(
compressor: Zlib.Compressor? = nil
) throws -> (bytes: ByteBuffer, promise: EventLoopPromise<Void>?)? {
if self.pendingMessages.isEmpty {
// Nothing pending: exit early.
return nil
Expand All @@ -69,15 +71,21 @@ struct GRPCMessageFramer {

var requiredCapacity = 0
for message in self.pendingMessages {
requiredCapacity += message.count + Self.metadataLength
requiredCapacity += message.bytes.count + Self.metadataLength
}
self.writeBuffer.clear(minimumCapacity: requiredCapacity)

var pendingWritePromise: EventLoopPromise<Void>?
while let message = self.pendingMessages.pop() {
try self.encode(message, compressor: compressor)
try self.encode(message.bytes, compressor: compressor)
if let existingPendingWritePromise = pendingWritePromise {
existingPendingWritePromise.futureResult.cascade(to: message.promise)
} else {
pendingWritePromise = message.promise
}
}

return self.writeBuffer
return (bytes: self.writeBuffer, promise: pendingWritePromise)
}

private mutating func encode(_ message: [UInt8], compressor: Zlib.Compressor?) throws {
Expand Down
63 changes: 35 additions & 28 deletions Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ struct GRPCStreamStateMachine {
}
}

mutating func send(message: [UInt8]) throws {
mutating func send(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
switch self.configuration {
case .client:
try self.clientSend(message: message)
try self.clientSend(message: message, promise: promise)
case .server:
try self.serverSend(message: message)
try self.serverSend(message: message, promise: promise)
}
}

Expand Down Expand Up @@ -397,23 +397,26 @@ struct GRPCStreamStateMachine {
}
}

/// The result of requesting the next outbound message.
enum OnNextOutboundMessage: Equatable {
/// Either the receiving party is closed, so we shouldn't send any more messages; or the sender is done
/// The result of requesting the next outbound frame, which may contain multiple messages.
enum OnNextOutboundFrame {
/// Either the receiving party is closed, so we shouldn't send any more frames; or the sender is done
/// writing messages (i.e. we are now closed).
case noMoreMessages
/// There isn't a message ready to be sent, but we could still receive more, so keep trying.
/// There isn't a frame ready to be sent, but we could still receive more messages, so keep trying.
case awaitMoreMessages
/// A message is ready to be sent.
case sendMessage(ByteBuffer)
/// A frame is ready to be sent.
case sendFrame(
frame: ByteBuffer,
promise: EventLoopPromise<Void>?
)
}

mutating func nextOutboundMessage() throws -> OnNextOutboundMessage {
mutating func nextOutboundFrame() throws -> OnNextOutboundFrame {
switch self.configuration {
case .client:
return try self.clientNextOutboundMessage()
return try self.clientNextOutboundFrame()
case .server:
return try self.serverNextOutboundMessage()
return try self.serverNextOutboundFrame()
}
}

Expand Down Expand Up @@ -540,15 +543,15 @@ extension GRPCStreamStateMachine {
}
}

private mutating func clientSend(message: [UInt8]) throws {
private mutating func clientSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState("Client not yet open.")
case .clientOpenServerIdle(var state):
state.framer.append(message)
state.framer.append(message, promise: promise)
self.state = .clientOpenServerIdle(state)
case .clientOpenServerOpen(var state):
state.framer.append(message)
state.framer.append(message, promise: promise)
self.state = .clientOpenServerOpen(state)
case .clientOpenServerClosed:
// The server has closed, so it makes no sense to send the rest of the request.
Expand Down Expand Up @@ -577,31 +580,33 @@ extension GRPCStreamStateMachine {

/// Returns the client's next request to the server.
/// - Returns: The request to be made to the server.
private mutating func clientNextOutboundMessage() throws -> OnNextOutboundMessage {
private mutating func clientNextOutboundFrame() throws -> OnNextOutboundFrame {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState("Client is not open yet.")
case .clientOpenServerIdle(var state):
let request = try state.framer.next(compressor: state.compressor)
self.state = .clientOpenServerIdle(state)
return request.map { .sendMessage($0) } ?? .awaitMoreMessages
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
?? .awaitMoreMessages
case .clientOpenServerOpen(var state):
let request = try state.framer.next(compressor: state.compressor)
self.state = .clientOpenServerOpen(state)
return request.map { .sendMessage($0) } ?? .awaitMoreMessages
return request.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
?? .awaitMoreMessages
case .clientClosedServerIdle(var state):
let request = try state.framer.next(compressor: state.compressor)
self.state = .clientClosedServerIdle(state)
if let request {
return .sendMessage(request)
return .sendFrame(frame: request.bytes, promise: request.promise)
} else {
return .noMoreMessages
}
case .clientClosedServerOpen(var state):
let request = try state.framer.next(compressor: state.compressor)
self.state = .clientClosedServerOpen(state)
if let request {
return .sendMessage(request)
return .sendFrame(frame: request.bytes, promise: request.promise)
} else {
return .noMoreMessages
}
Expand Down Expand Up @@ -1003,17 +1008,17 @@ extension GRPCStreamStateMachine {
}
}

private mutating func serverSend(message: [UInt8]) throws {
private mutating func serverSend(message: [UInt8], promise: EventLoopPromise<Void>?) throws {
switch self.state {
case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
try self.invalidState(
"Server must have sent initial metadata before sending a message."
)
case .clientOpenServerOpen(var state):
state.framer.append(message)
state.framer.append(message, promise: promise)
self.state = .clientOpenServerOpen(state)
case .clientClosedServerOpen(var state):
state.framer.append(message)
state.framer.append(message, promise: promise)
self.state = .clientClosedServerOpen(state)
case .clientOpenServerClosed, .clientClosedServerClosed:
try self.invalidState(
Expand Down Expand Up @@ -1351,31 +1356,33 @@ extension GRPCStreamStateMachine {
}
}

private mutating func serverNextOutboundMessage() throws -> OnNextOutboundMessage {
private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
switch self.state {
case .clientIdleServerIdle, .clientOpenServerIdle, .clientClosedServerIdle:
try self.invalidState("Server is not open yet.")
case .clientOpenServerOpen(var state):
let response = try state.framer.next(compressor: state.compressor)
self.state = .clientOpenServerOpen(state)
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
?? .awaitMoreMessages
case .clientClosedServerOpen(var state):
let response = try state.framer.next(compressor: state.compressor)
self.state = .clientClosedServerOpen(state)
return response.map { .sendMessage($0) } ?? .awaitMoreMessages
return response.map { .sendFrame(frame: $0.bytes, promise: $0.promise) }
?? .awaitMoreMessages
case .clientOpenServerClosed(var state):
let response = try state.framer?.next(compressor: state.compressor)
self.state = .clientOpenServerClosed(state)
if let response {
return .sendMessage(response)
return .sendFrame(frame: response.bytes, promise: response.promise)
} else {
return .noMoreMessages
}
case .clientClosedServerClosed(var state):
let response = try state.framer?.next(compressor: state.compressor)
self.state = .clientClosedServerClosed(state)
if let response {
return .sendMessage(response)
return .sendFrame(frame: response.bytes, promise: response.promise)
} else {
return .noMoreMessages
}
Expand Down
35 changes: 15 additions & 20 deletions Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ final class GRPCServerStreamHandler: ChannelDuplexHandler {
// We buffer the final status + trailers to avoid reordering issues (i.e.,
// if there are messages still not written into the channel because flush has
// not been called, but the server sends back trailers).
private var pendingTrailers: HTTP2Frame.FramePayload?
private var pendingTrailers:
(trailers: HTTP2Frame.FramePayload, promise: EventLoopPromise<Void>?)?

init(
scheme: Scheme,
Expand Down Expand Up @@ -142,37 +143,28 @@ extension GRPCServerStreamHandler {
do {
self.flushPending = true
let headers = try self.stateMachine.send(metadata: metadata)
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: nil)
// TODO: move the promise handling into the state machine
promise?.succeed()
context.write(self.wrapOutboundOut(.headers(.init(headers: headers))), promise: promise)
} catch {
context.fireErrorCaught(error)
// TODO: move the promise handling into the state machine
promise?.fail(error)
context.fireErrorCaught(error)
}

case .message(let message):
do {
try self.stateMachine.send(message: message)
// TODO: move the promise handling into the state machine
promise?.succeed()
try self.stateMachine.send(message: message, promise: promise)
} catch {
context.fireErrorCaught(error)
// TODO: move the promise handling into the state machine
promise?.fail(error)
context.fireErrorCaught(error)
}

case .status(let status, let metadata):
do {
let headers = try self.stateMachine.send(status: status, metadata: metadata)
let response = HTTP2Frame.FramePayload.headers(.init(headers: headers, endStream: true))
self.pendingTrailers = response
// TODO: move the promise handling into the state machine
promise?.succeed()
self.pendingTrailers = (response, promise)
} catch {
context.fireErrorCaught(error)
// TODO: move the promise handling into the state machine
promise?.fail(error)
context.fireErrorCaught(error)
}
}
}
Expand All @@ -185,19 +177,22 @@ extension GRPCServerStreamHandler {

do {
loop: while true {
switch try self.stateMachine.nextOutboundMessage() {
case .sendMessage(let byteBuffer):
switch try self.stateMachine.nextOutboundFrame() {
case .sendFrame(let byteBuffer, let promise):
self.flushPending = true
context.write(
self.wrapOutboundOut(.data(.init(data: .byteBuffer(byteBuffer)))),
promise: nil
promise: promise
)

case .noMoreMessages:
if let pendingTrailers = self.pendingTrailers {
self.flushPending = true
self.pendingTrailers = nil
context.write(self.wrapOutboundOut(pendingTrailers), promise: nil)
context.write(
self.wrapOutboundOut(pendingTrailers.trailers),
promise: pendingTrailers.promise
)
}
break loop

Expand Down
Loading