Permalink
Browse files

Provide ChannelPipeline.remove methods with promises. (#651)

Motivation:

ChannelPipeline.remove0 contains a comment that indicates that users
were expected to be able to use the ChannelHandlerContext when both
handlerRemoved and the promise for channel removal call out.

This works when invoking remove() from outside the event loop, but if
a handler invokes remove on itself then it will only be able to attach
a callback to the future *after* the callout occurs. This means that a
ChannelHandler removing itself from the pipeline cannot rely on there
being a moment when it can still invoke the ChannelHandlerContext, but
when it is no longer a formal part of the pipeline.

This kind of behaviour is useful in some unusual cases, and it would
be nice to support it.

Modifications:

- Added remove() methods that accept promises as input.
- Rewrote the current remove() methods in terms of the new ones.
- Added tests.

Result:

ChannelHandlers can perform cleanup with a valid ChannelHandlerContext
but outside the Channel.
  • Loading branch information...
Lukasa committed Nov 7, 2018
1 parent 035962e commit a0b3b19cf43936c6831b7be7a2764ed7c1a69866
@@ -357,11 +357,7 @@ public final class ChannelPipeline: ChannelInvoker {
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func remove(handler: ChannelHandler) -> EventLoopFuture<Bool> {
let promise: EventLoopPromise<Bool> = self.eventLoop.newPromise()
context0({
return $0.handler === handler
}).map { ctx in
self.remove0(ctx: ctx, promise: promise)
}.cascadeFailure(promise: promise)
self.remove(handler: handler, promise: promise)
return promise.futureResult
}
@@ -372,9 +368,7 @@ public final class ChannelPipeline: ChannelInvoker {
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func remove(name: String) -> EventLoopFuture<Bool> {
let promise: EventLoopPromise<Bool> = self.eventLoop.newPromise()
context0({ $0.name == name }).map { ctx in
self.remove0(ctx: ctx, promise: promise)
}.cascadeFailure(promise: promise)
self.remove(name: name, promise: promise)
return promise.futureResult
}
@@ -385,14 +379,57 @@ public final class ChannelPipeline: ChannelInvoker {
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func remove(ctx: ChannelHandlerContext) -> EventLoopFuture<Bool> {
let promise: EventLoopPromise<Bool> = self.eventLoop.newPromise()
self.remove(ctx: ctx, promise: promise)
return promise.futureResult
}
/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func remove(handler: ChannelHandler, promise: EventLoopPromise<Bool>?) {
let contextFuture = self.context0 {
return $0.handler === handler
}.map { ctx in
self.remove0(ctx: ctx, promise: promise)
}
if let promise = promise {
contextFuture.cascadeFailure(promise: promise)
}
}
/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func remove(name: String, promise: EventLoopPromise<Bool>?) {
let contextFuture = self.context0 {
$0.name == name
}.map { ctx in
self.remove0(ctx: ctx, promise: promise)
}
if let promise = promise {
contextFuture.cascadeFailure(promise: promise)
}
}
/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - ctx: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func remove(ctx: ChannelHandlerContext, promise: EventLoopPromise<Bool>?) {
if self.eventLoop.inEventLoop {
remove0(ctx: ctx, promise: promise)
self.remove0(ctx: ctx, promise: promise)
} else {
self.eventLoop.execute {
self.remove0(ctx: ctx, promise: promise)
}
}
return promise.futureResult
}
/// Returns the `ChannelHandlerContext` that belongs to a `ChannelHandler`.
@@ -45,6 +45,12 @@ extension ChannelPipelineTest {
("testFindHandlerByTypeReturnsTheFirstOfItsType", testFindHandlerByTypeReturnsTheFirstOfItsType),
("testContextForHeadOrTail", testContextForHeadOrTail),
("testRemoveHeadOrTail", testRemoveHeadOrTail),
("testRemovingByContextWithPromiseStillInChannel", testRemovingByContextWithPromiseStillInChannel),
("testRemovingByContextWithFutureNotInChannel", testRemovingByContextWithFutureNotInChannel),
("testRemovingByNameWithPromiseStillInChannel", testRemovingByNameWithPromiseStillInChannel),
("testRemovingByNameWithFutureNotInChannel", testRemovingByNameWithFutureNotInChannel),
("testRemovingByReferenceWithPromiseStillInChannel", testRemovingByReferenceWithPromiseStillInChannel),
("testRemovingByReferenceWithFutureNotInChannel", testRemovingByReferenceWithFutureNotInChannel),
]
}
}
@@ -678,4 +678,228 @@ class ChannelPipelineTest: XCTestCase {
/// expected
}
}
func testRemovingByContextWithPromiseStillInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
_ = try? channel.finish()
}
XCTAssertNoThrow(try channel.pipeline.add(handler: NoOpHandler()).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
let removalPromise: EventLoopPromise<Bool> = channel.eventLoop.newPromise()
removalPromise.futureResult.whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(ctx: context, promise: removalPromise)
guard case .some(.byteBuffer(let receivedBuffer)) = channel.readOutbound() else {
XCTFail("No buffer")
return
}
XCTAssertEqual(receivedBuffer, buffer)
do {
try channel.throwIfErrorCaught()
XCTFail("Did not throw")
} catch is DummyError {
// expected
} catch {
XCTFail("Unexpected error: \(error)")
}
}
func testRemovingByContextWithFutureNotInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
XCTAssertFalse(try channel.finish())
}
XCTAssertNoThrow(try channel.pipeline.add(handler: NoOpHandler()).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(ctx: context).whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
}
func testRemovingByNameWithPromiseStillInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
_ = try? channel.finish()
}
XCTAssertNoThrow(try channel.pipeline.add(name: "TestHandler", handler: NoOpHandler()).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
let removalPromise: EventLoopPromise<Bool> = channel.eventLoop.newPromise()
removalPromise.futureResult.whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(name: "TestHandler", promise: removalPromise)
guard case .some(.byteBuffer(let receivedBuffer)) = channel.readOutbound() else {
XCTFail("No buffer")
return
}
XCTAssertEqual(receivedBuffer, buffer)
do {
try channel.throwIfErrorCaught()
XCTFail("Did not throw")
} catch is DummyError {
// expected
} catch {
XCTFail("Unexpected error: \(error)")
}
}
func testRemovingByNameWithFutureNotInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
XCTAssertFalse(try channel.finish())
}
XCTAssertNoThrow(try channel.pipeline.add(name: "TestHandler", handler: NoOpHandler()).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(name: "TestHandler").whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
}
func testRemovingByReferenceWithPromiseStillInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
_ = try? channel.finish()
}
let handler = NoOpHandler()
XCTAssertNoThrow(try channel.pipeline.add(handler: handler).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
let removalPromise: EventLoopPromise<Bool> = channel.eventLoop.newPromise()
removalPromise.futureResult.whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(handler: handler, promise: removalPromise)
guard case .some(.byteBuffer(let receivedBuffer)) = channel.readOutbound() else {
XCTFail("No buffer")
return
}
XCTAssertEqual(receivedBuffer, buffer)
do {
try channel.throwIfErrorCaught()
XCTFail("Did not throw")
} catch is DummyError {
// expected
} catch {
XCTFail("Unexpected error: \(error)")
}
}
func testRemovingByReferenceWithFutureNotInChannel() throws {
class NoOpHandler: ChannelInboundHandler {
typealias InboundIn = Never
}
class DummyError: Error { }
let channel = EmbeddedChannel()
defer {
// This will definitely throw.
XCTAssertFalse(try channel.finish())
}
let handler = NoOpHandler()
XCTAssertNoThrow(try channel.pipeline.add(handler: handler).wait())
let context = try assertNoThrowWithValue(channel.pipeline.context(handlerType: NoOpHandler.self).wait())
var buffer = channel.allocator.buffer(capacity: 1024)
buffer.write(staticString: "Hello, world!")
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
channel.pipeline.remove(handler: handler).whenSuccess { (_: Bool) in
context.writeAndFlush(NIOAny(buffer), promise: nil)
context.fireErrorCaught(DummyError())
}
XCTAssertNil(channel.readOutbound())
XCTAssertNoThrow(try channel.throwIfErrorCaught())
}
}

0 comments on commit a0b3b19

Please sign in to comment.