Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions Sources/GRPCHTTP2Core/Client/GRPCClientStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,25 @@ extension GRPCClientStreamHandler {
switch frameData.data {
case .byteBuffer(let buffer):
do {
try self.stateMachine.receive(buffer: buffer, endStream: endStream)
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
break loop
switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
case .endRPCAndForwardErrorStatus(let status):
context.fireChannelRead(self.wrapInboundOut(.status(status, [:])))
context.close(promise: nil)

case .readInbound:
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
// This could only happen if the server sends a data frame with EOS
// set, without sending status and trailers.
// If this happens, we should have forwarded an error status above
// so we should never reach this point. Do nothing.
break loop
}
}
}
} catch {
Expand Down Expand Up @@ -105,6 +114,7 @@ extension GRPCClientStreamHandler {

case .receivedStatusAndMetadata(let status, let metadata):
context.fireChannelRead(self.wrapInboundOut(.status(status, metadata)))
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)

case .doNothing:
()
Expand Down Expand Up @@ -161,7 +171,29 @@ extension GRPCClientStreamHandler {

func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
switch mode {
case .output, .all:
case .input:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
promise?.succeed()

case .output:
// We flush all pending messages and update the internal state machine's
// state, but we don't close the outbound end of the channel, because
// forwarding the close in this case would cause the HTTP2 stream handler
// to close the whole channel (as the mode is ignored in its implementation).
do {
try self.stateMachine.closeOutbound()
// Force a flush by calling _flush instead of flush
// (otherwise, we'd skip flushing if we're in a read loop)
self._flush(context: context)
promise?.succeed()
} catch {
promise?.fail(error)
context.fireErrorCaught(error)
}

case .all:
// Since we're closing the whole channel here, we *do* forward the close
// down the pipeline.
do {
try self.stateMachine.closeOutbound()
// Force a flush by calling _flush
Expand All @@ -172,9 +204,6 @@ extension GRPCClientStreamHandler {
promise?.fail(error)
context.fireErrorCaught(error)
}

case .input:
context.close(mode: .input, promise: promise)
}
}

Expand Down
91 changes: 72 additions & 19 deletions Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ private enum GRPCStreamStateMachineState {
self.inboundMessageBuffer = previousState.inboundMessageBuffer
}

init(previousState: ClientOpenServerOpenState) {
self.framer = previousState.framer
self.compressor = previousState.compressor
self.inboundMessageBuffer = previousState.inboundMessageBuffer
}

init(previousState: ClientOpenServerClosedState) {
self.framer = previousState.framer
self.compressor = previousState.compressor
Expand Down Expand Up @@ -388,12 +394,24 @@ struct GRPCStreamStateMachine {
}
}

mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
enum OnBufferReceivedAction: Equatable {
case readInbound

// Client-specific actions

// This will be returned when the server sends a data frame with EOS set.
// This is invalid as per the protocol specification, because the server
// can only close by sending trailers, not by setting EOS when sending
// a message.
case endRPCAndForwardErrorStatus(Status)
}

mutating func receive(buffer: ByteBuffer, endStream: Bool) throws -> OnBufferReceivedAction {
switch self.configuration {
case .client:
try self.clientReceive(buffer: buffer, endStream: endStream)
return try self.clientReceive(buffer: buffer, endStream: endStream)
case .server:
try self.serverReceive(buffer: buffer, endStream: endStream)
return try self.serverReceive(buffer: buffer, endStream: endStream)
}
}

Expand Down Expand Up @@ -729,7 +747,7 @@ extension GRPCStreamStateMachine {
}

let statusMessage =
metadata.firstString(forKey: .grpcStatusMessage)
metadata.firstString(forKey: .grpcStatusMessage, canonicalForm: false)
.map { GRPCStatusMessageMarshaller.unmarshall($0) } ?? ""

var convertedMetadata = Metadata(headers: metadata)
Expand Down Expand Up @@ -860,38 +878,68 @@ extension GRPCStreamStateMachine {
}
}

private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
private mutating func clientReceive(
buffer: ByteBuffer,
endStream: Bool
) throws -> OnBufferReceivedAction {
// This is a message received by the client, from the server.
switch self.state {
case .clientIdleServerIdle:
try self.invalidState(
"Cannot have received anything from server if client is not yet open."
)

case .clientOpenServerIdle, .clientClosedServerIdle:
try self.invalidState(
"Server cannot have sent a message before sending the initial metadata."
)

case .clientOpenServerOpen(var state):
if endStream {
// This is invalid as per the protocol specification, because the server
// can only close by sending trailers, not by setting EOS when sending
// a message.
self.state = .clientClosedServerClosed(.init(previousState: state))
return .endRPCAndForwardErrorStatus(
Status(
code: .internalError,
message: """
Server sent EOS alongside a data frame, but server is only allowed \
to close by sending status and trailers.
"""
)
)
}

try state.deframer.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}
self.state = .clientOpenServerOpen(state)
return .readInbound

case .clientClosedServerOpen(var state):
if endStream {
self.state = .clientOpenServerClosed(.init(previousState: state))
} else {
self.state = .clientOpenServerOpen(state)
self.state = .clientClosedServerClosed(.init(previousState: state))
return .endRPCAndForwardErrorStatus(
Status(
code: .internalError,
message: """
Server sent EOS alongside a data frame, but server is only allowed \
to close by sending status and trailers.
"""
)
)
}
case .clientClosedServerOpen(var state):

// The client may have sent the end stream and thus it's closed,
// but the server may still be responding.
// The client must have a deframer set up, so force-unwrap is okay.
try state.deframer!.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}
if endStream {
self.state = .clientClosedServerClosed(.init(previousState: state))
} else {
self.state = .clientClosedServerOpen(state)
}
self.state = .clientClosedServerOpen(state)
return .readInbound

case .clientOpenServerClosed, .clientClosedServerClosed:
try self.invalidState(
"Cannot have received anything from a closed server."
Expand Down Expand Up @@ -1314,7 +1362,10 @@ extension GRPCStreamStateMachine {
}
}

private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
private mutating func serverReceive(
buffer: ByteBuffer,
endStream: Bool
) throws -> OnBufferReceivedAction {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState(
Expand Down Expand Up @@ -1354,6 +1405,7 @@ extension GRPCStreamStateMachine {
"Client can't send a message if closed."
)
}
return .readInbound
}

private mutating func serverNextOutboundFrame() throws -> OnNextOutboundFrame {
Expand Down Expand Up @@ -1443,10 +1495,11 @@ internal enum GRPCHTTP2Keys: String {
}

extension HPACKHeaders {
internal func firstString(forKey key: GRPCHTTP2Keys) -> String? {
self.values(forHeader: key.rawValue, canonicalForm: true).first(where: { _ in true }).map {
String($0)
}
internal func firstString(forKey key: GRPCHTTP2Keys, canonicalForm: Bool = true) -> String? {
self.values(forHeader: key.rawValue, canonicalForm: canonicalForm).first(where: { _ in true })
.map {
String($0)
}
}

internal mutating func add(_ value: String, forKey key: GRPCHTTP2Keys) {
Expand Down
26 changes: 16 additions & 10 deletions Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,22 @@ extension GRPCServerStreamHandler {
switch frameData.data {
case .byteBuffer(let buffer):
do {
try self.stateMachine.receive(buffer: buffer, endStream: endStream)
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
break loop
switch try self.stateMachine.receive(buffer: buffer, endStream: endStream) {
case .endRPCAndForwardErrorStatus:
preconditionFailure(
"OnBufferReceivedAction.endRPCAndForwardErrorStatus should never be returned for the server."
)
case .readInbound:
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
break loop
}
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,10 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
buffer.writeInteger(UInt8(0)) // not compressed
buffer.writeInteger(UInt32(42)) // message length
buffer.writeRepeatingByte(0, count: 42) // message
let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
let clientDataPayload = HTTP2Frame.FramePayload.Data(
data: .byteBuffer(buffer),
endStream: false
)
XCTAssertThrowsError(
ofType: RPCError.self,
try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload))
Expand All @@ -321,6 +324,74 @@ final class GRPCClientStreamHandlerTests: XCTestCase {
XCTAssertNil(try channel.readInbound(as: RPCRequestPart.self))
}

func testServerSendsEOSWhenSendingMessage_ResultsInErrorStatus() throws {
let handler = GRPCClientStreamHandler(
methodDescriptor: .init(service: "test", method: "test"),
scheme: .http,
outboundEncoding: .identity,
acceptedEncodings: [],
maximumPayloadSize: 100,
skipStateMachineAssertions: true
)

let channel = EmbeddedChannel(handler: handler)

// Send client's initial metadata
XCTAssertNoThrow(
try channel.writeOutbound(RPCRequestPart.metadata(Metadata()))
)

// Make sure we have sent right metadata.
let writtenMetadata = try channel.assertReadHeadersOutbound()

XCTAssertEqual(
writtenMetadata.headers,
[
GRPCHTTP2Keys.method.rawValue: "POST",
GRPCHTTP2Keys.scheme.rawValue: "http",
GRPCHTTP2Keys.path.rawValue: "test/test",
GRPCHTTP2Keys.contentType.rawValue: "application/grpc",
GRPCHTTP2Keys.te.rawValue: "trailers",
]
)

// Server sends initial metadata
let serverInitialMetadata: HPACKHeaders = [
GRPCHTTP2Keys.status.rawValue: "200",
GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
]
XCTAssertNoThrow(
try channel.writeInbound(
HTTP2Frame.FramePayload.headers(.init(headers: serverInitialMetadata))
)
)
XCTAssertEqual(
try channel.readInbound(as: RPCResponsePart.self),
.metadata(Metadata(headers: serverInitialMetadata))
)

// Server sends message with EOS set.
var buffer = ByteBuffer()
buffer.writeInteger(UInt8(0)) // not compressed
buffer.writeInteger(UInt32(42)) // message length
buffer.writeRepeatingByte(0, count: 42) // message
let clientDataPayload = HTTP2Frame.FramePayload.Data(data: .byteBuffer(buffer), endStream: true)
XCTAssertNoThrow(try channel.writeInbound(HTTP2Frame.FramePayload.data(clientDataPayload)))

// Make sure we got status + trailers with the right error.
XCTAssertEqual(
try channel.readInbound(as: RPCResponsePart.self),
.status(
Status(
code: .internalError,
message:
"Server sent EOS alongside a data frame, but server is only allowed to close by sending status and trailers."
),
[:]
)
)
}

func testServerEndsStream() throws {
let handler = GRPCClientStreamHandler(
methodDescriptor: .init(service: "test", method: "test"),
Expand Down
Loading