Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Sources/Examples/Echo/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func makeEchoClient(address: String, port: Int, ssl: Bool) -> Echo_EchoServiceCl
eventLoopGroup: eventLoopGroup,
tlsConfiguration: tlsConfiguration)

return try ClientConnection.start(configuration)
.map { Echo_EchoServiceClient(connection: $0) }
.wait()
return Echo_EchoServiceClient(connection: ClientConnection(configuration: configuration))
} catch {
print("Unable to create an EchoClient: \(error)")
return nil
Expand Down
8 changes: 6 additions & 2 deletions Sources/GRPC/ClientCalls/BaseClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ open class BaseClientCall<RequestMessage: Message, ResponseMessage: Message> {
/// Creates and configures an HTTP/2 stream channel. The `self.subchannel` future will hold the
/// stream channel once it has been created.
private func createStreamChannel() {
self.connection.channel.eventLoop.execute {
self.connection.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
self.connection.multiplexer.whenFailure { error in
self.streamPromise.fail(error)
}

self.connection.multiplexer.whenSuccess { multiplexer in
multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture<Void> in
subchannel.pipeline.addHandlers(
HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: self.connection.configuration.httpProtocol),
HTTP1ToRawGRPCClientCodec(),
Expand Down
261 changes: 168 additions & 93 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,139 +53,209 @@ import NIOTLS
/// delegate associated with this connection (see `DelegatingErrorHandler`).
///
/// See `BaseClientCall` for a description of the remainder of the client pipeline.
open class ClientConnection {
/// Makes and configures a `ClientBootstrap` using the provided configuration.
///
/// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
/// handlers detailed in the documentation for `ClientConnection`.
///
/// - Parameter configuration: The configuration to prepare the bootstrap with.
public class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
// Enable SO_REUSEADDR and TCP_NODELAY.
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.channelInitializer { channel in
let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
}
public class ClientConnection {
/// The configuration this connection was created using.
internal let configuration: ClientConnection.Configuration

return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
channel.configureHTTP2Pipeline(mode: .client)
}.flatMap { _ in
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
return channel.pipeline.addHandler(errorHandler)
}
}
/// The channel which will handle gRPC calls.
internal var channel: EventLoopFuture<Channel>

return bootstrap
}
/// HTTP multiplexer from the `channel` handling gRPC calls.
internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>

/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
///
/// - Parameter channel: The channel to verify successful TLS setup on.
public class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
/// A monitor for the connectivity state.
public let connectivity: ConnectivityStateMonitor

/// Creates a new connection from the given configuration.
public init(configuration: ClientConnection.Configuration) {
let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
let channel = ClientConnection.makeChannel(
configuration: configuration,
connectivityMonitor: monitor
)

self.channel = channel
self.multiplexer = channel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
self.connectivity = monitor
self.configuration = configuration

self.channel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: channel)
}

/// Makes a `ClientConnection` from the given channel and configuration.
///
/// - Parameter channel: The channel to use for the connection.
/// - Parameter configuration: The configuration used to create the channel.
public class func makeClientConnection(
channel: Channel,
configuration: Configuration
) -> EventLoopFuture<ClientConnection> {
return channel.pipeline.handler(type: HTTP2StreamMultiplexer.self).map { multiplexer in
ClientConnection(channel: channel, multiplexer: multiplexer, configuration: configuration)
/// Registers a callback on the `closeFuture` of the given channel to replace this class's
/// channel and multiplexer.
private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
channel.always { result in
// If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
if case .failure = result {
self.connectivity.state = .shutdown
}
}.flatMap {
$0.closeFuture
}.whenComplete { _ in
// `.shutdown` is terminal so don't attempt a reconnection.
guard self.connectivity.state != .shutdown else {
return
}

let newChannel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivityMonitor: self.connectivity
)

self.channel = newChannel
self.multiplexer = newChannel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}

// Change the state if the connection was successful.
newChannel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
}
}

/// Starts a client connection using the given configuration.
///
/// This involves: creating a `ClientBootstrap`, connecting to a target, verifying that the TLS
/// handshake was successful (if TLS was configured) and creating the `ClientConnection`.
/// See the individual functions for more information:
/// - `makeBootstrap(configuration:)`,
/// - `verifyTLS(channel:)`, and
/// - `makeClientConnection(channel:configuration:)`.
///
/// - Parameter configuration: The configuration to start the connection with.
public class func start(_ configuration: Configuration) -> EventLoopFuture<ClientConnection> {
return start(configuration, backoffIterator: configuration.connectionBackoff?.makeIterator())
/// The `EventLoop` this connection is using.
public var eventLoop: EventLoop {
return self.channel.eventLoop
}

/// Starts a client connection using the given configuration and backoff.
/// Closes the connection to the server.
public func close() -> EventLoopFuture<Void> {
if self.connectivity.state == .shutdown {
// We're already shutdown or in the process of shutting down.
return channel.flatMap { $0.closeFuture }
} else {
self.connectivity.state = .shutdown
return channel.flatMap { $0.close() }
}
}
}

extension ClientConnection {
/// Creates a `Channel` using the given configuration.
///
/// In addition to the steps taken in `start(configuration:)`, we _may_ additionally set a
/// connection timeout and schedule a retry attempt (should the connection fail) if a
/// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
/// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
/// timeout and schedule a retry attempt (should the connection fail) if a
/// `ConnectionBackoff.Iterator` is provided.
///
/// See the individual functions for more information:
/// - `makeBootstrap(configuration:)`, and
/// - `verifyTLS(channel:)`.
///
/// - Parameter configuration: The configuration to start the connection with.
/// - Parameter backoffIterator: A `ConnectionBackoff` iterator which generates connection
/// timeouts and backoffs to use when attempting to retry the connection.
internal class func start(
_ configuration: Configuration,
/// - Parameter connectivityMonitor: A connectivity state monitor.
/// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
/// connection timeouts and backoff to use when attempting to create a connection.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoff.Iterator?
) -> EventLoopFuture<ClientConnection> {
) -> EventLoopFuture<Channel> {
connectivityMonitor.state = .connecting
let timeoutAndBackoff = backoffIterator?.next()
var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)

var bootstrap = makeBootstrap(configuration: configuration)
// Set a timeout, if we have one.
if let timeout = timeoutAndBackoff?.timeout {
bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
}

let connection = bootstrap.connect(to: configuration.target)
.flatMap { channel -> EventLoopFuture<ClientConnection> in
let tlsVerified: EventLoopFuture<Void>?
if configuration.tlsConfiguration != nil {
tlsVerified = verifyTLS(channel: channel)
} else {
tlsVerified = nil
}

return (tlsVerified ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
makeClientConnection(channel: channel, configuration: configuration)
}
let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
if configuration.tlsConfiguration != nil {
return ClientConnection.verifyTLS(channel: channel).map { channel }
} else {
return channel.eventLoop.makeSucceededFuture(channel)
}
}.always { result in
switch result {
case .success:
// Update the state once the channel has been assigned, when it may be used for making
// RPCs.
break

case .failure:
// We might try again in a moment.
connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
}
}

guard let backoff = timeoutAndBackoff?.backoff else {
return connection
return channel
}

// If we're in error then schedule our next attempt.
return connection.flatMapError { error in
return channel.flatMapError { error in
// The `futureResult` of the scheduled task is of type
// `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
// remove a level of indirection.
return connection.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
return start(configuration, backoffIterator: backoffIterator)
return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: backoffIterator
)
}.futureResult.flatMap { nextConnection in
return nextConnection
}
}
}

public let channel: Channel
public let multiplexer: HTTP2StreamMultiplexer
public let configuration: Configuration

init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, configuration: Configuration) {
self.channel = channel
self.multiplexer = multiplexer
self.configuration = configuration
/// Creates a `Channel` using the given configuration amd state connectivity monitor.
///
/// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor
) -> EventLoopFuture<Channel> {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: configuration.connectionBackoff?.makeIterator()
)
}

