diff --git a/Sources/HummingbirdHTTP2/HTTP2Channel.swift b/Sources/HummingbirdHTTP2/HTTP2Channel.swift index 6628afda..2571018e 100644 --- a/Sources/HummingbirdHTTP2/HTTP2Channel.swift +++ b/Sources/HummingbirdHTTP2/HTTP2Channel.swift @@ -12,9 +12,11 @@ // //===----------------------------------------------------------------------===// +import AsyncAlgorithms import HTTPTypes import HummingbirdCore import Logging +import NIOConcurrencyHelpers import NIOCore import NIOHTTP2 import NIOHTTPTypes @@ -22,6 +24,7 @@ import NIOHTTPTypesHTTP1 import NIOHTTPTypesHTTP2 import NIOPosix import NIOSSL +import ServiceLifecycle /// Child channel for processing HTTP1 with the option of upgrading to HTTP2 public struct HTTP2UpgradeChannel: HTTPChannelHandler { @@ -29,6 +32,7 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { private let sslContext: NIOSSLContext private let http1: HTTP1Channel + private let idleTimeout: Duration private let additionalChannelHandlers: @Sendable () -> [any RemovableChannelHandler] public var responder: @Sendable (HBRequest, Channel) async throws -> HBResponse { http1.responder } @@ -39,12 +43,14 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { /// - responder: Function returning a HTTP response for a HTTP request public init( tlsConfiguration: TLSConfiguration, + idleTimeout: Duration = .seconds(30), additionalChannelHandlers: @escaping @Sendable () -> [any RemovableChannelHandler] = { [] }, responder: @escaping @Sendable (HBRequest, Channel) async throws -> HBResponse = { _, _ in throw HBHTTPError(.notImplemented) } ) throws { var tlsConfiguration = tlsConfiguration tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols self.sslContext = try NIOSSLContext(configuration: tlsConfiguration) + self.idleTimeout = idleTimeout self.additionalChannelHandlers = additionalChannelHandlers self.http1 = HTTP1Channel(responder: responder, additionalChannelHandlers: additionalChannelHandlers) } @@ -82,15 +88,11 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { self.additionalChannelHandlers() + [ HBHTTPUserEventHandler(logger: logger), ] - - return http2ChildChannel - .pipeline - .addHandler(HTTP2FramePayloadToHTTPServerCodec()) - .flatMap { - http2ChildChannel.pipeline.addHandlers(childChannelHandlers) - }.flatMapThrowing { - try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel) - } + return http2ChildChannel.eventLoop.makeCompletedFuture { + try http2ChildChannel.pipeline.syncOperations.addHandler(HTTP2FramePayloadToHTTPServerCodec()) + try http2ChildChannel.pipeline.syncOperations.addHandlers(childChannelHandlers) + return try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel) + } } } @@ -99,25 +101,69 @@ public struct HTTP2UpgradeChannel: HTTPChannelHandler { /// - value: Object to process input/output on child channel /// - logger: Logger to use while processing messages public func handle(value: Value, logger: Logger) async { + struct HTTP2StreamState { + var numberOfStreams: Int = 0 + var lastClose: ContinuousClock.Instant = .now + } do { let channel = try await value.get() switch channel { case .http1_1(let http1): await handleHTTP(asyncChannel: http1, logger: logger) case .http2((let http2, let multiplexer)): + enum MergeResult { + case gracefulShutdown + case timer + case stream(HTTP1Channel.Value) + } + // sequence to trigger graceful shutdown + let (gracefulShutdownSequence, gracefulShutdownSource) = AsyncStream.makeStream() + // timer sequence + let timerSequence = AsyncTimerSequence(interval: self.idleTimeout, clock: .continuous) + // merge multiplexer with graceful shutdown and timer + let mergedSequence = merge( + multiplexer.inbound.map { MergeResult.stream($0) }, + timerSequence.map { _ in .timer }, + gracefulShutdownSequence.map { .gracefulShutdown } + ) do { - try await withThrowingDiscardingTaskGroup { group in - for try await client in multiplexer.inbound.cancelOnGracefulShutdown() { - group.addTask { - await handleHTTP(asyncChannel: client, logger: logger) + try await withGracefulShutdownHandler { + try await withThrowingDiscardingTaskGroup { group in + // stream state. + let streamState = NIOLockedValueBox(HTTP2StreamState()) + loop: + for try await element in mergedSequence + { + switch element { + case .stream(let client): + streamState.withLockedValue { + $0.numberOfStreams += 1 + } + group.addTask { + await handleHTTP(asyncChannel: client, logger: logger) + streamState.withLockedValue { + $0.numberOfStreams -= 1 + $0.lastClose = .now + } + } + case .timer: + let state = streamState.withLockedValue { $0 } + if state.numberOfStreams == 0, state.lastClose + self.idleTimeout < .now { + break loop + } + case .gracefulShutdown: + break loop + } } } + } onGracefulShutdown: { + gracefulShutdownSource.yield() } } catch { logger.error("Error handling inbound connection for HTTP2 handler: \(error)") } // have to run this to ensure http2 channel outbound writer is closed - try await http2.executeThenClose { _,_ in} + try await http2.executeThenClose { _, _ in } } } catch { logger.error("Error getting HTTP2 upgrade negotiated value: \(error)") diff --git a/Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift b/Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift index f27448bb..85c83252 100644 --- a/Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift +++ b/Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift @@ -32,11 +32,13 @@ extension HBHTTPChannelBuilder { /// - Returns: HTTPChannelHandler builder public static func http2Upgrade( tlsConfiguration: TLSConfiguration, + idleTimeout: Duration = .seconds(30), additionalChannelHandlers: @autoclosure @escaping @Sendable () -> [any RemovableChannelHandler] = [] ) throws -> HBHTTPChannelBuilder { return .init { responder in return try HTTP2UpgradeChannel( tlsConfiguration: tlsConfiguration, + idleTimeout: idleTimeout, additionalChannelHandlers: additionalChannelHandlers, responder: responder ) diff --git a/Tests/HummingbirdCoreTests/HTTP2Tests.swift b/Tests/HummingbirdCoreTests/HTTP2Tests.swift index e58bd4b4..709c7046 100644 --- a/Tests/HummingbirdCoreTests/HTTP2Tests.swift +++ b/Tests/HummingbirdCoreTests/HTTP2Tests.swift @@ -51,4 +51,33 @@ class HummingBirdHTTP2Tests: XCTestCase { XCTAssertEqual(response.status, .ok) } } + + // test timeout doesn't kill long running task + func testTimeout() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + try await testServer( + responder: { _, _ in + try await Task.sleep(for: .seconds(2)) + return .init(status: .ok, body: .init(byteBuffer: .init(string: "Hello"))) + }, + httpChannelSetup: .http2Upgrade(tlsConfiguration: getServerTLSConfiguration(), idleTimeout: .seconds(0.5)), + configuration: .init(address: .hostname(port: 0), serverName: testServerName), + eventLoopGroup: eventLoopGroup, + logger: Logger(label: "HB") + ) { _, port in + var tlsConfiguration = try getClientTLSConfiguration() + // no way to override the SSL server name with AsyncHTTPClient so need to set + // hostname verification off + tlsConfiguration.certificateVerification = .noHostnameVerification + let httpClient = HTTPClient( + eventLoopGroupProvider: .shared(eventLoopGroup), + configuration: .init(tlsConfiguration: tlsConfiguration) + ) + defer { try? httpClient.syncShutdown() } + + let response = try await httpClient.get(url: "https://localhost:\(port)/").get() + XCTAssertEqual(response.status, .ok) + } + } }