Skip to content

Commit

Permalink
Add back pressure support to HBByteBufferStreamer (#33)
Browse files Browse the repository at this point in the history
* Add back pressure to HBByteBufferStreamer

* Add tests for HBByteBufferStreamer
  • Loading branch information
adam-fowler committed Nov 30, 2021
1 parent 1b3f345 commit a3f48ed
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 8 deletions.
44 changes: 41 additions & 3 deletions Sources/HummingbirdCore/Request/ByteBufferStreamer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Foundation
import NIOCore

/// Values returned when we consume the contents of the streamer
Expand Down Expand Up @@ -54,30 +55,62 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {

/// Queue of promises for each ByteBuffer fed to the streamer. Last entry is always waiting for the next buffer or end tag
var queue: CircularBuffer<EventLoopPromise<HBStreamerOutput>>
/// back pressure promise
var backPressurePromise: EventLoopPromise<Void>?
/// EventLoop everything is running on
let eventLoop: EventLoop
/// called every time a ByteBuffer is consumed
var onConsume: ((HBByteBufferStreamer) -> Void)?
/// maximum allowed size to upload
let maxSize: Int
/// maximum size currently being streamed before back pressure is applied
let maxStreamingBufferSize: Int
/// current size in memory
var currentSize: Int
/// bytes fed to streamer so far
var sizeFed: Int
/// has request streamer data been dropped
var dropped: Bool

public init(eventLoop: EventLoop, maxSize: Int) {
self.queue = .init(initialCapacity: 8)
public init(eventLoop: EventLoop, maxSize: Int, maxStreamingBufferSize: Int? = nil) {
self.queue = .init()
self.backPressurePromise = nil
self.queue.append(eventLoop.makePromise())
self.eventLoop = eventLoop
self.sizeFed = 0
self.currentSize = 0
self.maxSize = maxSize
self.maxStreamingBufferSize = maxStreamingBufferSize ?? maxSize
self.onConsume = nil
self.dropped = false
}

/// Feed a ByteBuffer to the request, while applying back pressure
/// - Parameter result: Bytebuffer or end tag
public func feed(buffer: ByteBuffer) -> EventLoopFuture<Void> {
if self.eventLoop.inEventLoop {
return self._feed(buffer: buffer)
} else {
return self.eventLoop.flatSubmit {
self._feed(buffer: buffer)
}
}
}

/// Feed a ByteBuffer to the request
/// - Parameter result: Bytebuffer or end tag
private func _feed(buffer: ByteBuffer) -> EventLoopFuture<Void> {
self.eventLoop.assertInEventLoop()
if let backPressurePromise = backPressurePromise {
return backPressurePromise.futureResult.always { _ in
self._feed(.byteBuffer(buffer))
}
} else {
self._feed(.byteBuffer(buffer))
return self.eventLoop.makeSucceededVoidFuture()
}
}

/// Feed a ByteBuffer to the request
/// - Parameter result: Bytebuffer or end tag
public func feed(_ result: FeedInput) {
Expand Down Expand Up @@ -108,7 +141,9 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {

self.sizeFed += byteBuffer.readableBytes
self.currentSize += byteBuffer.readableBytes

if self.currentSize > self.maxStreamingBufferSize {
self.backPressurePromise = self.eventLoop.makePromise()
}
if self.sizeFed > self.maxSize {
promise.fail(HBHTTPError(.payloadTooLarge))
} else {
Expand Down Expand Up @@ -205,6 +240,9 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
switch result {
case .byteBuffer(let buffer):
self.currentSize -= buffer.readableBytes
if self.currentSize < self.maxStreamingBufferSize {
self.backPressurePromise?.succeed(())
}
case .end:
assert(self.currentSize == 0)
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/HummingbirdCoreTests/CoreTests+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ final class HummingBirdCoreAsyncTests: XCTestCase {
#endif
struct Responder: HBHTTPResponder {
func respond(to request: HBHTTPRequest, context: ChannelHandlerContext, onComplete: @escaping (Result<HBHTTPResponse, Error>) -> Void) {
let streamer = HBByteBufferStreamer(eventLoop: context.eventLoop, maxSize: 1024 * 2048)
let streamer = HBByteBufferStreamer(eventLoop: context.eventLoop, maxSize: 1024 * 2048, maxStreamingBufferSize: 32 * 1024)
Task {
do {
for try await buffer in request.body.stream!.sequence {
streamer.feed(.byteBuffer(buffer))
try await streamer.feed(buffer: buffer).get()
}
streamer.feed(.end)
} catch {
Expand Down
5 changes: 2 additions & 3 deletions Tests/HummingbirdCoreTests/CoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,9 @@ class HummingBirdCoreTests: XCTestCase {
func testStreamBody2() {
struct Responder: HBHTTPResponder {
func respond(to request: HBHTTPRequest, context: ChannelHandlerContext, onComplete: @escaping (Result<HBHTTPResponse, Error>) -> Void) {
let streamer = HBByteBufferStreamer(eventLoop: context.eventLoop, maxSize: 2048 * 1024)
let streamer = HBByteBufferStreamer(eventLoop: context.eventLoop, maxSize: 2048 * 1024, maxStreamingBufferSize: 32 * 1024)
request.body.stream?.consumeAll(on: context.eventLoop) { buffer in
streamer.feed(.byteBuffer(buffer))
return context.eventLoop.makeSucceededVoidFuture()
return streamer.feed(buffer: buffer)
}
.flatMapErrorThrowing { error in
streamer.feed(.error(error))
Expand Down
219 changes: 219 additions & 0 deletions Tests/HummingbirdCoreTests/StreamerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2021-2021 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import HummingbirdCore
import NIOCore
import NIOPosix
import XCTest

class ByteBufferStreamerTests: XCTestCase {
var elg: EventLoopGroup!

override func setUp() {
self.elg = MultiThreadedEventLoopGroup(numberOfThreads: 2)
}

override func tearDown() {
XCTAssertNoThrow(try self.elg.syncShutdownGracefully())
}

func randomBuffer(size: Int) -> ByteBuffer {
var data = [UInt8](repeating: 0, count: size)
data = data.map { _ in UInt8.random(in: 0...255) }
return ByteBufferAllocator().buffer(bytes: data)
}

func feedStreamer(_ streamer: HBByteBufferStreamer, buffer: ByteBuffer) {
var buffer = buffer
while buffer.readableBytes > 0 {
let blockSize = min(buffer.readableBytes, 32 * 1024)
streamer.feed(.byteBuffer(buffer.readSlice(length: blockSize)!))
}
streamer.feed(.end)
}

func feedStreamer(_ streamer: HBByteBufferStreamer, buffer: ByteBuffer, eventLoop: EventLoop) {
var buffer = buffer
eventLoop.execute {
while buffer.readableBytes > 0 {
let blockSize = min(buffer.readableBytes, 32 * 1024)
streamer.feed(.byteBuffer(buffer.readSlice(length: blockSize)!))
}
streamer.feed(.end)
}
}

func feedStreamerWithBackPressure(_ streamer: HBByteBufferStreamer, buffer: ByteBuffer) {
var buffer = buffer
func _feed() {
let blockSize = min(buffer.readableBytes, 32 * 1024)
streamer.feed(buffer: buffer.readSlice(length: blockSize)!).whenComplete { _ in
XCTAssertLessThanOrEqual(streamer.currentSize, streamer.maxStreamingBufferSize + blockSize)
if buffer.readableBytes > 0 {
_feed()
} else {
streamer.feed(.end)
}
}
}
_feed()
}

func feedStreamerWithDelays(_ streamer: HBByteBufferStreamer, buffer: ByteBuffer, eventLoop: EventLoop) {
var buffer = buffer
func _feed() {
let blockSize = min(buffer.readableBytes, 32 * 1024)
streamer.feed(buffer: buffer.readSlice(length: blockSize)!).whenComplete { _ in
XCTAssertLessThanOrEqual(streamer.currentSize, streamer.maxStreamingBufferSize + blockSize)
if buffer.readableBytes > 0 {
eventLoop.scheduleTask(in: .microseconds(Int64.random(in: 0..<100_000))) {
_feed()
}
} else {
streamer.feed(.end)
}
}
}
_feed()
}

func consumeStreamer(_ streamer: HBByteBufferStreamer, eventLoop: EventLoop) -> EventLoopFuture<ByteBuffer> {
var consumeBuffer = ByteBuffer()
return streamer.consumeAll(on: eventLoop) { buffer in
var buffer = buffer
consumeBuffer.writeBuffer(&buffer)
return eventLoop.makeSucceededVoidFuture()
}.map { consumeBuffer }
}

func consumeStreamerWithDelays(_ streamer: HBByteBufferStreamer, eventLoop: EventLoop) -> EventLoopFuture<ByteBuffer> {
var consumeBuffer = ByteBuffer()
return streamer.consumeAll(on: eventLoop) { buffer in
var buffer = buffer
consumeBuffer.writeBuffer(&buffer)
return eventLoop.scheduleTask(in: .microseconds(Int64.random(in: 0..<100))) {}.futureResult
}.map { consumeBuffer }
}

/// Test can feed and then consume
func testFeedConsume() throws {
let buffer = self.randomBuffer(size: 128_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024)

self.feedStreamer(streamer, buffer: buffer, eventLoop: eventLoop)
let consumeBuffer = try consumeStreamer(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// Test can feed from not the EventLoop and then consume
func testFeedOffEventLoop() throws {
let buffer = self.randomBuffer(size: 128_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024)

self.feedStreamer(streamer, buffer: buffer)
let consumeBuffer = try consumeStreamer(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// Test can feed and then consume with back pressure applied
func testFeedWithBackPressure() throws {
let buffer = self.randomBuffer(size: 128_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024, maxStreamingBufferSize: 20 * 1024)

self.feedStreamerWithBackPressure(streamer, buffer: buffer)
let consumeBuffer = try consumeStreamer(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// Test can feed and then consume with delays and back pressure applied
func testFeedWithBackPressureConsumeDelays() throws {
let buffer = self.randomBuffer(size: 600_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024, maxStreamingBufferSize: 64 * 1024)

self.feedStreamerWithBackPressure(streamer, buffer: buffer)
let consumeBuffer = try consumeStreamerWithDelays(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// Test can feed and then consume
func testFeedWithBackPressureAndDelays() throws {
let buffer = self.randomBuffer(size: 400_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024, maxStreamingBufferSize: 64 * 1024)

self.feedStreamerWithDelays(streamer, buffer: buffer, eventLoop: eventLoop)
let consumeBuffer = try consumeStreamer(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// Test can feed and then consume
func testFeedAndConsumeWithDelays() throws {
let buffer = self.randomBuffer(size: 550_000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 1024 * 1024, maxStreamingBufferSize: 64 * 1024)

self.feedStreamerWithDelays(streamer, buffer: buffer, eventLoop: eventLoop)
let consumeBuffer = try consumeStreamerWithDelays(streamer, eventLoop: eventLoop).wait()

XCTAssertEqual(buffer, consumeBuffer)
}

/// test max size works
func testMaxSize() throws {
let buffer = self.randomBuffer(size: 60000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 32 * 1024)
self.feedStreamer(streamer, buffer: buffer)
XCTAssertThrowsError(try self.consumeStreamer(streamer, eventLoop: eventLoop).wait()) { error in
switch error {
case let error as HBHTTPError:
XCTAssertEqual(error.status, .payloadTooLarge)
default:
XCTFail("\(error)")
}
}
}

/// test error is propagated
func testError() throws {
struct MyError: Error {}
var buffer = self.randomBuffer(size: 10000)
let eventLoop = self.elg.next()
let streamer = HBByteBufferStreamer(eventLoop: eventLoop, maxSize: 32 * 1024)

while buffer.readableBytes > 0 {
let blockSize = min(buffer.readableBytes, 32 * 1024)
streamer.feed(.byteBuffer(buffer.readSlice(length: blockSize)!))
}
streamer.feed(.error(MyError()))

XCTAssertThrowsError(try self.consumeStreamer(streamer, eventLoop: eventLoop).wait()) { error in
switch error {
case is MyError:
break
default:
XCTFail("\(error)")
}
}
}
}

0 comments on commit a3f48ed

Please sign in to comment.