Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions Sources/GRPCHTTP2Core/GRPCStreamStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -367,25 +367,25 @@ struct GRPCStreamStateMachine {
case rejectRPC(trailers: HPACKHeaders)
}

mutating func receive(metadata: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
mutating func receive(headers: HPACKHeaders, endStream: Bool) throws -> OnMetadataReceived {
switch self.configuration {
case .client:
return try self.clientReceive(metadata: metadata, endStream: endStream)
return try self.clientReceive(headers: headers, endStream: endStream)
case .server(let serverConfiguration):
return try self.serverReceive(
metadata: metadata,
headers: headers,
endStream: endStream,
configuration: serverConfiguration
)
}
}

mutating func receive(message: ByteBuffer, endStream: Bool) throws {
mutating func receive(buffer: ByteBuffer, endStream: Bool) throws {
switch self.configuration {
case .client:
try self.clientReceive(bytes: message, endStream: endStream)
try self.clientReceive(buffer: buffer, endStream: endStream)
case .server:
try self.serverReceive(bytes: message, endStream: endStream)
try self.serverReceive(buffer: buffer, endStream: endStream)
}
}

Expand Down Expand Up @@ -722,12 +722,12 @@ extension GRPCStreamStateMachine {
}

private mutating func clientReceive(
metadata: HPACKHeaders,
headers: HPACKHeaders,
endStream: Bool
) throws -> OnMetadataReceived {
switch self.state {
case .clientOpenServerIdle(let state):
switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) {
switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
case (.invalid(let action), true):
// The headers are invalid, but the server signalled that it was
// closing the stream, so close both client and server.
Expand All @@ -739,9 +739,9 @@ extension GRPCStreamStateMachine {
case (.valid, true):
// This is a trailers-only response: close server.
self.state = .clientOpenServerClosed(.init(previousState: state))
return try self.validateAndReturnStatusAndMetadata(metadata)
return try self.validateAndReturnStatusAndMetadata(headers)
case (.valid, false):
switch self.processInboundEncoding(metadata) {
switch self.processInboundEncoding(headers) {
case .error(let failure):
return failure
case .success(let inboundEncoding):
Expand All @@ -759,7 +759,7 @@ extension GRPCStreamStateMachine {
decompressor: decompressor
)
)
return .receivedMetadata(Metadata(headers: metadata))
return .receivedMetadata(Metadata(headers: headers))
}
}

Expand All @@ -772,10 +772,10 @@ extension GRPCStreamStateMachine {
if endStream {
self.state = .clientOpenServerClosed(.init(previousState: state))
}
return try self.validateAndReturnStatusAndMetadata(metadata)
return try self.validateAndReturnStatusAndMetadata(headers)

case .clientClosedServerIdle(let state):
switch (self.clientValidateHeadersReceivedFromServer(metadata), endStream) {
switch (self.clientValidateHeadersReceivedFromServer(headers), endStream) {
case (.invalid(let action), true):
// The headers are invalid, but the server signalled that it was
// closing the stream, so close the server side too.
Expand All @@ -787,9 +787,9 @@ extension GRPCStreamStateMachine {
case (.valid, true):
// This is a trailers-only response: close server.
self.state = .clientClosedServerClosed(.init(previousState: state))
return try self.validateAndReturnStatusAndMetadata(metadata)
return try self.validateAndReturnStatusAndMetadata(headers)
case (.valid, false):
switch self.processInboundEncoding(metadata) {
switch self.processInboundEncoding(headers) {
case .error(let failure):
return failure
case .success(let inboundEncoding):
Expand All @@ -799,7 +799,7 @@ extension GRPCStreamStateMachine {
decompressionAlgorithm: inboundEncoding
)
)
return .receivedMetadata(Metadata(headers: metadata))
return .receivedMetadata(Metadata(headers: headers))
}
}

Expand All @@ -812,7 +812,7 @@ extension GRPCStreamStateMachine {
if endStream {
self.state = .clientClosedServerClosed(.init(previousState: state))
}
return try self.validateAndReturnStatusAndMetadata(metadata)
return try self.validateAndReturnStatusAndMetadata(headers)

case .clientClosedServerClosed:
// We could end up here if we received a grpc-status header in a previous
Expand All @@ -821,7 +821,7 @@ extension GRPCStreamStateMachine {
// We wouldn't want to throw in that scenario, so we just ignore it.
// Note that we don't want to ignore it if EOS is not set here though, as
// then it would be an invalid payload.
if !endStream || metadata.count > 0 {
if !endStream || headers.count > 0 {
try self.invalidState(
"Server is closed, nothing could have been sent."
)
Expand All @@ -838,7 +838,7 @@ extension GRPCStreamStateMachine {
}
}

private mutating func clientReceive(bytes: ByteBuffer, endStream: Bool) throws {
private mutating func clientReceive(buffer: ByteBuffer, endStream: Bool) throws {
// This is a message received by the client, from the server.
switch self.state {
case .clientIdleServerIdle:
Expand All @@ -850,7 +850,7 @@ extension GRPCStreamStateMachine {
"Server cannot have sent a message before sending the initial metadata."
)
case .clientOpenServerOpen(var state):
try state.deframer.process(buffer: bytes) { deframedMessage in
try state.deframer.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}
if endStream {
Expand All @@ -862,7 +862,7 @@ extension GRPCStreamStateMachine {
// The client may have sent the end stream and thus it's closed,
// but the server may still be responding.
// The client must have a deframer set up, so force-unwrap is okay.
try state.deframer!.process(buffer: bytes) { deframedMessage in
try state.deframer!.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}
if endStream {
Expand Down Expand Up @@ -1106,13 +1106,13 @@ extension GRPCStreamStateMachine {
}

private mutating func serverReceive(
metadata: HPACKHeaders,
headers: HPACKHeaders,
endStream: Bool,
configuration: GRPCStreamStateMachineConfiguration.ServerConfiguration
) throws -> OnMetadataReceived {
switch self.state {
case .clientIdleServerIdle(let state):
let contentType = metadata.firstString(forKey: .contentType)
let contentType = headers.firstString(forKey: .contentType)
.flatMap { ContentType(value: $0) }
if contentType == nil {
self.state = .clientOpenServerClosed(.init(previousState: state))
Expand All @@ -1123,7 +1123,7 @@ extension GRPCStreamStateMachine {
return .rejectRPC(trailers: trailers)
}

let path = metadata.firstString(forKey: .path)
let path = headers.firstString(forKey: .path)
.flatMap { MethodDescriptor(fullyQualifiedMethod: $0) }
if path == nil {
return self.closeServerAndBuildRejectRPCAction(
Expand All @@ -1136,7 +1136,7 @@ extension GRPCStreamStateMachine {
)
}

let scheme = metadata.firstString(forKey: .scheme)
let scheme = headers.firstString(forKey: .scheme)
.flatMap { Scheme(rawValue: $0) }
if scheme == nil {
return self.closeServerAndBuildRejectRPCAction(
Expand All @@ -1149,7 +1149,7 @@ extension GRPCStreamStateMachine {
)
}

guard let method = metadata.firstString(forKey: .method), method == "POST" else {
guard let method = headers.firstString(forKey: .method), method == "POST" else {
return self.closeServerAndBuildRejectRPCAction(
currentState: state,
endStream: endStream,
Expand All @@ -1160,7 +1160,7 @@ extension GRPCStreamStateMachine {
)
}

guard let te = metadata.firstString(forKey: .te), te == "trailers" else {
guard let te = headers.firstString(forKey: .te), te == "trailers" else {
return self.closeServerAndBuildRejectRPCAction(
currentState: state,
endStream: endStream,
Expand All @@ -1178,7 +1178,7 @@ extension GRPCStreamStateMachine {
// Firstly, find out if we support the client's chosen encoding, and reject
// the RPC if we don't.
let inboundEncoding: CompressionAlgorithm
let encodingValues = metadata.values(
let encodingValues = headers.values(
forHeader: GRPCHTTP2Keys.encoding.rawValue,
canonicalForm: true
)
Expand Down Expand Up @@ -1236,7 +1236,7 @@ extension GRPCStreamStateMachine {
// Secondly, find a compatible encoding the server can use to compress outbound messages,
// based on the encodings the client has advertised.
var outboundEncoding: CompressionAlgorithm = .identity
let clientAdvertisedEncodings = metadata.values(
let clientAdvertisedEncodings = headers.values(
forHeader: GRPCHTTP2Keys.acceptEncoding.rawValue,
canonicalForm: true
)
Expand Down Expand Up @@ -1280,7 +1280,7 @@ extension GRPCStreamStateMachine {
)
}

return .receivedMetadata(Metadata(headers: metadata))
return .receivedMetadata(Metadata(headers: headers))
case .clientOpenServerIdle, .clientOpenServerOpen, .clientOpenServerClosed:
try self.invalidState(
"Client shouldn't have sent metadata twice."
Expand All @@ -1292,7 +1292,7 @@ extension GRPCStreamStateMachine {
}
}

private mutating func serverReceive(bytes: ByteBuffer, endStream: Bool) throws {
private mutating func serverReceive(buffer: ByteBuffer, endStream: Bool) throws {
switch self.state {
case .clientIdleServerIdle:
try self.invalidState(
Expand All @@ -1301,7 +1301,7 @@ extension GRPCStreamStateMachine {
case .clientOpenServerIdle(var state):
// Deframer must be present on the server side, as we know the decompression
// algorithm from the moment the client opens.
try state.deframer!.process(buffer: bytes) { deframedMessage in
try state.deframer!.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}

Expand All @@ -1311,7 +1311,7 @@ extension GRPCStreamStateMachine {
self.state = .clientOpenServerIdle(state)
}
case .clientOpenServerOpen(var state):
try state.deframer.process(buffer: bytes) { deframedMessage in
try state.deframer.process(buffer: buffer) { deframedMessage in
state.inboundMessageBuffer.append(deframedMessage)
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPCHTTP2Core/Server/GRPCServerStreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extension GRPCServerStreamHandler {
switch frameData.data {
case .byteBuffer(let buffer):
do {
try self.stateMachine.receive(message: buffer, endStream: endStream)
try self.stateMachine.receive(buffer: buffer, endStream: endStream)
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
Expand All @@ -86,7 +86,7 @@ extension GRPCServerStreamHandler {
case .headers(let headers):
do {
let action = try self.stateMachine.receive(
metadata: headers.headers,
headers: headers.headers,
endStream: headers.endStream
)
switch action {
Expand Down
Loading