/// Fired when the client shuts down.
public var onClose: EventLoopFuture<Void> {
return channel.closeFuture
/// Makes and configures a `ClientBootstrap` using the provided configuration.
///
/// Enables `SO_REUSEADDR` and `TCP_NODELAY` and configures the `channelInitializer` to use the
/// handlers detailed in the documentation for `ClientConnection`.
///
/// - Parameter configuration: The configuration to prepare the bootstrap with.
private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
// Enable SO_REUSEADDR and TCP_NODELAY.
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.channelInitializer { channel in
let tlsConfigured = configuration.tlsConfiguration.map { tlsConfiguration in
channel.configureTLS(tlsConfiguration, errorDelegate: configuration.errorDelegate)
}

return (tlsConfigured ?? channel.eventLoop.makeSucceededFuture(())).flatMap {
channel.configureHTTP2Pipeline(mode: .client)
}.flatMap { _ in
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
return channel.pipeline.addHandler(errorHandler)
}
}

return bootstrap
}

public func close() -> EventLoopFuture<Void> {
return channel.close(mode: .all)
/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
///
/// - Parameter channel: The channel to verify successful TLS setup on.
private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
}
}
}

Expand Down Expand Up @@ -222,6 +292,9 @@ extension ClientConnection {
/// cycle.
public var errorDelegate: ClientErrorDelegate?

/// A delegate which is called when the connectivity state is changed.
public var connectivityStateDelegate: ConnectivityStateDelegate?

/// TLS configuration for this connection. `nil` if TLS is not desired.
public var tlsConfiguration: TLSConfiguration?

Expand All @@ -240,19 +313,22 @@ extension ClientConnection {
/// - Parameter eventLoopGroup: The event loop group to run the connection on.
/// - Parameter errorDelegate: The error delegate, defaulting to a delegate which will log only
/// on debug builds.
/// - Parameter connectivityStateDelegate: A connectivity state delegate, defaulting to `nil`.
/// - Parameter tlsConfiguration: TLS configuration, defaulting to `nil`.
/// - Parameter connectionBackoff: The connection backoff configuration to use, defaulting
/// to `nil`.
public init(
target: ConnectionTarget,
eventLoopGroup: EventLoopGroup,
errorDelegate: ClientErrorDelegate? = DebugOnlyLoggingClientErrorDelegate.shared,
connectivityStateDelegate: ConnectivityStateDelegate? = nil,
tlsConfiguration: TLSConfiguration? = nil,
connectionBackoff: ConnectionBackoff? = nil
) {
self.target = target
self.eventLoopGroup = eventLoopGroup
self.errorDelegate = errorDelegate
self.connectivityStateDelegate = connectivityStateDelegate
self.tlsConfiguration = tlsConfiguration
self.connectionBackoff = connectionBackoff
}
Expand Down Expand Up @@ -309,8 +385,7 @@ fileprivate extension Channel {
context: configuration.sslContext,
serverHostname: configuration.hostnameOverride)

let verificationHandler = TLSVerificationHandler(errorDelegate: errorDelegate)
return self.pipeline.addHandlers(sslClientHandler, verificationHandler)
return self.pipeline.addHandlers(sslClientHandler, TLSVerificationHandler())
} catch {
return self.eventLoop.makeFailedFuture(error)
}
Expand Down
Loading