Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standardize ELF.cascade* collection of methods #802

Merged
merged 2 commits into from Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -172,7 +172,7 @@ public func swiftMain() -> Int {
let respPart = self.unwrapInboundIn(data)
if case .end(nil) = respPart {
if self.remainingNumberOfRequests <= 0 {
ctx.channel.close().map { self.numberOfRequests - self.remainingNumberOfRequests }.cascade(promise: self.isDonePromise)
ctx.channel.close().map { self.numberOfRequests - self.remainingNumberOfRequests }.cascade(to: self.isDonePromise)
} else {
self.remainingNumberOfRequests -= 1
ctx.write(self.wrapOutboundOut(.head(RepeatedRequests.requestHead)), promise: nil)
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIO/BaseSocketChannel.swift
Expand Up @@ -802,7 +802,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
registerPromise.futureResult.whenFailure { (_: Error) in
self.close(promise: nil)
}
registerPromise.futureResult.cascadeFailure(promise: promise)
registerPromise.futureResult.cascadeFailure(to: promise)

if self.lifecycleManager.isPreRegistered {
// we expect kqueue/epoll registration to always succeed which is basically true, except for errors that
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIO/Bootstrap.swift
Expand Up @@ -517,7 +517,7 @@ public final class ClientBootstrap {
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error)
}.cascade(promise: promise)
}.cascade(to: promise)
return promise.futureResult
}

Expand Down Expand Up @@ -729,7 +729,7 @@ public final class DatagramBootstrap {

applier(channel)(key, value).map {
applyNext()
}.cascadeFailure(promise: applyPromise)
}.cascadeFailure(to: applyPromise)
}
applyNext()

Expand Down
4 changes: 2 additions & 2 deletions Sources/NIO/ChannelPipeline.swift
Expand Up @@ -395,7 +395,7 @@ public final class ChannelPipeline: ChannelInvoker {
self.remove0(ctx: ctx, promise: promise)
}

contextFuture.cascadeFailure(promise: promise)
contextFuture.cascadeFailure(to: promise)
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
Expand All @@ -410,7 +410,7 @@ public final class ChannelPipeline: ChannelInvoker {
self.remove0(ctx: ctx, promise: promise)
}

contextFuture.cascadeFailure(promise: promise)
contextFuture.cascadeFailure(to: promise)
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIO/EventLoop.swift
Expand Up @@ -892,7 +892,7 @@ internal final class SelectableEventLoop: EventLoop {
} else {
let p = self.makePromise(of: Void.self)
self.execute {
closeGently0().cascade(promise: p)
closeGently0().cascade(to: p)
}
return p.futureResult
}
Expand Down
65 changes: 38 additions & 27 deletions Sources/NIO/EventLoopFuture.swift
Expand Up @@ -273,7 +273,7 @@ public struct EventLoopPromise<Value> {
/// At the end of an `EventLoopFuture` chain, you can use `whenSuccess()` or `whenFailure()` to add an
/// observer callback that will be invoked with the result or error at that point. (Note: If you ever
/// find yourself invoking `promise.succeed()` from inside a `whenSuccess()` callback, you probably should
/// use `flatMap()` or `cascade(promise:)` instead.)
/// use `flatMap()` or `cascade(to:)` instead.)
///
/// `EventLoopFuture` objects are typically obtained by:
/// * Using `EventLoopFuture<Value>.async` or a similar wrapper function.
Expand Down Expand Up @@ -440,7 +440,7 @@ extension EventLoopFuture {
next._setValue(value: futureU.value!)
}
} else {
futureU.cascade(promise: next)
futureU.cascade(to: next)
return CallbackList()
}
case .failure(let error):
Expand Down Expand Up @@ -557,7 +557,7 @@ extension EventLoopFuture {
next._setValue(value: t.value!)
}
} else {
t.cascade(promise: next)
t.cascade(to: next)
return CallbackList()
}
}
Expand Down Expand Up @@ -736,14 +736,16 @@ extension EventLoopFuture {
}
}

