Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw CancellationError instead of returning nil during early cancellation. #2401

Merged
merged 14 commits into from Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -147,9 +147,8 @@ public struct NIOAsyncSequenceProducer<
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence {
let newSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
let newSequence = NIOThrowingAsyncSequenceProducer.makeNonThrowingSequence(
elementType: Element.self,
failureType: Never.self,
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
Expand Down
Expand Up @@ -98,6 +98,7 @@ public struct NIOThrowingAsyncSequenceProducer<
/// - backPressureStrategy: The back-pressure strategy of the sequence.
/// - delegate: The delegate of the sequence
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``.
@available(*, deprecated, message: "Support for a generic Failure type is deprecated. Failure type must be `any Swift.Error`.")
@inlinable
public static func makeSequence(
elementType: Element.Type = Element.self,
Expand All @@ -113,6 +114,51 @@ public struct NIOThrowingAsyncSequenceProducer<

return .init(source: source, sequence: sequence)
}

/// Initializes a new ``NIOThrowingAsyncSequenceProducer`` and a ``NIOThrowingAsyncSequenceProducer/Source``.
///
/// - Important: This method returns a struct containing a ``NIOThrowingAsyncSequenceProducer/Source`` and
/// a ``NIOThrowingAsyncSequenceProducer``. The source MUST be held by the caller and
/// used to signal new elements or finish. The sequence MUST be passed to the actual consumer and MUST NOT be held by the
/// caller. This is due to the fact that deiniting the sequence is used as part of a trigger to terminate the underlying source.
///
/// - Parameters:
/// - elementType: The element type of the sequence.
/// - failureType: The failure type of the sequence. Must be `Swift.Error`
/// - backPressureStrategy: The back-pressure strategy of the sequence.
/// - delegate: The delegate of the sequence
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``.
@inlinable
public static func makeSequence(
elementType: Element.Type = Element.self,
failureType: Failure.Type = Error.self,
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence where Failure == Error {
let sequence = Self(
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let source = Source(storage: sequence._storage)

return .init(source: source, sequence: sequence)
}

/// only used internally by``NIOAsyncSequenceProducer`` to reused most of the code
dnadoba marked this conversation as resolved.
Show resolved Hide resolved
@inlinable
internal static func makeNonThrowingSequence(
elementType: Element.Type = Element.self,
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence where Failure == Never {
let sequence = Self(
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let source = Source(storage: sequence._storage)

return .init(source: source, sequence: sequence)
}

@inlinable
/* private */ internal init(
Expand Down Expand Up @@ -499,7 +545,20 @@ extension NIOThrowingAsyncSequenceProducer {
return delegate

case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation):
continuation.resume(throwing: CancellationError())
// We have deprecated the generic Failure type in the public API and Failure should
// now be `Swift.Error`. However, if users have not migrated to the new API the could
dnadoba marked this conversation as resolved.
Show resolved Hide resolved
// still use a custom generic Error type and this cast might fail.
// In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the
// non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and
// this cast will fail as well.
// Everything is marked @inlinable and the Failure type is know at compile time,
dnadoba marked this conversation as resolved.
Show resolved Hide resolved
// therefore this cast should be optimised away in release build.
if let failure = CancellationError() as? Failure {
continuation.resume(throwing: failure)
} else {
continuation.resume(returning: nil)
}

let delegate = self._delegate
self._delegate = nil

Expand Down Expand Up @@ -880,9 +939,26 @@ extension NIOThrowingAsyncSequenceProducer {
switch self._state {
case .initial(_, let iteratorInitialized):
// This can happen if the `Task` that calls `next()` is already cancelled.
self._state = .finished(iteratorInitialized: iteratorInitialized)

return .callDidTerminate

// We have deprecated the generic Failure type in the public API and Failure should
// now be `Swift.Error`. However, if users have not migrated to the new API the could
dnadoba marked this conversation as resolved.
Show resolved Hide resolved
// still use a custom generic Error type and this cast might fail.
// In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the
// non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and
// this cast will fail as well.
// Everything is marked @inlinable and the Failure type is know at compile time,
dnadoba marked this conversation as resolved.
Show resolved Hide resolved
// therefore this cast should be optimised away in release build.
if let failure = CancellationError() as? Failure {
self._state = .sourceFinished(
buffer: .init(),
iteratorInitialized: iteratorInitialized,
failure: failure
)
} else {
self._state = .finished(iteratorInitialized: iteratorInitialized)
}

return .none

case .streaming(_, _, .some(let continuation), _, let iteratorInitialized):
// We have an outstanding continuation that needs to resumed
Expand Down
Expand Up @@ -19,7 +19,7 @@ import Atomics

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
final class NIOAsyncSequenceProducerBenchmark: AsyncBenchmark, NIOAsyncSequenceProducerDelegate, @unchecked Sendable {
fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer<Int, Never, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncSequenceProducerBenchmark>
fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer<Int, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncSequenceProducerBenchmark>

private let iterations: Int
private var iterator: SequenceProducer.AsyncIterator!
Expand Down
Expand Up @@ -515,9 +515,11 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {

task.cancel()

let value = try await task.value
let result = await task.result

XCTAssertNil(value)
await XCTAssertThrowsError(try result.get()) { error in
XCTAssertTrue(error is CancellationError, "unexpected error \(error)")
}
}

// MARK: - Next
Expand Down