Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,32 @@ public class ClientConnection {
)
}

/// Closes the connection to the server.
/// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
///
/// - Returns: Returns a future which will be resolved when shutdown has completed.
public func close() -> EventLoopFuture<Void> {
return self.connectionManager.shutdown()
let promise = self.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
}

/// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
///
/// - Parameter promise: A promise which will be completed when shutdown has completed.
public func close(promise: EventLoopPromise<Void>) {
self.connectionManager.shutdown(mode: .forceful, promise: promise)
}

/// Attempt to gracefully shutdown the channel. New RPCs will be failed immediately and existing
/// RPCs may continue to run until they complete.
///
/// - Parameters:
/// - deadline: A point in time by which the graceful shutdown must have completed. If the
/// deadline passes and RPCs are still active then the connection will be closed forcefully
/// and any remaining in-flight RPCs may be failed.
/// - promise: A promise which will be completed when shutdown has completed.
public func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) {
return self.connectionManager.shutdown(mode: .graceful(deadline), promise: promise)
}

/// Populates the logger in `options` and appends a request ID header to the metadata, if
Expand Down
100 changes: 78 additions & 22 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -435,71 +435,126 @@ internal final class ConnectionManager {
return muxFuture
}

/// Shutdown any connection which exists. This is a request from the application.
internal func shutdown() -> EventLoopFuture<Void> {
@usableFromInline
internal enum ShutdownMode {
/// Closes the underlying channel without waiting for existing RPCs to complete.
case forceful
/// Allows running RPCs to run their course before closing the underlying channel. No new
/// streams may be created.
case graceful(NIODeadline)
}

/// Shutdown the underlying connection.
///
/// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
internal func shutdown(mode: ShutdownMode) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.shutdown(mode: mode, promise: promise)
return promise.futureResult
}

/// Shutdown the underlying connection.
///
/// - Note: Initiating a `forceful` shutdown after a `graceful` shutdown has no effect.
internal func shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
if self.eventLoop.inEventLoop {
return self._shutdown()
self._shutdown(mode: mode, promise: promise)
} else {
return self.eventLoop.flatSubmit {
return self._shutdown()
self.eventLoop.execute {
self._shutdown(mode: mode, promise: promise)
}
}
}