extension EventLoopFuture {
// MARK: cascade

/// Fulfill the given `EventLoopPromise` with the results from this `EventLoopFuture`.
extension EventLoopFuture {
/// Fulfills the given `EventLoopPromise` with the results from this `EventLoopFuture`.
///
/// This is useful when allowing users to provide promises for you to fulfill, but
/// when you are calling functions that return their own promises. They allow you to
/// tidy up your computational pipelines. For example:
/// tidy up your computational pipelines.
///
/// For example:
/// ```
/// doWork().flatMap {
/// doMoreWork($0)
Expand All @@ -753,37 +755,46 @@ extension EventLoopFuture {
/// maybeRecoverFromError($0)
/// }.map {
/// transformData($0)
/// }.cascade(promise: userPromise)
/// }.cascade(to: userPromise)
/// ```
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascade(promise: EventLoopPromise<Value>?) {
/// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future.
public func cascade(to promise: EventLoopPromise<Value>?) {
Mordil marked this conversation as resolved.
Show resolved Hide resolved
guard let promise = promise else { return }
_whenCompleteWithValue { v in
switch v {
case .failure(let err):
promise.fail(err)
case .success(let value):
promise.succeed(value)
self.whenComplete { result in
switch result {
case let .success(value): promise.succeed(value)
case let .failure(error): promise.fail(error)
}
}
}

/// Fulfill the given `EventLoopPromise` with the error result from this `EventLoopFuture`,
/// if one exists.
/// Fulfills the given `EventLoopPromise` only when this `EventLoopFuture` succeeds.
///
/// If you are doing work that fulfills a type that doesn't match the expected `EventLoopPromise` value, add an
/// intermediate `map`.
///
/// For example:
/// ```
/// let boolPromise = eventLoop.makePromise(of: Bool.self)
/// doWorkReturningInt().map({ $0 >= 0 }).cascade(to: boolPromise)
/// ```
///
/// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available.
public func cascadeSuccess(to promise: EventLoopPromise<Value>?) {
guard let promise = promise else { return }
self.whenSuccess { promise.succeed($0) }
}

/// Fails the given `EventLoopPromise` with the error from this `EventLoopFuture` if encountered.
///
/// This is an alternative variant of `cascade` that allows you to potentially return early failures in
/// error cases, while passing the user `EventLoopPromise` onwards. In general, however, `cascade` is
/// more broadly useful.
/// error cases, while passing the user `EventLoopPromise` onwards.
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
/// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`.
public func cascadeFailure<NewValue>(to promise: EventLoopPromise<NewValue>?) {
guard let promise = promise else { return }
self.whenFailure { err in
promise.fail(err)
}
self.whenFailure { promise.fail($0) }
}
}

Expand Down Expand Up @@ -1075,7 +1086,7 @@ public extension EventLoopFuture {
return self
}
let hoppingPromise = target.makePromise(of: Value.self)
self.cascade(promise: hoppingPromise)
self.cascade(to: hoppingPromise)
return hoppingPromise.futureResult
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIO/PendingDatagramWritesManager.swift
Expand Up @@ -239,7 +239,7 @@ private struct PendingDatagramWritesState {

switch (promiseFiller, thisWriteFiller) {
case (.some(let all), .some(let this)):
all.0.futureResult.cascade(promise: this.0)
all.0.futureResult.cascade(to: this.0)
case (.none, .some(let this)):
promiseFiller = this
case (.some, .none),
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIO/PendingWritesManager.swift
Expand Up @@ -196,7 +196,7 @@ private struct PendingStreamWritesState {
/* we wrote at least the whole head item, so drop it and succeed the promise */
if let promise = self.fullyWrittenFirst() {
if let p = promise0 {
p.futureResult.cascade(promise: promise)
p.futureResult.cascade(to: promise)
} else {
promise0 = promise
}
Expand Down Expand Up @@ -230,7 +230,7 @@ private struct PendingStreamWritesState {
while !self.pendingWrites.isEmpty {
if let p = self.fullyWrittenFirst() {
if let promise = promise0 {
promise.futureResult.cascade(promise: p)
promise.futureResult.cascade(to: p)
} else {
promise0 = p
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOHTTP1/HTTPEncoder.swift
Expand Up @@ -21,7 +21,7 @@ private func writeChunk(wrapOutboundOut: (IOData) -> NIOAny, ctx: ChannelHandler
case (true, .some(let p)):
/* chunked encoding and the user's interested: we need three promises and need to cascade into the users promise */
let (w1, w2, w3) = (ctx.eventLoop.makePromise() as EventLoopPromise<Void>, ctx.eventLoop.makePromise() as EventLoopPromise<Void>, ctx.eventLoop.makePromise() as EventLoopPromise<Void>)
w1.futureResult.and(w2.futureResult).and(w3.futureResult).map { (_: ((((), ()), ()))) in }.cascade(promise: p)
w1.futureResult.and(w2.futureResult).and(w3.futureResult).map { (_: ((((), ()), ()))) in }.cascade(to: p)
(mW1, mW2, mW3) = (w1, w2, w3)
case (false, .some(let p)):
/* not chunked, so just use the user's promise for the actual data */
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOHTTP1/HTTPResponseCompressor.swift
Expand Up @@ -124,11 +124,11 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler {
responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm!.rawValue)
initializeEncoder(encoding: algorithm!)
pendingResponse.bufferResponseHead(responseHead)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
case .body(let body):
if algorithm != nil {
pendingResponse.bufferBodyPart(body)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
} else {
ctx.write(data, promise: promise)
}
Expand All @@ -141,7 +141,7 @@ public final class HTTPResponseCompressor: ChannelDuplexHandler {
}

pendingResponse.bufferResponseEnd(httpData)
pendingWritePromise.futureResult.cascade(promise: promise)
pendingWritePromise.futureResult.cascade(to: promise)
emitPendingWrites(ctx: ctx)
algorithm = nil
deinitializeEncoder()
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOHTTP1/HTTPServerPipelineHandler.swift
Expand Up @@ -315,7 +315,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler {
case .quiescingLastRequestEndReceived:
ctx.write(data).flatMap {
ctx.close()
}.cascade(promise: promise)
}.cascade(to: promise)
case .acceptingEvents, .quiescingWaitingForRequestEnd:
ctx.write(data, promise: promise)
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOPerformanceTester/main.swift
Expand Up @@ -168,7 +168,7 @@ final class RepeatedRequests: ChannelInboundHandler {
let reqPart = self.unwrapInboundIn(data)
if case .end(nil) = reqPart {
if self.remainingNumberOfRequests <= 0 {
ctx.channel.close().map { self.doneRequests }.cascade(promise: self.isDonePromise)
ctx.channel.close().map { self.doneRequests }.cascade(to: self.isDonePromise)
} else {
self.doneRequests += 1
self.remainingNumberOfRequests -= 1
Expand Down
10 changes: 10 additions & 0 deletions Sources/_NIO1APIShims/NIO1APIShims.swift
Expand Up @@ -135,6 +135,16 @@ extension EventLoopFuture {
line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> {
return self.and(value: result, file: file, line: line)
}

@available(*, deprecated, renamed: "cascade(to:)")
public func cascade(promise: EventLoopPromise<Value>?) {
self.cascade(to: promise)
}

@available(*, deprecated, renamed: "cascadeFailure(to:)")
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
self.cascadeFailure(to: promise)
}
}

extension EventLoopPromise {
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/ChannelTests.swift
Expand Up @@ -2475,7 +2475,7 @@ public class ChannelTests: XCTestCase {
func channelActive(ctx: ChannelHandlerContext) {
var buffer = ctx.channel.allocator.buffer(capacity: 1)
buffer.write(staticString: "X")
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(promise: self.channelAvailablePromise)
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(to: self.channelAvailablePromise)
}
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/EventLoopTest.swift
Expand Up @@ -370,7 +370,7 @@ public class EventLoopTest : XCTestCase {
}.flatMap {
// connecting here to stop epoll from throwing EPOLLHUP at us
channel.connect(to: serverChannel.localAddress!)
}.cascade(promise: connectPromise)
}.cascade(to: connectPromise)
}.wait()

// Wait for the connect to complete.
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/MulticastTest.swift
Expand Up @@ -148,7 +148,7 @@ final class MulticastTest: XCTestCase {
// If we receive a datagram, or the reader promise fails, we must fail the timeoutPromise.
receivedMulticastDatagram.futureResult.map { (_: AddressedEnvelope<ByteBuffer>) in
timeoutPromise.fail(ReceivedDatagramError())
}.cascadeFailure(promise: timeoutPromise)
}.cascadeFailure(to: timeoutPromise)

var messageBuffer = sender.allocator.buffer(capacity: 24)
messageBuffer.write(staticString: "hello, world!")
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOTests/SelectorTest.swift
Expand Up @@ -286,7 +286,7 @@ class SelectorTest: XCTestCase {

// if all the new re-connected channels have read, then we're happy here.
EventLoopFuture<Void>.andAll(reconnectedChannelsHaveRead,
eventLoop: ctx.eventLoop).cascade(promise: self.everythingWasReadPromise)
eventLoop: ctx.eventLoop).cascade(to: self.everythingWasReadPromise)
// let's also remove all the channels so this code will not be triggered again.
self.allChannels.value.removeAll()
}
Expand Down
2 changes: 2 additions & 0 deletions docs/public-api-changes-NIO1-to-NIO2.md
Expand Up @@ -40,3 +40,5 @@
- `EventLoopPromise.succeed(result: Value)` lost its label so is now `EventLoopPromise.succeed(Value)`
- `EventLoopPromise.fail(error: Error)` lost its label so is now `EventLoopPromise.fail(Error)`
- renamed `HTTPProtocolUpgrader` to `HTTPServerProtocolUpgrader`
- `EventLoopFuture.cascade(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`
Mordil marked this conversation as resolved.
Show resolved Hide resolved
- `EventLoopFuture.cascadeFailure(promise: EventLoopPromise)` had its label changed to `EventLoopFuture.cascade(to: EventLoopPromise)`