Skip to content

Commit

Permalink
Implement a NIOAsyncWriter (#2251)
Browse files Browse the repository at this point in the history
* Implement a `NIOAsyncWriter`

# Motivation
We previously added the `NIOAsyncProducer` to bridge between the NIO channel pipeline and the asynchronous world. However, we still need something to bridge writes from the asynchronous world back to the NIO channel pipeline.

# Modification
This PR adds a new `NIOAsyncWriter` type that allows us to asynchronously `yield` elements to it. On the other side, we can register a `NIOAsyncWriterDelegate` which will get informed about any written elements. Furthermore, the synchronous side can toggle the writability of the `AsyncWriter` which allows it to implement flow control.
A main goal of this type is to be as performant as possible. To achieve this I did the following things:
- Make everything generic and inlinable
- Use a class with a lock instead of an actor
- Provide methods to yield a sequence of things which allows users to reduce the amount of times the lock gets acquired.

# Result
We now have the means to bridge writes from the asynchronous world to the synchronous

* Remove the completion struct and incorporate code review comments

* Fixup some refactoring leftovers

* More code review comments

* Move to holding the lock around the delegate and moved the delegate into the state machine

* Comment fixups

* More doc fixes

* Call finish when the sink deinits

* Refactor the writer to only yield Deques and rename the delegate to NIOAsyncWriterSinkDelegate

* Review

* Fix some warnings

* Fix benchmark sendability

* Remove Failure generic parameter and allow sending of an error through the Sink
  • Loading branch information
FranzBusch committed Sep 21, 2022
1 parent 26afcec commit f144292
Show file tree
Hide file tree
Showing 9 changed files with 1,738 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let swiftCollections: PackageDescription.Target.Dependency = .product(name: "Deq

var targets: [PackageDescription.Target] = [
.target(name: "NIOCore",
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections]),
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections, swiftAtomics]),
.target(name: "_NIODataStructures"),
.target(name: "NIOEmbedded",
dependencies: ["NIOCore",
Expand Down
2 changes: 1 addition & 1 deletion Package@swift-5.4.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let swiftCollections: PackageDescription.Target.Dependency = .product(name: "Deq

var targets: [PackageDescription.Target] = [
.target(name: "NIOCore",
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections]),
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections, swiftAtomics]),
.target(name: "_NIODataStructures"),
.target(name: "NIOEmbedded",
dependencies: ["NIOCore",
Expand Down
2 changes: 1 addition & 1 deletion Package@swift-5.5.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ let swiftCollections: PackageDescription.Target.Dependency = .product(name: "Deq

var targets: [PackageDescription.Target] = [
.target(name: "NIOCore",
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections]),
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows", swiftCollections, swiftAtomics]),
.target(name: "_NIODataStructures"),
.target(name: "NIOEmbedded",
dependencies: ["NIOCore",
Expand Down
1,060 changes: 1,060 additions & 0 deletions Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions Sources/NIOPerformanceTester/Benchmark.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//
//===----------------------------------------------------------------------===//

import Dispatch

protocol Benchmark: AnyObject {
func setUp() throws
func tearDown()
Expand All @@ -27,3 +29,32 @@ func measureAndPrint<B: Benchmark>(desc: String, benchmark bench: B) throws {
return try bench.run()
}
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
protocol AsyncBenchmark: AnyObject, Sendable {
func setUp() async throws
func tearDown()
func run() async throws -> Int
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
func measureAndPrint<B: AsyncBenchmark>(desc: String, benchmark bench: B) throws {
let group = DispatchGroup()
group.enter()
Task {
do {
try await bench.setUp()
defer {
bench.tearDown()
}
try await measureAndPrint(desc: desc) {
return try await bench.run()
}
}
group.leave()
}

group.wait()
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=5.5.2) && canImport(_Concurrency)
import NIOCore
import DequeModule
import Atomics

struct NoOpDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable {
typealias Element = Int
let counter = ManagedAtomic(0)

func didYield(contentsOf sequence: Deque<Int>) {
counter.wrappingIncrement(by: sequence.count, ordering: .relaxed)
}

func didTerminate(error: Error?) {}
}

// This is unchecked Sendable because the Sink is not Sendable but the Sink is thread safe
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
final class NIOAsyncWriterSingleWritesBenchmark: AsyncBenchmark, @unchecked Sendable {
private let iterations: Int
private let delegate: NoOpDelegate
private let writer: NIOAsyncWriter<Int, NoOpDelegate>
private let sink: NIOAsyncWriter<Int, NoOpDelegate>.Sink

init(iterations: Int) {
self.iterations = iterations
self.delegate = .init()
let newWriter = NIOAsyncWriter<Int, NoOpDelegate>.makeWriter(isWritable: true, delegate: self.delegate)
self.writer = newWriter.writer
self.sink = newWriter.sink
}

func setUp() async throws {}
func tearDown() {}

func run() async throws -> Int {
for i in 0..<self.iterations {
try await self.writer.yield(i)
}
return self.delegate.counter.load(ordering: .sequentiallyConsistent)
}
}
#endif
42 changes: 42 additions & 0 deletions Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,37 @@ public func measureAndPrint(desc: String, fn: () throws -> Int) rethrows -> Void
}
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func measure(_ fn: () async throws -> Int) async rethrows -> [Double] {
func measureOne(_ fn: () async throws -> Int) async rethrows -> Double {
let start = DispatchTime.now().uptimeNanoseconds
_ = try await fn()
let end = DispatchTime.now().uptimeNanoseconds
return Double(end - start) / Double(TimeAmount.seconds(1).nanoseconds)
}

_ = try await measureOne(fn) /* pre-heat and throw away */
var measurements = Array(repeating: 0.0, count: 10)
for i in 0..<10 {
measurements[i] = try await measureOne(fn)
}

return measurements
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func measureAndPrint(desc: String, fn: () async throws -> Int) async rethrows -> Void {
if limitSet.isEmpty || limitSet.contains(desc) {
print("measuring\(warning): \(desc): ", terminator: "")
let measurements = try await measure(fn)
print(measurements.reduce(into: "") { $0.append("\($1), ") })
} else {
print("skipping '\(desc)', limit set = \(limitSet)")
}
}
#endif

// MARK: Utilities

private final class SimpleHTTPServer: ChannelInboundHandler {
Expand Down Expand Up @@ -1053,3 +1084,14 @@ try measureAndPrint(
iterations: 1_000_000
)
)

#if compiler(>=5.5.2) && canImport(_Concurrency)
if #available(macOS 10.15, *) {
try measureAndPrint(
desc: "asyncwriter_single_writes_1M_times",
benchmark: NIOAsyncWriterSingleWritesBenchmark(
iterations: 1_000_000
)
)
}
#endif
Loading

0 comments on commit f144292

Please sign in to comment.