Skip to content

Commit

Permalink
Avoid emitting WINDOW_UPDATE frames when receiving stream closed
Browse files Browse the repository at this point in the history
Motivation:

In some cases it's possible for a stream to emit a WINDOW_UPDATE frame
after receiving close from the network. This is unfortunate because in
the eyes of the `NIOHTTP2Handler` the stream no longer exists which
results in a connection-level error.

Modifications:

- Add a flag to `deliverPendingReads` telling it whether it is allowed
  to emit a WINDOW_UPDATE frame
- Use the above functionality when the stream is closing

Result:

WINDOW_UPDATE frames are no longer emitted from the stream as a result
of delivering pending reads while closing.
  • Loading branch information
glbrntt committed Aug 21, 2020
1 parent 9f4eff0 commit 007171c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram
}
}

private func tryToRead() {
private func tryToRead(willCloseImminently: Bool = false) {
// If there's no read to satisfy, no worries about it.
guard self.unsatisfiedRead else {
return
Expand All @@ -597,7 +597,7 @@ final class HTTP2StreamChannel<Message: HTTP2FramePayloadConvertible & HTTP2Fram

// Ok, we're satisfying a read here.
self.unsatisfiedRead = false
self.deliverPendingReads()
self.deliverPendingReads(mayEmitWindowUpdates: !willCloseImminently)
self.tryToAutoRead()
}

Expand Down Expand Up @@ -625,7 +625,7 @@ private extension HTTP2StreamChannel {
}

/// Deliver all pending reads to the channel.
private func deliverPendingReads() {
private func deliverPendingReads(mayEmitWindowUpdates: Bool) {
assert(self._isActive)
while self.pendingReads.count > 0 {
let frame = self.pendingReads.removeFirst()
Expand All @@ -639,7 +639,7 @@ private extension HTTP2StreamChannel {

self.pipeline.fireChannelRead(NIOAny(frame))

if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size) {
if let size = dataLength, let increment = self.windowManager.bufferedFrameEmitted(size: size), mayEmitWindowUpdates {
// To have a pending read, we must have a stream ID.
let frame = HTTP2Frame(streamID: self.streamID!, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
Expand Down Expand Up @@ -725,7 +725,7 @@ internal extension HTTP2StreamChannel {
/// - reason: The reason received from the network, if any.
func receiveStreamClosed(_ reason: HTTP2ErrorCode?) {
// The stream is closed, we should aim to deliver any read frames we have for it.
self.tryToRead()
self.tryToRead(willCloseImminently: true)

if let reason = reason {
// To receive from the network, it must be safe to force-unwrap here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ extension HTTP2FramePayloadStreamMultiplexerTests {
("testInboundChannelWindowSizeIsCustomisable", testInboundChannelWindowSizeIsCustomisable),
("testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer", testWeCanCreateFrameAndPayloadBasedStreamsOnAMultiplexer),
("testReadWhenUsingAutoreadOnChildChannel", testReadWhenUsingAutoreadOnChildChannel),
("testWindowUpdateIsNotEmittedAfterStreamIsClosed", testWindowUpdateIsNotEmittedAfterStreamIsClosed),
]
}
}
Expand Down
43 changes: 43 additions & 0 deletions Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1805,4 +1805,47 @@ final class HTTP2FramePayloadStreamMultiplexerTests: XCTestCase {

XCTAssertNoThrow(try self.channel.finish())
}

func testWindowUpdateIsNotEmittedAfterStreamIsClosed() throws {
let targetWindowSize = 1024
let multiplexer = HTTP2StreamMultiplexer(mode: .client,
channel: self.channel,
targetWindowSize: targetWindowSize) { channel in
return channel.eventLoop.makeSucceededFuture(())
}
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())

// We need to activate the underlying channel here.
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())

// Write a headers frame.
let headers = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
let headersFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: headers)))
self.channel.pipeline.fireChannelRead(NIOAny(headersFrame))

// Activate the stream.
self.activateStream(1)

// Send a window updated event.
let windowUpdated = NIOHTTP2WindowUpdatedEvent(streamID: 1, inboundWindowSize: 128, outboundWindowSize: nil)
self.channel.pipeline.fireUserInboundEventTriggered(windowUpdated)
self.channel.pipeline.fireChannelReadComplete()

// We expect the a WINDOW_UPDATE frame: our inbound window size is 128 but has a target of 1024.
let frame = try assertNoThrowWithValue(try self.channel.readOutbound(as: HTTP2Frame.self))!
frame.assertWindowUpdateFrame(streamID: 1, windowIncrement: 896)

// The inbound window size should now be our target: 1024. Write enough bytes to consume the
// inbound window.
let data = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(.init(repeating: 0, count: targetWindowSize + 1))))
let dataFrame = HTTP2Frame(streamID: 1, payload: data)
self.channel.pipeline.fireChannelRead(NIOAny(dataFrame))

self.channel.pipeline.fireUserInboundEventTriggered(StreamClosedEvent(streamID: 1, reason: nil))
self.channel.pipeline.fireChannelReadComplete()

// We've consumed the inbound window: normally we'd expect a WINDOW_UPDATE frame but since
// the stream has closed we don't expect to read anything out.
XCTAssertNil(try self.channel.readOutbound(as: HTTP2Frame.self))
}
}

0 comments on commit 007171c

Please sign in to comment.