diff --git a/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift b/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift index 9ec784402..e6c3b2421 100644 --- a/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift +++ b/Sources/GRPC/GRPCWebToHTTP2ServerCodec.swift @@ -96,28 +96,16 @@ extension GRPCWebToHTTP2ServerCodec { self.scheme = scheme } - private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action { - var state: State = ._modifying - swap(&self.state, &state) - defer { - swap(&self.state, &state) - } - return body(&state) - } - /// Process the inbound `HTTPServerRequestPart`. internal mutating func processInbound( serverRequestPart: HTTPServerRequestPart, allocator: ByteBufferAllocator ) -> Action { - let scheme = self.scheme - return self.withStateAvoidingCoWs { state in - state.processInbound( - serverRequestPart: serverRequestPart, - scheme: scheme, - allocator: allocator - ) - } + return self.state.processInbound( + serverRequestPart: serverRequestPart, + scheme: self.scheme, + allocator: allocator + ) } /// Process the outbound `HTTP2Frame.FramePayload`. @@ -126,9 +114,11 @@ extension GRPCWebToHTTP2ServerCodec { promise: EventLoopPromise?, allocator: ByteBufferAllocator ) -> Action { - return self.withStateAvoidingCoWs { state in - state.processOutbound(framePayload: framePayload, promise: promise, allocator: allocator) - } + return self.state.processOutbound( + framePayload: framePayload, + promise: promise, + allocator: allocator + ) } /// An action to take as a result of interaction with the state machine. @@ -174,6 +164,23 @@ extension GRPCWebToHTTP2ServerCodec { /// Not a real state. case _modifying + + private var isModifying: Bool { + switch self { + case ._modifying: + return true + case .idle, .fullyOpen, .clientClosedServerOpen, .clientOpenServerClosed: + return false + } + } + + private mutating func withStateAvoidingCoWs(_ body: (inout State) -> Action) -> Action { + self = ._modifying + defer { + assert(!self.isModifying) + } + return body(&self) + } } fileprivate struct InboundState { @@ -258,35 +265,37 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { ) -> GRPCWebToHTTP2ServerCodec.StateMachine.Action { switch self { case .idle: - let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true) - - // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to - // allocate a second headers block to use the normalization provided by NIO HTTP/2. - // - // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the - // extra copy. - var headers = HPACKHeaders() - headers.reserveCapacity(normalized.count + 4) - headers.add(name: ":path", value: head.uri) - headers.add(name: ":method", value: head.method.rawValue) - headers.add(name: ":scheme", value: scheme) - if let host = head.headers.first(name: "host") { - headers.add(name: ":authority", value: host) - } - headers.add(contentsOf: normalized) + return self.withStateAvoidingCoWs { state in + let normalized = HPACKHeaders(httpHeaders: head.headers, normalizeHTTPHeaders: true) + + // Regular headers need to come after the pseudo headers. Unfortunately, this means we need to + // allocate a second headers block to use the normalization provided by NIO HTTP/2. + // + // TODO: Use API provided by https://github.com/apple/swift-nio-http2/issues/254 to avoid the + // extra copy. + var headers = HPACKHeaders() + headers.reserveCapacity(normalized.count + 4) + headers.add(name: ":path", value: head.uri) + headers.add(name: ":method", value: head.method.rawValue) + headers.add(name: ":scheme", value: scheme) + if let host = head.headers.first(name: "host") { + headers.add(name: ":authority", value: host) + } + headers.add(contentsOf: normalized) - // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type - // that will be done at the HTTP/2 level. - let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) - let isWebText = contentType == .some(.webTextProtobuf) + // Check whether we're dealing with gRPC Web Text. No need to fully validate the content-type + // that will be done at the HTTP/2 level. + let contentType = headers.first(name: GRPCHeaderName.contentType).flatMap(ContentType.init) + let isWebText = contentType == .some(.webTextProtobuf) - let closeConnection = head.headers[canonicalForm: "connection"].contains("close") + let closeConnection = head.headers[canonicalForm: "connection"].contains("close") - self = .fullyOpen( - .init(isTextEncoded: isWebText, allocator: allocator), - .init(isTextEncoded: isWebText, closeConnection: closeConnection) - ) - return .fireChannelRead(.headers(.init(headers: headers))) + state = .fullyOpen( + .init(isTextEncoded: isWebText, allocator: allocator), + .init(isTextEncoded: isWebText, closeConnection: closeConnection) + ) + return .fireChannelRead(.headers(.init(headers: headers))) + } case .fullyOpen, .clientOpenServerClosed, .clientClosedServerOpen: preconditionFailure("Invalid state: already received request head") @@ -304,15 +313,19 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case .fullyOpen(var inbound, let outbound): - let action = inbound.processInboundData(buffer: &buffer) - self = .fullyOpen(inbound, outbound) - return action + return self.withStateAvoidingCoWs { state in + let action = inbound.processInboundData(buffer: &buffer) + state = .fullyOpen(inbound, outbound) + return action + } case var .clientOpenServerClosed(inbound): // The server is already done, but it's not our place to drop the request. - let action = inbound.processInboundData(buffer: &buffer) - self = .clientOpenServerClosed(inbound) - return action + return self.withStateAvoidingCoWs { state in + let action = inbound.processInboundData(buffer: &buffer) + state = .clientOpenServerClosed(inbound) + return action + } case .clientClosedServerOpen: preconditionFailure("End of request stream already received") @@ -330,24 +343,28 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case let .fullyOpen(_, outbound): - // We're done with inbound state. - self = .clientClosedServerOpen(outbound) + return self.withStateAvoidingCoWs { state in + // We're done with inbound state. + state = .clientClosedServerOpen(outbound) - // Send an empty DATA frame with the end stream flag set. - let empty = allocator.buffer(capacity: 0) - return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) + // Send an empty DATA frame with the end stream flag set. + let empty = allocator.buffer(capacity: 0) + return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) + } case .clientClosedServerOpen: preconditionFailure("End of request stream already received") case .clientOpenServerClosed: - // Both sides are closed now, back to idle. Don't forget to pass on the .end, as - // it's necessary to communicate to the other peers that the response is done. - self = .idle + return self.withStateAvoidingCoWs { state in + // Both sides are closed now, back to idle. Don't forget to pass on the .end, as + // it's necessary to communicate to the other peers that the response is done. + state = .idle - // Send an empty DATA frame with the end stream flag set. - let empty = allocator.buffer(capacity: 0) - return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) + // Send an empty DATA frame with the end stream flag set. + let empty = allocator.buffer(capacity: 0) + return .fireChannelRead(.data(.init(data: .byteBuffer(empty), endStream: true))) + } case ._modifying: preconditionFailure("Left in modifying state") @@ -368,39 +385,43 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case .fullyOpen(let inbound, var outbound): - // Double check these are trailers. - assert(outbound.responseHeadersSent) + return self.withStateAvoidingCoWs { state in + // Double check these are trailers. + assert(outbound.responseHeadersSent) - // We haven't seen the end of the request stream yet. - self = .clientOpenServerClosed(inbound) + // We haven't seen the end of the request stream yet. + state = .clientOpenServerClosed(inbound) - // Avoid CoW-ing the buffers. - let responseBuffers = outbound.responseBuffer - outbound.responseBuffer = nil + // Avoid CoW-ing the buffers. + let responseBuffers = outbound.responseBuffer + outbound.responseBuffer = nil - return self.processTrailers( - responseBuffers: responseBuffers, - trailers: trailers, - promise: promise, - allocator: allocator, - closeChannel: outbound.closeConnection - ) + return Self.processTrailers( + responseBuffers: responseBuffers, + trailers: trailers, + promise: promise, + allocator: allocator, + closeChannel: outbound.closeConnection + ) + } case var .clientClosedServerOpen(state): - // Client is closed and now so is the server. - self = .idle + return self.withStateAvoidingCoWs { nextState in + // Client is closed and now so is the server. + nextState = .idle - // Avoid CoW-ing the buffers. - let responseBuffers = state.responseBuffer - state.responseBuffer = nil + // Avoid CoW-ing the buffers. + let responseBuffers = state.responseBuffer + state.responseBuffer = nil - return self.processTrailers( - responseBuffers: responseBuffers, - trailers: trailers, - promise: promise, - allocator: allocator, - closeChannel: state.closeConnection - ) + return Self.processTrailers( + responseBuffers: responseBuffers, + trailers: trailers, + promise: promise, + allocator: allocator, + closeChannel: state.closeConnection + ) + } case .clientOpenServerClosed: preconditionFailure("Already seen end of response stream") @@ -410,7 +431,7 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { } } - private func processTrailers( + private static func processTrailers( responseBuffers: CircularBuffer?, trailers: HPACKHeaders, promise: EventLoopPromise?, @@ -447,40 +468,44 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case let .fullyOpen(inbound, outbound): - // We still haven't seen the end of the request stream. - self = .clientOpenServerClosed(inbound) + return self.withStateAvoidingCoWs { state in + // We still haven't seen the end of the request stream. + state = .clientOpenServerClosed(inbound) - let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( - hpackHeaders: trailers, - closeConnection: outbound.closeConnection - ) + let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( + hpackHeaders: trailers, + closeConnection: outbound.closeConnection + ) - return .write( - .init( - part: .head(head), - additionalPart: .end(nil), - promise: promise, - closeChannel: outbound.closeConnection + return .write( + .init( + part: .head(head), + additionalPart: .end(nil), + promise: promise, + closeChannel: outbound.closeConnection + ) ) - ) + } case let .clientClosedServerOpen(outbound): - // We're done, back to idle. - self = .idle + return self.withStateAvoidingCoWs { state in + // We're done, back to idle. + state = .idle - let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( - hpackHeaders: trailers, - closeConnection: outbound.closeConnection - ) + let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( + hpackHeaders: trailers, + closeConnection: outbound.closeConnection + ) - return .write( - .init( - part: .head(head), - additionalPart: .end(nil), - promise: promise, - closeChannel: outbound.closeConnection + return .write( + .init( + part: .head(head), + additionalPart: .end(nil), + promise: promise, + closeChannel: outbound.closeConnection + ) ) - ) + } case .clientOpenServerClosed: preconditionFailure("Already seen end of response stream") @@ -499,24 +524,28 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case .fullyOpen(let inbound, var outbound): - outbound.responseHeadersSent = true - self = .fullyOpen(inbound, outbound) + return self.withStateAvoidingCoWs { state in + outbound.responseHeadersSent = true + state = .fullyOpen(inbound, outbound) - let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( - hpackHeaders: headers, - closeConnection: outbound.closeConnection - ) - return .write(.init(part: .head(head), promise: promise, closeChannel: false)) + let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( + hpackHeaders: headers, + closeConnection: outbound.closeConnection + ) + return .write(.init(part: .head(head), promise: promise, closeChannel: false)) + } case var .clientClosedServerOpen(outbound): - outbound.responseHeadersSent = true - self = .clientClosedServerOpen(outbound) + return self.withStateAvoidingCoWs { state in + outbound.responseHeadersSent = true + state = .clientClosedServerOpen(outbound) - let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( - hpackHeaders: headers, - closeConnection: outbound.closeConnection - ) - return .write(.init(part: .head(head), promise: promise, closeChannel: false)) + let head = GRPCWebToHTTP2ServerCodec.makeResponseHead( + hpackHeaders: headers, + closeConnection: outbound.closeConnection + ) + return .write(.init(part: .head(head), promise: promise, closeChannel: false)) + } case .clientOpenServerClosed: preconditionFailure("Already seen end of response stream") @@ -558,7 +587,7 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { } } - private func processResponseData( + private static func processResponseData( _ payload: HTTP2Frame.FramePayload.Data, promise: EventLoopPromise?, state: inout GRPCWebToHTTP2ServerCodec.StateMachine.OutboundState @@ -590,14 +619,18 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State { preconditionFailure("Invalid state: haven't received request head") case .fullyOpen(let inbound, var outbound): - let action = self.processResponseData(payload, promise: promise, state: &outbound) - self = .fullyOpen(inbound, outbound) - return action + return self.withStateAvoidingCoWs { state in + let action = Self.processResponseData(payload, promise: promise, state: &outbound) + state = .fullyOpen(inbound, outbound) + return action + } case var .clientClosedServerOpen(outbound): - let action = self.processResponseData(payload, promise: promise, state: &outbound) - self = .clientClosedServerOpen(outbound) - return action + return self.withStateAvoidingCoWs { state in + let action = Self.processResponseData(payload, promise: promise, state: &outbound) + state = .clientClosedServerOpen(outbound) + return action + } case .clientOpenServerClosed: return .completePromise(promise, .failure(GRPCError.AlreadyComplete()))