Skip to content

Commit

Permalink
Allow payload-based stream channels to be created.
Browse files Browse the repository at this point in the history
Motivation:

To lift the requirement that stream creation order must match the order
of the first write on those channels we added a payload-based stream
channel. We need a way for users to create these channels and initialze
these channels when they are inbound.

Modifications:

- Adds a `createStreamChannel` using a stream initialzer without a
  stream ID
- Adds a new multiplexer `init` where the inbound stream initialzer does
  not use stream IDs
- Adds an additional 'configure' to MultiplexerAbstractChannel
- Replaces the `init` in `MultiplexerAbstractChannel` with two `static func`s:
  we can't switch on an optional stream ID since inbound streams always
  have a stream ID; we need to be be able to explicitly support choosing
  the type of channel we want.
- Minor refactoring in `HTTP2StreamChannel` to avoid duplication between the
  `configure` functions

Result:

- The multiplexer can create payload based streams manually and when a
  new stream is inbound.
  • Loading branch information
glbrntt committed Jul 29, 2020
1 parent cde820e commit 30e4d8d
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 44 deletions.
70 changes: 51 additions & 19 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Expand Up @@ -163,34 +163,66 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
// 2. Calling the initializer, if provided.
// 3. Activating when complete.
// 4. Catching errors if they occur.
let f = self.parent!.getOption(ChannelOptions.autoRead).flatMap { autoRead -> EventLoopFuture<Void> in
self.getAutoReadFromParent().flatMap { autoRead in
self.autoRead = autoRead
// This initializer callback can only be invoked if we already have a stream ID.
// So we force-unwrap here.
return initializer?(self, self.streamID!) ?? self.eventLoop.makeSucceededFuture(())
}.map {
// This force unwrap is safe as parent is assigned in the initializer, and never unassigned.
// If parent is not active, we expect to receive a channelActive later.
if self.parent!.isActive {
self.performActivation()
}
self.postInitializerActivate(promise: promise)
}.whenFailure { error in
self.configurationFailed(withError: error, promise: promise)
}
}

// We aren't using cascade here to avoid the allocations it causes.
promise?.succeed(self)
internal func configure(initializer: ((Channel) -> EventLoopFuture<Void>)?, userPromise promise: EventLoopPromise<Channel>?){
// We need to configure this channel. This involves doing four things:
// 1. Setting our autoRead state from the parent
// 2. Calling the initializer, if provided.
// 3. Activating when complete.
// 4. Catching errors if they occur.
self.getAutoReadFromParent().flatMap { autoRead in
self.autoRead = autoRead
return initializer?(self) ?? self.eventLoop.makeSucceededFuture(())
}.map {
self.postInitializerActivate(promise: promise)
}.whenFailure { error in
self.configurationFailed(withError: error, promise: promise)
}
}

