Permalink
Browse files

standardize `ELF.cascade*` collection of methods (#802)

Motivation:

The `ELF.cascade` methods have a parameter label `promise` that does not match Swift API Guidelines, and a way to cascade just successes is not available - while for failures there is.

Modifications:

`ELF.cascade*` methods that already exist have had their `promise` label renamed to `to`, and a new `ELF.cascadeSuccess` method has been added.

Result:

EventLoopFuture now has the cascade methods `ELF.cascade(to:)`, `ELF.cascadeFailure(to:)`, and `ELF.cascadeSuccess(to:)`
  • Loading branch information...
Mordil authored and Lukasa committed Feb 5, 2019
1 parent 9b9dceb commit caf9a3d8dacb18156491dac5a575e0827ec4a981
@@ -172,7 +172,7 @@ public func swiftMain() -> Int {
let respPart = self.unwrapInboundIn(data)
if case .end(nil) = respPart {
if self.remainingNumberOfRequests <= 0 {
ctx.channel.close().map { self.numberOfRequests - self.remainingNumberOfRequests }.cascade(promise: self.isDonePromise)
ctx.channel.close().map { self.numberOfRequests - self.remainingNumberOfRequests }.cascade(to: self.isDonePromise)
} else {
self.remainingNumberOfRequests -= 1
ctx.write(self.wrapOutboundOut(.head(RepeatedRequests.requestHead)), promise: nil)
@@ -802,7 +802,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
registerPromise.futureResult.whenFailure { (_: Error) in
self.close(promise: nil)
}
registerPromise.futureResult.cascadeFailure(promise: promise)
registerPromise.futureResult.cascadeFailure(to: promise)

if self.lifecycleManager.isPreRegistered {
// we expect kqueue/epoll registration to always succeed which is basically true, except for errors that
@@ -517,7 +517,7 @@ public final class ClientBootstrap {
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error)
}.cascade(promise: promise)
}.cascade(to: promise)
return promise.futureResult
}

@@ -729,7 +729,7 @@ public final class DatagramBootstrap {

applier(channel)(key, value).map {
applyNext()
}.cascadeFailure(promise: applyPromise)
}.cascadeFailure(to: applyPromise)
}
applyNext()

@@ -395,7 +395,7 @@ public final class ChannelPipeline: ChannelInvoker {
self.remove0(ctx: ctx, promise: promise)
}

contextFuture.cascadeFailure(promise: promise)
contextFuture.cascadeFailure(to: promise)
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
@@ -410,7 +410,7 @@ public final class ChannelPipeline: ChannelInvoker {
self.remove0(ctx: ctx, promise: promise)
}

contextFuture.cascadeFailure(promise: promise)
contextFuture.cascadeFailure(to: promise)
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
@@ -892,7 +892,7 @@ internal final class SelectableEventLoop: EventLoop {
} else {
let p = self.makePromise(of: Void.self)
self.execute {
closeGently0().cascade(promise: p)
closeGently0().cascade(to: p)
}
return p.futureResult
}
@@ -273,7 +273,7 @@ public struct EventLoopPromise<Value> {
/// At the end of an `EventLoopFuture` chain, you can use `whenSuccess()` or `whenFailure()` to add an
/// observer callback that will be invoked with the result or error at that point. (Note: If you ever
/// find yourself invoking `promise.succeed()` from inside a `whenSuccess()` callback, you probably should
/// use `flatMap()` or `cascade(promise:)` instead.)
/// use `flatMap()` or `cascade(to:)` instead.)
///
/// `EventLoopFuture` objects are typically obtained by:
/// * Using `EventLoopFuture<Value>.async` or a similar wrapper function.
@@ -440,7 +440,7 @@ extension EventLoopFuture {
next._setValue(value: futureU.value!)
}
} else {
futureU.cascade(promise: next)
futureU.cascade(to: next)
return CallbackList()
}
case .failure(let error):
@@ -557,7 +557,7 @@ extension EventLoopFuture {
next._setValue(value: t.value!)
}
} else {
t.cascade(promise: next)
t.cascade(to: next)
return CallbackList()
}
}
@@ -736,14 +736,16 @@ extension EventLoopFuture {
}
}

