diff --git a/Sources/GRPC/GRPCServerPipelineConfigurator.swift b/Sources/GRPC/GRPCServerPipelineConfigurator.swift index 451f11ac2..88e129301 100644 --- a/Sources/GRPC/GRPCServerPipelineConfigurator.swift +++ b/Sources/GRPC/GRPCServerPipelineConfigurator.swift @@ -15,6 +15,7 @@ */ import Logging import NIO +import NIOHPACK import NIOHTTP1 import NIOHTTP2 import NIOTLS @@ -78,7 +79,19 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan /// Makes an HTTP/2 handler. private func makeHTTP2Handler() -> NIOHTTP2Handler { - return .init(mode: .server) + return .init( + mode: .server, + initialSettings: [ + HTTP2Setting( + parameter: .maxConcurrentStreams, + value: self.configuration.httpMaxConcurrentStreams + ), + HTTP2Setting( + parameter: .maxHeaderListSize, + value: HPACKDecoder.defaultMaxHeaderListSize + ), + ] + ) } /// Makes an HTTP/2 multiplexer suitable handling gRPC requests. diff --git a/Sources/GRPC/Server.swift b/Sources/GRPC/Server.swift index 32023a426..3abf37fa0 100644 --- a/Sources/GRPC/Server.swift +++ b/Sources/GRPC/Server.swift @@ -324,6 +324,13 @@ extension Server { /// The HTTP/2 flow control target window size. Defaults to 65535. public var httpTargetWindowSize: Int = 65535 + /// The HTTP/2 max number of concurrent streams. Defaults to 100. Must be non-negative. + public var httpMaxConcurrentStreams: Int = 100 { + willSet { + precondition(newValue >= 0, "httpMaxConcurrentStreams must be non-negative") + } + } + /// The root server logger. Accepted connections will branch from this logger and RPCs on /// each connection will use a logger branched from the connections logger. This logger is made /// available to service providers via `context`. Defaults to a no-op logger. diff --git a/Sources/GRPC/ServerBuilder.swift b/Sources/GRPC/ServerBuilder.swift index 15dd29793..082da263b 100644 --- a/Sources/GRPC/ServerBuilder.swift +++ b/Sources/GRPC/ServerBuilder.swift @@ -160,6 +160,14 @@ extension Server.Builder { } } +extension Server.Builder { + @discardableResult + public func withHTTPMaxConcurrentStreams(_ httpMaxConcurrentStreams: Int) -> Self { + self.configuration.httpMaxConcurrentStreams = httpMaxConcurrentStreams + return self + } +} + extension Server.Builder { /// Sets the root server logger. Accepted connections will branch from this logger and RPCs on /// each connection will use a logger branched from the connections logger. This logger is made diff --git a/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift new file mode 100644 index 000000000..e109b3ca4 --- /dev/null +++ b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift @@ -0,0 +1,90 @@ +/* + * Copyright 2021, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import EchoImplementation +import EchoModel +@testable import GRPC +import NIO +import NIOHTTP2 +import XCTest + +class HTTP2MaxConcurrentStreamsTests: GRPCTestCase { + struct Constants { + static let testTimeout: TimeInterval = 10 + + static let defaultMaxNumberOfConcurrentStreams = + nioDefaultSettings.first(where: { $0.parameter == .maxConcurrentStreams })!.value + + static let testNumberOfConcurrentStreams: Int = defaultMaxNumberOfConcurrentStreams + 20 + } + + func testHTTP2MaxConcurrentStreamsSetting() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let server = try! Server.insecure(group: eventLoopGroup) + .withLogger(self.serverLogger) + .withHTTPMaxConcurrentStreams(Constants.testNumberOfConcurrentStreams) + .withServiceProviders([EchoProvider()]) + .bind(host: "localhost", port: 0) + .wait() + + defer { XCTAssertNoThrow(try server.initiateGracefulShutdown().wait()) } + + let clientConnection = ClientConnection.insecure(group: eventLoopGroup) + .withBackgroundActivityLogger(self.clientLogger) + .connect(host: "localhost", port: server.channel.localAddress!.port!) + + defer { XCTAssertNoThrow(try clientConnection.close().wait()) } + + let echoClient = Echo_EchoClient( + channel: clientConnection, + defaultCallOptions: CallOptions(logger: self.clientLogger) + ) + + var clientStreamingCalls = + (0 ..< Constants.testNumberOfConcurrentStreams) + .map { _ in echoClient.collect() } + + let allMessagesSentExpectation = self.expectation(description: "all messages sent") + + let sendMessageFutures = clientStreamingCalls + .map { $0.sendMessage(.with { $0.text = "Hi!" }) } + + EventLoopFuture + .whenAllSucceed(sendMessageFutures, on: eventLoopGroup.next()) + .assertSuccess(fulfill: allMessagesSentExpectation) + + self.wait(for: [allMessagesSentExpectation], timeout: Constants.testTimeout) + + let lastCall = clientStreamingCalls.popLast()! + + let lastCallCompletedExpectation = self.expectation(description: "last call completed") + _ = lastCall.sendEnd() + + lastCall.status.assertSuccess(fulfill: lastCallCompletedExpectation) + + self.wait(for: [lastCallCompletedExpectation], timeout: Constants.testTimeout) + + let allCallsCompletedExpectation = self.expectation(description: "all calls completed") + let endFutures = clientStreamingCalls.map { $0.sendEnd() } + + EventLoopFuture + .whenAllSucceed(endFutures, on: eventLoopGroup.next()) + .assertSuccess(fulfill: allCallsCompletedExpectation) + + self.wait(for: [allCallsCompletedExpectation], timeout: Constants.testTimeout) + } +}