Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 111 additions & 45 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ private struct SocketChannelLifecycleManager {
// MARK: Types
private enum State {
case fresh
case registered
case preRegistered // register() has been run but the selector doesn't know about it yet
case fullyRegistered // fully registered, ie. the selector knows about it
case activated
case closed
}

private enum Event {
case activate
case register
case beginRegistration
case finishRegistration
case close
}

Expand Down Expand Up @@ -66,8 +68,13 @@ private struct SocketChannelLifecycleManager {
}

@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
internal mutating func register(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
return self.moveState(event: .register, promise: promise)
internal mutating func beginRegistration(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
return self.moveState(event: .beginRegistration, promise: promise)
}

@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
internal mutating func finishRegistration(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
return self.moveState(event: .finishRegistration, promise: promise)
}

@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
Expand All @@ -87,21 +94,28 @@ private struct SocketChannelLifecycleManager {

switch (self.currentState, event) {
// origin: .fresh
case (.fresh, .register):
return self.doStateTransfer(newState: .registered, promise: promise) { pipeline in
case (.fresh, .beginRegistration):
return self.doStateTransfer(newState: .preRegistered, promise: promise) { pipeline in
pipeline.fireChannelRegistered0()
}

case (.fresh, .close):
return self.doStateTransfer(newState: .closed, promise: promise) { (_: ChannelPipeline) in }

// origin: .registered
case (.registered, .activate):
// origin: .preRegistered
case (.preRegistered, .finishRegistration):
return self.doStateTransfer(newState: .fullyRegistered, promise: promise) { pipeline in
// we don't tell the user about this
}

// origin: .fullyRegistered
case (.fullyRegistered, .activate):
return self.doStateTransfer(newState: .activated, promise: promise) { pipeline in
pipeline.fireChannelActive0()
}

case (.registered, .close):
// origin: .preRegistered || .fullyRegistered
case (.preRegistered, .close), (.fullyRegistered, .close):
return self.doStateTransfer(newState: .closed, promise: promise) { pipeline in
pipeline.fireChannelUnregistered0()
}
Expand All @@ -114,11 +128,16 @@ private struct SocketChannelLifecycleManager {
}

// bad transitions
case (.fresh, .activate), // should go through .registered first
(.registered, .register), // already registered
(.activated, .activate), // already activated
(.activated, .register), // already registered (and activated)
(.closed, _): // already closed
case (.fresh, .activate), // should go through .registered first
(.preRegistered, .activate), // need to first be fully registered
(.preRegistered, .beginRegistration), // already registered
(.fullyRegistered, .beginRegistration), // already registered
(.activated, .activate), // already activated
(.activated, .beginRegistration), // already fully registered (and activated)
(.activated, .finishRegistration), // already fully registered (and activated)
(.fullyRegistered, .finishRegistration), // already fully registered
(.fresh, .finishRegistration), // need to register lazily first
(.closed, _): // already closed
self.badTransition(event: event)
}
}
Expand All @@ -143,12 +162,22 @@ private struct SocketChannelLifecycleManager {
return self.currentState == .activated
}

internal var isRegistered: Bool {
internal var isPreRegistered: Bool {
assert(self.eventLoop.inEventLoop)
switch self.currentState {
case .fresh, .closed:
return false
case .registered, .activated:
case .preRegistered, .fullyRegistered, .activated:
return true
}
}

internal var isRegisteredFully: Bool {
assert(self.eventLoop.inEventLoop)
switch self.currentState {
case .fresh, .closed, .preRegistered:
return false
case .fullyRegistered, .activated:
return true
}
}
Expand Down Expand Up @@ -264,7 +293,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

var isRegistered: Bool {
assert(self.eventLoop.inEventLoop)
return self.lifecycleManager.isRegistered
return self.lifecycleManager.isPreRegistered
}

internal var selectable: T {
Expand Down Expand Up @@ -463,7 +492,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

// We only want to call read0() or pauseRead0() if we already registered to the EventLoop if not this will be automatically done
// once register0 is called. Beside this we also only need to do it when the value actually change.
if self.lifecycleManager.isRegistered && old != auto {
if self.lifecycleManager.isPreRegistered && old != auto {
if auto {
read0()
} else {
Expand Down Expand Up @@ -536,7 +565,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
promise?.fail(error: ChannelError.ioOnClosedChannel)
return
}
guard self.isRegistered else {
guard self.lifecycleManager.isPreRegistered else {
promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState)
return
}
Expand Down Expand Up @@ -598,7 +627,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

if !isWritePending() && flushNow() == .register {
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isPreRegistered)
registerForWritable()
}
}
Expand All @@ -611,22 +640,22 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
readPending = true

if self.lifecycleManager.isRegistered {
if self.lifecycleManager.isPreRegistered {
registerForReadable()
}
}

private final func pauseRead0() {
assert(eventLoop.inEventLoop)

if self.lifecycleManager.isRegistered {
if self.lifecycleManager.isPreRegistered {
unregisterForReadable()
}
}

private final func registerForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isRegisteredFully)

guard !self.lifecycleManager.hasSeenEOFNotification else {
// we have seen an EOF notification before so there's no point in registering for reads
Expand All @@ -642,7 +671,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

internal final func unregisterForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isRegisteredFully)

guard self.interestedEvent.contains(.read) else {
return
Expand All @@ -651,6 +680,14 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.safeReregister(interested: self.interestedEvent.subtracting(.read))
}

/// Closes the this `BaseChannelChannel` and fulfills `promise` with the result of the _close_ operation.
/// So unless either the deregistration or the close itself fails, `promise` will be succeeded regardless of
/// `error`. `error` is used to fail outstanding writes (if any) and the `connectPromise` if set.
///
/// - parameters:
/// - error: The error to fail the outstanding (if any) writes/connect with.
/// - mode: The close mode, must be `.all` for `BaseSocketChannel`
/// - promise: The promise that gets notified about the result of the deregistration/close operations.
public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)

Expand Down Expand Up @@ -731,27 +768,36 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

guard !self.lifecycleManager.isRegistered else {
guard !self.lifecycleManager.isPreRegistered else {
promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState)
return
}

// Was not registered yet so do it now.
do {
// We always register with interested .none and will just trigger readIfNeeded0() later to re-register if needed.
try self.safeRegister(interested: [.readEOF, .reset])
self.lifecycleManager.register(promise: promise)(self.pipeline)
} catch {
guard self.selectableEventLoop.isOpen else {
let error = EventLoopError.shutdown
self.pipeline.fireErrorCaught0(error: error)
// `close0`'s error is about the result of the `close` operation, ...
self.close0(error: error, mode: .all, promise: nil)
// ... therefore we need to fail the registration `promise` separately.
promise?.fail(error: error)
return
}

// we can't fully register yet as epoll would give us EPOLLHUP if bind/connect wasn't called yet.
self.lifecycleManager.beginRegistration(promise: promise)(self.pipeline)
}

public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
assert(self.eventLoop.inEventLoop)
assert(self.isOpen)
assert(!self.lifecycleManager.isActive)
register0(promise: nil)
becomeActive0(promise: promise)
if self.lifecycleManager.isPreRegistered {
try! becomeFullyRegistered0()
if self.lifecycleManager.isRegisteredFully {
becomeActive0(promise: promise)
}
}
}

public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
Expand All @@ -772,7 +818,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

private func finishConnect() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isPreRegistered)

if let connectPromise = pendingConnect {
assert(!self.lifecycleManager.isActive)
Expand All @@ -799,7 +845,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(eventLoop.inEventLoop)

if self.isOpen {
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isPreRegistered)
unregisterForWritable()
}
}
Expand All @@ -810,8 +856,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

// we can't be not active but still registered here; this would mean that we got a notification about a
// channel before we're ready to receive them.
assert(self.lifecycleManager.isActive || !self.lifecycleManager.isRegistered,
"illegal state: active: \(self.lifecycleManager.isActive), registered: \(self.lifecycleManager.isRegistered)")
assert(self.lifecycleManager.isActive || !self.lifecycleManager.isPreRegistered,
"illegal state: \(self): active: \(self.lifecycleManager.isActive), pre-registered: \(self.lifecycleManager.isPreRegistered)")

self.readEOF0()

Expand All @@ -820,7 +866,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

final func readEOF0() {
if self.lifecycleManager.isRegistered {
if self.lifecycleManager.isRegisteredFully {
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
// and would anyway fire constantly.
Expand All @@ -835,7 +881,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
case .error:
// we should be unregistered and inactive now (as `readable0` would've called close).
assert(!self.lifecycleManager.isActive)
assert(!self.lifecycleManager.isRegistered)
assert(!self.lifecycleManager.isPreRegistered)
break loop
case .normal(.none):
preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
Expand All @@ -854,7 +900,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.readEOF0()

if self.socket.isOpen {
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isPreRegistered)
let error: IOError
// if the socket is still registered (and therefore open), let's try to get the actual socket error from the socket
do {
Expand All @@ -873,7 +919,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.close0(error: error, mode: .all, promise: nil)
}
}
assert(!self.lifecycleManager.isRegistered)
assert(!self.lifecycleManager.isPreRegistered)
}

public final func readable() {
Expand Down Expand Up @@ -973,7 +1019,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

guard self.lifecycleManager.isRegistered else {
guard self.lifecycleManager.isPreRegistered else {
promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState)
return
}
Expand All @@ -987,13 +1033,14 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
} else {
pendingConnect = eventLoop.newPromise()
}
try becomeFullyRegistered0()
registerForWritable()
} else {
self.updateCachedAddressesFromSocket()
becomeActive0(promise: promise)
}
} catch let error {
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isPreRegistered)
// We would like to have this assertion here, but we want to be able to go through this
// code path in cases where connect() is being called on channels that are already active.
//assert(!self.lifecycleManager.isActive)
Expand All @@ -1018,7 +1065,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