extension EventLoopFuture {
// MARK: cascade
/// Fulfill the given `EventLoopPromise` with the results from this `EventLoopFuture`.
extension EventLoopFuture {
/// Fulfills the given `EventLoopPromise` with the results from this `EventLoopFuture`.
///
/// This is useful when allowing users to provide promises for you to fulfill, but
/// when you are calling functions that return their own promises. They allow you to
/// tidy up your computational pipelines. For example:
/// tidy up your computational pipelines.
///
/// For example:
/// ```
/// doWork().flatMap {
/// doMoreWork($0)
@@ -753,37 +755,46 @@ extension EventLoopFuture {
/// maybeRecoverFromError($0)
/// }.map {
/// transformData($0)
/// }.cascade(promise: userPromise)
/// }.cascade(to: userPromise)
/// ```
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascade(promise: EventLoopPromise<Value>?) {
/// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future.
public func cascade(to promise: EventLoopPromise<Value>?) {
guard let promise = promise else { return }
_whenCompleteWithValue { v in
switch v {
case .failure(let err):
promise.fail(err)
case .success(let value):
promise.succeed(value)
self.whenComplete { result in
switch result {
case let .success(value): promise.succeed(value)
case let .failure(error): promise.fail(error)
}
}
}

/// Fulfill the given `EventLoopPromise` with the error result from this `EventLoopFuture`,
/// if one exists.
/// Fulfills the given `EventLoopPromise` only when this `EventLoopFuture` succeeds.
///
/// If you are doing work that fulfills a type that doesn't match the expected `EventLoopPromise` value, add an
/// intermediate `map`.
///
/// For example:
/// ```
/// let boolPromise = eventLoop.makePromise(of: Bool.self)
/// doWorkReturningInt().map({ $0 >= 0 }).cascade(to: boolPromise)
/// ```
///
/// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available.
public func cascadeSuccess(to promise: EventLoopPromise<Value>?) {
guard let promise = promise else { return }
self.whenSuccess { promise.succeed($0) }
}

/// Fails the given `EventLoopPromise` with the error from this `EventLoopFuture` if encountered.
///
/// This is an alternative variant of `cascade` that allows you to potentially return early failures in
/// error cases, while passing the user `EventLoopPromise` onwards. In general, however, `cascade` is
/// more broadly useful.
/// error cases, while passing the user `EventLoopPromise` onwards.
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
/// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`.
public func cascadeFailure<NewValue>(to promise: EventLoopPromise<NewValue>?) {
guard let promise = promise else { return }
self.whenFailure { err in
promise.fail(err)
}
self.whenFailure { promise.fail($0) }
}
}

@@ -1075,7 +1086,7 @@ public extension EventLoopFuture {
return self
}
let hoppingPromise = target.makePromise(of: Value.self)
self.cascade(promise: hoppingPromise)
self.cascade(to: hoppingPromise)
return hoppingPromise.futureResult
}
}
@@ -239,7 +239,7 @@ private struct PendingDatagramWritesState {

switch (promiseFiller, thisWriteFiller) {
case (.some(let all), .some(let this)):
all.0.futureResult.cascade(promise: this.0)
all.0.futureResult.cascade(to: this.0)
case (.none, .some(let this)):
promiseFiller = this
case (.some, .none),
@@ -196,7 +196,7 @@ private struct PendingStreamWritesState {
/* we wrote at least the whole head item, so drop it and succeed the promise */
if let promise = self.fullyWrittenFirst() {
if let p = promise0 {
p.futureResult.cascade(promise: promise)
p.futureResult.cascade(to: promise)
} else {
promise0 = promise
}
@@ -230,7 +230,7 @@ private struct PendingStreamWritesState {
while !self.pendingWrites.isEmpty {
if let p = self.fullyWrittenFirst() {
if let promise = promise0 {
promise.futureResult.cascade(promise: p)
promise.futureResult.cascade(to: p)
} else {
promise0 = p
}
@@ -21,7 +21,7 @@ private func writeChunk(wrapOutboundOut: (IOData) -> NIOAny, ctx: ChannelHandler
case (true, .some(let p)):
/* chunked encoding and the user's interested: we need three promises and need to cascade into the users promise */
let (w1, w2, w3) = (ctx.eventLoop.makePromise() as EventLoopPromise<Void>, ctx.eventLoop.makePromise() as EventLoopPromise<Void>, ctx.eventLoop.makePromise() as EventLoopPromise<Void>)
w1.futureResult.and(w2.futureResult).and(w3.futureResult).map { (_: ((((), ()), ()))) in }.cascade(promise: p)
w1.futureResult.and(w2.futureResult).and(w3.futureResult).map { (_: ((((), ()), ()))) in }.cascade(to: p)
(mW1, mW2, mW3) = (w1, w2, w3)
case (false, .some(let p)):
/* not chunked, so just use the user's promise for the actual data */
@@ -124,11 +124,11 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler {
responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm!.rawValue)
initializeEncoder(encoding: algorithm!)
pendingResponse.bufferResponseHead(responseHead)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
case .body(let body):
if algorithm != nil {
pendingResponse.bufferBodyPart(body)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
} else {
ctx.write(data, promise: promise)
}
@@ -141,7 +141,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler {
}

pendingResponse.bufferResponseEnd(httpData)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
emitPendingWrites(ctx: ctx)
algorithm = nil
deinitializeEncoder()
@@ -315,7 +315,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler {
case .quiescingLastRequestEndReceived:
ctx.write(data).flatMap {
ctx.close()
}.cascade(promise: promise)
}.cascade(to: promise)
case .acceptingEvents, .quiescingWaitingForRequestEnd:
ctx.write(data, promise: promise)
}
@@ -168,7 +168,7 @@ final class RepeatedRequests: ChannelInboundHandler {
let reqPart = self.unwrapInboundIn(data)
if case .end(nil) = reqPart {
if self.remainingNumberOfRequests <= 0 {
ctx.channel.close().map { self.doneRequests }.cascade(promise: self.isDonePromise)
ctx.channel.close().map { self.doneRequests }.cascade(to: self.isDonePromise)
} else {
self.doneRequests += 1
self.remainingNumberOfRequests -= 1
@@ -135,6 +135,16 @@ extension EventLoopFuture {
line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> {
return self.and(value: result, file: file, line: line)
}

@available(*, deprecated, renamed: "cascade(to:)")
public func cascade(promise: EventLoopPromise<Value>?) {
self.cascade(to: promise)
}

@available(*, deprecated, renamed: "cascadeFailure(to:)")
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
self.cascadeFailure(to: promise)
}
}

extension EventLoopPromise {
@@ -2475,7 +2475,7 @@ public class ChannelTests: XCTestCase {
func channelActive(ctx: ChannelHandlerContext) {
var buffer = ctx.channel.allocator.buffer(capacity: 1)
buffer.write(staticString: "X")
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(promise: self.channelAvailablePromise)
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(to: self.channelAvailablePromise)
}
}

@@ -370,7 +370,7 @@ public class EventLoopTest : XCTestCase {
}.flatMap {
// connecting here to stop epoll from throwing EPOLLHUP at us
channel.connect(to: serverChannel.localAddress!)
}.cascade(promise: connectPromise)
}.cascade(to: connectPromise)
}.wait()

// Wait for the connect to complete.
@@ -148,7 +148,7 @@ final class MulticastTest: XCTestCase {
// If we receive a datagram, or the reader promise fails, we must fail the timeoutPromise.
receivedMulticastDatagram.futureResult.map { (_: AddressedEnvelope<ByteBuffer>) in
timeoutPromise.fail(ReceivedDatagramError())
}.cascadeFailure(promise: timeoutPromise)
}.cascadeFailure(to: timeoutPromise)

var messageBuffer = sender.allocator.buffer(capacity: 24)
messageBuffer.write(staticString: "hello, world!")
@@ -286,7 +286,7 @@ class SelectorTest: XCTestCase {

// if all the new re-connected channels have read, then we're happy here.
EventLoopFuture<Void>.andAll(reconnectedChannelsHaveRead,
eventLoop: ctx.eventLoop).cascade(promise: self.everythingWasReadPromise)
eventLoop: ctx.eventLoop).cascade(to: self.everythingWasReadPromise)
// let's also remove all the channels so this code will not be triggered again.
self.allChannels.value.removeAll()
}
@@ -40,3 +40,5 @@
- `EventLoopPromise.succeed(result: Value)` lost its label so is now `EventLoopPromise.succeed(Value)`
- `EventLoopPromise.fail(error: Error)` lost its label so is now `EventLoopPromise.fail(Error)`
- renamed `HTTPProtocolUpgrader` to `HTTPServerProtocolUpgrader`
- `EventLoopFuture.cascade(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`
- `EventLoopFuture.cascadeFailure(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`

0 comments on commit caf9a3d

Please sign in to comment.