Permalink
Browse files

EventLoopFuture: use Result type (#734)

Motivation:

Now that the stdlib has introduced the Result type, we can use it in the
implementation (and the whenComplete) function of EventLoopFuture

Modifications:

- replace EventLoopValue with Result
- make whenComplete provide the Result

Result:

use the new shiny stuff
  • Loading branch information...
weissi authored and Lukasa committed Jan 5, 2019
1 parent 08eb325 commit 684cad331cac8b14dadf826b5527e4b819addc8a
@@ -440,7 +440,7 @@ public final class ClientBootstrap {
channel.close(promise: nil)
}

connectPromise.futureResult.whenComplete {
connectPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
cancelTask.cancel()
}
return connectPromise.futureResult
@@ -223,7 +223,7 @@ public class IdleStateHandler: ChannelDuplexHandler {
}

let writePromise = promise ?? ctx.eventLoop.makePromise()
writePromise.futureResult.whenComplete {
writePromise.futureResult.whenComplete { (_: Result<Void, Error>) in
self.lastWriteCompleteTime = DispatchTime.now()
}
ctx.write(data, promise: writePromise)
@@ -116,7 +116,7 @@ public final class RepeatedTask {
}

scheduled.futureResult.whenSuccess { future in
future.whenComplete {
future.whenComplete { (_: Result<Void, Error>) in
self.reschedule0()
}
}
@@ -957,7 +957,7 @@ final public class MultiThreadedEventLoopGroup: EventLoopGroup {
g.enter()
loop.closeGently().mapIfError { err in
q.sync { error = err }
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
g.leave()
}
}
@@ -14,14 +14,6 @@
import NIOConcurrencyHelpers


/// A `Result`-like type that is used to track the data through the
/// callback pipeline.
private enum EventLoopFutureValue<T> {
case success(T)
case failure(Error)
}

/// Internal list of callbacks.
///
/// Most of these are closures that pull a value from one future, call a user callback, push the
@@ -187,7 +179,7 @@ public struct EventLoopPromise<T> {
///
/// - parameters:
/// - value: The value to fire the future with.
private func _resolve(value: EventLoopFutureValue<T>) {
private func _resolve(value: Result<T, Error>) {
if futureResult.eventLoop.inEventLoop {
_setValue(value: value)._run()
} else {
@@ -202,7 +194,7 @@ public struct EventLoopPromise<T> {
/// - parameters:
/// - value: The result of the promise.
/// - returns: The callback list to run.
fileprivate func _setValue(value: EventLoopFutureValue<T>) -> CallbackList {
fileprivate func _setValue(value: Result<T, Error>) -> CallbackList {
return futureResult._setValue(value: value)
}
}
@@ -333,7 +325,7 @@ public struct EventLoopPromise<T> {
/// `EventLoopFuture` should be sufficient to guarantee thread-safety.
public final class EventLoopFuture<T> {
// TODO: Provide a tracing facility. It would be nice to be able to set '.debugTrace = true' on any EventLoopFuture or EventLoopPromise and have every subsequent chained EventLoopFuture report the success result or failure error. That would simplify some debugging scenarios.
fileprivate var value: EventLoopFutureValue<T>? {
fileprivate var value: Result<T, Error>? {
didSet {
_isFulfilled.store(true)
}
@@ -356,7 +348,7 @@ public final class EventLoopFuture<T> {
/// the entire chain from the top without recursing.
fileprivate var callbacks: CallbackList = CallbackList()

private init(eventLoop: EventLoop, value: EventLoopFutureValue<T>?, file: StaticString, line: UInt) {
private init(eventLoop: EventLoop, value: Result<T, Error>?, file: StaticString, line: UInt) {
self.eventLoop = eventLoop
self.value = value
self._isFulfilled = UnsafeEmbeddedAtomic(value: value != nil)
@@ -608,7 +600,7 @@ extension EventLoopFuture {
}
}

fileprivate func _whenCompleteWithValue(_ callback: @escaping (EventLoopFutureValue<T>) -> Void) {
fileprivate func _whenCompleteWithValue(_ callback: @escaping (Result<T, Error>) -> Void) {
_whenComplete {
callback(self.value!)
return CallbackList()
@@ -662,16 +654,16 @@ extension EventLoopFuture {
///
/// - parameters:
/// - callback: The callback that is called when the `EventLoopFuture` is fulfilled.
public func whenComplete(_ callback: @escaping () -> Void) {
public func whenComplete(_ callback: @escaping (Result<T, Error>) -> Void) {
_whenComplete {
callback()
callback(self.value!)
return CallbackList()
}
}


/// Internal: Set the value and return a list of callbacks that should be invoked as a result.
fileprivate func _setValue(value: EventLoopFutureValue<T>) -> CallbackList {
fileprivate func _setValue(value: Result<T, Error>) -> CallbackList {
self.eventLoop.assertInEventLoop()
if self.value == nil {
self.value = value
@@ -813,7 +805,7 @@ Further information:
precondition(MultiThreadedEventLoopGroup.currentEventLoop == nil, explainer(), file: file, line: line)
}

var v: EventLoopFutureValue <T>? = nil
var v: Result<T, Error>? = nil
let lock = ConditionLock(value: 0)
_whenComplete { () -> CallbackList in
lock.lock()
@@ -606,7 +606,7 @@ internal class HappyEyeballsConnector {
self.targets.aResultsAvailable(results)
}.mapIfError { err in
self.error.dnsAError = err
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.dnsResolutions += 1
self.processInput(.resolverACompleted)
}
@@ -618,7 +618,7 @@ internal class HappyEyeballsConnector {
self.targets.aaaaResultsAvailable(results)
}.mapIfError { err in
self.error.dnsAAAAError = err
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
// It's possible that we were waiting to time out here, so if we were we should
// cancel that.
self.resolutionTask?.cancel()
@@ -206,7 +206,7 @@ public class HTTPServerUpgradeHandler: ChannelInboundHandler {
if bufferedMessages.count > 0 {
ctx.fireChannelReadComplete()
}
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
ctx.pipeline.remove(ctx: ctx, promise: nil)
}
}
@@ -358,7 +358,7 @@ private final class HTTPHandler: ChannelInboundHandler {
} else {
return ctx.close()
}
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
_ = try? file.close()
}
case .sendfile:
@@ -370,7 +370,7 @@ private final class HTTPHandler: ChannelInboundHandler {
return p.futureResult
}.thenIfError { (_: Error) in
ctx.close()
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
_ = try? file.close()
}
}
@@ -387,7 +387,7 @@ private final class HTTPHandler: ChannelInboundHandler {

let promise = self.keepAlive ? promise : (promise ?? ctx.eventLoop.makePromise())
if !self.keepAlive {
promise!.futureResult.whenComplete { ctx.close(promise: nil) }
promise!.futureResult.whenComplete { (_: Result<Void, Error>) in ctx.close(promise: nil) }
}
self.handler = nil

@@ -118,7 +118,7 @@ public class ApplicationProtocolNegotiationHandler: ChannelInboundHandler {
}

let switchFuture = completionHandler(result)
switchFuture.whenComplete {
switchFuture.whenComplete { (_: Result<Void, Error>) in
self.unbuffer(context: context)
context.pipeline.remove(handler: self, promise: nil)
}
@@ -309,7 +309,7 @@ public final class WebSocketFrameDecoder: ByteToMessageDecoder {
let frame = WebSocketFrame(fin: true,
opcode: .connectionClose,
data: data)
ctx.writeAndFlush(self.wrapInboundOut(frame)).whenComplete {
ctx.writeAndFlush(self.wrapInboundOut(frame)).whenComplete { (_: Result<Void, Error>) in
ctx.close(promise: nil)
}
}
@@ -32,7 +32,7 @@ public final class WebSocketProtocolErrorHandler: ChannelInboundHandler {
let frame = WebSocketFrame(fin: true,
opcode: .connectionClose,
data: data)
ctx.writeAndFlush(self.wrapOutboundOut(frame)).whenComplete {
ctx.writeAndFlush(self.wrapOutboundOut(frame)).whenComplete { (_: Result<Void, Error>) in
ctx.close(promise: nil)
}
}
@@ -81,7 +81,7 @@ private final class HTTPHandler: ChannelInboundHandler {
headers: headers)
ctx.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(self.responseBody))), promise: nil)
ctx.write(self.wrapOutboundOut(.end(nil))).whenComplete {
ctx.write(self.wrapOutboundOut(.end(nil))).whenComplete { (_: Result<Void, Error>) in
ctx.close(promise: nil)
}
ctx.flush()
@@ -95,7 +95,7 @@ private final class HTTPHandler: ChannelInboundHandler {
status: .methodNotAllowed,
headers: headers)
ctx.write(self.wrapOutboundOut(.head(head)), promise: nil)
ctx.write(self.wrapOutboundOut(.end(nil))).whenComplete {
ctx.write(self.wrapOutboundOut(.end(nil))).whenComplete { (_: Result<Void, Error>) in
ctx.close(promise: nil)
}
ctx.flush()
@@ -194,7 +194,7 @@ private final class WebSocketTimeHandler: ChannelInboundHandler {
var data = ctx.channel.allocator.buffer(capacity: 2)
data.write(webSocketErrorCode: .protocolError)
let frame = WebSocketFrame(fin: true, opcode: .connectionClose, data: data)
ctx.write(self.wrapOutboundOut(frame)).whenComplete {
ctx.write(self.wrapOutboundOut(frame)).whenComplete { (_: Result<Void, Error>) in
ctx.close(mode: .output, promise: nil)
}
awaitingClose = true
@@ -36,7 +36,7 @@ private class PromiseOrderer {
let thisPromiseIndex = promiseArray.count
promiseArray.append(promise)

promise.futureResult.whenComplete {
promise.futureResult.whenComplete { (_: Result<Void, Error>) in
let priorFutures = self.promiseArray[0..<thisPromiseIndex]
let subsequentFutures = self.promiseArray[(thisPromiseIndex + 1)...]
let allPriorFuturesFired = priorFutures.map { $0.futureResult.isFulfilled }.allSatisfy { $0 }
@@ -124,12 +124,12 @@ class HTTPServerClientTest : XCTestCase {
b.write(string: replyString)

let outbound = self.outboundBody(b)
ctx.write(self.wrapOutboundOut(outbound.body)).whenComplete {
ctx.write(self.wrapOutboundOut(outbound.body)).whenComplete { (_: Result<Void, Error>) in
outbound.destructor()
}
ctx.write(self.wrapOutboundOut(.end(nil))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -148,13 +148,13 @@ class HTTPServerClientTest : XCTestCase {
let outbound = self.outboundBody(b)
ctx.write(self.wrapOutboundOut(outbound.body)).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
outbound.destructor()
}
}
ctx.write(self.wrapOutboundOut(.end(nil))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -174,7 +174,7 @@ class HTTPServerClientTest : XCTestCase {
let outbound = self.outboundBody(b)
ctx.write(self.wrapOutboundOut(outbound.body)).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
outbound.destructor()
}
}
@@ -184,7 +184,7 @@ class HTTPServerClientTest : XCTestCase {
trailers.add(name: "X-Should-Trail", value: "sure")
ctx.write(self.wrapOutboundOut(.end(trailers))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -209,12 +209,12 @@ class HTTPServerClientTest : XCTestCase {
let outbound = self.outboundBody(buf)
ctx.writeAndFlush(self.wrapOutboundOut(outbound.body)).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
outbound.destructor()
}
ctx.write(self.wrapOutboundOut(.end(nil))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -227,7 +227,7 @@ class HTTPServerClientTest : XCTestCase {
}
ctx.write(self.wrapOutboundOut(.end(nil))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -239,7 +239,7 @@ class HTTPServerClientTest : XCTestCase {
}
ctx.write(self.wrapOutboundOut(.end(nil))).mapIfError { error in
XCTFail("unexpected error \(error)")
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.sentEnd = true
self.maybeClose(ctx: ctx)
}
@@ -972,7 +972,7 @@ public class ChannelTests: XCTestCase {
pwm.markFlushCheckpoint()
_ = pwm.add(data: .byteBuffer(buffer), promise: ps[2])

ps[0].futureResult.whenComplete {
ps[0].futureResult.whenComplete { (_: Result<Void, Error>) in
pwm.failAll(error: ChannelError.inputClosed, close: true)
}

@@ -350,7 +350,7 @@ class EchoServerClientTest : XCTestCase {
default:
XCTFail("unexpected error: \(err)")
}
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.channelInactivePromise.succeed(result: ())
}
}
@@ -370,7 +370,7 @@ class EchoServerClientTest : XCTestCase {
default:
XCTFail("unexpected error: \(err)")
}
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.channelUnregisteredPromise.succeed(result: ())
}
}
@@ -603,7 +603,7 @@ class EchoServerClientTest : XCTestCase {
}

private func writeUntilFailed(_ ctx: ChannelHandlerContext, _ buffer: ByteBuffer) {
ctx.writeAndFlush(NIOAny(buffer)).whenComplete {
ctx.writeAndFlush(NIOAny(buffer)).whenComplete { (_: Result<Void, Error>) in
ctx.eventLoop.execute {
self.writeUntilFailed(ctx, buffer)
}
@@ -635,7 +635,7 @@ class EchoServerClientTest : XCTestCase {
ctx.writeAndFlush(NIOAny(buffer))
}.then {
ctx.close()
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
self.dpGroup.leave()
}
}
@@ -160,14 +160,14 @@ class EmbeddedChannelTest: XCTestCase {
let channel = EmbeddedChannel()
XCTAssertFalse(channel.isActive)
let connectPromise = channel.eventLoop.makePromise(of: Void.self)
connectPromise.futureResult.whenComplete {
connectPromise.futureResult.whenComplete { (_: Result<Void, Error>) in
XCTAssertTrue(channel.isActive)
}
channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 0), promise: connectPromise)
try connectPromise.futureResult.wait()

let closePromise = channel.eventLoop.makePromise(of: Void.self)
closePromise.futureResult.whenComplete {
closePromise.futureResult.whenComplete { (_: Result<Void, Error>) in
XCTAssertFalse(channel.isActive)
}

@@ -215,7 +215,7 @@ public class EmbeddedEventLoopTest: XCTestCase {
XCTFail("Scheduled future completed")
}.mapIfError { caughtErr in
XCTAssertTrue(err === caughtErr as? EmbeddedTestError)
}.whenComplete {
}.whenComplete { (_: Result<Void, Error>) in
fired = true
}

Oops, something went wrong.

0 comments on commit 684cad3

Please sign in to comment.