private func _shutdown() -> EventLoopFuture<Void> {
private func _shutdown(mode: ShutdownMode, promise: EventLoopPromise<Void>) {
self.logger.debug("shutting down connection", metadata: [
"connectivity_state": "\(self.state.label)",
"shutdown.mode": "\(mode)",
])
let shutdown: ShutdownState

switch self.state {
// We don't have a channel and we don't want one, easy!
case .idle:
shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
self.state = .shutdown(shutdown)
promise.succeed(())

// We're mid-connection: the application doesn't have any 'ready' channels so we'll succeed
// the shutdown future and deal with any fallout from the connecting channel without the
// application knowing.
case let .connecting(state):
shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
self.state = .shutdown(shutdown)

// Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
// connect the application shouldn't have access to the channel or multiplexer.
state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
state.candidateMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
// In case we do successfully connect, close immediately.
state.candidate.whenSuccess {
$0.close(mode: .all, promise: nil)

// Complete the shutdown promise when the connection attempt has completed.
state.candidate.whenComplete {
switch $0 {
case let .success(channel):
// In case we do successfully connect, close immediately.
channel.close(mode: .all, promise: nil)
promise.completeWith(channel.closeFuture.recoveringFromUncleanShutdown())

case .failure:
// We failed to connect, that's fine we still shutdown successfully.
promise.succeed(())
}
}

// We have an active channel but the application doesn't know about it yet. We'll do the same
// as for `.connecting`.
case let .active(state):
shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
self.state = .shutdown(shutdown)

// Fail the ready channel mux promise: we're shutting down so even if we manage to successfully
// connect the application shouldn't have access to the channel or multiplexer.
state.readyChannelMuxPromise.fail(GRPCStatus(code: .unavailable, message: nil))
// We have a channel, close it.
// We have a channel, close it. We only create streams in the ready state so there's no need
// to quiesce here.
state.candidate.close(mode: .all, promise: nil)
promise.completeWith(state.candidate.closeFuture.recoveringFromUncleanShutdown())

// The channel is up and running: the application could be using it. We can close it and
// return the `closeFuture`.
case let .ready(state):
shutdown = .shutdownByUser(closeFuture: state.channel.closeFuture)
let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
self.state = .shutdown(shutdown)

// We have a channel, close it.
state.channel.close(mode: .all, promise: nil)
switch mode {
case .forceful:
// We have a channel, close it.
state.channel.close(mode: .all, promise: nil)

case let .graceful(deadline):
// If we don't close by the deadline forcibly close the channel.
let scheduledForceClose = state.channel.eventLoop.scheduleTask(deadline: deadline) {
self.logger.info("shutdown timer expired, forcibly closing connection")
state.channel.close(mode: .all, promise: nil)
}

// Cancel the force close if we close normally first.
state.channel.closeFuture.whenComplete { _ in
scheduledForceClose.cancel()
}

// Tell the channel to quiesce. It will be picked up by the idle handler which will close
// the channel when all streams have been closed.
state.channel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
}

// Complete the promise when we eventually close.
promise.completeWith(state.channel.closeFuture.recoveringFromUncleanShutdown())

// Like `.connecting` and `.active` the application does not have a `.ready` channel. We'll
// do the same but also cancel any scheduled connection attempts and deal with any fallout
// if we cancelled too late.
case let .transientFailure(state):
shutdown = .shutdownByUser(closeFuture: self.eventLoop.makeSucceededFuture(()))
let shutdown: ShutdownState = .shutdownByUser(closeFuture: promise.futureResult)
self.state = .shutdown(shutdown)

// Stop the creation of a new channel, if we can. If we can't then the task to
Expand All @@ -510,12 +565,13 @@ internal final class ConnectionManager {
// connect the application shouldn't should have access to the channel.
state.readyChannelMuxPromise.fail(shutdown.reason)

// We're already shutdown; nothing to do.
// No active channel, so complete the shutdown promise now.
promise.succeed(())

// We're already shutdown; there's nothing to do.
case let .shutdown(state):
shutdown = state
promise.completeWith(state.closeFuture)
}

return shutdown.closeFuture
}

// MARK: - State changes from the channel handler.
Expand Down
21 changes: 13 additions & 8 deletions Sources/GRPC/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,17 @@ internal final class ConnectionPool {
///
/// Existing waiters will be failed and all underlying connections will be shutdown. Subsequent
/// calls to `makeStream` will be failed immediately.
internal func shutdown() -> EventLoopFuture<Void> {
///
/// - Parameter mode: The mode to use when shutting down.
/// - Returns: A future indicated when shutdown has been completed.
internal func shutdown(mode: ConnectionManager.ShutdownMode) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)

if self.eventLoop.inEventLoop {
self._shutdown(promise: promise)
self._shutdown(mode: mode, promise: promise)
} else {
self.eventLoop.execute {
self._shutdown(promise: promise)
self._shutdown(mode: mode, promise: promise)
}
}

Expand Down Expand Up @@ -484,11 +487,11 @@ internal final class ConnectionPool {
}
}

/// See `shutdown()`.
/// See `shutdown(mode:)`.
///
/// - Parameter promise: A `promise` to complete when the pool has been shutdown.
@usableFromInline
internal func _shutdown(promise: EventLoopPromise<Void>) {
internal func _shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
self.eventLoop.assertInEventLoop()

switch self._state {
Expand All @@ -503,11 +506,13 @@ internal final class ConnectionPool {
}

// Shutdown all the connections and remove them from the pool.
let allShutdown: [EventLoopFuture<Void>] = self._connections.values.map {
return $0.manager.shutdown()
}
let connections = self._connections
self._connections.removeAll()

let allShutdown = connections.values.map {
$0.manager.shutdown(mode: mode)
}

// Fail the outstanding waiters.
while let waiter = self.waiters.popFirst() {
waiter.fail(ConnectionPoolError.shutdown)
Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPC/ConnectionPool/PoolManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ internal final class PoolManager {

/// Shutdown the pool manager and all connection pools it manages.
@usableFromInline
internal func shutdown(promise: EventLoopPromise<Void>) {
internal func shutdown(mode: ConnectionManager.ShutdownMode, promise: EventLoopPromise<Void>) {
let (action, pools): (PoolManagerStateMachine.ShutdownAction, [ConnectionPool]?) = self.lock
.withLock {
let action = self._state.shutdown(promise: promise)
Expand All @@ -313,7 +313,7 @@ internal final class PoolManager {
switch (action, pools) {
case let (.shutdownPools, .some(pools)):
promise.futureResult.whenComplete { _ in self.shutdownComplete() }
EventLoopFuture.andAllSucceed(pools.map { $0.shutdown() }, promise: promise)
EventLoopFuture.andAllSucceed(pools.map { $0.shutdown(mode: mode) }, promise: promise)

case let (.alreadyShuttingDown(future), .none):
promise.completeWith(future)
Expand Down
9 changes: 7 additions & 2 deletions Sources/GRPC/ConnectionPool/PooledChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,18 @@ internal final class PooledChannel: GRPCChannel {

@inlinable
internal func close(promise: EventLoopPromise<Void>) {
self._pool.shutdown(promise: promise)
self._pool.shutdown(mode: .forceful, promise: promise)
}

@inlinable
internal func close() -> EventLoopFuture<Void> {
let promise = self._configuration.eventLoopGroup.next().makePromise(of: Void.self)
self._pool.shutdown(promise: promise)
self.close(promise: promise)
return promise.futureResult
}

@usableFromInline
internal func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) {
self._pool.shutdown(mode: .graceful(deadline), promise: promise)
}
}
37 changes: 37 additions & 0 deletions Sources/GRPC/EventLoopFuture+RecoverFromUncleanShutdown.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIOCore
#if canImport(NIOSSL)
import NIOSSL
#endif

extension EventLoopFuture where Value == Void {
internal func recoveringFromUncleanShutdown() -> EventLoopFuture<Void> {
#if canImport(NIOSSL)
// We can ignore unclean shutdown since gRPC is self-terminated and therefore not prone to
// truncation attacks.
return self.flatMapErrorThrowing { error in
if let sslError = error as? NIOSSLError, sslError == .uncleanShutdown {
()
} else {
throw error
}
}
#else
return self
#endif
}
}
30 changes: 29 additions & 1 deletion Sources/GRPC/GRPCChannel/GRPCChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,36 @@ public protocol GRPCChannel {
interceptors: [ClientInterceptor<Request, Response>]
) -> Call<Request, Response>

/// Close the channel, and any connections associated with it.
/// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
///
/// - Returns: Returns a future which will be resolved when shutdown has completed.
func close() -> EventLoopFuture<Void>

/// Close the channel, and any connections associated with it. Any ongoing RPCs may fail.
///
/// - Parameter promise: A promise which will be completed when shutdown has completed.
func close(promise: EventLoopPromise<Void>)

/// Attempt to gracefully shutdown the channel. New RPCs will be failed immediately and existing
/// RPCs may continue to run until they complete.
///
/// - Parameters:
/// - deadline: A point in time by which the graceful shutdown must have completed. If the
/// deadline passes and RPCs are still active then the connection will be closed forcefully
/// and any remaining in-flight RPCs may be failed.
/// - promise: A promise which will be completed when shutdown has completed.
func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>)
}

// Default implementations to avoid breaking API. Implementations provided by GRPC override these.
extension GRPCChannel {
public func close(promise: EventLoopPromise<Void>) {
promise.completeWith(self.close())
}

public func closeGracefully(deadline: NIODeadline, promise: EventLoopPromise<Void>) {
promise.completeWith(self.close())
}
}

extension GRPCChannel {
Expand Down
Loading