private final func safeReregister(interested: SelectorEventSet) {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)
assert(self.lifecycleManager.isRegisteredFully)

guard self.isOpen else {
assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) event though we're closed")
Expand All @@ -1039,7 +1086,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

private func safeRegister(interested: SelectorEventSet) throws {
assert(eventLoop.inEventLoop)
assert(!self.lifecycleManager.isRegistered)
assert(!self.lifecycleManager.isRegisteredFully)

guard self.isOpen else {
throw ChannelError.ioOnClosedChannel
Expand All @@ -1055,8 +1102,27 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

final func becomeFullyRegistered0() throws {
assert(self.eventLoop.inEventLoop)
assert(self.lifecycleManager.isPreRegistered)
assert(!self.lifecycleManager.isRegisteredFully)

// We always register with interested .none and will just trigger readIfNeeded0() later to re-register if needed.
try self.safeRegister(interested: [.readEOF, .reset])
self.lifecycleManager.finishRegistration(promise: nil)(self.pipeline)
}

final func becomeActive0(promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
assert(self.eventLoop.inEventLoop)
assert(self.lifecycleManager.isPreRegistered)
if !self.lifecycleManager.isRegisteredFully {
do {
try self.becomeFullyRegistered0()
} catch {
self.close0(error: error, mode: .all, promise: promise)
return
}
}
self.lifecycleManager.activate(promise: promise)(self.pipeline)
self.readIfNeeded0()
}
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIO/EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ internal final class SelectableEventLoop: EventLoop {
_addresses.deallocate()
}

/// Is this `SelectableEventLoop` still open (ie. not shutting down or shut down)
internal var isOpen: Bool {
assert(self.inEventLoop)
return self.lifecycleState == .open
}

/// Register the given `SelectableChannel` with this `SelectableEventLoop`. After this point all I/O for the `SelectableChannel` will be processed by this `SelectableEventLoop` until it
/// is deregistered by calling `deregister`.
public func register<C: SelectableChannel>(channel: C) throws {
Expand Down
Loading