From 2458fc49731c54c93e7724a819e2362321b0a7e1 Mon Sep 17 00:00:00 2001 From: Quirin Schweigert Date: Sat, 17 Jul 2021 21:17:41 +0200 Subject: [PATCH 1/3] Add setting for max number of concurrent streams for HTTP/2 connection to circumvent the limitation of a maximum of 100 concurrent calls as imposed by the default settings of NIOHTTP2Handler --- Sources/GRPC/ClientConnection.swift | 15 +++++++++++++-- .../GRPC/ConnectionManagerChannelProvider.swift | 7 ++++++- Sources/GRPC/GRPCServerPipelineConfigurator.swift | 15 ++++++++++++++- Sources/GRPC/Server.swift | 5 +++++ Sources/GRPC/ServerBuilder.swift | 8 ++++++++ 5 files changed, 46 insertions(+), 4 deletions(-) diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index 7527b6ea1..07c6fe54c 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -16,6 +16,7 @@ import Foundation import Logging import NIO +import NIOHPACK import NIOHTTP2 import NIOSSL import NIOTLS @@ -343,6 +344,8 @@ extension ClientConnection { /// The HTTP/2 flow control target window size. Defaults to 65535. public var httpTargetWindowSize = 65535 + public var httpMaxConcurrentStreams: Int = 100 + /// The HTTP protocol used for this connection. public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol { return self.tlsConfiguration == nil ? .http : .https @@ -405,6 +408,7 @@ extension ClientConnection { connectionIdleTimeout: TimeAmount = .minutes(30), callStartBehavior: CallStartBehavior = .waitsForConnectivity, httpTargetWindowSize: Int = 65535, + httpMaxConcurrentStreams: Int = 100, backgroundActivityLogger: Logger = Logger( label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() } @@ -424,6 +428,7 @@ extension ClientConnection { self.httpTargetWindowSize = httpTargetWindowSize self.backgroundActivityLogger = backgroundActivityLogger self.debugChannelInitializer = debugChannelInitializer + self.httpMaxConcurrentStreams = httpMaxConcurrentStreams } private init(eventLoopGroup: EventLoopGroup, target: ConnectionTarget) { @@ -501,11 +506,17 @@ extension ChannelPipeline.SynchronousOperations { connectionIdleTimeout: TimeAmount, httpTargetWindowSize: Int, errorDelegate: ClientErrorDelegate?, - logger: Logger + logger: Logger, + maxConcurrentStreams: Int ) throws { + let initialSettings: HTTP2Settings = [ + HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams), + HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize), + ] + // We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the // two HTTP/2 handlers so we'll do it manually instead. - try self.addHandler(NIOHTTP2Handler(mode: .client)) + try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings)) let h2Multiplexer = HTTP2StreamMultiplexer( mode: .client, diff --git a/Sources/GRPC/ConnectionManagerChannelProvider.swift b/Sources/GRPC/ConnectionManagerChannelProvider.swift index a629843b7..f440d141c 100644 --- a/Sources/GRPC/ConnectionManagerChannelProvider.swift +++ b/Sources/GRPC/ConnectionManagerChannelProvider.swift @@ -49,6 +49,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { internal var tlsConfiguration: GRPCTLSConfiguration? internal var httpTargetWindowSize: Int + internal var httpMaxConcurrentStreams: Int internal var errorDelegate: Optional internal var debugChannelInitializer: Optional<(Channel) -> EventLoopFuture> @@ -60,6 +61,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, + httpMaxConcurrentStreams: Int, errorDelegate: ClientErrorDelegate?, debugChannelInitializer: ((Channel) -> EventLoopFuture)? ) { @@ -71,6 +73,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { self.tlsConfiguration = tlsConfiguration self.httpTargetWindowSize = httpTargetWindowSize + self.httpMaxConcurrentStreams = httpMaxConcurrentStreams self.errorDelegate = errorDelegate self.debugChannelInitializer = debugChannelInitializer @@ -101,6 +104,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { tlsMode: tlsMode, tlsConfiguration: configuration.tlsConfiguration, httpTargetWindowSize: configuration.httpTargetWindowSize, + httpMaxConcurrentStreams: configuration.httpMaxConcurrentStreams, errorDelegate: configuration.errorDelegate, debugChannelInitializer: configuration.debugChannelInitializer ) @@ -170,7 +174,8 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionIdleTimeout: self.connectionIdleTimeout, httpTargetWindowSize: self.httpTargetWindowSize, errorDelegate: self.errorDelegate, - logger: logger + logger: logger, + maxConcurrentStreams: self.httpMaxConcurrentStreams ) } catch { return channel.eventLoop.makeFailedFuture(error) diff --git a/Sources/GRPC/GRPCServerPipelineConfigurator.swift b/Sources/GRPC/GRPCServerPipelineConfigurator.swift index 451f11ac2..88e129301 100644 --- a/Sources/GRPC/GRPCServerPipelineConfigurator.swift +++ b/Sources/GRPC/GRPCServerPipelineConfigurator.swift @@ -15,6 +15,7 @@ */ import Logging import NIO +import NIOHPACK import NIOHTTP1 import NIOHTTP2 import NIOTLS @@ -78,7 +79,19 @@ final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChan /// Makes an HTTP/2 handler. private func makeHTTP2Handler() -> NIOHTTP2Handler { - return .init(mode: .server) + return .init( + mode: .server, + initialSettings: [ + HTTP2Setting( + parameter: .maxConcurrentStreams, + value: self.configuration.httpMaxConcurrentStreams + ), + HTTP2Setting( + parameter: .maxHeaderListSize, + value: HPACKDecoder.defaultMaxHeaderListSize + ), + ] + ) } /// Makes an HTTP/2 multiplexer suitable handling gRPC requests. diff --git a/Sources/GRPC/Server.swift b/Sources/GRPC/Server.swift index 32023a426..610ddac20 100644 --- a/Sources/GRPC/Server.swift +++ b/Sources/GRPC/Server.swift @@ -324,6 +324,9 @@ extension Server { /// The HTTP/2 flow control target window size. Defaults to 65535. public var httpTargetWindowSize: Int = 65535 + /// The HTTP/2 max number of concurrent streams. Defaults to 100. + public var httpMaxConcurrentStreams: Int = 100 + /// The root server logger. Accepted connections will branch from this logger and RPCs on /// each connection will use a logger branched from the connections logger. This logger is made /// available to service providers via `context`. Defaults to a no-op logger. @@ -371,6 +374,7 @@ extension Server { connectionIdleTimeout: TimeAmount = .nanoseconds(.max), messageEncoding: ServerMessageEncoding = .disabled, httpTargetWindowSize: Int = 65535, + httpMaxConcurrentStreams: Int = 100, logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), debugChannelInitializer: ((Channel) -> EventLoopFuture)? = nil ) { @@ -387,6 +391,7 @@ extension Server { self.connectionIdleTimeout = connectionIdleTimeout self.messageEncoding = messageEncoding self.httpTargetWindowSize = httpTargetWindowSize + self.httpMaxConcurrentStreams = httpMaxConcurrentStreams self.logger = logger self.debugChannelInitializer = debugChannelInitializer } diff --git a/Sources/GRPC/ServerBuilder.swift b/Sources/GRPC/ServerBuilder.swift index 15dd29793..c4e198854 100644 --- a/Sources/GRPC/ServerBuilder.swift +++ b/Sources/GRPC/ServerBuilder.swift @@ -85,6 +85,14 @@ extension Server.Builder { } } +extension Server.Builder { + @discardableResult + public func with(httpMaxConcurrentStreams: Int) -> Self { + self.configuration.httpMaxConcurrentStreams = httpMaxConcurrentStreams + return self + } +} + extension Server.Builder { /// The amount of time to wait before closing connections. The idle timeout will start only /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. Unless a From 37ff9882264c0d048d3451c0206407e52e1895e0 Mon Sep 17 00:00:00 2001 From: Quirin Schweigert Date: Mon, 19 Jul 2021 23:11:34 +0200 Subject: [PATCH 2/3] Remove client-side setting for maxConcurrentStreams; Add test case for maxConcurrentStreams; Refactor --- Sources/GRPC/ClientConnection.swift | 15 +--- .../ConnectionManagerChannelProvider.swift | 7 +- Sources/GRPC/Server.swift | 10 ++- Sources/GRPC/ServerBuilder.swift | 16 ++-- .../HTTP2MaxConcurrentStreamsTests.swift | 85 +++++++++++++++++++ 5 files changed, 102 insertions(+), 31 deletions(-) create mode 100644 Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift diff --git a/Sources/GRPC/ClientConnection.swift b/Sources/GRPC/ClientConnection.swift index 07c6fe54c..7527b6ea1 100644 --- a/Sources/GRPC/ClientConnection.swift +++ b/Sources/GRPC/ClientConnection.swift @@ -16,7 +16,6 @@ import Foundation import Logging import NIO -import NIOHPACK import NIOHTTP2 import NIOSSL import NIOTLS @@ -344,8 +343,6 @@ extension ClientConnection { /// The HTTP/2 flow control target window size. Defaults to 65535. public var httpTargetWindowSize = 65535 - public var httpMaxConcurrentStreams: Int = 100 - /// The HTTP protocol used for this connection. public var httpProtocol: HTTP2FramePayloadToHTTP1ClientCodec.HTTPProtocol { return self.tlsConfiguration == nil ? .http : .https @@ -408,7 +405,6 @@ extension ClientConnection { connectionIdleTimeout: TimeAmount = .minutes(30), callStartBehavior: CallStartBehavior = .waitsForConnectivity, httpTargetWindowSize: Int = 65535, - httpMaxConcurrentStreams: Int = 100, backgroundActivityLogger: Logger = Logger( label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() } @@ -428,7 +424,6 @@ extension ClientConnection { self.httpTargetWindowSize = httpTargetWindowSize self.backgroundActivityLogger = backgroundActivityLogger self.debugChannelInitializer = debugChannelInitializer - self.httpMaxConcurrentStreams = httpMaxConcurrentStreams } private init(eventLoopGroup: EventLoopGroup, target: ConnectionTarget) { @@ -506,17 +501,11 @@ extension ChannelPipeline.SynchronousOperations { connectionIdleTimeout: TimeAmount, httpTargetWindowSize: Int, errorDelegate: ClientErrorDelegate?, - logger: Logger, - maxConcurrentStreams: Int + logger: Logger ) throws { - let initialSettings: HTTP2Settings = [ - HTTP2Setting(parameter: .maxConcurrentStreams, value: maxConcurrentStreams), - HTTP2Setting(parameter: .maxHeaderListSize, value: HPACKDecoder.defaultMaxHeaderListSize), - ] - // We could use 'configureHTTP2Pipeline' here, but we need to add a few handlers between the // two HTTP/2 handlers so we'll do it manually instead. - try self.addHandler(NIOHTTP2Handler(mode: .client, initialSettings: initialSettings)) + try self.addHandler(NIOHTTP2Handler(mode: .client)) let h2Multiplexer = HTTP2StreamMultiplexer( mode: .client, diff --git a/Sources/GRPC/ConnectionManagerChannelProvider.swift b/Sources/GRPC/ConnectionManagerChannelProvider.swift index f440d141c..a629843b7 100644 --- a/Sources/GRPC/ConnectionManagerChannelProvider.swift +++ b/Sources/GRPC/ConnectionManagerChannelProvider.swift @@ -49,7 +49,6 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { internal var tlsConfiguration: GRPCTLSConfiguration? internal var httpTargetWindowSize: Int - internal var httpMaxConcurrentStreams: Int internal var errorDelegate: Optional internal var debugChannelInitializer: Optional<(Channel) -> EventLoopFuture> @@ -61,7 +60,6 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { tlsMode: TLSMode, tlsConfiguration: GRPCTLSConfiguration?, httpTargetWindowSize: Int, - httpMaxConcurrentStreams: Int, errorDelegate: ClientErrorDelegate?, debugChannelInitializer: ((Channel) -> EventLoopFuture)? ) { @@ -73,7 +71,6 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { self.tlsConfiguration = tlsConfiguration self.httpTargetWindowSize = httpTargetWindowSize - self.httpMaxConcurrentStreams = httpMaxConcurrentStreams self.errorDelegate = errorDelegate self.debugChannelInitializer = debugChannelInitializer @@ -104,7 +101,6 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { tlsMode: tlsMode, tlsConfiguration: configuration.tlsConfiguration, httpTargetWindowSize: configuration.httpTargetWindowSize, - httpMaxConcurrentStreams: configuration.httpMaxConcurrentStreams, errorDelegate: configuration.errorDelegate, debugChannelInitializer: configuration.debugChannelInitializer ) @@ -174,8 +170,7 @@ internal struct DefaultChannelProvider: ConnectionManagerChannelProvider { connectionIdleTimeout: self.connectionIdleTimeout, httpTargetWindowSize: self.httpTargetWindowSize, errorDelegate: self.errorDelegate, - logger: logger, - maxConcurrentStreams: self.httpMaxConcurrentStreams + logger: logger ) } catch { return channel.eventLoop.makeFailedFuture(error) diff --git a/Sources/GRPC/Server.swift b/Sources/GRPC/Server.swift index 610ddac20..3abf37fa0 100644 --- a/Sources/GRPC/Server.swift +++ b/Sources/GRPC/Server.swift @@ -324,8 +324,12 @@ extension Server { /// The HTTP/2 flow control target window size. Defaults to 65535. public var httpTargetWindowSize: Int = 65535 - /// The HTTP/2 max number of concurrent streams. Defaults to 100. - public var httpMaxConcurrentStreams: Int = 100 + /// The HTTP/2 max number of concurrent streams. Defaults to 100. Must be non-negative. + public var httpMaxConcurrentStreams: Int = 100 { + willSet { + precondition(newValue >= 0, "httpMaxConcurrentStreams must be non-negative") + } + } /// The root server logger. Accepted connections will branch from this logger and RPCs on /// each connection will use a logger branched from the connections logger. This logger is made @@ -374,7 +378,6 @@ extension Server { connectionIdleTimeout: TimeAmount = .nanoseconds(.max), messageEncoding: ServerMessageEncoding = .disabled, httpTargetWindowSize: Int = 65535, - httpMaxConcurrentStreams: Int = 100, logger: Logger = Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), debugChannelInitializer: ((Channel) -> EventLoopFuture)? = nil ) { @@ -391,7 +394,6 @@ extension Server { self.connectionIdleTimeout = connectionIdleTimeout self.messageEncoding = messageEncoding self.httpTargetWindowSize = httpTargetWindowSize - self.httpMaxConcurrentStreams = httpMaxConcurrentStreams self.logger = logger self.debugChannelInitializer = debugChannelInitializer } diff --git a/Sources/GRPC/ServerBuilder.swift b/Sources/GRPC/ServerBuilder.swift index c4e198854..082da263b 100644 --- a/Sources/GRPC/ServerBuilder.swift +++ b/Sources/GRPC/ServerBuilder.swift @@ -85,14 +85,6 @@ extension Server.Builder { } } -extension Server.Builder { - @discardableResult - public func with(httpMaxConcurrentStreams: Int) -> Self { - self.configuration.httpMaxConcurrentStreams = httpMaxConcurrentStreams - return self - } -} - extension Server.Builder { /// The amount of time to wait before closing connections. The idle timeout will start only /// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. Unless a @@ -168,6 +160,14 @@ extension Server.Builder { } } +extension Server.Builder { + @discardableResult + public func withHTTPMaxConcurrentStreams(_ httpMaxConcurrentStreams: Int) -> Self { + self.configuration.httpMaxConcurrentStreams = httpMaxConcurrentStreams + return self + } +} + extension Server.Builder { /// Sets the root server logger. Accepted connections will branch from this logger and RPCs on /// each connection will use a logger branched from the connections logger. This logger is made diff --git a/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift new file mode 100644 index 000000000..56cb50098 --- /dev/null +++ b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift @@ -0,0 +1,85 @@ +/* + * Copyright 2021, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import EchoImplementation +import EchoModel +@testable import GRPC +import NIO +import NIOHTTP2 +import XCTest + +class HTTP2MaxConcurrentStreamsTests: GRPCTestCase { + struct Constants { + static let testTimeout: TimeInterval = 10 + + static let defaultMaxNumberOfConcurrentStreams = + nioDefaultSettings.first(where: { $0.parameter == .maxConcurrentStreams })!.value + + static let testNumberOfConcurrentStreams: Int = defaultMaxNumberOfConcurrentStreams + 20 + } + + func testHTTP2MaxConcurrentStreamsSetting() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + + let server = try! Server.insecure(group: eventLoopGroup) + .withLogger(self.serverLogger) + .withHTTPMaxConcurrentStreams(Constants.testNumberOfConcurrentStreams) + .withServiceProviders([EchoProvider()]) + .bind(host: "localhost", port: 0) + .wait() + + let clientConnection = ClientConnection.insecure(group: eventLoopGroup) + .withBackgroundActivityLogger(self.clientLogger) + .connect(host: "localhost", port: server.channel.localAddress!.port!) + + let echoClient = Echo_EchoClient( + channel: clientConnection, + defaultCallOptions: CallOptions(logger: self.clientLogger) + ) + + var clientStreamingCalls = + (0 ..< Constants.testNumberOfConcurrentStreams) + .map { _ in echoClient.collect() } + + let allMessagesSentExpectation = self.expectation(description: "all messages sent") + + let sendMessageFutures = clientStreamingCalls + .map { $0.sendMessage(.with { $0.text = "Hi!" }) } + + EventLoopFuture + .whenAllSucceed(sendMessageFutures, on: eventLoopGroup.next()) + .assertSuccess(fulfill: allMessagesSentExpectation) + + self.wait(for: [allMessagesSentExpectation], timeout: Constants.testTimeout) + + let lastCall = clientStreamingCalls.popLast()! + + let lastCallCompletedExpectation = self.expectation(description: "last call completed") + _ = lastCall.sendEnd() + + lastCall.status.assertSuccess(fulfill: lastCallCompletedExpectation) + + self.wait(for: [lastCallCompletedExpectation], timeout: Constants.testTimeout) + + let allCallsCompletedExpectation = self.expectation(description: "all calls completed") + let endFutures = clientStreamingCalls.map { $0.sendEnd() } + + EventLoopFuture + .whenAllSucceed(endFutures, on: eventLoopGroup.next()) + .assertSuccess(fulfill: allCallsCompletedExpectation) + + self.wait(for: [allCallsCompletedExpectation], timeout: Constants.testTimeout) + } +} From 3b07c047dae6f8562d45ef11b27e0d08baec58c4 Mon Sep 17 00:00:00 2001 From: Quirin Schweigert Date: Tue, 20 Jul 2021 14:23:06 +0200 Subject: [PATCH 3/3] Free ressources allocated for HTTP2MaxConcurrentStreamsTests --- Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift index 56cb50098..e109b3ca4 100644 --- a/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift +++ b/Tests/GRPCTests/HTTP2MaxConcurrentStreamsTests.swift @@ -32,6 +32,7 @@ class HTTP2MaxConcurrentStreamsTests: GRPCTestCase { func testHTTP2MaxConcurrentStreamsSetting() { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } let server = try! Server.insecure(group: eventLoopGroup) .withLogger(self.serverLogger) @@ -40,10 +41,14 @@ class HTTP2MaxConcurrentStreamsTests: GRPCTestCase { .bind(host: "localhost", port: 0) .wait() + defer { XCTAssertNoThrow(try server.initiateGracefulShutdown().wait()) } + let clientConnection = ClientConnection.insecure(group: eventLoopGroup) .withBackgroundActivityLogger(self.clientLogger) .connect(host: "localhost", port: server.channel.localAddress!.port!) + defer { XCTAssertNoThrow(try clientConnection.close().wait()) } + let echoClient = Echo_EchoClient( channel: clientConnection, defaultCallOptions: CallOptions(logger: self.clientLogger)