diff --git a/Sources/NIO/Selector.swift b/Sources/NIO/Selector.swift index 9d3d6557abf..99422f6ff0d 100644 --- a/Sources/NIO/Selector.swift +++ b/Sources/NIO/Selector.swift @@ -494,13 +494,22 @@ internal extension Selector where R == NIORegistration { } let futures: [EventLoopFuture] = self.registrations.map { (_, reg: NIORegistration) -> EventLoopFuture in + // The futures will only be notified (of success) once also the closeFuture of each Channel is notified. + // This only happens after all other actions on the Channel is complete and all events are propagated through the + // ChannelPipeline. We do this to minimize the risk to left over any tasks / promises that are tied to the + // EventLoop itself. + func closeChannel(_ chan: Channel) -> EventLoopFuture { + chan.close(promise: nil) + return chan.closeFuture + } + switch reg { case .serverSocketChannel(let chan, _): - return chan.close() + return closeChannel(chan) case .socketChannel(let chan, _): - return chan.close() + return closeChannel(chan) case .datagramChannel(let chan, _): - return chan.close() + return closeChannel(chan) } } diff --git a/Tests/NIOTests/EventLoopTest+XCTest.swift b/Tests/NIOTests/EventLoopTest+XCTest.swift index e10c6eef038..6055343f815 100644 --- a/Tests/NIOTests/EventLoopTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopTest+XCTest.swift @@ -36,6 +36,7 @@ extension EventLoopTest { ("testEventLoopPinnedCPUIdsConstructor", testEventLoopPinnedCPUIdsConstructor), ("testCurrentEventLoop", testCurrentEventLoop), ("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady), + ("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock), ] } } diff --git a/Tests/NIOTests/EventLoopTest.swift b/Tests/NIOTests/EventLoopTest.swift index 9942409cecb..418641d4032 100644 --- a/Tests/NIOTests/EventLoopTest.swift +++ b/Tests/NIOTests/EventLoopTest.swift @@ -279,4 +279,31 @@ public class EventLoopTest : XCTestCase { _ = eventLoop.scheduleTask(in: .hours(1)) { } try group.syncShutdownGracefully() } + + public func testCloseFutureNotifiedBeforeUnblock() throws { + class AssertHandler: ChannelInboundHandler { + typealias InboundIn = Any + + let groupIsShutdown = Atomic(value: false) + let removed = Atomic(value: false) + + public func handlerRemoved(ctx: ChannelHandlerContext) { + XCTAssertFalse(groupIsShutdown.load()) + XCTAssertTrue(removed.compareAndExchange(expected: false, desired: true)) + } + } + + let group = MultiThreadedEventLoopGroup(numThreads: 1) + let eventLoop = group.next() + let assertHandler = AssertHandler() + let channel = try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, protocolFamily: AF_INET) + try channel.pipeline.add(handler: assertHandler).wait() + try channel.register().wait() + XCTAssertFalse(channel.closeFuture.isFulfilled) + try group.syncShutdownGracefully() + XCTAssertTrue(assertHandler.groupIsShutdown.compareAndExchange(expected: false, desired: true)) + XCTAssertTrue(assertHandler.removed.load()) + XCTAssertTrue(channel.closeFuture.isFulfilled) + XCTAssertFalse(channel.isActive) + } }