Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions Sources/NIO/Selector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,22 @@ internal extension Selector where R == NIORegistration {
}

let futures: [EventLoopFuture<Void>] = self.registrations.map { (_, reg: NIORegistration) -> EventLoopFuture<Void> in
// The futures will only be notified (of success) once also the closeFuture of each Channel is notified.
// This only happens after all other actions on the Channel is complete and all events are propagated through the
// ChannelPipeline. We do this to minimize the risk to left over any tasks / promises that are tied to the
// EventLoop itself.
func closeChannel(_ chan: Channel) -> EventLoopFuture<Void> {
chan.close(promise: nil)
return chan.closeFuture
}

switch reg {
case .serverSocketChannel(let chan, _):
return chan.close()
return closeChannel(chan)
case .socketChannel(let chan, _):
return chan.close()
return closeChannel(chan)
case .datagramChannel(let chan, _):
return chan.close()
return closeChannel(chan)
}
}

Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/EventLoopTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extension EventLoopTest {
("testEventLoopPinnedCPUIdsConstructor", testEventLoopPinnedCPUIdsConstructor),
("testCurrentEventLoop", testCurrentEventLoop),
("testShutdownWhileScheduledTasksNotReady", testShutdownWhileScheduledTasksNotReady),
("testCloseFutureNotifiedBeforeUnblock", testCloseFutureNotifiedBeforeUnblock),
]
}
}
Expand Down
27 changes: 27 additions & 0 deletions Tests/NIOTests/EventLoopTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,31 @@ public class EventLoopTest : XCTestCase {
_ = eventLoop.scheduleTask(in: .hours(1)) { }
try group.syncShutdownGracefully()
}

public func testCloseFutureNotifiedBeforeUnblock() throws {
class AssertHandler: ChannelInboundHandler {
typealias InboundIn = Any

let groupIsShutdown = Atomic(value: false)
let removed = Atomic(value: false)

public func handlerRemoved(ctx: ChannelHandlerContext) {
XCTAssertFalse(groupIsShutdown.load())
XCTAssertTrue(removed.compareAndExchange(expected: false, desired: true))
}
}

let group = MultiThreadedEventLoopGroup(numThreads: 1)
let eventLoop = group.next()
let assertHandler = AssertHandler()
let channel = try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, protocolFamily: AF_INET)
try channel.pipeline.add(handler: assertHandler).wait()
try channel.register().wait()
XCTAssertFalse(channel.closeFuture.isFulfilled)
try group.syncShutdownGracefully()
XCTAssertTrue(assertHandler.groupIsShutdown.compareAndExchange(expected: false, desired: true))
XCTAssertTrue(assertHandler.removed.load())
XCTAssertTrue(channel.closeFuture.isFulfilled)
XCTAssertFalse(channel.isActive)
}
}