Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x.x http2 idle state (version 2) #371

Open
wants to merge 2 commits into
base: 2.x.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 60 additions & 14 deletions Sources/HummingbirdHTTP2/HTTP2Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@
//
//===----------------------------------------------------------------------===//

import AsyncAlgorithms
import HTTPTypes
import HummingbirdCore
import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTP2
import NIOHTTPTypes
import NIOHTTPTypesHTTP1
import NIOHTTPTypesHTTP2
import NIOPosix
import NIOSSL
import ServiceLifecycle

/// Child channel for processing HTTP1 with the option of upgrading to HTTP2
public struct HTTP2UpgradeChannel: HTTPChannelHandler {
public typealias Value = EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Channel.Value, (NIOAsyncChannel<HTTP2Frame, HTTP2Frame>, NIOHTTP2Handler.AsyncStreamMultiplexer<HTTP1Channel.Value>)>>

private let sslContext: NIOSSLContext
private let http1: HTTP1Channel
private let idleTimeout: Duration
private let additionalChannelHandlers: @Sendable () -> [any RemovableChannelHandler]
public var responder: @Sendable (HBRequest, Channel) async throws -> HBResponse { http1.responder }

Expand All @@ -39,12 +43,14 @@
/// - responder: Function returning a HTTP response for a HTTP request
public init(
tlsConfiguration: TLSConfiguration,
idleTimeout: Duration = .seconds(30),
additionalChannelHandlers: @escaping @Sendable () -> [any RemovableChannelHandler] = { [] },
responder: @escaping @Sendable (HBRequest, Channel) async throws -> HBResponse = { _, _ in throw HBHTTPError(.notImplemented) }
) throws {
var tlsConfiguration = tlsConfiguration
tlsConfiguration.applicationProtocols = NIOHTTP2SupportedALPNProtocols
self.sslContext = try NIOSSLContext(configuration: tlsConfiguration)
self.idleTimeout = idleTimeout
self.additionalChannelHandlers = additionalChannelHandlers
self.http1 = HTTP1Channel(responder: responder, additionalChannelHandlers: additionalChannelHandlers)
}
Expand Down Expand Up @@ -82,15 +88,11 @@
self.additionalChannelHandlers() + [
HBHTTPUserEventHandler(logger: logger),
]

return http2ChildChannel
.pipeline
.addHandler(HTTP2FramePayloadToHTTPServerCodec())
.flatMap {
http2ChildChannel.pipeline.addHandlers(childChannelHandlers)
}.flatMapThrowing {
try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
return http2ChildChannel.eventLoop.makeCompletedFuture {
try http2ChildChannel.pipeline.syncOperations.addHandler(HTTP2FramePayloadToHTTPServerCodec())
try http2ChildChannel.pipeline.syncOperations.addHandlers(childChannelHandlers)
return try HTTP1Channel.Value(wrappingChannelSynchronously: http2ChildChannel)
}
}
}

Expand All @@ -99,25 +101,69 @@
/// - value: Object to process input/output on child channel
/// - logger: Logger to use while processing messages
public func handle(value: Value, logger: Logger) async {
struct HTTP2StreamState {
var numberOfStreams: Int = 0
var lastClose: ContinuousClock.Instant = .now
}
do {
let channel = try await value.get()
switch channel {
case .http1_1(let http1):
await handleHTTP(asyncChannel: http1, logger: logger)
case .http2((let http2, let multiplexer)):
enum MergeResult {
case gracefulShutdown
case timer
case stream(HTTP1Channel.Value)
}
// sequence to trigger graceful shutdown
let (gracefulShutdownSequence, gracefulShutdownSource) = AsyncStream<Void>.makeStream()
// timer sequence
let timerSequence = AsyncTimerSequence(interval: self.idleTimeout, clock: .continuous)
// merge multiplexer with graceful shutdown and timer
let mergedSequence = merge(
multiplexer.inbound.map { MergeResult.stream($0) },
timerSequence.map { _ in .timer },
gracefulShutdownSequence.map { .gracefulShutdown }
)
do {
try await withThrowingDiscardingTaskGroup { group in
for try await client in multiplexer.inbound.cancelOnGracefulShutdown() {
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
// stream state.
let streamState = NIOLockedValueBox(HTTP2StreamState())
loop:
for try await element in mergedSequence
{
switch element {
case .stream(let client):
streamState.withLockedValue {
$0.numberOfStreams += 1
}
group.addTask {
await handleHTTP(asyncChannel: client, logger: logger)
streamState.withLockedValue {
$0.numberOfStreams -= 1
$0.lastClose = .now
}
}
case .timer:
let state = streamState.withLockedValue { $0 }
if state.numberOfStreams == 0, state.lastClose + self.idleTimeout < .now {
break loop

Check warning on line 152 in Sources/HummingbirdHTTP2/HTTP2Channel.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdHTTP2/HTTP2Channel.swift#L152

Added line #L152 was not covered by tests
}
case .gracefulShutdown:
break loop
}
}
}
} onGracefulShutdown: {
gracefulShutdownSource.yield()
}
} catch {
logger.error("Error handling inbound connection for HTTP2 handler: \(error)")
}
// have to run this to ensure http2 channel outbound writer is closed
try await http2.executeThenClose { _,_ in}
try await http2.executeThenClose { _, _ in }
}
} catch {
logger.error("Error getting HTTP2 upgrade negotiated value: \(error)")
Expand Down
2 changes: 2 additions & 0 deletions Sources/HummingbirdHTTP2/HTTP2ChannelBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ extension HBHTTPChannelBuilder {
/// - Returns: HTTPChannelHandler builder
public static func http2Upgrade(
tlsConfiguration: TLSConfiguration,
idleTimeout: Duration = .seconds(30),
additionalChannelHandlers: @autoclosure @escaping @Sendable () -> [any RemovableChannelHandler] = []
) throws -> HBHTTPChannelBuilder<HTTP2UpgradeChannel> {
return .init { responder in
return try HTTP2UpgradeChannel(
tlsConfiguration: tlsConfiguration,
idleTimeout: idleTimeout,
additionalChannelHandlers: additionalChannelHandlers,
responder: responder
)
Expand Down
29 changes: 29 additions & 0 deletions Tests/HummingbirdCoreTests/HTTP2Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,33 @@ class HummingBirdHTTP2Tests: XCTestCase {
XCTAssertEqual(response.status, .ok)
}
}

// test timeout doesn't kill long running task
func testTimeout() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
try await testServer(
responder: { _, _ in
try await Task.sleep(for: .seconds(2))
return .init(status: .ok, body: .init(byteBuffer: .init(string: "Hello")))
},
httpChannelSetup: .http2Upgrade(tlsConfiguration: getServerTLSConfiguration(), idleTimeout: .seconds(0.5)),
configuration: .init(address: .hostname(port: 0), serverName: testServerName),
eventLoopGroup: eventLoopGroup,
logger: Logger(label: "HB")
) { _, port in
var tlsConfiguration = try getClientTLSConfiguration()
// no way to override the SSL server name with AsyncHTTPClient so need to set
// hostname verification off
tlsConfiguration.certificateVerification = .noHostnameVerification
let httpClient = HTTPClient(
eventLoopGroupProvider: .shared(eventLoopGroup),
configuration: .init(tlsConfiguration: tlsConfiguration)
)
defer { try? httpClient.syncShutdown() }

let response = try await httpClient.get(url: "https://localhost:\(port)/").get()
XCTAssertEqual(response.status, .ok)
}
}
}