Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make HTTP2StreamChannel generic over a message type #218

Merged
merged 2 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Sources/NIOHTTP2/HTTP2Frame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ public struct HTTP2Frame {
}

extension HTTP2Frame: HTTP2FrameConvertible, HTTP2FramePayloadConvertible {
init(http2Frame: HTTP2Frame) {
self = http2Frame
}

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
assert(self.streamID == streamID, "streamID does not match")
return self
Expand All @@ -264,6 +268,10 @@ extension HTTP2Frame.FramePayload: HTTP2FrameConvertible, HTTP2FramePayloadConve
return self
}

init(http2Frame: HTTP2Frame) {
self = http2Frame.payload
}

func makeHTTP2Frame(streamID: HTTP2StreamID) -> HTTP2Frame {
return HTTP2Frame(streamID: streamID, payload: self)
}
Expand Down
39 changes: 39 additions & 0 deletions Sources/NIOHTTP2/HTTP2FrameConvertible.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
//===----------------------------------------------------------------------===//

protocol HTTP2FrameConvertible {
/// Initialize `Self` from an `HTTP2Frame`.
init(http2Frame: HTTP2Frame)

/// Makes an `HTTPFrame` with the given `streamID`.
///
/// - Parameter streamID: The `streamID` to use when constructing the frame.
Expand All @@ -23,3 +26,39 @@ protocol HTTP2FramePayloadConvertible {
/// Makes a `HTTP2Frame.FramePayload`.
var payload: HTTP2Frame.FramePayload { get }
}

extension HTTP2FrameConvertible where Self: HTTP2FramePayloadConvertible {
Lukasa marked this conversation as resolved.
Show resolved Hide resolved
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
var estimatedFrameSize: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
82 changes: 34 additions & 48 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ private enum StreamChannelState {
}
}

/// An `HTTP2StreamChannel` which deals in `HTTPFrame`s.
typealias HTTP2FrameBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame>

final class HTTP2StreamChannel: Channel, ChannelCore {
/// An `HTTP2StreamChannel` which reads and writes `HTTPFrame.FramePayload`s.
typealias HTTP2PayloadBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame.FramePayload>

final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2FrameConvertible>: Channel, ChannelCore {
internal init(allocator: ByteBufferAllocator,
parent: Channel,
multiplexer: HTTP2StreamMultiplexer,
Expand Down Expand Up @@ -350,7 +355,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
/// In the future this buffer will be used to manage interactions with read() and even, one day,
/// with flow control. For now, though, all this does is hold frames until we have set the
/// channel up.
private var pendingReads: CircularBuffer<HTTP2Frame> = CircularBuffer(initialCapacity: 8)
private var pendingReads: CircularBuffer<Message> = CircularBuffer(initialCapacity: 8)

/// Whether `autoRead` is enabled. By default, all `HTTP2StreamChannel` objects inherit their `autoRead`
/// state from their parent.
Expand All @@ -364,7 +369,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
///
/// To correctly respect flushes, we deliberately withold frames from the parent channel until this
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
private var pendingWrites: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)
private var pendingWrites: MarkedCircularBuffer<(Message, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

/// A list node used to hold stream channels.
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()
Expand All @@ -390,13 +395,13 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

let frame = self.unwrapData(data, as: HTTP2Frame.self)
let outbound = self.unwrapData(data, as: Message.self)

// We need a promise to attach our flow control callback to.
// Regardless of whether the write succeeded or failed, we don't count
// the bytes any longer.
let promise = promise ?? self.eventLoop.makePromise()
let writeSize = frame.bufferBytes
let writeSize = outbound.estimatedFrameSize

// Right now we deal with this math by just attaching a callback to all promises. This is going
// to be annoyingly expensive, but for now it's the most straightforward approach.
Expand All @@ -405,7 +410,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.changeWritability(to: value)
}
}
self.pendingWrites.append((frame, promise))
self.pendingWrites.append((outbound, promise))

// Ok, we can make an outcall now, which means we can safely deal with the flow control.
if case .changed(newValue: let value) = self.writabilityManager.bufferedBytes(writeSize) {
Expand Down Expand Up @@ -511,7 +516,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.succeed(())
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
if let streamID = self.streamID {
self.multiplexer.childChannelClosed(streamID: streamID)
} else {
self.multiplexer.childChannelClosed(channelID: ObjectIdentifier(self))
}
}
}

Expand All @@ -532,7 +541,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.eventLoop.execute {
self.removeHandlers(channel: self)
self.closePromise.fail(error)
self.multiplexer.childChannelClosed(MultiplexerAbstractChannel(self))
if let streamID = self.streamID {
self.multiplexer.childChannelClosed(streamID: streamID)
} else {
self.multiplexer.childChannelClosed(channelID: ObjectIdentifier(self))
}
}
}

Expand Down Expand Up @@ -606,9 +619,16 @@ private extension HTTP2StreamChannel {
return
}

// Get a streamID from the multiplexer if we haven't got one already.
if self.streamID == nil {
self.streamID = self.multiplexer.requestStreamID()
}

while self.pendingWrites.hasMark {
let write = self.pendingWrites.removeFirst()
self.receiveOutboundFrame(write.0, promise: write.1)
let (outbound, promise) = self.pendingWrites.removeFirst()
// This unwrap is okay: we just ensured that `self.streamID` was set above.
let frame = outbound.makeHTTP2Frame(streamID: self.streamID!)
self.receiveOutboundFrame(frame, promise: promise)
}
self.multiplexer.childChannelFlush()
}
Expand All @@ -634,18 +654,20 @@ internal extension HTTP2StreamChannel {
return
}

let message = Message(http2Frame: frame)

if self.unsatisfiedRead {
// We don't need to account for this frame in the window manager: it's being delivered
// straight into the pipeline.
self.pipeline.fireChannelRead(NIOAny(frame))
self.pipeline.fireChannelRead(NIOAny(message))
} else {
// Record the size of the frame so that when we receive a window update event our
// calculation on whether we emit a WINDOW_UPDATE frame is based on the bytes we have
// actually delivered into the pipeline.
if case .data(let dataPayload) = frame.payload {
self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes)
}
self.pendingReads.append(frame)
self.pendingReads.append(message)
}
}

Expand Down Expand Up @@ -744,39 +766,3 @@ extension HTTP2StreamChannel {
return "HTTP2StreamChannel(streamID: \(String(describing: self.streamID)), isActive: \(self.isActive), isWritable: \(self.isWritable))"
}
}

extension HTTP2Frame {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
fileprivate var bufferBytes: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
12 changes: 6 additions & 6 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,12 @@ extension HTTP2StreamMultiplexer {

// MARK:- Child to parent calls
extension HTTP2StreamMultiplexer {
internal func childChannelClosed(_ channel: MultiplexerAbstractChannel) {
if let streamID = channel.streamID {
self.streams.removeValue(forKey: streamID)
} else {
preconditionFailure("Child channels always have stream IDs right now.")
}
internal func childChannelClosed(streamID: HTTP2StreamID) {
self.streams.removeValue(forKey: streamID)
}

internal func childChannelClosed(channelID: ObjectIdentifier) {
preconditionFailure("We don't currently support closing channels by 'channelID'")
}

internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
Expand Down
Loading