f.whenFailure { (error: Error) in
switch self.state {
case .idle, .localActive, .closed:
// The stream isn't open on the network, nothing to close.
self.errorEncountered(error: error)
case .remoteActive, .active, .closing, .closingNeverActivated:
// In all of these states the stream is still on the network and we may need to take action.
self.closedWhileOpen()
}
/// Gets the 'autoRead' option from the parent channel.
private func getAutoReadFromParent() -> EventLoopFuture<Bool> {
// This force unwrap is safe as parent is assigned in the initializer, and never unassigned.
// Note we also don't set the value here: the additional `map` causes an extra allocation
// when using a Swift 5.0 compiler.
return self.parent!.getOption(ChannelOptions.autoRead)
}

promise?.fail(error)
/// Activates the channel if the parent channel is active and succeeds the given `promise`.
private func postInitializerActivate(promise: EventLoopPromise<Channel>?) {
// This force unwrap is safe as parent is assigned in the initializer, and never unassigned.
// If parent is not active, we expect to receive a channelActive later.
if self.parent!.isActive {
self.performActivation()
}

// We aren't using cascade here to avoid the allocations it causes.
promise?.succeed(self)
}

/// Handle any error that occurred during configuration.
private func configurationFailed(withError error: Error, promise: EventLoopPromise<Channel>?) {
switch self.state {
case .idle, .localActive, .closed:
// The stream isn't open on the network, nothing to close.
self.errorEncountered(error: error)
case .remoteActive, .active, .closing, .closingNeverActivated:
// In all of these states the stream is still on the network and we may need to take action.
self.closedWhileOpen()
}

promise?.fail(error)
}

/// Activates this channel.
Expand Down Expand Up @@ -621,7 +653,7 @@ private extension HTTP2StreamChannel {

// Get a streamID from the multiplexer if we haven't got one already.
if self.streamID == nil {
self.streamID = self.multiplexer.requestStreamID()
self.streamID = self.multiplexer.requestStreamID(forChannel: self)
}

while self.pendingWrites.hasMark {
Expand Down
130 changes: 114 additions & 16 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Expand Up @@ -27,8 +27,11 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
public typealias OutboundIn = HTTP2Frame
public typealias OutboundOut = HTTP2Frame

// Streams which have a stream ID.
private var streams: [HTTP2StreamID: MultiplexerAbstractChannel] = [:]
private let inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)?
// Streams which don't yet have a stream ID assigned to them.
private var pendingStreams: [ObjectIdentifier: MultiplexerAbstractChannel] = [:]
private let inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer
private let channel: Channel
private var context: ChannelHandlerContext!
private var nextOutboundStreamID: HTTP2StreamID
Expand Down Expand Up @@ -82,10 +85,12 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
streamID: streamID,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: self.inboundStreamStateInitializer
)

self.streams[streamID] = channel
channel.configure(initializer: self.inboundStreamStateInitializer, userPromise: nil)
channel.configureInboundStream(initializer: self.inboundStreamStateInitializer)
channel.receiveInboundFrame(frame)

if !channel.inList {
Expand Down Expand Up @@ -216,7 +221,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
targetWindowSize: targetWindowSize,
outboundBufferSizeHighWatermark: 8192,
outboundBufferSizeLowWatermark: 4096,
inboundStreamStateInitializer: inboundStreamStateInitializer)
inboundStreamStateInitializer: .includesStreamID(inboundStreamStateInitializer))
}

/// Create a new `HTTP2StreamMultiplexer`.
Expand All @@ -227,19 +232,64 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
/// - targetWindowSize: The target inbound connection and stream window size. Defaults to 65535 bytes.
/// - outboundBufferSizeHighWatermark: The high watermark for the number of bytes of writes that are
/// allowed to be un-sent on any child stream. This is broadly analogous to a regular socket send buffer.
/// Defaults to 8196 bytes.
/// - outboundBufferSizeLowWatermark: The low watermark for the number of bytes of writes that are
/// allowed to be un-sent on any child stream. This is broadly analogous to a regular socket send buffer.
/// Defaults to 4092 bytes.
/// - inboundStreamStateInitializer: A block that will be invoked to configure each new child stream
/// channel that is created by the remote peer. For servers, these are channels created by
/// receiving a `HEADERS` frame from a client. For clients, these are channels created by
/// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use
/// `createStreamChannel`.
public init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
targetWindowSize: Int = 65535,
outboundBufferSizeHighWatermark: Int,
outboundBufferSizeLowWatermark: Int,
inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)? = nil) {
public convenience init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
targetWindowSize: Int = 65535,
outboundBufferSizeHighWatermark: Int = 8196,
outboundBufferSizeLowWatermark: Int = 4092,
inboundStreamStateInitializer: ((Channel) -> EventLoopFuture<Void>)? = nil) {
self.init(mode: mode,
channel: channel,
targetWindowSize: targetWindowSize,
outboundBufferSizeHighWatermark: outboundBufferSizeHighWatermark,
outboundBufferSizeLowWatermark: outboundBufferSizeLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(inboundStreamStateInitializer))
}

