From 25cdd3d27dde8b601ff88e7174fa9896a982bc28 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 26 Mar 2018 09:50:57 +0200 Subject: [PATCH] Take closeFuture into account when closing EventLoopGroup Motiviation: When the Selector is closed we also close the registered Channels and notify the future once all the close operations complete. The problem here is that while the close operation may be complete already there may still be events flowing through the ChannelPipeline. The only way to ensure this not happens anymore is to take the closeFuture into account of each Channel (as the closeFuture will only be notified once all is done for a Channel). Modification: - Also take the closeFuture into account per Channel - Add test Result: Safer shutdown of EventLoopGroup. --- Sources/NIO/Selector.swift | 15 ++++++++++--- Tests/NIOTests/EventLoopTest+XCTest.swift | 1 + Tests/NIOTests/EventLoopTest.swift | 27 +++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/Sources/NIO/Selector.swift b/Sources/NIO/Selector.swift index 9d3d6557abf..99422f6ff0d 100644 --- a/Sources/NIO/Selector.swift +++ b/Sources/NIO/Selector.swift @@ -494,13 +494,22 @@ internal extension Selector where R == NIORegistration { } let futures: [EventLoopFuture] = self.registrations.map { (_, reg: NIORegistration) -> EventLoopFuture in + // The futures will only be notified (of success) once also the closeFuture of each Channel is notified. + // This only happens after all other actions on the Channel is complete and all events are propagated through the + // ChannelPipeline. We do this to minimize the risk to left over any tasks / promises that are tied to the + // EventLoop itself. + func closeChannel(_ chan: Channel) -> EventLoopFuture { + chan.close(promise: nil) + return chan.closeFuture + } + switch reg { case .serverSocketChannel(let chan, _): - return chan.close() + return closeChannel(chan) case .socketChannel(let chan, _): - return chan.close() + return closeChannel(chan) case .datagramChannel(let chan, _): - return chan.close() + return closeChannel(chan) } } diff --git a/Tests/NIOTests/EventLoopTest+XCTest.swift b/Tests/NIOTests/EventLoopTest+XCTest.swift index e10c6eef038..6055343f815 100644 --- a/Tests/NIOTests/EventLoopTest+XCTest.swift +++ b/Tests/NIOTests/EventLoopTest+XCTest.swift @@ -36,6 +36,7 @@ extension EventLoopTest { ("testEventLoopPinnedCPUIdsConstructor", testEventLoopPinnedCPUIdsConstructor), ("testCurrentEventLoop", testCurrentEventLoop), ("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady), + ("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock), ] } } diff --git a/Tests/NIOTests/EventLoopTest.swift b/Tests/NIOTests/EventLoopTest.swift index 9942409cecb..418641d4032 100644 --- a/Tests/NIOTests/EventLoopTest.swift +++ b/Tests/NIOTests/EventLoopTest.swift @@ -279,4 +279,31 @@ public class EventLoopTest : XCTestCase { _ = eventLoop.scheduleTask(in: .hours(1)) { } try group.syncShutdownGracefully() } + + public func testCloseFutureNotifiedBeforeUnblock() throws { + class AssertHandler: ChannelInboundHandler { + typealias InboundIn = Any + + let groupIsShutdown = Atomic(value: false) + let removed = Atomic(value: false) + + public func handlerRemoved(ctx: ChannelHandlerContext) { + XCTAssertFalse(groupIsShutdown.load()) + XCTAssertTrue(removed.compareAndExchange(expected: false, desired: true)) + } + } + + let group = MultiThreadedEventLoopGroup(numThreads: 1) + let eventLoop = group.next() + let assertHandler = AssertHandler() + let channel = try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, protocolFamily: AF_INET) + try channel.pipeline.add(handler: assertHandler).wait() + try channel.register().wait() + XCTAssertFalse(channel.closeFuture.isFulfilled) + try group.syncShutdownGracefully() + XCTAssertTrue(assertHandler.groupIsShutdown.compareAndExchange(expected: false, desired: true)) + XCTAssertTrue(assertHandler.removed.load()) + XCTAssertTrue(channel.closeFuture.isFulfilled) + XCTAssertFalse(channel.isActive) + } }