Skip to content

Commit

Permalink
Make HTTP2StreamChannel generic over a message type
Browse files Browse the repository at this point in the history
Motivation:

As part of #214 we need to make `HTTP2StreamChannel` generic over the type
of message it expects to read and write. This allows us to define an
`HTTP2StreamChannel` which uses `HTTP2Frame.FramePayload` as its
currency type, rather than `HTTP2Frame`.

Modifications:

- Make `HTTP2StreamChannel` generic over `Message` which is constrained
  by conformance to `HTTP2FrameConvertible` and
  `HTTP2FramePayloadConvertible`
- `HTTP2StreamChannel` now expects to read in `Message`s (as opposed to
  `HTTP2Frame`s) and have `Message`s written in to it.
- Added typealiases for frame and payload based stream channels.
- Added support for the payload based channel in
  `MultiplexerAbstractChannel` (although one can't be created yet).

Result:

We can create payload based stream channels.
  • Loading branch information
glbrntt committed Jul 28, 2020
1 parent 125cab7 commit bf8910a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 50 deletions.
36 changes: 36 additions & 0 deletions Sources/NIOHTTP2/HTTP2FrameConvertible.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,39 @@ protocol HTTP2FramePayloadConvertible {
/// Makes a `HTTP2Frame.FramePayload`.
var payload: HTTP2Frame.FramePayload { get }
}

extension HTTP2FrameConvertible where Self: HTTP2FramePayloadConvertible {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
var estimatedFrameSize: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
66 changes: 18 additions & 48 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ private enum StreamChannelState {
}
}

/// An `HTTP2StreamChannel` which deals in `HTTPFrame`s.
typealias HTTP2FrameBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame>

final class HTTP2StreamChannel: Channel, ChannelCore {
/// An `HTTP2StreamChannel` which reads and writes `HTTPFrame.FramePayload`s.
typealias HTTP2PayloadBasedStreamChannel = HTTP2StreamChannel<HTTP2Frame.FramePayload>

final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2FrameConvertible>: Channel, ChannelCore {
internal init(allocator: ByteBufferAllocator,
parent: Channel,
multiplexer: HTTP2StreamMultiplexer,
Expand Down Expand Up @@ -339,7 +344,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
/// In the future this buffer will be used to manage interactions with read() and even, one day,
/// with flow control. For now, though, all this does is hold frames until we have set the
/// channel up.
private var pendingReads: CircularBuffer<HTTP2Frame> = CircularBuffer(initialCapacity: 8)
private var pendingReads: CircularBuffer<Message> = CircularBuffer(initialCapacity: 8)

/// Whether `autoRead` is enabled. By default, all `HTTP2StreamChannel` objects inherit their `autoRead`
/// state from their parent.
Expand All @@ -353,7 +358,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
///
/// To correctly respect flushes, we deliberately withold frames from the parent channel until this
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
private var pendingWrites: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)
private var pendingWrites: MarkedCircularBuffer<(Message, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

/// A list node used to hold stream channels.
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()
Expand All @@ -379,13 +384,13 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
return
}

let frame = self.unwrapData(data, as: HTTP2Frame.self)
let outbound = self.unwrapData(data, as: Message.self)

// We need a promise to attach our flow control callback to.
// Regardless of whether the write succeeded or failed, we don't count
// the bytes any longer.
let promise = promise ?? self.eventLoop.makePromise()
let writeSize = frame.bufferBytes
let writeSize = outbound.estimatedFrameSize

// Right now we deal with this math by just attaching a callback to all promises. This is going
// to be annoyingly expensive, but for now it's the most straightforward approach.
Expand All @@ -394,7 +399,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
self.changeWritability(to: value)
}
}
self.pendingWrites.append((frame, promise))
self.pendingWrites.append((outbound, promise))

// Ok, we can make an outcall now, which means we can safely deal with the flow control.
if case .changed(newValue: let value) = self.writabilityManager.bufferedBytes(writeSize) {
Expand Down Expand Up @@ -594,8 +599,9 @@ private extension HTTP2StreamChannel {
}

while self.pendingWrites.hasMark {
let write = self.pendingWrites.removeFirst()
self.receiveOutboundFrame(write.0, promise: write.1)
let (outbound, promise) = self.pendingWrites.removeFirst()
let frame = outbound.makeHTTP2Frame(streamID: self.streamID)
self.receiveOutboundFrame(frame, promise: promise)
}
self.multiplexer.childChannelFlush()
}
Expand All @@ -615,7 +621,7 @@ internal extension HTTP2StreamChannel {
///
/// - parameters:
/// - frame: The `HTTP2Frame` received from the network.
func receiveInboundFrame(_ frame: HTTP2Frame) {
func receiveInbound(_ message: Message) {
guard self.state != .closed else {
// Do nothing
return
Expand All @@ -624,15 +630,15 @@ internal extension HTTP2StreamChannel {
if self.unsatisfiedRead {
// We don't need to account for this frame in the window manager: it's being delivered
// straight into the pipeline.
self.pipeline.fireChannelRead(NIOAny(frame))
self.pipeline.fireChannelRead(NIOAny(message))
} else {
// Record the size of the frame so that when we receive a window update event our
// calculation on whether we emit a WINDOW_UPDATE frame is based on the bytes we have
// actually delivered into the pipeline.
if case .data(let dataPayload) = frame.payload {
if case .data(let dataPayload) = message.payload {
self.windowManager.bufferedFrameReceived(size: dataPayload.data.readableBytes)
}
self.pendingReads.append(frame)
self.pendingReads.append(message)
}
}

Expand Down Expand Up @@ -728,39 +734,3 @@ extension HTTP2StreamChannel {
return "HTTP2StreamChannel(streamID: \(self.streamID), isActive: \(self.isActive), isWritable: \(self.isWritable))"
}
}

extension HTTP2Frame {
/// A shorthand heuristic for how many bytes we assume a frame consumes on the wire.
///
/// Here we concern ourselves only with per-stream frames: that is, `HEADERS`, `DATA`,
/// `WINDOW_UDPATE`, `RST_STREAM`, and I guess `PRIORITY`. As a simple heuristic we
/// hard code fixed lengths for fixed length frames, use a calculated length for
/// variable length frames, and just ignore encoded headers because it's not worth doing a better
/// job.
fileprivate var bufferBytes: Int {
let frameHeaderSize = 9

switch self.payload {
case .data(let d):
let paddingBytes = d.paddingBytes.map { $0 + 1 } ?? 0
return d.data.readableBytes + paddingBytes + frameHeaderSize
case .headers(let h):
let paddingBytes = h.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .priority:
return frameHeaderSize + 5
case .pushPromise(let p):
// Like headers, this is variably size, and we just ignore the encoded headers because
// it's not worth having a heuristic.
let paddingBytes = p.paddingBytes.map { $0 + 1 } ?? 0
return paddingBytes + frameHeaderSize
case .rstStream:
return frameHeaderSize + 4
case .windowUpdate:
return frameHeaderSize + 4
default:
// Unknown or unexpected control frame: say 9 bytes.
return frameHeaderSize
}
}
}
35 changes: 33 additions & 2 deletions Sources/NIOHTTP2/MultiplexerAbstractChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ struct MultiplexerAbstractChannel {

extension MultiplexerAbstractChannel {
enum BaseChannel {
case frameBased(HTTP2StreamChannel)
case frameBased(HTTP2FrameBasedStreamChannel)
case payloadBased(HTTP2PayloadBasedStreamChannel)
}
}

Expand All @@ -55,6 +56,8 @@ extension MultiplexerAbstractChannel {
switch self.baseChannel {
case .frameBased(let base):
return base.inList
case .payloadBased(let base):
return base.inList
}
}

Expand All @@ -63,12 +66,16 @@ extension MultiplexerAbstractChannel {
switch self.baseChannel {
case .frameBased(let base):
return base.streamChannelListNode
case .payloadBased(let base):
return base.streamChannelListNode
}
}
nonmutating set {
switch self.baseChannel {
case .frameBased(let base):
base.streamChannelListNode = newValue
case .payloadBased(let base):
base.streamChannelListNode = newValue
}
}
}
Expand All @@ -77,62 +84,80 @@ extension MultiplexerAbstractChannel {
switch self.baseChannel {
case .frameBased(let base):
base.configure(initializer: initializer, userPromise: promise)
case .payloadBased(let base):
base.configure(initializer: initializer, userPromise: promise)
}
}

func performActivation() {
switch self.baseChannel {
case .frameBased(let base):
base.performActivation()
case .payloadBased(let base):
base.performActivation()
}
}

func networkActivationReceived() {
switch self.baseChannel {
case .frameBased(let base):
base.networkActivationReceived()
case .payloadBased(let base):
base.networkActivationReceived()
}
}

func receiveInboundFrame(_ frame: HTTP2Frame) {
switch self.baseChannel {
case .frameBased(let base):
base.receiveInboundFrame(frame)
base.receiveInbound(frame)
case .payloadBased(let base):
base.receiveInbound(frame.payload)
}
}

func receiveParentChannelReadComplete() {
switch self.baseChannel {
case .frameBased(let base):
base.receiveParentChannelReadComplete()
case .payloadBased(let base):
base.receiveParentChannelReadComplete()
}
}

func initialWindowSizeChanged(delta: Int) {
switch self.baseChannel {
case .frameBased(let base):
base.initialWindowSizeChanged(delta: delta)
case .payloadBased(let base):
base.initialWindowSizeChanged(delta: delta)
}
}

func receiveWindowUpdatedEvent(_ windowSize: Int) {
switch self.baseChannel {
case .frameBased(let base):
base.receiveWindowUpdatedEvent(windowSize)
case .payloadBased(let base):
base.receiveWindowUpdatedEvent(windowSize)
}
}

func parentChannelWritabilityChanged(newValue: Bool) {
switch self.baseChannel {
case .frameBased(let base):
base.parentChannelWritabilityChanged(newValue: newValue)
case .payloadBased(let base):
base.parentChannelWritabilityChanged(newValue: newValue)
}
}

func receiveStreamClosed(_ reason: HTTP2ErrorCode?) {
switch self.baseChannel {
case .frameBased(let base):
base.receiveStreamClosed(reason)
case .payloadBased(let base):
base.receiveStreamClosed(reason)
}
}
}
Expand All @@ -142,6 +167,10 @@ extension MultiplexerAbstractChannel: Equatable {
switch (lhs.baseChannel, rhs.baseChannel) {
case (.frameBased(let lhs), .frameBased(let rhs)):
return lhs === rhs
case (.payloadBased(let lhs), .payloadBased(let rhs)):
return lhs === rhs
case (.frameBased, .payloadBased), (.payloadBased, .frameBased):
return false
}
}
}
Expand All @@ -151,6 +180,8 @@ extension MultiplexerAbstractChannel: Hashable {
switch self.baseChannel {
case .frameBased(let base):
hasher.combine(ObjectIdentifier(base))
case .payloadBased(let base):
hasher.combine(ObjectIdentifier(base))
}
}
}

0 comments on commit bf8910a

Please sign in to comment.