/// Create a new `HTTP2StreamMultiplexer`.
///
/// - parameters:
/// - mode: The mode of the HTTP/2 connection being used: server or client.
/// - channel: The Channel to which this `HTTP2StreamMultiplexer` belongs.
/// - targetWindowSize: The target inbound connection and stream window size. Defaults to 65535 bytes.
/// - outboundBufferSizeHighWatermark: The high watermark for the number of bytes of writes that are
/// allowed to be un-sent on any child stream. This is broadly analogous to a regular socket send buffer.
/// - outboundBufferSizeLowWatermark: The low watermark for the number of bytes of writes that are
/// allowed to be un-sent on any child stream. This is broadly analogous to a regular socket send buffer.
/// - inboundStreamStateInitializer: A block that will be invoked to configure each new child stream
/// channel that is created by the remote peer. For servers, these are channels created by
/// receiving a `HEADERS` frame from a client. For clients, these are channels created by
/// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use
/// `createStreamChannel`.
public convenience init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
targetWindowSize: Int = 65535,
outboundBufferSizeHighWatermark: Int,
outboundBufferSizeLowWatermark: Int,
inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)? = nil) {
self.init(mode: mode,
channel: channel,
targetWindowSize: targetWindowSize,
outboundBufferSizeHighWatermark: outboundBufferSizeHighWatermark,
outboundBufferSizeLowWatermark: outboundBufferSizeLowWatermark,
inboundStreamStateInitializer: .includesStreamID(inboundStreamStateInitializer))
}

private init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
targetWindowSize: Int = 65535,
outboundBufferSizeHighWatermark: Int,
outboundBufferSizeLowWatermark: Int,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer) {
self.inboundStreamStateInitializer = inboundStreamStateInitializer
self.channel = channel
self.targetWindowSize = max(0, min(targetWindowSize, Int(Int32.max)))
Expand Down Expand Up @@ -282,8 +332,37 @@ extension HTTP2StreamMultiplexer {
}



extension HTTP2StreamMultiplexer {
/// Create a new `Channel` for a new stream initiated by this peer.
///
/// This method is intended for situations where the NIO application is initiating the stream. For clients,
/// this is for all request streams. For servers, this is for pushed streams.
///
/// - note:
/// Resources for the stream will be freed after it has been closed.
///
/// - parameters:
/// - promise: An `EventLoopPromise` that will be succeeded with the new activated channel, or
/// failed if an error occurs.
/// - streamStateInitializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: self,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}
}

/// Create a new `Channel` for a new stream initiated by this peer.
///
/// This method is intended for situations where the NIO application is initiating the stream. For clients,
Expand All @@ -299,20 +378,27 @@ extension HTTP2StreamMultiplexer {
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let streamID = self.requestStreamID()
let streamID = self.nextStreamID()
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: self,
streamID: streamID,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .includesStreamID(nil)
)
self.streams[streamID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}
}

private func nextStreamID() -> HTTP2StreamID {
let streamID = self.nextOutboundStreamID
self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2)
return streamID
}
}


Expand All @@ -334,9 +420,21 @@ extension HTTP2StreamMultiplexer {
self.flush(context: context)
}

internal func requestStreamID() -> HTTP2StreamID {
let streamID = self.nextOutboundStreamID
self.nextOutboundStreamID = HTTP2StreamID(Int32(streamID) + 2)
/// Requests a `HTTP2StreamID` for the given `Channel`.
///
/// - Precondition: The `channel` must not already have a `streamID`.
internal func requestStreamID(forChannel channel: Channel) -> HTTP2StreamID {
let channelID = ObjectIdentifier(channel)

// This unwrap shouldn't fail: the multiplexer owns the stream and the stream only requests
// a streamID once.
guard let abstractChannel = self.pendingStreams.removeValue(forKey: channelID) else {
preconditionFailure("No pending streams have channelID \(channelID)")
}
assert(abstractChannel.channelID == channelID)

let streamID = self.nextStreamID()
self.streams[streamID] = abstractChannel
return streamID
}
}

0 comments on commit 30e4d8d

Please sign in to comment.