From 6475881aea9ca0289e550bfb2a5aa476d68cacc7 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 6 Apr 2018 17:48:43 +0100 Subject: [PATCH] Initial commit --- .github/ISSUE_TEMPLATE.md | 21 + .github/PULL_REQUEST_TEMPLATE.md | 13 + .gitignore | 8 + CONTRIBUTING.md | 64 ++ LICENSE.txt | 202 +++++ Package.swift | 38 + README.md | 42 + Sources/NIOTSHTTPClient/main.swift | 71 ++ Sources/NIOTSHTTPServer/main.swift | 50 ++ .../NIOTSConnectionBootstrap.swift | 221 +++++ .../NIOTSConnectionChannel.swift | 756 ++++++++++++++++++ .../NIOTransportServices/NIOTSErrors.swift | 57 ++ .../NIOTransportServices/NIOTSEventLoop.swift | 199 +++++ .../NIOTSEventLoopGroup.swift | 80 ++ .../NIOTSListenerBootstrap.swift | 311 +++++++ .../NIOTSListenerChannel.swift | 385 +++++++++ .../NIOTSNetworkEvents.swift | 68 ++ .../SocketAddress+NWEndpoint.swift | 121 +++ .../StateManagedChannel.swift | 264 ++++++ .../TCPOptions+SocketChannelOption.swift | 84 ++ .../NIOTSConnectionChannelTests.swift | 428 ++++++++++ .../NIOTSEndToEndTests.swift | 472 +++++++++++ .../NIOTSEventLoopTests.swift | 89 +++ .../NIOTSListenerChannelTests.swift | 115 +++ .../NIOTSSocketOptionTests.swift | 164 ++++ .../NIOTSSocketOptionsOnChannelTests.swift | 124 +++ dev/git.commit.template | 14 + 27 files changed, 4461 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE.md create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 .gitignore create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE.txt create mode 100644 Package.swift create mode 100644 README.md create mode 100644 Sources/NIOTSHTTPClient/main.swift create mode 100644 Sources/NIOTSHTTPServer/main.swift create mode 100644 Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift create mode 100644 Sources/NIOTransportServices/NIOTSConnectionChannel.swift create mode 100644 Sources/NIOTransportServices/NIOTSErrors.swift create mode 100644 Sources/NIOTransportServices/NIOTSEventLoop.swift create mode 100644 Sources/NIOTransportServices/NIOTSEventLoopGroup.swift create mode 100644 Sources/NIOTransportServices/NIOTSListenerBootstrap.swift create mode 100644 Sources/NIOTransportServices/NIOTSListenerChannel.swift create mode 100644 Sources/NIOTransportServices/NIOTSNetworkEvents.swift create mode 100644 Sources/NIOTransportServices/SocketAddress+NWEndpoint.swift create mode 100644 Sources/NIOTransportServices/StateManagedChannel.swift create mode 100644 Sources/NIOTransportServices/TCPOptions+SocketChannelOption.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSSocketOptionTests.swift create mode 100644 Tests/NIOTransportServicesTests/NIOTSSocketOptionsOnChannelTests.swift create mode 100644 dev/git.commit.template diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..b98b9f5 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,21 @@ +### Expected behavior +_[what you expected to happen]_ + +### Actual behavior +_[what actually happened]_ + +### Steps to reproduce + +1. ... +2. ... + +### If possible, minimal yet complete reproducer code (or URL to code) + +_[anything to help us reproducing the issue]_ + +### SwiftNIO version/commit hash + +_[the SwiftNIO tag/commit hash]_ + +### Swift & OS version (output of `swift --version && uname -a`) + diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..ab90c7b --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,13 @@ +_[One line description of your change]_ + +### Motivation: + +_[Explain here the context, and why you're making that change. What is the problem you're trying to solve.]_ + +### Modifications: + +_[Describe the modifications you've done.]_ + +### Result: + +_[After your change, what will change.]_ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a9e6aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.DS_Store +/.build +/Packages +/*.xcodeproj +Package.pins +Package.resolved +*.pem +*.xcconfig diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..40ebb08 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,64 @@ +## Legal + +By submitting a pull request, you represent that you have the right to license +your contribution to Apple and the community, and agree by submitting the patch +that your contributions are licensed under the Apache 2.0 license (see +`LICENSE.txt`). + + +## How to submit a bug report + +Please ensure to specify the following: + +* NIO Transport Services commit hash +* Contextual information (e.g. what you were trying to achieve with NIOTS) +* Simplest possible steps to reproduce + * More complex the steps are, lower the priority will be. + * A pull request with failing test case is preferred, but it's just fine to paste the test case into the issue description. +* Anything that might be relevant in your opinion, such as: + * Swift version or the output of `swift --version` + * OS version and the output of `uname -a` + * Network configuration + + +### Example + +``` +NIOTS commit hash: 22ec043dc9d24bb011b47ece4f9ee97ee5be2757 + +Context: +While load testing my HTTP web server written with NIOTS, I noticed +that one file descriptor is leaked per request. + +Steps to reproduce: +1. ... +2. ... +3. ... +4. ... + +$ swift --version +Swift version 4.0.2 (swift-4.0.2-RELEASE) +Target: x86_64-unknown-linux-gnu + +Operating system: macOS 10.14.0 +``` + +## Writing a Patch + +A good NIOTS patch is: + +1. Concise, and contains as few changes as needed to achieve the end result. +2. Tested, ensuring that any tests provided failed before the patch and pass after it. +3. Documented, adding API documentation as needed to cover new functions and properties. +4. Accompanied by a great commit message, using our commit message template. + +### Commit Message Template + +We require that your commit messages match our template. The easiest way to do that is to get git to help you by explicitly using the template. To do that, `cd` to the root of our repository and run: + + git config commit.template dev/git.commit.template + + +## How to contribute your work + +Please open a pull request at https://github.com/apple/swift-nio-transport-services. Make sure the CI passes, and then wait for code review. diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Package.swift b/Package.swift new file mode 100644 index 0000000..b4c3773 --- /dev/null +++ b/Package.swift @@ -0,0 +1,38 @@ +// swift-tools-version:4.0 +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import PackageDescription + +let package = Package( + name: "swift-nio-transport-services", + products: [ + .library(name: "NIOTransportServices", targets: ["NIOTransportServices"]), + .executable(name: "NIOTSHTTPClient", targets: ["NIOTSHTTPClient"]), + .executable(name: "NIOTSHTTPServer", targets: ["NIOTSHTTPServer"]), + ], + dependencies: [ + .package(url: "https://github.com/apple/swift-nio.git", from: "1.8.0"), + ], + targets: [ + .target(name: "NIOTransportServices", + dependencies: ["NIO", "NIOFoundationCompat", "NIOConcurrencyHelpers", "NIOTLS"]), + .target(name: "NIOTSHTTPClient", + dependencies: ["NIO", "NIOTransportServices", "NIOHTTP1"]), + .target(name: "NIOTSHTTPServer", + dependencies: ["NIO", "NIOTransportServices", "NIOHTTP1"]), + .testTarget(name: "NIOTransportServicesTests", + dependencies: ["NIO", "NIOTransportServices"]), + ] +) diff --git a/README.md b/README.md new file mode 100644 index 0000000..3ac02c1 --- /dev/null +++ b/README.md @@ -0,0 +1,42 @@ +# NIO Transport Services + +Extensions for [SwiftNIO](https://github.com/apple/swift-nio) to support Apple platforms as first-class citizens. + +## About NIO Transport Services + +NIO Transport Services is an extension to SwiftNIO that provides first-class support for Apple platforms by using [Network.framework](https://developer.apple.com/documentation/network) to provide network connectivity, and [Dispatch](https://developer.apple.com/documentation/dispatch) to provide concurrency. NIOTS provides an alternative [EventLoop](https://apple.github.io/swift-nio/docs/current/NIO/Protocols/EventLoop.html), [EventLoopGroup](https://apple.github.io/swift-nio/docs/current/NIO/Protocols/EventLoopGroup.html), and several alternative [Channels](https://apple.github.io/swift-nio/docs/current/NIO/Protocols/Channel.html) and Bootstraps. + +In addition to providing first-class support for Apple platforms, NIO Transport Services takes advantage of the richer API of Network.framework to provide more insight into the behaviour of the network than is normally available to NIO applications. This includes the ability to wait for connectivity until a network route is available, as well as all of the extra proxy and VPN support that is built directly into Network.framework. + +All regular NIO applications should work just fine with NIO Transport Services, simply by changing the event loops and bootstraps in use. + +## Why Transport Services? + +Network.framework is Apple's reference implementation of the [proposed post-sockets API](https://datatracker.ietf.org/wg/taps/charter/) that is currently being worked on by the Transport Services Working Group (taps) of the IETF. To indicate the proposed long-term future of interfaces like Network.framework, we decided to call this module NIOTransportServices. Also, NIONetworkFramework didn't appeal to us much as a name. + +## Limitations + +Network.framework is only available on macOS 10.14+, iOS 12+, and tvOS 12+. This does not match the current minimum deployment target for Swift Package Manager, so building this repository with Swift Package Manager requires that you pass custom build flags, e.g: + +``` +swift build -Xswiftc -target -Xswiftc x86_64-apple-macosx10.14 +``` + +Alternatively, if you want to use Xcode to build this repository, you can use the following xcconfig: + +``` +MACOSX_DEPLOYMENT_TARGET = 10.14 +``` + +For support in iOS or tvOS, change the targets as necessary. + +## Versioning + +This repository will remain in development throughout the beta period of macOS Mojave and iOS 12. Once those operating systems have gone GM, we will finalize the API and cut a stable release. + +## Developing NIO Transport Services + +For the most part, NIO Transport Services development is as straightforward as any other SwiftPM project. With that said, we do have a few processes that are worth understanding before you contribute. For details, please see `CONTRIBUTING.md` in this repository. + +Please note that all work on NIO Transport Services is covered by the [SwiftNIO Code of Conduct](https://github.com/apple/swift-nio/blob/master/CODE_OF_CONDUCT.md). + diff --git a/Sources/NIOTSHTTPClient/main.swift b/Sources/NIOTSHTTPClient/main.swift new file mode 100644 index 0000000..4b9e5a6 --- /dev/null +++ b/Sources/NIOTSHTTPClient/main.swift @@ -0,0 +1,71 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIO +import NIOTransportServices +import NIOHTTP1 +import Network + +final class HTTP1ClientHandler: ChannelInboundHandler { + typealias OutboundOut = HTTPClientRequestPart + typealias InboundIn = HTTPClientResponsePart + + func channelActive(ctx: ChannelHandlerContext) { + var head = HTTPRequestHead(version: .init(major: 1, minor: 1), method: .GET, uri: "/get") + head.headers.add(name: "Host", value: "httpbin.org") + head.headers.add(name: "User-Agent", value: "SwiftNIO") + ctx.write(self.wrapOutboundOut(.head(head)), promise: nil) + ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + + print("Connected to \(ctx.channel.remoteAddress!) from \(ctx.channel.localAddress!)") + } + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + switch part { + case .head(let head): + self.printResponseHead(head) + case .body(let b): + print(b.getString(at: b.readerIndex, length: b.readableBytes)!, separator: "") + case .end: + // Print a newline. + print("") + ctx.close(promise: nil) + } + } + + private func printResponseHead(_ head: HTTPResponseHead) { + print("HTTP/\(head.version.major).\(head.version.minor) \(head.status.code) \(head.status.reasonPhrase)") + for (name, value) in head.headers { + print("\(name): \(value)") + } + print("") + } +} + +let group = NIOTSEventLoopGroup() +let channel = try! NIOTSConnectionBootstrap(group: group) + .connectTimeout(.hours(1)) + .channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) + .tlsOptions(NWProtocolTLS.Options()) + .channelInitializer { channel in + channel.pipeline.addHTTPClientHandlers().then { + channel.pipeline.add(handler: HTTP1ClientHandler()) + } + }.connect(host: "httpbin.org", port: 443).wait() + +// Wait for the request to complete +try! channel.closeFuture.wait() diff --git a/Sources/NIOTSHTTPServer/main.swift b/Sources/NIOTSHTTPServer/main.swift new file mode 100644 index 0000000..1d0bf7e --- /dev/null +++ b/Sources/NIOTSHTTPServer/main.swift @@ -0,0 +1,50 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIO +import NIOTransportServices +import NIOHTTP1 +import Network + +final class HTTP1ServerHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + let part = self.unwrapInboundIn(data) + + guard case .head = part else { + return + } + + let responseHeaders = HTTPHeaders([("server", "nio-transport-services"), ("content-length", "0")]) + let responseHead = HTTPResponseHead(version: .init(major: 1, minor: 1), status: .ok, headers: responseHeaders) + ctx.write(self.wrapOutboundOut(.head(responseHead)), promise: nil) + ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } +} + +let group = NIOTSEventLoopGroup() +let channel = try! NIOTSListenerBootstrap(group: group) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).then { + channel.pipeline.add(handler: HTTP1ServerHandler()) + } + }.bind(host: "127.0.0.1", port: 8888).wait() + +print("Server listening on \(channel.localAddress!)") + +// Wait for the request to complete +try! channel.closeFuture.wait() diff --git a/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift new file mode 100644 index 0000000..f4883b9 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSConnectionBootstrap.swift @@ -0,0 +1,221 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIO +import Dispatch +import Network + + +public final class NIOTSConnectionBootstrap { + private let group: NIOTSEventLoopGroup + private var channelInitializer: ((Channel) -> EventLoopFuture)? + private var connectTimeout: TimeAmount = TimeAmount.seconds(10) + private var channelOptions = ChannelOptionStorage() + private var qos: DispatchQoS? + private var tcpOptions: NWProtocolTCP.Options = .init() + private var tlsOptions: NWProtocolTLS.Options? + + /// Create a `NIOTSConnectionBootstrap` on the `NIOTSEventLoopGroup` `group`. + /// + /// - parameters: + /// - group: The `NIOTSEventLoopGroup` to use. + public init(group: NIOTSEventLoopGroup) { + self.group = group + } + + /// Initialize the connected `NIOTSConnectionChannel` with `initializer`. The most common task in initializer is to add + /// `ChannelHandler`s to the `ChannelPipeline`. + /// + /// The connected `Channel` will operate on `ByteBuffer` as inbound and `IOData` as outbound messages. + /// + /// - parameters: + /// - handler: A closure that initializes the provided `Channel`. + public func channelInitializer(_ handler: @escaping (Channel) -> EventLoopFuture) -> Self { + self.channelInitializer = handler + return self + } + + /// Specifies a `ChannelOption` to be applied to the `NIOTSConnectionChannel`. + /// + /// - parameters: + /// - option: The option to be applied. + /// - value: The value for the option. + public func channelOption(_ option: T, value: T.OptionType) -> Self { + channelOptions.put(key: option, value: value) + return self + } + + /// Specifies a timeout to apply to a connection attempt. + // + /// - parameters: + /// - timeout: The timeout that will apply to the connection attempt. + public func connectTimeout(_ timeout: TimeAmount) -> Self { + self.connectTimeout = timeout + return self + } + + /// Specifies a QoS to use for this connection, instead of the default QoS for the + /// event loop. + /// + /// This allows unusually high or low priority workloads to be appropriately scheduled. + public func withQoS(_ qos: DispatchQoS) -> Self { + self.qos = qos + return self + } + + /// Specifies the TCP options to use on the `Channel`s. + /// + /// To retrieve the TCP options from connected channels, use + /// `NIOTSChannelOptions.TCPConfiguration`. It is not possible to change the + /// TCP configuration after `connect` is called. + public func tcpOptions(_ options: NWProtocolTCP.Options) -> Self { + self.tcpOptions = options + return self + } + + /// Specifies the TLS options to use on the `Channel`s. + /// + /// To retrieve the TLS options from connected channels, use + /// `NIOTSChannelOptions.TLSConfiguration`. It is not possible to change the + /// TLS configuration after `connect` is called. + public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self { + self.tlsOptions = options + return self + } + + /// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established. + /// + /// - parameters: + /// - host: The host to connect to. + /// - port: The port to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(host: String, port: Int) -> EventLoopFuture { + guard let actualPort = NWEndpoint.Port(rawValue: UInt16(port)) else { + return self.group.next().newFailedFuture(error: NIOTSErrors.InvalidPort(port: port)) + } + return self.connect(endpoint: NWEndpoint.hostPort(host: .init(host), port: actualPort)) + } + + /// Specify the `address` to connect to for the TCP `Channel` that will be established. + /// + /// - parameters: + /// - address: The address to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(to address: SocketAddress) -> EventLoopFuture { + return self.connect { channel, promise in + channel.connect(to: address, promise: promise) + } + } + + /// Specify the `unixDomainSocket` path to connect to for the UDS `Channel` that will be established. + /// + /// - parameters: + /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. + /// - returns: An `EventLoopFuture` to deliver the `Channel` when connected. + public func connect(unixDomainSocketPath: String) -> EventLoopFuture { + do { + let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) + return connect(to: address) + } catch { + return group.next().newFailedFuture(error: error) + } + } + + /// Specify the `endpoint` to connect to for the TCP `Channel` that will be established. + public func connect(endpoint: NWEndpoint) -> EventLoopFuture { + return self.connect { channel, promise in + channel.triggerUserOutboundEvent(NIOTSNetworkEvents.ConnectToNWEndpoint(endpoint: endpoint), + promise: promise) + } + } + + private func connect(_ connectAction: @escaping (Channel, EventLoopPromise) -> Void) -> EventLoopFuture { + let conn: Channel = NIOTSConnectionChannel(eventLoop: self.group.next() as! NIOTSEventLoop, + qos: self.qos, + tcpOptions: self.tcpOptions, + tlsOptions: self.tlsOptions) + let initializer = self.channelInitializer ?? { _ in conn.eventLoop.newSucceededFuture(result: ()) } + let channelOptions = self.channelOptions + + return conn.eventLoop.submit { + return channelOptions.applyAll(channel: conn).then { + initializer(conn) + }.then { + conn.register() + }.then { + let connectPromise: EventLoopPromise = conn.eventLoop.newPromise() + connectAction(conn, connectPromise) + let cancelTask = conn.eventLoop.scheduleTask(in: self.connectTimeout) { + connectPromise.fail(error: ChannelError.connectTimeout(self.connectTimeout)) + conn.close(promise: nil) + } + + connectPromise.futureResult.whenComplete { + cancelTask.cancel() + } + return connectPromise.futureResult + }.map { conn }.thenIfErrorThrowing { + conn.close(promise: nil) + throw $0 + } + }.then { $0 } + } +} + +internal struct ChannelOptionStorage { + private var storage: [(Any, (Any, (Channel) -> (Any, Any) -> EventLoopFuture))] = [] + + mutating func put(key: K, + value newValue: K.OptionType) { + func applier(_ t: Channel) -> (Any, Any) -> EventLoopFuture { + return { (x, y) in + return t.setOption(option: x as! K, value: y as! K.OptionType) + } + } + var hasSet = false + self.storage = self.storage.map { typeAndValue in + let (type, value) = typeAndValue + if type is K { + hasSet = true + return (key, (newValue, applier)) + } else { + return (type, value) + } + } + if !hasSet { + self.storage.append((key, (newValue, applier))) + } + } + + func applyAll(channel: Channel) -> EventLoopFuture { + let applyPromise: EventLoopPromise = channel.eventLoop.newPromise() + var it = self.storage.makeIterator() + + func applyNext() { + guard let (key, (value, applier)) = it.next() else { + // If we reached the end, everything is applied. + applyPromise.succeed(result: ()) + return + } + + applier(channel)(key, value).map { + applyNext() + }.cascadeFailure(promise: applyPromise) + } + applyNext() + + return applyPromise.futureResult + } +} diff --git a/Sources/NIOTransportServices/NIOTSConnectionChannel.swift b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift new file mode 100644 index 0000000..feeea52 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSConnectionChannel.swift @@ -0,0 +1,756 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import NIOConcurrencyHelpers +import NIOFoundationCompat +import NIOTLS +import Dispatch +import Network +import Security + +/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`). +func executeAndComplete(_ promise: EventLoopPromise?, _ body: () throws -> T) { + do { + let result = try body() + promise?.succeed(result: result) + } catch let e { + promise?.fail(error: e) + } +} + +/// Merge two possible promises together such that firing the result will fire both. +private func mergePromises(_ first: EventLoopPromise?, _ second: EventLoopPromise?) -> EventLoopPromise? { + if let first = first { + if let second = second { + first.futureResult.cascade(promise: second) + } + return first + } else { + return second + } +} + + +/// Channel options for the connection channel. +private struct ConnectionChannelOptions { + /// Whether autoRead is enabled for this channel. + internal var autoRead: Bool = true + + /// Whether we support remote half closure. If not true, remote half closure will + /// cause connection drops. + internal var supportRemoteHalfClosure: Bool = false +} + + +private typealias PendingWrite = (data: ByteBuffer, promise: EventLoopPromise?) + + +/// A structure that manages backpressure signaling on this channel. +private struct BackpressureManager { + /// Whether the channel is writable, given the current watermark state. + /// + /// This is an atomic only because the channel writability flag needs to be safe to access from multiple + /// threads. All activity in this structure itself is expected to be thread-safe. + /// + /// All code that operates on this atomic uses load/store, not compareAndSwap. This is because we know + /// that this atomic is only ever written from one thread: the event loop thread. All unsynchronized + /// access is only reading. As a result, we don't have racing writes, and don't need CAS. This is good, + /// because in most cases these loads/stores will be free, as the user will never actually check the + /// channel writability from another thread, meaning this cache line is uncontended. CAS is never free: + /// it always has some substantial runtime cost over loads/stores. + let writable = Atomic(value: true) + + /// The number of bytes outstanding on the network. + private var outstandingBytes: Int = 0 + + /// The watermarks currently configured by the user. + private(set) var waterMarks: WriteBufferWaterMark = WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024) + + /// Adds `newBytes` to the queue of outstanding bytes, and returns whether this + /// has caused a writability change. + /// + /// - parameters: + /// - newBytes: the number of bytes queued to send, but not yet sent. + /// - returns: Whether the state changed. + mutating func writabilityChanges(whenQueueingBytes newBytes: Int) -> Bool { + self.outstandingBytes += newBytes + if self.outstandingBytes > self.waterMarks.high && self.writable.load() { + self.writable.store(false) + return true + } + + return false + } + + /// Removes `sentBytes` from the queue of outstanding bytes, and returns whether this + /// has caused a writability change. + /// + /// - parameters: + /// - newBytes: the number of bytes sent to the network. + /// - returns: Whether the state changed. + mutating func writabilityChanges(whenBytesSent sentBytes: Int) -> Bool { + self.outstandingBytes -= sentBytes + if self.outstandingBytes < self.waterMarks.low && !self.writable.load() { + self.writable.store(true) + return true + } + + return false + } + + /// Updates the watermarks to `waterMarks`, and returns whether this change has changed the + /// writability state of the channel. + /// + /// - parameters: + /// - waterMarks: The new waterMarks to use. + /// - returns: Whether the state changed. + mutating func writabilityChanges(whenUpdatingWaterMarks waterMarks: WriteBufferWaterMark) -> Bool { + let writable = self.writable.load() + self.waterMarks = waterMarks + + if writable && self.outstandingBytes > self.waterMarks.high { + self.writable.store(false) + return true + } else if !writable && self.outstandingBytes < self.waterMarks.low { + self.writable.store(true) + return true + } + + return false + } +} + + +internal final class NIOTSConnectionChannel { + /// The `ByteBufferAllocator` for this `Channel`. + public let allocator = ByteBufferAllocator() + + /// An `EventLoopFuture` that will complete when this channel is finally closed. + public var closeFuture: EventLoopFuture { + return self.closePromise.futureResult + } + + /// The parent `Channel` for this one, if any. + public let parent: Channel? + + /// The `EventLoop` this `Channel` belongs to. + internal let tsEventLoop: NIOTSEventLoop + + private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads. + + internal let closePromise: EventLoopPromise + + /// The underlying `NWConnection` that this `Channel` wraps. This is only non-nil + /// after the initial connection attempt has been made. + private var nwConnection: NWConnection? + + /// The `DispatchQueue` that socket events for this connection will be dispatched onto. + private let connectionQueue: DispatchQueue + + /// An `EventLoopPromise` that will be succeeded or failed when a connection attempt succeeds or fails. + private var connectPromise: EventLoopPromise? + + /// The TCP options for this connection. + private var tcpOptions: NWProtocolTCP.Options + + /// The TLS options for this connection, if any. + private var tlsOptions: NWProtocolTLS.Options? + + /// The state of this connection channel. + internal var state: ChannelState = .idle + + /// The kinds of channel activation this channel supports + internal let supportedActivationType: ActivationType = .connect + + /// Whether a call to NWConnection.receive has been made, but the completion + /// handler has not yet been invoked. + private var outstandingRead: Bool = false + + /// The options for this channel. + private var options: ConnectionChannelOptions = ConnectionChannelOptions() + + /// Any pending writes that have yet to be delivered to the network stack. + private var pendingWrites = CircularBuffer(initialRingCapacity: 8) + + /// An object to keep track of pending writes and manage our backpressure signaling. + private var backpressureManager = BackpressureManager() + + /// Create a `NIOTSConnectionChannel` on a given `NIOTSEventLoop`. + /// + /// Note that `NIOTSConnectionChannel` objects cannot be created on arbitrary loops types. + internal init(eventLoop: NIOTSEventLoop, + parent: Channel? = nil, + qos: DispatchQoS? = nil, + tcpOptions: NWProtocolTCP.Options, + tlsOptions: NWProtocolTLS.Options?) { + self.tsEventLoop = eventLoop + self.closePromise = eventLoop.newPromise() + self.parent = parent + self.connectionQueue = eventLoop.channelQueue(label: "nio.nioTransportServices.connectionchannel", qos: qos) + self.tcpOptions = tcpOptions + self.tlsOptions = tlsOptions + + // Must come last, as it requires self to be completely initialized. + self._pipeline = ChannelPipeline(channel: self) + } + + /// Create a `NIOTSConnectionChannel` with an already-established `NWConnection`. + internal convenience init(wrapping connection: NWConnection, + on eventLoop: NIOTSEventLoop, + parent: Channel, + qos: DispatchQoS? = nil, + tcpOptions: NWProtocolTCP.Options, + tlsOptions: NWProtocolTLS.Options?) { + self.init(eventLoop: eventLoop, + parent: parent, + qos: qos, + tcpOptions: tcpOptions, + tlsOptions: tlsOptions) + self.nwConnection = connection + } +} + + +// MARK:- NIOTSConnectionChannel implementation of Channel +extension NIOTSConnectionChannel: Channel { + /// The `ChannelPipeline` for this `Channel`. + public var pipeline: ChannelPipeline { + return self._pipeline + } + + /// The local address for this channel. + public var localAddress: SocketAddress? { + if self.eventLoop.inEventLoop { + return try? self.localAddress0() + } else { + return self.connectionQueue.sync { try? self.localAddress0() } + } + } + + /// The remote address for this channel. + public var remoteAddress: SocketAddress? { + if self.eventLoop.inEventLoop { + return try? self.remoteAddress0() + } else { + return self.connectionQueue.sync { try? self.remoteAddress0() } + } + } + + /// Whether this channel is currently writable. + public var isWritable: Bool { + return self.backpressureManager.writable.load() + } + + public var _unsafe: ChannelCore { + return self + } + + public func setOption(option: T, value: T.OptionType) -> EventLoopFuture where T : ChannelOption { + if eventLoop.inEventLoop { + let promise: EventLoopPromise = eventLoop.newPromise() + executeAndComplete(promise) { try setOption0(option: option, value: value) } + return promise.futureResult + } else { + return eventLoop.submit { try self.setOption0(option: option, value: value) } + } + } + + private func setOption0(option: T, value: T.OptionType) throws { + assert(eventLoop.inEventLoop) + + guard !self.closed else { + throw ChannelError.ioOnClosedChannel + } + + switch option { + case _ as AutoReadOption: + self.options.autoRead = value as! Bool + self.readIfNeeded0() + case _ as AllowRemoteHalfClosureOption: + self.options.supportRemoteHalfClosure = value as! Bool + case _ as SocketOption: + let optionValue = option as! SocketOption + try self.tcpOptions.applyChannelOption(option: optionValue, value: value as! SocketOptionValue) + case _ as WriteBufferWaterMarkOption: + if self.backpressureManager.writabilityChanges(whenUpdatingWaterMarks: value as! WriteBufferWaterMark) { + self.pipeline.fireChannelWritabilityChanged() + } + default: + fatalError("option \(option) not supported") + } + } + + public func getOption(option: T) -> EventLoopFuture where T : ChannelOption { + if eventLoop.inEventLoop { + let promise: EventLoopPromise = eventLoop.newPromise() + executeAndComplete(promise) { try getOption0(option: option) } + return promise.futureResult + } else { + return eventLoop.submit { try self.getOption0(option: option) } + } + } + + func getOption0(option: T) throws -> T.OptionType { + assert(eventLoop.inEventLoop) + + guard !self.closed else { + throw ChannelError.ioOnClosedChannel + } + + switch option { + case _ as AutoReadOption: + return self.options.autoRead as! T.OptionType + case _ as AllowRemoteHalfClosureOption: + return self.options.supportRemoteHalfClosure as! T.OptionType + case _ as SocketOption: + let optionValue = option as! SocketOption + return try self.tcpOptions.valueFor(socketOption: optionValue) as! T.OptionType + case _ as WriteBufferWaterMarkOption: + return self.backpressureManager.waterMarks as! T.OptionType + default: + fatalError("option \(option) not supported") + } + } +} + + +// MARK:- NIOTSConnectionChannel implementation of StateManagedChannel. +extension NIOTSConnectionChannel: StateManagedChannel { + typealias ActiveSubstate = TCPSubstate + + /// A TCP connection may be fully open or partially open. In the fully open state, both + /// peers may send data. In the partially open states, only one of the two peers may send + /// data. + /// + /// We keep track of this to manage the half-closure state of the TCP connection. + enum TCPSubstate: ActiveChannelSubstate { + /// Both peers may send. + case open + + /// This end of the connection has sent a FIN. We may only receive data. + case halfClosedLocal + + /// The remote peer has sent a FIN. We may still send data, but cannot expect to + /// receive more. + case halfClosedRemote + + /// The channel is "active", but there can be no forward momentum here. The only valid + /// thing to do in this state is drop the channel. + case closed + + init() { + self = .open + } + } + + public func localAddress0() throws -> SocketAddress { + guard let localEndpoint = self.nwConnection?.currentPath?.localEndpoint else { + throw NIOTSErrors.NoCurrentPath() + } + // TODO: Support wider range of address types. + return try SocketAddress(fromNWEndpoint: localEndpoint) + } + + public func remoteAddress0() throws -> SocketAddress { + guard let remoteEndpoint = self.nwConnection?.currentPath?.remoteEndpoint else { + throw NIOTSErrors.NoCurrentPath() + } + // TODO: Support wider range of address types. + return try SocketAddress(fromNWEndpoint: remoteEndpoint) + } + + internal func alreadyConfigured0(promise: EventLoopPromise?) { + guard let connection = nwConnection else { + promise?.fail(error: NIOTSErrors.NotPreConfigured()) + return + } + + guard case .setup = connection.state else { + promise?.fail(error: NIOTSErrors.NotPreConfigured()) + return + } + + connection.stateUpdateHandler = self.stateUpdateHandler(newState:) + connection.betterPathUpdateHandler = self.betterPathHandler + connection.pathUpdateHandler = self.pathChangedHandler(newPath:) + connection.start(queue: self.connectionQueue) + } + + internal func beginActivating0(to target: NWEndpoint, promise: EventLoopPromise?) { + assert(self.nwConnection == nil) + assert(self.connectPromise == nil) + self.connectPromise = promise + + let parameters = NWParameters(tls: self.tlsOptions, tcp: self.tcpOptions) + let connection = NWConnection(to: target, using: parameters) + connection.stateUpdateHandler = self.stateUpdateHandler(newState:) + connection.betterPathUpdateHandler = self.betterPathHandler + connection.pathUpdateHandler = self.pathChangedHandler(newPath:) + + // Ok, state is ready. Let's go! + self.nwConnection = connection + connection.start(queue: self.connectionQueue) + } + + public func write0(_ data: NIOAny, promise: EventLoopPromise?) { + guard self.isActive else { + promise?.fail(error: ChannelError.ioOnClosedChannel) + return + } + + // TODO: We would ideally support all of IOData here, gotta work out how to do that without HOL blocking + // all writes terribly. + // My best guess at this time is that Data(contentsOf:) may mmap the file in question, which would let us + // at least only block the network stack itself rather than our thread. I'm not certain though, especially + // on Linux. Should investigate. + let data = self.unwrapData(data, as: ByteBuffer.self) + self.pendingWrites.append((data, promise)) + + + /// This may cause our writability state to change. + if self.backpressureManager.writabilityChanges(whenQueueingBytes: data.readableBytes) { + self.pipeline.fireChannelWritabilityChanged() + } + } + + public func flush0() { + guard self.isActive else { + return + } + + guard let conn = self.nwConnection else { + preconditionFailure("nwconnection cannot be nil while channel is active") + } + + func completionCallback(promise: EventLoopPromise?, sentBytes: Int) -> ((NWError?) -> Void) { + return { error in + if let error = error { + promise?.fail(error: error) + } else { + promise?.succeed(result: ()) + } + + if self.backpressureManager.writabilityChanges(whenBytesSent: sentBytes) { + self.pipeline.fireChannelWritabilityChanged() + } + } + } + + conn.batch { + while self.pendingWrites.count > 0 { + let write = self.pendingWrites.removeFirst() + let buffer = write.data + let content = buffer.getData(at: buffer.readerIndex, length: buffer.readableBytes) + conn.send(content: content, completion: .contentProcessed(completionCallback(promise: write.promise, sentBytes: buffer.readableBytes))) + } + } + } + + /// Perform a read from the network. + /// + /// This method has a slightly strange semantic, because we do not allow multiple reads at once. As a result, this + /// is a *request* to read, and if there is a read already being processed then this method will do nothing. + public func read0() { + guard self.inboundStreamOpen && !self.outstandingRead else { + return + } + + guard let conn = self.nwConnection else { + preconditionFailure("Connection should not be nil") + } + + // TODO: Can we do something sensible with these numbers? + self.outstandingRead = true + conn.receive(minimumIncompleteLength: 1, maximumLength: 8192, completion: self.dataReceivedHandler(content:context:isComplete:error:)) + } + + public func doClose0(error: Error) { + guard let conn = self.nwConnection else { + // We don't have a connection to close here, so we're actually done. Our old state + // was idle. + assert(self.pendingWrites.count == 0) + return + } + + // Step 1 is to tell the network stack we're done. + // TODO: Does this drop the connection fully, or can we keep receiving data? Must investigate. + conn.cancel() + + // Step 2 is to fail all outstanding writes. + self.dropOutstandingWrites(error: error) + + // Step 3 is to cancel a pending connect promise, if any. + if let pendingConnect = self.connectPromise { + self.connectPromise = nil + pendingConnect.fail(error: error) + } + } + + public func doHalfClose0(error: Error, promise: EventLoopPromise?) { + guard let conn = self.nwConnection else { + // We don't have a connection to half close, so fail the promise. + promise?.fail(error: ChannelError.ioOnClosedChannel) + return + } + + + do { + try self.state.closeOutput() + } catch ChannelError.outputClosed { + // Here we *only* fail the promise, no need to blow up the connection. + promise?.fail(error: ChannelError.outputClosed) + return + } catch { + // For any other error, this is fatal. + self.close0(error: error, mode: .all, promise: promise) + return + } + + func completionCallback(for promise: EventLoopPromise?) -> ((NWError?) -> Void) { + return { error in + if let error = error { + promise?.fail(error: error) + } else { + promise?.succeed(result: ()) + } + } + } + + // It should not be possible to have a pending connect promise while we're doing half-closure. + assert(self.connectPromise == nil) + + // Step 1 is to tell the network stack we're done. + conn.send(content: nil, contentContext: .finalMessage, completion: .contentProcessed(completionCallback(for: promise))) + + // Step 2 is to fail all outstanding writes. + self.dropOutstandingWrites(error: error) + } + + public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { + switch event { + case let x as NIOTSNetworkEvents.ConnectToNWEndpoint: + self.connect0(to: x.endpoint, promise: promise) + default: + promise?.fail(error: ChannelError.operationUnsupported) + } + } + + public func channelRead0(_ data: NIOAny) { + // drop the data, do nothing + return + } + + public func errorCaught0(error: Error) { + // Currently we don't do anything with errors that pass through the pipeline + return + } + + /// A function that will trigger a socket read if necessary. + internal func readIfNeeded0() { + if self.options.autoRead { + self.read0() + } + } +} + + +// MARK:- Implementations of the callbacks passed to NWConnection. +extension NIOTSConnectionChannel { + /// Called by the underlying `NWConnection` when its internal state has changed. + private func stateUpdateHandler(newState: NWConnection.State) { + switch newState { + case .setup: + preconditionFailure("Should not be told about this state.") + case .waiting(let err): + if case .activating = self.state { + // This means the connection cannot currently be completed. We should notify the pipeline + // here, or support this with a channel option or something, but for now for the same of + // demos we will just allow ourselves into this stage. + break + } + + // In this state we've transitioned into waiting, presumably from active or closing. In this + // version of NIO this is an error, but we should aim to support this at some stage. + self.close0(error: err, mode: .all, promise: nil) + case .preparing: + // This just means connections are being actively established. We have no specific action + // here. + break + case .ready: + // Transitioning to ready means the connection was succeeded. Hooray! + self.connectionComplete0() + case .cancelled: + // This is the network telling us we're closed. We don't need to actually do anything here + // other than check our state is ok. + assert(self.closed) + self.nwConnection = nil + case .failed(let err): + // The connection has failed for some reason. + self.close0(error: err, mode: .all, promise: nil) + default: + // This clause is here to help the compiler out: it's otherwise not able to + // actually validate that the switch is exhaustive. Trust me, it is. + fatalError("Unreachable") + } + } + + /// Called by the underlying `NWConnection` when a network receive has completed. + /// + /// The state matrix here is large. If `content` is non-nil, some data was received: we need to send it down the pipeline + /// and call channelReadComplete. This may be nil, in which case we expect either `isComplete` to be `true` or `error` + /// to be non-nil. `isComplete` indicates half-closure on the read side of a connection. `error` is set if the receive + /// did not complete due to an error, though there may still be some data. + private func dataReceivedHandler(content: Data?, context: NWConnection.ContentContext?, isComplete: Bool, error: NWError?) { + precondition(self.outstandingRead) + self.outstandingRead = false + + guard self.isActive else { + // If we're already not active, we aren't going to process any of this: it's likely the result of an extra + // read somewhere along the line. + assert(content == nil) + return + } + + // First things first, if there's data we need to deliver it. + if let content = content { + // It would be nice if we didn't have to do this copy, but I'm not sure how to avoid it with the current Data + // APIs. + var buffer = self.allocator.buffer(capacity: content.count) + buffer.write(bytes: content) + self.pipeline.fireChannelRead(NIOAny(buffer)) + self.pipeline.fireChannelReadComplete() + } + + // Next, we want to check if there's an error. If there is, we're going to deliver it, and then close the connection with + // it. Otherwise, we're going to check if we read EOF, and if we did we'll close with that instead. + if let error = error { + self.pipeline.fireErrorCaught(error) + self.close0(error: error, mode: .all, promise: nil) + } else if isComplete { + self.didReadEOF() + } + + // Last, issue a new read automatically if we need to. + self.readIfNeeded0() + } + + /// Called by the underlying `NWConnection` when a better path for this connection is available. + /// + /// Notifies the channel pipeline of the new option. + private func betterPathHandler(available: Bool) { + if available { + self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.BetterPathAvailable()) + } else { + self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.BetterPathUnavailable()) + } + } + + /// Called by the underlying `NWConnection` when this connection changes its network path. + /// + /// Notifies the channel pipeline of the new path. + private func pathChangedHandler(newPath path: NWPath) { + self.pipeline.fireUserInboundEventTriggered(NIOTSNetworkEvents.PathChanged(newPath: path)) + } +} + + +// MARK:- Implementations of state management for the channel. +extension NIOTSConnectionChannel { + /// Whether the inbound side of the connection is still open. + private var inboundStreamOpen: Bool { + switch self.state { + case .active(.open), .active(.halfClosedLocal): + return true + case .idle, .registered, .activating, .active, .inactive: + return false + } + } + + /// Make the channel active. + private func connectionComplete0() { + let promise = self.connectPromise + self.connectPromise = nil + self.becomeActive0(promise: promise) + + if let metadata = self.nwConnection?.metadata(definition: NWProtocolTLS.definition) as? NWProtocolTLS.Metadata { + // This is a TLS connection, we may need to fire some other events. + let negotiatedProtocol = sec_protocol_metadata_get_negotiated_protocol(metadata.securityProtocolMetadata).map { + String(cString: $0) + } + self.pipeline.fireUserInboundEventTriggered(TLSUserEvent.handshakeCompleted(negotiatedProtocol: negotiatedProtocol)) + } + } + + /// Drop all outstanding writes. Must only be called in the inactive + /// state. + private func dropOutstandingWrites(error: Error) { + while self.pendingWrites.count > 0 { + self.pendingWrites.removeFirst().promise?.fail(error: error) + } + } + + /// Handle a read EOF. + /// + /// If the user has indicated they support half-closure, we will emit the standard half-closure + /// event. If they have not, we upgrade this to regular closure. + private func didReadEOF() { + if self.options.supportRemoteHalfClosure { + // This is a half-closure, but the connection is still valid. + do { + try self.state.closeInput() + } catch { + return self.close0(error: error, mode: .all, promise: nil) + } + + self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed) + } else { + self.close0(error: ChannelError.eof, mode: .all, promise: nil) + } + } +} + + +// MARK:- Managing TCP substate. +fileprivate extension ChannelState where ActiveSubstate == NIOTSConnectionChannel.TCPSubstate { + /// Close the input side of the TCP state machine. + mutating func closeInput() throws { + switch self { + case .active(.open): + self = .active(.halfClosedRemote) + case .active(.halfClosedLocal): + self = .active(.closed) + case .idle, .registered, .activating, .active(.halfClosedRemote), .active(.closed), .inactive: + throw NIOTSErrors.InvalidChannelStateTransition() + } + } + + /// Close the output side of the TCP state machine. + mutating func closeOutput() throws { + switch self { + case .active(.open): + self = .active(.halfClosedLocal) + case .active(.halfClosedRemote): + self = .active(.closed) + case .active(.halfClosedLocal), .active(.closed): + // This is a special case for closing the output, as it's user-controlled. If they already + // closed it, we want to throw a special error to tell them. + throw ChannelError.outputClosed + case .idle, .registered, .activating, .inactive: + throw NIOTSErrors.InvalidChannelStateTransition() + } + } +} diff --git a/Sources/NIOTransportServices/NIOTSErrors.swift b/Sources/NIOTransportServices/NIOTSErrors.swift new file mode 100644 index 0000000..dda0f79 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSErrors.swift @@ -0,0 +1,57 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIO + +/// A tag protocol that can be used to cover all errors thrown by `NIOTransportServices`. +/// +/// Users are strongly encouraged not to conform their own types to this protocol. +public protocol NIOTSError: Error, Equatable { } + +public enum NIOTSErrors { + /// `InvalidChannelStateTransition` is thrown when a channel has been asked to do something + /// that is incompatible with its current channel state: e.g. attempting to register an + /// already registered channel. + public struct InvalidChannelStateTransition: NIOTSError { } + + /// `NotPreConfigured` is thrown when a channel has had `registerAlreadyConfigured` + /// called on it, but has not had the appropriate underlying network object provided. + public struct NotPreConfigured: NIOTSError { } + + /// `UnsupportedSocketOption` is thrown when an attempt is made to configure a socket option that + /// is not supported by Network.framework. + public struct UnsupportedSocketOption: NIOTSError { + public let optionValue: SocketOption.AssociatedValueType + + public static func ==(lhs: UnsupportedSocketOption, rhs: UnsupportedSocketOption) -> Bool { + return lhs.optionValue == rhs.optionValue + } + } + + /// `NoCurrentPath` is thrown when an attempt is made to request path details from a channel and + /// that channel has no path available. This can manifest, for example, when asking for remote + /// or local addresses. + public struct NoCurrentPath: NIOTSError { } + + /// `InvalidPort` is thrown when the port passed to a method is not valid. + public struct InvalidPort: NIOTSError { + /// The provided port. + public let port: Int + } + + /// `UnableToResolveEndpoint` is thrown when an attempt is made to resolve a local endpoint, but + /// insufficient information is available to create it. + public struct UnableToResolveEndpoint: NIOTSError { } +} diff --git a/Sources/NIOTransportServices/NIOTSEventLoop.swift b/Sources/NIOTransportServices/NIOTSEventLoop.swift new file mode 100644 index 0000000..c316f84 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSEventLoop.swift @@ -0,0 +1,199 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import Dispatch +import Network + + +/// An `EventLoop` that interacts with `DispatchQoS` to help schedule upcoming work. +/// +/// `EventLoop`s that implement `QoSEventLoop` can interact with `Dispatch` to propagate information +/// about the QoS required for a specific task block. This allows tasks to be dispatched onto an +/// event loop with a different priority than the majority of tasks on that loop. +public protocol QoSEventLoop: EventLoop { + /// Submit a given task to be executed by the `EventLoop` at a given `qos`. + func execute(qos: DispatchQoS, _ task: @escaping () -> Void) -> Void + + /// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time at the + /// given `qos`. + func scheduleTask(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled +} + + +/// The lifecycle state of a given event loop. +/// +/// Event loops have the ability to be shut down, and not restarted. When a loop is active it will accept +/// new registrations, and new scheduled work items. When a loop is shutting down it will no longer accept +/// new registrations, but it will continue to accept new scheduled work items. When a loop is closed, it +/// will accept neither new registrations nor new scheduled work items, but it will continue to process +/// the queue until it has drained. +fileprivate enum LifecycleState { + case active + case closing + case closed +} + + +internal class NIOTSEventLoop: QoSEventLoop { + private let loop: DispatchQueue + private let taskQueue: DispatchQueue + private let inQueueKey: DispatchSpecificKey + private let loopID: UUID + private let defaultQoS: DispatchQoS + + /// All the channels registered to this event loop. + /// + /// This array does two jobs. Firstly, it ensures that these channels stay alive for as long as + /// they are registered: they cannot leak. Secondly, it provides a notification mechanism for + /// this event loop to deliver them specific kinds of events: in particular, to request that + /// they quiesce or shut themselves down. + private var registeredChannels: [ObjectIdentifier: Channel] = [:] + + /// The state of this event loop. + private var state = LifecycleState.active + + /// Whether this event loop is accepting new channels. + private var open: Bool { + return self.state == .active + } + + public var inEventLoop: Bool { + return DispatchQueue.getSpecific(key: self.inQueueKey) == self.loopID + } + + public init(qos: DispatchQoS) { + self.loop = DispatchQueue(label: "nio.transportservices.eventloop.loop", qos: qos, autoreleaseFrequency: .workItem) + self.taskQueue = DispatchQueue(label: "nio.transportservices.eventloop.taskqueue", target: self.loop) + self.loopID = UUID() + self.inQueueKey = DispatchSpecificKey() + self.defaultQoS = qos + loop.setSpecific(key: inQueueKey, value: self.loopID) + } + + public func execute(_ task: @escaping () -> Void) { + self.execute(qos: self.defaultQoS, task) + } + + public func execute(qos: DispatchQoS, _ task: @escaping () -> Void) { + // Ideally we'd not accept new work while closed. Sadly, that's not possible with the current APIs for this. + self.taskQueue.async(qos: qos, execute: task) + } + + public func scheduleTask(in time: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { + return self.scheduleTask(in: time, qos: self.defaultQoS, task) + } + + public func scheduleTask(in time: TimeAmount, qos: DispatchQoS, _ task: @escaping () throws -> T) -> Scheduled { + let p: EventLoopPromise = self.newPromise() + + guard self.state != .closed else { + p.fail(error: EventLoopError.shutdown) + return Scheduled(promise: p, cancellationTask: { } ) + } + + // Dispatch has no support for cancellation, so instead we synchronize over this nice variable. + var cancelled = false + + self.taskQueue.asyncAfter(deadline: DispatchTime(uptimeNanoseconds: DispatchTime.now().uptimeNanoseconds + UInt64(time.nanoseconds)), qos: qos) { + guard !cancelled else { return } + do { + p.succeed(result: try task()) + } catch { + p.fail(error: error) + } + } + + return Scheduled(promise: p, cancellationTask: { self.taskQueue.async { cancelled = true } }) + } + + public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + self.closeGently().map { + queue.async { callback(nil) } + }.whenFailure { error in + queue.async { callback(error) } + } + } +} + +extension NIOTSEventLoop { + /// Create a `DispatchQueue` to use for events on a given `Channel`. + /// + /// This `DispatchQueue` will be guaranteed to execute on this `EventLoop`, and + /// so is safe to use concurrently with the rest of the event loop. + internal func channelQueue(label: String, qos: DispatchQoS? = nil) -> DispatchQueue { + // If a QoS override is not requested, use the default. + let qos = qos ?? self.defaultQoS + return DispatchQueue(label: label, qos: qos, target: self.loop) + } +} + +extension NIOTSEventLoop { + internal func closeGently() -> EventLoopFuture { + let p: EventLoopPromise = self.newPromise() + self.taskQueue.async { + guard self.open else { + p.fail(error: EventLoopError.shutdown) + return + } + + // Ok, time to shut down. + self.state = .closing + + // We need to tell all currently-registered channels to close. + let futures: [EventLoopFuture] = self.registeredChannels.map { _, channel in + channel.close(promise: nil) + return channel.closeFuture.thenIfErrorThrowing { error in + if let error = error as? ChannelError, error == .alreadyClosed { + return () + } else { + throw error + } + } + } + + // The ordering here is important. + // We must not transition into the closed state until *after* the caller has been notified that the + // event loop is closed. Otherwise, this future is in real trouble, as if it needs to dispatch onto the + // event loop it will be forbidden from doing so. + let completionFuture = EventLoopFuture.andAll(futures, eventLoop: self) + completionFuture.cascade(promise: p) + completionFuture.whenComplete { + self.state = .closed + } + } + return p.futureResult + } +} + +extension NIOTSEventLoop { + /// Record a given channel with this event loop. + internal func register(_ channel: Channel) throws { + guard self.open else { + throw EventLoopError.shutdown + } + + assert(channel.eventLoop === self) + self.registeredChannels[ObjectIdentifier(channel)] = channel + } + + // We don't allow deregister to fail, as it doesn't make any sense. + internal func deregister(_ channel: Channel) { + assert(channel.eventLoop === self) + let oldChannel = self.registeredChannels.removeValue(forKey: ObjectIdentifier(channel)) + assert(oldChannel != nil) + } +} diff --git a/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift b/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift new file mode 100644 index 0000000..89ed6cb --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSEventLoopGroup.swift @@ -0,0 +1,80 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import NIOConcurrencyHelpers +import Dispatch +import Network + + +/// An `EventLoopGroup` containing `EventLoop`s specifically designed for use with +/// Network.framework's post-sockets networking API. +/// +/// These `EventLoop`s provide highly optimised and powerful networking tools for +/// the Darwin platforms. They have a number of advantages over the regular +/// `SelectableEventLoop` that NIO uses on other platforms. In particular: +/// +/// - The use of `DispatchQueue`s to schedule tasks allows the Darwin kernels to make +/// intelligent scheduling decisions, as well as to maintain QoS and ensure that +/// tasks required to handle networking in your application are given appropriate +/// priority by the system. +/// - Network.framework provides powerful tools for observing network state and managing +/// connections on devices with highly fluid networking environments, such as laptops +/// and mobile devices. These tools can be exposed to `Channel`s using this backend. +/// - Network.framework brings the networking stack into userspace, reducing the overhead +/// of most network operations by removing syscalls, and greatly increasing the safety +/// and security of the network stack. +/// - The applications networking needs are more effectively communicated to the kernel, +/// allowing mobile devices to change radio configuration and behaviour as needed to +/// take advantage of the various interfaces available on mobile devices. +/// +/// In general, when building applications whose primary purpose is to be deployed on Darwin +/// platforms, the `NIOTSEventLoopGroup` should be preferred over the +/// `MultiThreadedEventLoopGroup`. In particular, on iOS, the `NIOTSEventLoopGroup` is the +/// preferred networking backend. +public final class NIOTSEventLoopGroup: EventLoopGroup { + private let index = Atomic(value: 0) + private let eventLoops: [NIOTSEventLoop] + + public init(loopCount: Int = 1, defaultQoS: DispatchQoS = .default) { + precondition(loopCount > 0) + self.eventLoops = (0.. EventLoop { + return self.eventLoops[abs(index.add(1) % self.eventLoops.count)] + } + + /// Shuts down all of the event loops, rendering them unable to perform further work. + public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + let g = DispatchGroup() + let q = DispatchQueue(label: "nio.transportservices.shutdowngracefullyqueue", target: queue) + var error: Error? = nil + + for loop in self.eventLoops { + g.enter() + loop.closeGently().mapIfError { err in + q.sync { error = err } + }.whenComplete { + g.leave() + } + } + + g.notify(queue: q) { + callback(error) + } + } +} diff --git a/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift new file mode 100644 index 0000000..7f403c6 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSListenerBootstrap.swift @@ -0,0 +1,311 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import NIO +import Dispatch +import Network + + +public final class NIOTSListenerBootstrap { + private let group: EventLoopGroup + private let childGroup: EventLoopGroup + private var serverChannelInit: ((Channel) -> EventLoopFuture)? + private var childChannelInit: ((Channel) -> EventLoopFuture)? + private var serverChannelOptions = ChannelOptionStorage() + private var childChannelOptions = ChannelOptionStorage() + private var serverQoS: DispatchQoS? + private var childQoS: DispatchQoS? + private var tcpOptions: NWProtocolTCP.Options = .init() + private var tlsOptions: NWProtocolTLS.Options? + + /// Create a `NIOTSListenerBootstrap` for the `EventLoopGroup` `group`. + /// + /// - parameters: + /// - group: The `EventLoopGroup` to use for the `ServerSocketChannel`. + public convenience init(group: NIOTSEventLoopGroup) { + self.init(group: group, childGroup: group) + } + + /// Create a `NIOTSListenerBootstrap`. + /// + /// - parameters: + /// - group: The `NIOTSEventLoopGroup` to use for the `bind` of the `NIOTSListenerChannel` + /// and to accept new `NIOTSConnectionChannel`s with. + /// - childGroup: The `NIOTSEventLoopGroup` to run the accepted `NIOTSConnectionChannel`s on. + public init(group: NIOTSEventLoopGroup, childGroup: NIOTSEventLoopGroup) { + self.group = group + self.childGroup = childGroup + } + + /// Initialize the `NIOTSListenerChannel` with `initializer`. The most common task in initializer is to add + /// `ChannelHandler`s to the `ChannelPipeline`. + /// + /// The `NIOTSListenerChannel` uses the accepted `NIOTSConnectionChannel`s as inbound messages. + /// + /// - note: To set the initializer for the accepted `NIOTSConnectionChannel`s, look at + /// `ServerBootstrap.childChannelInitializer`. + /// + /// - parameters: + /// - initializer: A closure that initializes the provided `Channel`. + public func serverChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture) -> Self { + self.serverChannelInit = initializer + return self + } + + /// Initialize the accepted `NIOTSConnectionChannel`s with `initializer`. The most common task in initializer is to add + /// `ChannelHandler`s to the `ChannelPipeline`. + /// + /// The accepted `Channel` will operate on `ByteBuffer` as inbound and `IOData` as outbound messages. + /// + /// - parameters: + /// - initializer: A closure that initializes the provided `Channel`. + public func childChannelInitializer(_ initializer: @escaping (Channel) -> EventLoopFuture) -> Self { + self.childChannelInit = initializer + return self + } + + /// Specifies a `ChannelOption` to be applied to the `NIOTSListenerChannel`. + /// + /// - note: To specify options for the accepted `NIOTSConnectionChannels`s, look at `NIOTSListenerBootstrap.childChannelOption`. + /// + /// - parameters: + /// - option: The option to be applied. + /// - value: The value for the option. + public func serverChannelOption(_ option: T, value: T.OptionType) -> Self { + self.serverChannelOptions.put(key: option, value: value) + return self + } + + /// Specifies a `ChannelOption` to be applied to the accepted `NIOTSConnectionChannel`s. + /// + /// - parameters: + /// - option: The option to be applied. + /// - value: The value for the option. + public func childChannelOption(_ option: T, value: T.OptionType) -> Self { + self.childChannelOptions.put(key: option, value: value) + return self + } + + + /// Specifies a QoS to use for the server channel, instead of the default QoS for the + /// event loop. + /// + /// This allows unusually high or low priority workloads to be appropriately scheduled. + public func serverQoS(_ qos: DispatchQoS) -> Self { + self.serverQoS = qos + return self + } + + + /// Specifies a QoS to use for the child connections created from the server channel, + /// instead of the default QoS for the event loop. + /// + /// This allows unusually high or low priority workloads to be appropriately scheduled. + public func childQoS(_ qos: DispatchQoS) -> Self { + self.childQoS = qos + return self + } + + /// Specifies the TCP options to use on the child `Channel`s. + /// + /// To retrieve the TCP options from connected channels, use + /// `NIOTSChannelOptions.TCPConfiguration`. It is not possible to change the + /// TCP configuration after `bind` is called. + public func tcpOptions(_ options: NWProtocolTCP.Options) -> Self { + self.tcpOptions = options + return self + } + + /// Specifies the TLS options to use on the child `Channel`s. + /// + /// To retrieve the TLS options from connected channels, use + /// `NIOTSChannelOptions.TLSConfiguration`. It is not possible to change the + /// TLS configuration after `bind` is called. + public func tlsOptions(_ options: NWProtocolTLS.Options) -> Self { + self.tlsOptions = options + return self + } + + /// Bind the `NIOTSListenerChannel` to `host` and `port`. + /// + /// - parameters: + /// - host: The host to bind on. + /// - port: The port to bind on. + public func bind(host: String, port: Int) -> EventLoopFuture { + return self.bind0 { channel in + let p: EventLoopPromise = channel.eventLoop.newPromise() + do { + // NWListener does not actually resolve hostname-based NWEndpoints + // for use with requiredLocalEndpoint, so we fall back to + // SocketAddress for this. + let address = try SocketAddress.newAddressResolving(host: host, port: port) + channel.bind(to: address, promise: p) + } catch { + p.fail(error: error) + } + return p.futureResult + } + } + + /// Bind the `NIOTSListenerChannel` to `address`. + /// + /// - parameters: + /// - address: The `SocketAddress` to bind on. + public func bind(to address: SocketAddress) -> EventLoopFuture { + return self.bind0 { channel in + channel.bind(to: address) + } + } + + /// Bind the `NIOTSListenerChannel` to a UNIX Domain Socket. + /// + /// - parameters: + /// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system. + public func bind(unixDomainSocketPath: String) -> EventLoopFuture { + return self.bind0 { channel in + let p: EventLoopPromise = channel.eventLoop.newPromise() + do { + let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath) + channel.bind(to: address, promise: p) + } catch { + p.fail(error: error) + } + return p.futureResult + } + } + + /// Bind the `NIOTSListenerChannel` to a given `NWEndpoint`. + /// + /// - parameters: + /// - endpoint: The `NWEndpoint` to bind this channel to. + public func bind(endpoint: NWEndpoint) -> EventLoopFuture { + return self.bind0 { channel in + channel.triggerUserOutboundEvent(NIOTSNetworkEvents.BindToNWEndpoint(endpoint: endpoint)) + } + } + + private func bind0(_ binder: @escaping (Channel) -> EventLoopFuture) -> EventLoopFuture { + let eventLoop = self.group.next() as! NIOTSEventLoop + let childEventLoopGroup = self.childGroup as! NIOTSEventLoopGroup + let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.newSucceededFuture(result: ()) } + let childChannelInit = self.childChannelInit + let serverChannelOptions = self.serverChannelOptions + let childChannelOptions = self.childChannelOptions + + let serverChannel = NIOTSListenerChannel(eventLoop: eventLoop, + qos: self.serverQoS, + tcpOptions: self.tcpOptions, + tlsOptions: self.tlsOptions) + + return eventLoop.submit { + return serverChannelOptions.applyAll(channel: serverChannel).then { + serverChannelInit(serverChannel) + }.then { + serverChannel.pipeline.add(handler: AcceptHandler(childChannelInitializer: childChannelInit, + childGroup: childEventLoopGroup, + childChannelOptions: childChannelOptions, + childChannelQoS: self.childQoS, + tcpOptions: self.tcpOptions, + tlsOptions: self.tlsOptions)) + }.then { + serverChannel.register() + }.then { + binder(serverChannel) + }.map { + serverChannel as Channel + }.thenIfError { error in + serverChannel.close0(error: error, mode: .all, promise: nil) + return eventLoop.newFailedFuture(error: error) + } + }.then { + $0 + } + } +} + + +private class AcceptHandler: ChannelInboundHandler { + typealias InboundIn = NWConnection + typealias InboundOut = NIOTSConnectionChannel + + private let childChannelInitializer: ((Channel) -> EventLoopFuture)? + private let childGroup: NIOTSEventLoopGroup + private let childChannelOptions: ChannelOptionStorage + private let childChannelQoS: DispatchQoS? + private let originalTCPOptions: NWProtocolTCP.Options + private let originalTLSOptions: NWProtocolTLS.Options? + + init(childChannelInitializer: ((Channel) -> EventLoopFuture)?, + childGroup: NIOTSEventLoopGroup, + childChannelOptions: ChannelOptionStorage, + childChannelQoS: DispatchQoS?, + tcpOptions: NWProtocolTCP.Options, + tlsOptions: NWProtocolTLS.Options?) { + self.childChannelInitializer = childChannelInitializer + self.childGroup = childGroup + self.childChannelOptions = childChannelOptions + self.childChannelQoS = childChannelQoS + self.originalTCPOptions = tcpOptions + self.originalTLSOptions = tlsOptions + } + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + let conn = self.unwrapInboundIn(data) + let childLoop = self.childGroup.next() as! NIOTSEventLoop + let ctxEventLoop = ctx.eventLoop + let childInitializer = self.childChannelInitializer ?? { _ in childLoop.newSucceededFuture(result: ()) } + let newChannel = NIOTSConnectionChannel(wrapping: conn, + on: childLoop, + parent: ctx.channel, + qos: self.childChannelQoS, + tcpOptions: self.originalTCPOptions, + tlsOptions: self.originalTLSOptions) + + @inline(__always) + func setupChildChannel() -> EventLoopFuture { + return self.childChannelOptions.applyAll(channel: newChannel).then { () -> EventLoopFuture in + assert(childLoop.inEventLoop) + return childInitializer(newChannel) + } + } + + @inline(__always) + func fireThroughPipeline(_ future: EventLoopFuture) { + assert(ctxEventLoop.inEventLoop) + future.then { (_) -> EventLoopFuture in + assert(ctxEventLoop.inEventLoop) + guard ctx.channel.isActive else { + return newChannel.close().thenThrowing { + throw ChannelError.ioOnClosedChannel + } + } + ctx.fireChannelRead(self.wrapInboundOut(newChannel)) + return ctx.eventLoop.newSucceededFuture(result: ()) + }.whenFailure { error in + assert(ctx.eventLoop.inEventLoop) + _ = newChannel.close() + ctx.fireErrorCaught(error) + } + } + + if childLoop === ctxEventLoop { + fireThroughPipeline(setupChildChannel()) + } else { + fireThroughPipeline(childLoop.submit { + return setupChildChannel() + }.then { $0 }.hopTo(eventLoop: ctxEventLoop)) + } + } +} diff --git a/Sources/NIOTransportServices/NIOTSListenerChannel.swift b/Sources/NIOTransportServices/NIOTSListenerChannel.swift new file mode 100644 index 0000000..7bcd4d8 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSListenerChannel.swift @@ -0,0 +1,385 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import NIOFoundationCompat +import Dispatch +import Network + +internal final class NIOTSListenerChannel { + /// The `ByteBufferAllocator` for this `Channel`. + public let allocator = ByteBufferAllocator() + + /// An `EventLoopFuture` that will complete when this channel is finally closed. + public var closeFuture: EventLoopFuture { + return self.closePromise.futureResult + } + + /// The parent `Channel` for this one, if any. + public let parent: Channel? = nil + + /// The `EventLoop` this `Channel` belongs to. + internal let tsEventLoop: NIOTSEventLoop + + private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads. + + internal let closePromise: EventLoopPromise + + /// The underlying `NWListener` that this `Channel` wraps. This is only non-nil + /// after the initial connection attempt has been made. + private var nwListener: NWListener? + + /// The TCP options for this listener. + private let tcpOptions: NWProtocolTCP.Options + + /// The TLS options for this listener. + private let tlsOptions: NWProtocolTLS.Options? + + /// The `DispatchQueue` that socket events for this connection will be dispatched onto. + private let connectionQueue: DispatchQueue + + /// An `EventLoopPromise` that will be succeeded or failed when a bind attempt succeeds or fails. + private var bindPromise: EventLoopPromise? + + /// The state of this connection channel. + internal var state: ChannelState = .idle + + /// The kinds of channel activation this channel supports + internal let supportedActivationType: ActivationType = .bind + + /// Whether a call to NWListener.receive has been made, but the completion + /// handler has not yet been invoked. + private var outstandingRead: Bool = false + + /// Whether autoRead is enabled for this channel. + private var autoRead: Bool = true + + /// Create a `NIOTSListenerChannel` on a given `NIOTSEventLoop`. + /// + /// Note that `NIOTSListenerChannel` objects cannot be created on arbitrary loops types. + internal init(eventLoop: NIOTSEventLoop, + qos: DispatchQoS? = nil, + tcpOptions: NWProtocolTCP.Options, + tlsOptions: NWProtocolTLS.Options?) { + self.tsEventLoop = eventLoop + self.closePromise = eventLoop.newPromise() + self.connectionQueue = eventLoop.channelQueue(label: "nio.transportservices.listenerchannel", qos: qos) + self.tcpOptions = tcpOptions + self.tlsOptions = tlsOptions + + // Must come last, as it requires self to be completely initialized. + self._pipeline = ChannelPipeline(channel: self) + } +} + + +// MARK:- NIOTSListenerChannel implementation of Channel +extension NIOTSListenerChannel: Channel { + /// The `ChannelPipeline` for this `Channel`. + public var pipeline: ChannelPipeline { + return self._pipeline + } + + /// The local address for this channel. + public var localAddress: SocketAddress? { + if self.eventLoop.inEventLoop { + return try? self.localAddress0() + } else { + return self.connectionQueue.sync { try? self.localAddress0() } + } + } + + /// The remote address for this channel. + public var remoteAddress: SocketAddress? { + if self.eventLoop.inEventLoop { + return try? self.remoteAddress0() + } else { + return self.connectionQueue.sync { try? self.remoteAddress0() } + } + } + + /// Whether this channel is currently writable. + public var isWritable: Bool { + // TODO: implement + return true + } + + + public var _unsafe: ChannelCore { + return self + } + + public func setOption(option: T, value: T.OptionType) -> EventLoopFuture where T : ChannelOption { + if eventLoop.inEventLoop { + let promise: EventLoopPromise = eventLoop.newPromise() + executeAndComplete(promise) { try setOption0(option: option, value: value) } + return promise.futureResult + } else { + return eventLoop.submit { try self.setOption0(option: option, value: value) } + } + } + + private func setOption0(option: T, value: T.OptionType) throws { + assert(eventLoop.inEventLoop) + + guard !self.closed else { + throw ChannelError.ioOnClosedChannel + } + + // TODO: Many more channel options, both from NIO and Network.framework. + switch option { + case _ as AutoReadOption: + // AutoRead is currently mandatory for TS listeners. + if value as! AutoReadOption.OptionType == false { + throw ChannelError.operationUnsupported + } + case _ as SocketOption: + let optionValue = option as! SocketOption + try self.tcpOptions.applyChannelOption(option: optionValue, value: value as! SocketOptionValue) + default: + fatalError("option \(option) not supported") + } + } + + public func getOption(option: T) -> EventLoopFuture where T : ChannelOption { + if eventLoop.inEventLoop { + let promise: EventLoopPromise = eventLoop.newPromise() + executeAndComplete(promise) { try getOption0(option: option) } + return promise.futureResult + } else { + return eventLoop.submit { try self.getOption0(option: option) } + } + } + + func getOption0(option: T) throws -> T.OptionType { + assert(eventLoop.inEventLoop) + + guard !self.closed else { + throw ChannelError.ioOnClosedChannel + } + + switch option { + case _ as AutoReadOption: + return autoRead as! T.OptionType + case _ as SocketOption: + let optionValue = option as! SocketOption + return try self.tcpOptions.valueFor(socketOption: optionValue) as! T.OptionType + default: + fatalError("option \(option) not supported") + } + } +} + + +// MARK:- NIOTSListenerChannel implementation of StateManagedChannel. +extension NIOTSListenerChannel: StateManagedChannel { + typealias ActiveSubstate = ListenerActiveSubstate + + /// Listener channels do not have active substates: they are either active or they + /// are not. + enum ListenerActiveSubstate: ActiveChannelSubstate { + case active + + init() { + self = .active + } + } + + func alreadyConfigured0(promise: EventLoopPromise?) { + fatalError("Not implemented") + } + + public func localAddress0() throws -> SocketAddress { + guard let listener = self.nwListener else { + throw ChannelError.ioOnClosedChannel + } + + guard let localEndpoint = listener.parameters.requiredLocalEndpoint else { + throw NIOTSErrors.UnableToResolveEndpoint() + } + + var address = try SocketAddress(fromNWEndpoint: localEndpoint) + + // If we were asked to bind port 0, we need to update that. + if let port = address.port, port == 0 { + // We were. Let's ask Network.framework what we got. Nothing is an unacceptable answer. + guard let actualPort = listener.port else { + throw NIOTSErrors.UnableToResolveEndpoint() + } + address.newPort(actualPort.rawValue) + } + + return address + } + + public func remoteAddress0() throws -> SocketAddress { + throw ChannelError.operationUnsupported + } + + internal func beginActivating0(to target: NWEndpoint, promise: EventLoopPromise?) { + assert(self.nwListener == nil) + assert(self.bindPromise == nil) + self.bindPromise = promise + + let parameters = NWParameters(tls: self.tlsOptions, tcp: self.tcpOptions) + + // If we have a target that is not for a Bonjour service, we treat this as a request for + // a specific local endpoint. That gets configured on the parameters. If this is a bonjour + // endpoint, we deal with that later, though if it has requested a specific interface we + // set that now. + switch target { + case .hostPort, .unix: + parameters.requiredLocalEndpoint = target + case .service(_, _, _, let interface): + parameters.requiredInterface = interface + } + + let listener: NWListener + do { + listener = try NWListener(using: parameters) + } catch { + self.close0(error: error, mode: .all, promise: nil) + return + } + + if case .service(let name, let type, let domain, _) = target { + // Ok, now we deal with Bonjour. + listener.service = NWListener.Service(name: name, type: type, domain: domain) + } + + listener.stateUpdateHandler = self.stateUpdateHandler(newState:) + listener.newConnectionHandler = self.newConnectionHandler(connection:) + + // Ok, state is ready. Let's go! + self.nwListener = listener + listener.start(queue: self.connectionQueue) + } + + public func write0(_ data: NIOAny, promise: EventLoopPromise?) { + promise?.fail(error: ChannelError.operationUnsupported) + } + + public func flush0() { + // Flush is not supported on listening channels. + } + + /// Perform a read from the network. + /// + /// This method has a slightly strange semantic, because we do not allow multiple reads at once. As a result, this + /// is a *request* to read, and if there is a read already being processed then this method will do nothing. + public func read0() { + // AutoRead is currently mandatory, so this method does nothing. + } + + public func doClose0(error: Error) { + guard let listener = self.nwListener else { + // We don't have a connection to close here, so we're actually done. Our old state + // was idle. + return + } + + // Step 1 is to tell the network stack we're done. + listener.cancel() + + // Step 2 is to cancel a pending bind promise, if any. + if let pendingBind = self.bindPromise { + self.bindPromise = nil + pendingBind.fail(error: error) + } + } + + public func doHalfClose0(error: Error, promise: EventLoopPromise?) { + promise?.fail(error: ChannelError.operationUnsupported) + } + + public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { + switch event { + case let x as NIOTSNetworkEvents.BindToNWEndpoint: + self.bind0(to: x.endpoint, promise: promise) + default: + promise?.fail(error: ChannelError.operationUnsupported) + } + } + + public func channelRead0(_ data: NIOAny) { + let channel = self.unwrapData(data, as: NIOTSConnectionChannel.self) + let p: EventLoopPromise = self.eventLoop.newPromise() + channel.registerAlreadyConfigured0(promise: p) + p.futureResult.whenFailure { (_: Error) in + channel.close(promise: nil) + } + } + + public func errorCaught0(error: Error) { + // Currently we don't do anything with errors that pass through the pipeline + return + } + + /// A function that will trigger a socket read if necessary. + internal func readIfNeeded0() { + // AutoRead is currently mandatory, so this does nothing. + } +} + + +// MARK:- Implementations of the callbacks passed to NWListener. +extension NIOTSListenerChannel { + /// Called by the underlying `NWListener` when its internal state has changed. + private func stateUpdateHandler(newState: NWListener.State) { + switch newState { + case .setup: + preconditionFailure("Should not be told about this state.") + case .waiting: + break + case .ready: + // Transitioning to ready means the bind succeeded. Hooray! + self.bindComplete0() + case .cancelled: + // This is the network telling us we're closed. We don't need to actually do anything here + // other than check our state is ok. + assert(self.closed) + self.nwListener = nil + case .failed(let err): + // The connection has failed for some reason. + self.close0(error: err, mode: .all, promise: nil) + default: + // This clause is here to help the compiler out: it's otherwise not able to + // actually validate that the switch is exhaustive. Trust me, it is. + fatalError("Unreachable") + } + } + + /// Called by the underlying `NWListener` when a new connection has been received. + private func newConnectionHandler(connection: NWConnection) { + guard self.isActive else { + return + } + + self.pipeline.fireChannelRead(NIOAny(connection)) + self.pipeline.fireChannelReadComplete() + } +} + + +// MARK:- Implementations of state management for the channel. +extension NIOTSListenerChannel { + /// Make the channel active. + private func bindComplete0() { + let promise = self.bindPromise + self.bindPromise = nil + self.becomeActive0(promise: promise) + } +} diff --git a/Sources/NIOTransportServices/NIOTSNetworkEvents.swift b/Sources/NIOTransportServices/NIOTSNetworkEvents.swift new file mode 100644 index 0000000..5afccf1 --- /dev/null +++ b/Sources/NIOTransportServices/NIOTSNetworkEvents.swift @@ -0,0 +1,68 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Network +import NIO + +/// A tag protocol that can be used to cover all network events emitted by `NIOTS`. +/// +/// Users are strongly encouraged not to conform their own types to this protocol. +public protocol NIOTSNetworkEvent: Equatable { } + +public enum NIOTSNetworkEvents { + /// This event is fired whenever the OS has informed NIO that there is a better + /// path available to the endpoint that this `Channel` is currently connected to, + /// e.g. the current connection is using an expensive cellular connection and + /// a cheaper WiFi connection has become available. + /// + /// If you can handle this event, you should make a new connection attempt, and then + /// transfer your work to that connection before closing this one. + public struct BetterPathAvailable: NIOTSNetworkEvent { } + + /// This event is fired when the OS has informed NIO that no better path to the + /// to the remote endpoint than the one currently being used by this `Channel` is + /// currently available. + public struct BetterPathUnavailable: NIOTSNetworkEvent { } + + /// This event is fired whenever the OS has informed NIO that a new path is in use + /// for this `Channel`. + public struct PathChanged: NIOTSNetworkEvent { + /// The new path for this `Channel`. + public let newPath: NWPath + + /// Create a new `PathChanged` event. + public init(newPath: NWPath) { + self.newPath = newPath + } + } + + /// This event is fired as an outbound event when NIO would like to ask Network.framework + /// to handle the connection logic (e.g. its own DNS resolution and happy eyeballs racing). + /// This is temporary workaround until NIO 2.0 which should allow us to use the regular + /// `Channel.connect` method instead. + public struct ConnectToNWEndpoint: NIOTSNetworkEvent { + /// The endpoint to which we want to connect. + public let endpoint: NWEndpoint + } + + /// This event is fired as an outbound event when NIO would like to ask Network.framework + /// to handle the binding logic (e.g. its own support for bonjour and interface selection). + /// This is temporary workaround until NIO 2.0 which should allow us to use the regular + /// `Channel.bind` method instead. + public struct BindToNWEndpoint: NIOTSNetworkEvent { + /// The endpoint to which we want to bind. + public let endpoint: NWEndpoint + } +} diff --git a/Sources/NIOTransportServices/SocketAddress+NWEndpoint.swift b/Sources/NIOTransportServices/SocketAddress+NWEndpoint.swift new file mode 100644 index 0000000..f5f349d --- /dev/null +++ b/Sources/NIOTransportServices/SocketAddress+NWEndpoint.swift @@ -0,0 +1,121 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Darwin +import Foundation +import NIO +import Network + +internal extension IPv4Address { + /// Create an `IPv4Address` object from a `sockaddr_in`. + internal init(fromSockAddr sockAddr: sockaddr_in) { + var localAddr = sockAddr + self = withUnsafeBytes(of: &localAddr.sin_addr) { + precondition($0.count == 4) + let addrData = Data(bytes: $0.baseAddress!, count: $0.count) + return IPv4Address(addrData)! + } + } +} + +internal extension IPv6Address { + internal init(fromSockAddr sockAddr: sockaddr_in6) { + var localAddr = sockAddr + + // TODO: We should check whether we can reliably pull an interface declaration out + // here to be more useful. + self = withUnsafeBytes(of: &localAddr.sin6_addr) { + precondition($0.count == 16) + let addrData = Data(bytes: $0.baseAddress!, count: $0.count) + return IPv6Address(addrData)! + } + } +} + +internal extension NWEndpoint { + /// Create an `NWEndpoint` value from a NIO `SocketAddress`. + internal init(fromSocketAddress socketAddress: SocketAddress) { + switch socketAddress { + case .unixDomainSocket(let uds): + var address = uds.address + let path: String = withUnsafeBytes(of: &address.sun_path) { ptr in + let ptr = ptr.baseAddress!.bindMemory(to: UInt8.self, capacity: 104) + return String(cString: ptr) + } + self = NWEndpoint.unix(path: path) + case .v4(let v4Addr): + let v4Address = IPv4Address(fromSockAddr: v4Addr.address) + let port = NWEndpoint.Port(rawValue: socketAddress.port!)! + self = NWEndpoint.hostPort(host: .ipv4(v4Address), port: port) + case .v6(let v6Addr): + let v6Address = IPv6Address(fromSockAddr: v6Addr.address) + let port = NWEndpoint.Port(rawValue: socketAddress.port!)! + self = NWEndpoint.hostPort(host: .ipv6(v6Address), port: port) + } + } +} + +// TODO: We'll want to get rid of this when we support returning NWEndpoint directly from +// the various address-handling functions. +internal extension SocketAddress { + internal init(fromNWEndpoint endpoint: NWEndpoint) throws { + switch endpoint { + case .hostPort(.ipv4(let host), let port): + var addr = sockaddr_in() + addr.sin_family = sa_family_t(AF_INET) + addr.sin_len = UInt8(MemoryLayout.size) + addr.sin_port = port.rawValue.bigEndian + host.rawValue.withUnsafeBytes { + precondition($0.count == 4) + memcpy(&addr.sin_addr, $0.baseAddress!, 4) + } + self = .init(addr, host: host.debugDescription) + case .hostPort(.ipv6(let host), let port): + var addr = sockaddr_in6() + addr.sin6_family = sa_family_t(AF_INET6) + addr.sin6_port = port.rawValue.bigEndian + addr.sin6_len = UInt8(MemoryLayout.size) + host.rawValue.withUnsafeBytes { + precondition($0.count == 16) + memcpy(&addr.sin6_addr, $0.baseAddress!, 16) + } + self = .init(addr, host: host.debugDescription) + case .unix(let path): + self = try .init(unixDomainSocketPath: path) + case .service: + preconditionFailure("Cannot represent service addresses in SocketAddress") + case .hostPort(.name, _): + preconditionFailure("Cannot represent host by name only as SocketAddress") + } + } +} + +internal extension SocketAddress { + /// Change the port on this `SocketAddress` to a new value. + mutating func newPort(_ port: UInt16) { + switch self { + case .v4(let addr): + var address = addr.address + address.sin_port = port.bigEndian + self = SocketAddress(address, host: addr.host) + case .v6(let addr): + var address = addr.address + address.sin6_port = port.bigEndian + self = SocketAddress(address, host: addr.host) + case .unixDomainSocket: + preconditionFailure("Cannot set new port on UDS") + } + } +} diff --git a/Sources/NIOTransportServices/StateManagedChannel.swift b/Sources/NIOTransportServices/StateManagedChannel.swift new file mode 100644 index 0000000..55000b4 --- /dev/null +++ b/Sources/NIOTransportServices/StateManagedChannel.swift @@ -0,0 +1,264 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import NIOFoundationCompat +import Dispatch +import Network + + +/// An object that conforms to this protocol represents the substate of a channel in the +/// active state. This can be used to provide more fine-grained tracking of states +/// within the active state of a channel. Example uses include for tracking TCP half-closure +/// state in a TCP stream channel. +internal protocol ActiveChannelSubstate { + /// Create the substate in its default initial state. + init() +} + + +/// A state machine enum that tracks the state of the connection channel. +internal enum ChannelState { + case idle + case registered + case activating + case active(ActiveSubstate) + case inactive + + fileprivate mutating func register() throws { + guard case .idle = self else { + throw NIOTSErrors.InvalidChannelStateTransition() + } + self = .registered + } + + fileprivate mutating func beginActivating() throws { + switch self { + case .registered: + self = .activating + case .idle, .activating, .active, .inactive: + throw NIOTSErrors.InvalidChannelStateTransition() + } + } + + fileprivate mutating func becomeActive() throws { + guard case .activating = self else { + throw NIOTSErrors.InvalidChannelStateTransition() + } + self = .active(ActiveSubstate()) + } + + fileprivate mutating func becomeInactive() throws -> ChannelState { + let oldState = self + + switch self { + case .idle, .registered, .activating, .active: + self = .inactive + case .inactive: + // In this state we're already closed. + throw ChannelError.alreadyClosed + } + + return oldState + } +} + + +/// The kinds of activation that a channel may support. +internal enum ActivationType { + case connect + case bind +} + + +/// A protocol for `Channel` implementations with a simple Network.framework +/// state management layer. +/// +/// This protocol provides default hooks for managing state appropriately for a +/// given channel. It also provides some default implementations of `Channel` methods +/// for simple behaviours. +internal protocol StateManagedChannel: Channel, ChannelCore { + associatedtype ActiveSubstate: ActiveChannelSubstate + + var state: ChannelState { get set } + + var tsEventLoop: NIOTSEventLoop { get } + + var closePromise: EventLoopPromise { get } + + var supportedActivationType: ActivationType { get } + + func beginActivating0(to: NWEndpoint, promise: EventLoopPromise?) -> Void + + func becomeActive0(promise: EventLoopPromise?) -> Void + + func alreadyConfigured0(promise: EventLoopPromise?) -> Void + + func doClose0(error: Error) -> Void + + func doHalfClose0(error: Error, promise: EventLoopPromise?) -> Void + + func readIfNeeded0() -> Void +} + +extension StateManagedChannel { + public var eventLoop: EventLoop { + return self.tsEventLoop + } + + /// Whether this channel is currently active. + public var isActive: Bool { + switch self.state { + case .active: + return true + case .idle, .registered, .activating, .inactive: + return false + } + } + + /// Whether this channel is currently closed. This is not necessary for the public + /// API, it's just a convenient helper. + internal var closed: Bool { + switch self.state { + case .inactive: + return true + case .idle, .registered, .activating, .active: + return false + } + } + + public func register0(promise: EventLoopPromise?) { + // TODO: does this need to do anything more than this? + do { + try self.state.register() + try self.tsEventLoop.register(self) + self.pipeline.fireChannelRegistered() + promise?.succeed(result: ()) + } catch { + promise?.fail(error: error) + self.close0(error: error, mode: .all, promise: nil) + } + } + + public func registerAlreadyConfigured0(promise: EventLoopPromise?) { + do { + try self.state.register() + try self.tsEventLoop.register(self) + self.pipeline.fireChannelRegistered() + try self.state.beginActivating() + promise?.succeed(result: ()) + } catch { + promise?.fail(error: error) + self.close0(error: error, mode: .all, promise: nil) + return + } + + // Ok, we are registered and ready to begin activating. Tell the channel: it must + // call becomeActive0 directly. + self.alreadyConfigured0(promise: promise) + } + + public func connect0(to address: SocketAddress, promise: EventLoopPromise?) { + self.activateWithType(type: .connect, to: NWEndpoint(fromSocketAddress: address), promise: promise) + } + + public func connect0(to endpoint: NWEndpoint, promise: EventLoopPromise?) { + self.activateWithType(type: .connect, to: endpoint, promise: promise) + } + + public func bind0(to address: SocketAddress, promise: EventLoopPromise?) { + self.activateWithType(type: .bind, to: NWEndpoint(fromSocketAddress: address), promise: promise) + } + + public func bind0(to endpoint: NWEndpoint, promise: EventLoopPromise?) { + self.activateWithType(type: .bind, to: endpoint, promise: promise) + } + + public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise?) { + switch mode { + case .all: + let oldState: ChannelState + do { + oldState = try self.state.becomeInactive() + } catch let thrownError { + promise?.fail(error: thrownError) + return + } + + self.doClose0(error: error) + + if case .active = oldState { + self.pipeline.fireChannelInactive() + } + + // TODO: If we want slightly more complex state management, we can actually fire this only when the + // state transitions into .cancelled. For the moment I didn't think it was necessary. + self.tsEventLoop.deregister(self) + self.pipeline.fireChannelUnregistered() + + // Next we fire the promise passed to this method. + promise?.succeed(result: ()) + + // Now we schedule our final cleanup. We need to keep the channel pipeline alive for at least one more event + // loop tick, as more work might be using it. + self.eventLoop.execute { + self.removeHandlers(channel: self) + self.closePromise.succeed(result: ()) + } + + case .input: + promise?.fail(error: ChannelError.operationUnsupported) + + case .output: + self.doHalfClose0(error: error, promise: promise) + } + } + + public func becomeActive0(promise: EventLoopPromise?) { + // Here we crash if we cannot transition our state. That's because my understanding is that we + // should not be able to hit this. + do { + try self.state.becomeActive() + } catch { + self.close0(error: error, mode: .all, promise: promise) + return + } + + if let promise = promise { + promise.succeed(result: ()) + } + self.pipeline.fireChannelActive() + self.readIfNeeded0() + } + + /// A helper to handle the fact that activation is mostly common across connect and bind, and that both are + /// not supported by a single channel type. + private func activateWithType(type: ActivationType, to endpoint: NWEndpoint, promise: EventLoopPromise?) { + guard type == self.supportedActivationType else { + promise?.fail(error: ChannelError.operationUnsupported) + return + } + + do { + try self.state.beginActivating() + } catch { + promise?.fail(error: error) + return + } + + self.beginActivating0(to: endpoint, promise: promise) + } +} diff --git a/Sources/NIOTransportServices/TCPOptions+SocketChannelOption.swift b/Sources/NIOTransportServices/TCPOptions+SocketChannelOption.swift new file mode 100644 index 0000000..6aa49d8 --- /dev/null +++ b/Sources/NIOTransportServices/TCPOptions+SocketChannelOption.swift @@ -0,0 +1,84 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIO +import Network + +internal extension NWProtocolTCP.Options { + /// Apply a given channel `SocketOption` to this protocol options state. + func applyChannelOption(option: SocketOption, value: SocketOptionValue) throws { + switch option.value { + case (IPPROTO_TCP, TCP_NODELAY): + self.noDelay = value != 0 + case (IPPROTO_TCP, TCP_NOPUSH): + self.noPush = value != 0 + case (IPPROTO_TCP, TCP_NOOPT): + self.noOptions = value != 0 + case (IPPROTO_TCP, TCP_KEEPCNT): + self.keepaliveCount = Int(value) + case (IPPROTO_TCP, TCP_KEEPALIVE): + self.keepaliveIdle = Int(value) + case (IPPROTO_TCP, TCP_KEEPINTVL): + self.keepaliveInterval = Int(value) + case (IPPROTO_TCP, TCP_MAXSEG): + self.maximumSegmentSize = Int(value) + case (IPPROTO_TCP, TCP_CONNECTIONTIMEOUT): + self.connectionTimeout = Int(value) + case (IPPROTO_TCP, TCP_RXT_CONNDROPTIME): + self.connectionDropTime = Int(value) + case (IPPROTO_TCP, TCP_RXT_FINDROP): + self.retransmitFinDrop = value != 0 + case (IPPROTO_TCP, TCP_SENDMOREACKS): + self.disableAckStretching = value != 0 + case (SOL_SOCKET, SO_KEEPALIVE): + self.enableKeepalive = value != 0 + default: + throw NIOTSErrors.UnsupportedSocketOption(optionValue: option.value) + } + } + + /// Obtain the given `SocketOption` value for this protocol options state. + func valueFor(socketOption option: SocketOption) throws -> SocketOptionValue { + switch option.value { + case (IPPROTO_TCP, TCP_NODELAY): + return self.noDelay ? 1 : 0 + case (IPPROTO_TCP, TCP_NOPUSH): + return self.noPush ? 1 : 0 + case (IPPROTO_TCP, TCP_NOOPT): + return self.noOptions ? 1 : 0 + case (IPPROTO_TCP, TCP_KEEPCNT): + return Int32(self.keepaliveCount) + case (IPPROTO_TCP, TCP_KEEPALIVE): + return Int32(self.keepaliveIdle) + case (IPPROTO_TCP, TCP_KEEPINTVL): + return Int32(self.keepaliveInterval) + case (IPPROTO_TCP, TCP_MAXSEG): + return Int32(self.maximumSegmentSize) + case (IPPROTO_TCP, TCP_CONNECTIONTIMEOUT): + return Int32(self.connectionTimeout) + case (IPPROTO_TCP, TCP_RXT_CONNDROPTIME): + return Int32(self.connectionDropTime) + case (IPPROTO_TCP, TCP_RXT_FINDROP): + return self.retransmitFinDrop ? 1 : 0 + case (IPPROTO_TCP, TCP_SENDMOREACKS): + return self.disableAckStretching ? 1 : 0 + case (SOL_SOCKET, SO_KEEPALIVE): + return self.enableKeepalive ? 1 : 0 + default: + throw NIOTSErrors.UnsupportedSocketOption(optionValue: option.value) + } + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift b/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift new file mode 100644 index 0000000..9aa042a --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSConnectionChannelTests.swift @@ -0,0 +1,428 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import Network +import NIO +import NIOTransportServices + + +final class ConnectRecordingHandler: ChannelOutboundHandler { + typealias OutboundIn = Any + typealias OutboundOut = Any + + var connectTargets: [SocketAddress] = [] + var endpointTargets: [NWEndpoint] = [] + + func connect(ctx: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise?) { + self.connectTargets.append(address) + ctx.connect(to: address, promise: promise) + } + + func triggerUserOutboundEvent(ctx: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { + switch event { + case let evt as NIOTSNetworkEvents.ConnectToNWEndpoint: + self.endpointTargets.append(evt.endpoint) + default: + break + } + ctx.triggerUserOutboundEvent(event, promise: promise) + } +} + + +final class FailOnReadHandler: ChannelInboundHandler { + typealias InboundIn = Any + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + XCTFail("Must not read") + ctx.fireChannelRead(data) + } +} + + +final class WritabilityChangedHandler: ChannelInboundHandler { + typealias InboundIn = Any + + private let cb: (Bool) -> Void + + init(_ cb: @escaping (Bool) -> Void) { + self.cb = cb + } + + func channelWritabilityChanged(ctx: ChannelHandlerContext) { + self.cb(ctx.channel.isWritable) + } +} + + +class NIOTSConnectionChannelTests: XCTestCase { + private var group: NIOTSEventLoopGroup! + + override func setUp() { + self.group = NIOTSEventLoopGroup() + } + + override func tearDown() { + XCTAssertNoThrow(try self.group.syncShutdownGracefully()) + } + + func testConnectingToSocketAddressTraversesPipeline() throws { + let connectRecordingHandler = ConnectRecordingHandler() + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connectBootstrap = NIOTSConnectionBootstrap(group: self.group) + .channelInitializer { channel in channel.pipeline.add(handler: connectRecordingHandler) } + XCTAssertEqual(connectRecordingHandler.connectTargets, []) + XCTAssertEqual(connectRecordingHandler.endpointTargets, []) + + let connection = try connectBootstrap.connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + try connection.eventLoop.submit { + XCTAssertEqual(connectRecordingHandler.connectTargets, [listener.localAddress!]) + XCTAssertEqual(connectRecordingHandler.endpointTargets, []) + }.wait() + } + + func testConnectingToHostPortSkipsPipeline() throws { + let connectRecordingHandler = ConnectRecordingHandler() + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connectBootstrap = NIOTSConnectionBootstrap(group: self.group) + .channelInitializer { channel in channel.pipeline.add(handler: connectRecordingHandler) } + XCTAssertEqual(connectRecordingHandler.connectTargets, []) + XCTAssertEqual(connectRecordingHandler.endpointTargets, []) + + let connection = try connectBootstrap.connect(host: "localhost", port: Int(listener.localAddress!.port!)).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + try connection.eventLoop.submit { + XCTAssertEqual(connectRecordingHandler.connectTargets, []) + XCTAssertEqual(connectRecordingHandler.endpointTargets, [NWEndpoint.hostPort(host: "localhost", port: NWEndpoint.Port(rawValue: listener.localAddress!.port!)!)]) + }.wait() + } + + func testConnectingToEndpointSkipsPipeline() throws { + let connectRecordingHandler = ConnectRecordingHandler() + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connectBootstrap = NIOTSConnectionBootstrap(group: self.group) + .channelInitializer { channel in channel.pipeline.add(handler: connectRecordingHandler) } + XCTAssertEqual(connectRecordingHandler.connectTargets, []) + XCTAssertEqual(connectRecordingHandler.endpointTargets, []) + + let target = NWEndpoint.hostPort(host: "localhost", port: NWEndpoint.Port(rawValue: listener.localAddress!.port!)!) + + let connection = try connectBootstrap.connect(endpoint: target).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + try connection.eventLoop.submit { + XCTAssertEqual(connectRecordingHandler.connectTargets, []) + XCTAssertEqual(connectRecordingHandler.endpointTargets, [target]) + }.wait() + } + + func testZeroLengthWritesHaveSatisfiedPromises() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: FailOnReadHandler())} + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group).connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + let buffer = connection.allocator.buffer(capacity: 0) + XCTAssertNoThrow(try connection.writeAndFlush(buffer).wait()) + } + + func testSettingTCPOptionsWholesale() throws { + let tcpOptions = NWProtocolTCP.Options() + tcpOptions.disableAckStretching = true + + + let listener = try NIOTSListenerBootstrap(group: self.group) + .tcpOptions(tcpOptions) + .serverChannelInitializer { channel in + channel.getOption(option: ChannelOptions.socket(IPPROTO_TCP, TCP_SENDMOREACKS)).map { value in + XCTAssertEqual(value, 1) + } + } + .childChannelInitializer { channel in + channel.getOption(option: ChannelOptions.socket(IPPROTO_TCP, TCP_SENDMOREACKS)).map { value in + XCTAssertEqual(value, 1) + } + } + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .tcpOptions(tcpOptions) + .channelInitializer { channel in + channel.getOption(option: ChannelOptions.socket(IPPROTO_TCP, TCP_SENDMOREACKS)).map { value in + XCTAssertEqual(value, 1) + } + } + .connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + let buffer = connection.allocator.buffer(capacity: 0) + XCTAssertNoThrow(try connection.writeAndFlush(buffer).wait()) + } + + func testWatermarkSettingGetting() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .connect(to: listener.localAddress!) + .wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + try connection.getOption(option: ChannelOptions.writeBufferWaterMark).then { option -> EventLoopFuture in + XCTAssertEqual(option.high, 64 * 1024) + XCTAssertEqual(option.low, 32 * 1024) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 1, high: 101)) + }.then { + connection.getOption(option: ChannelOptions.writeBufferWaterMark) + }.map { + XCTAssertEqual($0.high, 101) + XCTAssertEqual($0.low, 1) + }.wait() + } + + func testWritabilityChangesAfterExceedingWatermarks() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + var writabilities = [Bool]() + let handler = WritabilityChangedHandler { newValue in + writabilities.append(newValue) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .channelInitializer { channel in channel.pipeline.add(handler: handler) } + .connect(to: listener.localAddress!) + .wait() + + // We're going to set some helpful watermarks, and allocate a big buffer. + XCTAssertNoThrow(try connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 2, high: 2048)).wait()) + var buffer = connection.allocator.buffer(capacity: 2048) + buffer.write(bytes: repeatElement(UInt8(4), count: 2048)) + + // We're going to issue the following pattern of writes: + // a: 1 byte + // b: 1 byte + // c: 2045 bytes + // d: 1 byte + // e: 1 byte + // + // We will begin by issuing the writes and checking the writeability status of the channel. + // The channel will remain writable until after the write of e, at which point the channel will + // become non-writable. + // + // Then we will issue a flush. The writes will succeed in order. The channel will remain non-writable + // until after the promise for d has fired: by the time the promise for e has fired it will be writable + // again. + try connection.eventLoop.submit { + // Pre writing. + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write a. After this write, we are still writable. When this write + // succeeds, we'll still be not writable. + connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + } + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write b. After this write we are still writable. When this write + // succeeds we'll still be not writable. + connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + } + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write c. After this write we are still writable (2047 bytes written). + // When this write succeeds we'll still be not writable (2 bytes outstanding). + connection.write(buffer.getSlice(at: 0, length: 2045)).whenComplete { + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + } + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write d. After this write we are still writable (2048 bytes written). + // When this write succeeds we'll become writable, but critically the promise fires before + // the state change, so we'll *appear* to be unwritable. + connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + } + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write e. After this write we are now not writable (2049 bytes written). + // When this write succeeds we'll have already been writable, thanks to the previous + // write. + connection.write(buffer.getSlice(at: 0, length: 1)).whenComplete { + XCTAssertEqual(writabilities, [false, true]) + XCTAssertTrue(connection.isWritable) + + // We close after this succeeds. + connection.close(promise: nil) + } + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + }.wait() + + // Now we're going to flush. This should fire all the writes. + connection.flush() + XCTAssertNoThrow(try connection.closeFuture.wait()) + + // Ok, check that the writability changes worked. + XCTAssertEqual(writabilities, [false, true]) + } + + func testWritabilityChangesAfterChangingWatermarks() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + var writabilities = [Bool]() + let handler = WritabilityChangedHandler { newValue in + writabilities.append(newValue) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .channelInitializer { channel in channel.pipeline.add(handler: handler) } + .connect(to: listener.localAddress!) + .wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + // We're going to allocate a buffer. + var buffer = connection.allocator.buffer(capacity: 256) + buffer.write(bytes: repeatElement(UInt8(4), count: 256)) + + // We're going to issue a 256-byte write. This write will not cause any change in channel writability + // state. + // + // Then we're going to set the high watermark to 256, and the low to 128. This will not change channel + // writability state. + // + // Then we're going to set the high watermark to 255 and the low to 128. This will make the channel + // not writable. + // + // Then we're going to set the high watermark to 256, and the low to 128. This will not change the + // channel writability state. + // + // Then we're going to set the high watermark to 1024, and the low to 256. This will not change the + // channel writability state. + // + // Then we're going to set the high watermark to 1024, and the low to 257. This will make the channel + // writable again. + // + // Then we're going to set the high watermark to 1024, and the low to 256. This will change nothing. + try connection.eventLoop.submit { + // Pre changes. + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + // Write. No writability change. + connection.write(buffer, promise: nil) + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + }.wait() + + try connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 128, high: 256)).then { + // High to 256, low to 128. No writability change. + XCTAssertEqual(writabilities, []) + XCTAssertTrue(connection.isWritable) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 128, high: 255)) + }.then { + // High to 255, low to 127. Channel becomes not writable. + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 128, high: 256)) + }.then { + // High back to 256, low to 128. No writability change. + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 256, high: 1024)) + }.then { + // High to 1024, low to 128. No writability change. + XCTAssertEqual(writabilities, [false]) + XCTAssertFalse(connection.isWritable) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 257, high: 1024)) + }.then { + // Low to 257, channel becomes writable again. + XCTAssertEqual(writabilities, [false, true]) + XCTAssertTrue(connection.isWritable) + + return connection.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 256, high: 1024)) + }.map { + // Low back to 256, no writability change. + XCTAssertEqual(writabilities, [false, true]) + XCTAssertTrue(connection.isWritable) + }.wait() + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift new file mode 100644 index 0000000..935828c --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSEndToEndTests.swift @@ -0,0 +1,472 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import NIO +import NIOTransportServices +import Foundation +import Network + + +final class EchoHandler: ChannelInboundHandler { + typealias InboundIn = Any + typealias OutboundOut = Any + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + ctx.write(data, promise: nil) + } + + func channelReadComplete(ctx: ChannelHandlerContext) { + ctx.flush() + } +} + + +final class ReadExpecter: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + + struct DidNotReadError: Error { } + + private var readPromise: EventLoopPromise? + private var cumulationBuffer: ByteBuffer? + private let expectedRead: ByteBuffer + + var readFuture: EventLoopFuture? { + return self.readPromise?.futureResult + } + + init(expecting: ByteBuffer) { + self.expectedRead = expecting + } + + func handlerAdded(ctx: ChannelHandlerContext) { + self.readPromise = ctx.eventLoop.newPromise() + } + + func handlerRemoved(ctx: ChannelHandlerContext) { + if let promise = self.readPromise { + promise.fail(error: DidNotReadError()) + } + } + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + var bytes = self.unwrapInboundIn(data) + if self.cumulationBuffer == nil { + self.cumulationBuffer = bytes + } else { + self.cumulationBuffer!.write(buffer: &bytes) + } + + self.maybeFulfillPromise() + } + + private func maybeFulfillPromise() { + if let promise = self.readPromise, self.cumulationBuffer! == self.expectedRead { + promise.succeed(result: ()) + self.readPromise = nil + } + } +} + + +final class CloseOnActiveHandler: ChannelInboundHandler { + typealias InboundIn = Never + typealias OutboundOut = Never + + func channelActive(ctx: ChannelHandlerContext) { + ctx.close(promise: nil) + } +} + + +final class HalfCloseHandler: ChannelInboundHandler { + typealias InboundIn = Never + typealias InboundOut = Never + + private let halfClosedPromise: EventLoopPromise + private var alreadyHalfClosed = false + private var closed = false + + init(_ halfClosedPromise: EventLoopPromise) { + self.halfClosedPromise = halfClosedPromise + } + + func userInboundEventTriggered(ctx: ChannelHandlerContext, event: Any) { + switch event { + case ChannelEvent.inputClosed: + XCTAssertFalse(self.alreadyHalfClosed) + XCTAssertFalse(self.closed) + self.alreadyHalfClosed = true + self.halfClosedPromise.succeed(result: ()) + + ctx.close(mode: .output, promise: nil) + default: + break + } + + ctx.fireUserInboundEventTriggered(event) + } + + func channelInactive(ctx: ChannelHandlerContext) { + XCTAssertTrue(self.alreadyHalfClosed) + XCTAssertFalse(self.closed) + self.closed = true + } +} + + +final class FailOnHalfCloseHandler: ChannelInboundHandler { + typealias InboundIn = Any + + func userInboundEventTriggered(ctx: ChannelHandlerContext, event: Any) { + switch event { + case ChannelEvent.inputClosed: + XCTFail("Must not receive half-closure") + ctx.close(promise: nil) + default: + break + } + + ctx.fireUserInboundEventTriggered(event) + } +} + + +extension Channel { + /// Expect that the given bytes will be received. + func expectRead(_ bytes: ByteBuffer) -> EventLoopFuture { + let expecter = ReadExpecter(expecting: bytes) + return self.pipeline.add(handler: expecter).then { + return expecter.readFuture! + } + } +} + +extension ByteBufferAllocator { + func bufferFor(string: String) -> ByteBuffer { + var buffer = self.buffer(capacity: string.count) + buffer.write(string: string) + return buffer + } +} + + +class NIOTSEndToEndTests: XCTestCase { + private var group: NIOTSEventLoopGroup! + + override func setUp() { + self.group = NIOTSEventLoopGroup() + } + + override func tearDown() { + XCTAssertNoThrow(try self.group.syncShutdownGracefully()) + } + + func testSimpleListener() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: EchoHandler())} + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group).connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + let buffer = connection.allocator.bufferFor(string: "hello, world!") + let completeFuture = connection.expectRead(buffer) + connection.writeAndFlush(buffer, promise: nil) + XCTAssertNoThrow(try completeFuture.wait()) + } + + func testMultipleConnectionsOneListener() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: EchoHandler())} + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let bootstrap = NIOTSConnectionBootstrap(group: self.group) + + let completeFutures: [EventLoopFuture] = (0..<10).map { _ in + return bootstrap.connect(to: listener.localAddress!).then { channel -> EventLoopFuture in + let buffer = channel.allocator.bufferFor(string: "hello, world!") + let completeFuture = channel.expectRead(buffer) + channel.writeAndFlush(buffer, promise: nil) + return completeFuture + } + } + + let allDoneFuture = EventLoopFuture.andAll(completeFutures, eventLoop: self.group.next()) + XCTAssertNoThrow(try allDoneFuture.wait()) + } + + func testBasicConnectionTeardown() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: CloseOnActiveHandler())} + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let bootstrap = NIOTSConnectionBootstrap(group: self.group) + + let closeFutures: [EventLoopFuture] = (0..<10).map { _ in + bootstrap.connect(to: listener.localAddress!).then { channel in + channel.closeFuture + } + } + + let allClosed = EventLoopFuture.andAll(closeFutures, eventLoop: self.group.next()) + XCTAssertNoThrow(try allClosed.wait()) + } + + func testCloseFromClientSide() throws { + // This test is a little bit dicey, but we need 20 futures in this list. + let closeFutureSyncQueue = DispatchQueue(label: "closeFutureSyncQueue") + let closeFutureGroup = DispatchGroup() + var closeFutures: [EventLoopFuture] = [] + + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in + closeFutureSyncQueue.sync { + closeFutures.append(channel.closeFuture) + } + closeFutureGroup.leave() + return channel.eventLoop.newSucceededFuture(result: ()) + } + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let bootstrap = NIOTSConnectionBootstrap(group: self.group).channelInitializer { channel in + channel.pipeline.add(handler: CloseOnActiveHandler()) + } + + for _ in (0..<10) { + // Each connection attempt needs to enter the group twice: each end will leave it once + // for us. + closeFutureGroup.enter(); closeFutureGroup.enter() + bootstrap.connect(to: listener.localAddress!).whenSuccess { channel in + closeFutureSyncQueue.sync { + closeFutures.append(channel.closeFuture) + } + closeFutureGroup.leave() + } + } + + closeFutureGroup.wait() + let allClosed = closeFutureSyncQueue.sync { + return EventLoopFuture.andAll(closeFutures, eventLoop: self.group.next()) + } + XCTAssertNoThrow(try allClosed.wait()) + } + + func testAgreeOnRemoteLocalAddresses() throws { + let serverSideConnectionPromise: EventLoopPromise = self.group.next().newPromise() + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in + serverSideConnectionPromise.succeed(result: channel) + return channel.pipeline.add(handler: EchoHandler()) + } + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group).connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + let serverSideConnection = try serverSideConnectionPromise.futureResult.wait() + + XCTAssertEqual(connection.remoteAddress, listener.localAddress) + XCTAssertEqual(connection.remoteAddress, serverSideConnection.localAddress) + XCTAssertEqual(connection.localAddress, serverSideConnection.remoteAddress) + } + + func testHalfClosureSupported() throws { + let halfClosedPromise: EventLoopPromise = self.group.next().newPromise() + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in + channel.pipeline.add(handler: EchoHandler()).then { _ in + channel.pipeline.add(handler: HalfCloseHandler(halfClosedPromise)) + } + } + .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .connect(to: listener.localAddress!).wait() + + // First check the channel is working. + let buffer = connection.allocator.bufferFor(string: "hello, world!") + let completeFuture = connection.expectRead(buffer) + connection.writeAndFlush(buffer, promise: nil) + XCTAssertNoThrow(try completeFuture.wait()) + + // Ok, now half-close. This should propagate to the remote peer, which should also + // close its end, leading to complete shutdown of the connection. + XCTAssertNoThrow(try connection.close(mode: .output).wait()) + XCTAssertNoThrow(try halfClosedPromise.futureResult.wait()) + XCTAssertNoThrow(try connection.closeFuture.wait()) + } + + func testDisabledHalfClosureCausesFullClosure() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in + channel.pipeline.add(handler: EchoHandler()).then { _ in + channel.pipeline.add(handler: FailOnHalfCloseHandler()) + } + } + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .connect(to: listener.localAddress!).wait() + + // First check the channel is working. + let buffer = connection.allocator.bufferFor(string: "hello, world!") + let completeFuture = connection.expectRead(buffer) + connection.writeAndFlush(buffer, promise: nil) + XCTAssertNoThrow(try completeFuture.wait()) + + // Ok, now half-close. This should propagate to the remote peer, which should also + // close its end, leading to complete shutdown of the connection. + XCTAssertNoThrow(try connection.close(mode: .output).wait()) + XCTAssertNoThrow(try connection.closeFuture.wait()) + } + + func testHalfClosingTwiceFailsTheSecondTime() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .connect(to: listener.localAddress!).wait() + + // Ok, now half-close. First one should be fine. + XCTAssertNoThrow(try connection.close(mode: .output).wait()) + + // Second one won't be. + do { + try connection.close(mode: .output).wait() + XCTFail("Did not throw") + } catch ChannelError.outputClosed { + // ok + } catch { + XCTFail("Threw unexpected error \(error)") + } + } + + func testHalfClosingInboundSideIsRejected() throws { + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .connect(to: listener.localAddress!).wait() + + // Ok, now try to half-close the input. + do { + try connection.close(mode: .input).wait() + XCTFail("Did not throw") + } catch ChannelError.operationUnsupported { + // ok + } catch { + XCTFail("Threw unexpected error \(error)") + } + } + + func testBasicUnixSockets() throws { + // We don't use FileManager here because this code round-trips through sockaddr_un, and + // sockaddr_un cannot hold paths as long as the true temporary directories used by + // FileManager. + let udsPath = "/tmp/\(UUID().uuidString)_testBasicUnixSockets.sock" + + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: EchoHandler())} + .bind(unixDomainSocketPath: udsPath).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .connect(unixDomainSocketPath: udsPath).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + XCTAssertEqual(listener.localAddress, connection.remoteAddress) + XCTAssertNil(connection.localAddress) + + let buffer = connection.allocator.bufferFor(string: "hello, world!") + let completeFuture = connection.expectRead(buffer) + connection.writeAndFlush(buffer, promise: nil) + XCTAssertNoThrow(try completeFuture.wait()) + } + + func testFancyEndpointSupport() throws { + // This test validates that we can use NWEndpoints properly by doing something core NIO + // cannot: setting up and connecting to a Bonjour service. To avoid the risk of multiple + // users running this test on the same network at the same time and getting in each others + // way we use a UUID to distinguish the service. + let name = UUID().uuidString + let serviceEndpoint = NWEndpoint.service(name: name, type: "_niots._tcp", domain: "local", interface: nil) + + let listener = try NIOTSListenerBootstrap(group: self.group) + .childChannelInitializer { channel in channel.pipeline.add(handler: EchoHandler())} + .bind(endpoint: serviceEndpoint).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + let connection = try NIOTSConnectionBootstrap(group: self.group) + .connectTimeout(.hours(1)) + .connect(endpoint: serviceEndpoint).wait() + defer { + XCTAssertNoThrow(try connection.close().wait()) + } + + XCTAssertNotNil(connection.localAddress) + XCTAssertNotNil(connection.remoteAddress) + XCTAssertNil(listener.localAddress) + XCTAssertNil(listener.remoteAddress) + + let buffer = connection.allocator.bufferFor(string: "hello, world!") + let completeFuture = connection.expectRead(buffer) + connection.writeAndFlush(buffer, promise: nil) + XCTAssertNoThrow(try completeFuture.wait()) + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift new file mode 100644 index 0000000..b7a2f95 --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSEventLoopTests.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import NIO +import NIOTransportServices + +class NIOTSEventLoopTest: XCTestCase { + func testIsInEventLoopWorks() throws { + let group = NIOTSEventLoopGroup() + let loop = group.next() + XCTAssertFalse(loop.inEventLoop) + try loop.scheduleTask(in: .nanoseconds(0)) { + XCTAssertTrue(loop.inEventLoop) + }.futureResult.wait() + } + + func testDelayedTask() throws { + let group = NIOTSEventLoopGroup() + let loop = group.next() + let now = DispatchTime.now() + + try loop.scheduleTask(in: .milliseconds(100)) { + let newNow = DispatchTime.now() + XCTAssertGreaterThan(newNow.uptimeNanoseconds - now.uptimeNanoseconds, + 100 * 1000 * 1000) + }.futureResult.wait() + } + + func testCancellingDelayedTask() throws { + let group = NIOTSEventLoopGroup() + let loop = group.next() + let now = DispatchTime.now() + + let firstTask = loop.scheduleTask(in: .milliseconds(30)) { + XCTFail("Must not be called") + } + let secondTask = loop.scheduleTask(in: .milliseconds(10)) { + firstTask.cancel() + } + let thirdTask = loop.scheduleTask(in: .milliseconds(50)) { } + firstTask.futureResult.whenComplete { + let newNow = DispatchTime.now() + XCTAssertLessThan(newNow.uptimeNanoseconds - now.uptimeNanoseconds, + 300 * 1000 * 1000) + } + + XCTAssertNoThrow(try secondTask.futureResult.wait()) + do { + try firstTask.futureResult.wait() + } catch EventLoopError.cancelled { + // ok + } catch { + XCTFail("Unexpected error: \(error)") + } + + // Wait just to be sure the cancelled job doesn't execute. + XCTAssertNoThrow(try thirdTask.futureResult.wait()) + } + + func testLoopsAreNotInEachOther() throws { + let group = NIOTSEventLoopGroup(loopCount: 2) + let firstLoop = group.next() + let secondLoop = group.next() + XCTAssertFalse(firstLoop === secondLoop) + + let firstTask = firstLoop.scheduleTask(in: .nanoseconds(0)) { + XCTAssertTrue(firstLoop.inEventLoop) + XCTAssertFalse(secondLoop.inEventLoop) + } + let secondTask = secondLoop.scheduleTask(in: .nanoseconds(0)) { + XCTAssertFalse(firstLoop.inEventLoop) + XCTAssertTrue(secondLoop.inEventLoop) + } + try EventLoopFuture.andAll([firstTask.futureResult, secondTask.futureResult], eventLoop: firstLoop).wait() + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift b/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift new file mode 100644 index 0000000..1e55dbc --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSListenerChannelTests.swift @@ -0,0 +1,115 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import Network +import NIO +import NIOTransportServices + + +final class BindRecordingHandler: ChannelOutboundHandler { + typealias OutboundIn = Any + typealias OutboundOut = Any + + var bindTargets: [SocketAddress] = [] + var endpointTargets: [NWEndpoint] = [] + + func bind(ctx: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise?) { + self.bindTargets.append(address) + ctx.bind(to: address, promise: promise) + } + + func triggerUserOutboundEvent(ctx: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { + switch event { + case let evt as NIOTSNetworkEvents.BindToNWEndpoint: + self.endpointTargets.append(evt.endpoint) + default: + break + } + ctx.triggerUserOutboundEvent(event, promise: promise) + } +} + + +class NIOTSListenerChannelTests: XCTestCase { + private var group: NIOTSEventLoopGroup! + + override func setUp() { + self.group = NIOTSEventLoopGroup(loopCount: 1) + } + + override func tearDown() { + XCTAssertNoThrow(try self.group.syncShutdownGracefully()) + } + + func testBindingToSocketAddressTraversesPipeline() throws { + let bindRecordingHandler = BindRecordingHandler() + let target = try SocketAddress.newAddressResolving(host: "localhost", port: 0) + let bindBootstrap = NIOTSListenerBootstrap(group: self.group) + .serverChannelInitializer { channel in channel.pipeline.add(handler: bindRecordingHandler)} + + XCTAssertEqual(bindRecordingHandler.bindTargets, []) + XCTAssertEqual(bindRecordingHandler.endpointTargets, []) + + let listener = try bindBootstrap.bind(to: target).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + try self.group.next().submit { + XCTAssertEqual(bindRecordingHandler.bindTargets, [target]) + XCTAssertEqual(bindRecordingHandler.endpointTargets, []) + }.wait() + } + + func testConnectingToHostPortTraversesPipeline() throws { + let bindRecordingHandler = BindRecordingHandler() + let bindBootstrap = NIOTSListenerBootstrap(group: self.group) + .serverChannelInitializer { channel in channel.pipeline.add(handler: bindRecordingHandler)} + + XCTAssertEqual(bindRecordingHandler.bindTargets, []) + XCTAssertEqual(bindRecordingHandler.endpointTargets, []) + + let listener = try bindBootstrap.bind(host: "localhost", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + try self.group.next().submit { + XCTAssertEqual(bindRecordingHandler.bindTargets, [try SocketAddress.newAddressResolving(host: "localhost", port: 0)]) + XCTAssertEqual(bindRecordingHandler.endpointTargets, []) + }.wait() + } + + func testConnectingToEndpointSkipsPipeline() throws { + let endpoint = NWEndpoint.hostPort(host: .ipv4(.loopback), port: .any) + let bindRecordingHandler = BindRecordingHandler() + let bindBootstrap = NIOTSListenerBootstrap(group: self.group) + .serverChannelInitializer { channel in channel.pipeline.add(handler: bindRecordingHandler)} + + XCTAssertEqual(bindRecordingHandler.bindTargets, []) + XCTAssertEqual(bindRecordingHandler.endpointTargets, []) + + let listener = try bindBootstrap.bind(endpoint: endpoint).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + + try self.group.next().submit { + XCTAssertEqual(bindRecordingHandler.bindTargets, []) + XCTAssertEqual(bindRecordingHandler.endpointTargets, [endpoint]) + }.wait() + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSSocketOptionTests.swift b/Tests/NIOTransportServicesTests/NIOTSSocketOptionTests.swift new file mode 100644 index 0000000..ed6f36c --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSSocketOptionTests.swift @@ -0,0 +1,164 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import NIO +import Network +@testable import NIOTransportServices + + +class NIOTSSocketOptionTests: XCTestCase { + private var options: NWProtocolTCP.Options! + + override func setUp() { + self.options = NWProtocolTCP.Options() + } + + override func tearDown() { + self.options = nil + } + + private func assertProperty(called path: KeyPath, + correspondsTo socketOption: SocketOption, + defaultsTo defaultValue: T, + and defaultSocketOptionValue: SocketOptionValue, + canBeSetTo unusualValue: SocketOptionValue, + whichLeadsTo newInnerValue: T) throws { + // Confirm the default is right. + let actualDefaultSocketOptionValue = try self.options.valueFor(socketOption: socketOption) + XCTAssertEqual(self.options[keyPath: path], defaultValue) + XCTAssertEqual(actualDefaultSocketOptionValue, defaultSocketOptionValue) + + // Confirm that we can set this to a new value, and that it leads to the right outcome. + try self.options.applyChannelOption(option: socketOption, value: unusualValue) + XCTAssertEqual(self.options[keyPath: path], newInnerValue) + XCTAssertEqual(try self.options.valueFor(socketOption: socketOption), unusualValue) + + // And confirm that we can set it back to the default. + try self.options.applyChannelOption(option: socketOption, value: actualDefaultSocketOptionValue) + XCTAssertEqual(self.options[keyPath: path], defaultValue) + XCTAssertEqual(actualDefaultSocketOptionValue, defaultSocketOptionValue) + } + + func testReadingAndSettingNoDelay() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.noDelay, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_NODELAY), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testReadingAndSettingNoPush() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.noPush, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_NOPUSH), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testReadingAndSettingNoOpt() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.noOptions, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_NOOPT), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testReadingAndSettingKeepaliveCount() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.keepaliveCount, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPCNT), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingKeepaliveIdle() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.keepaliveIdle, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPALIVE), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingKeepaliveInterval() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.keepaliveInterval, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPINTVL), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingMaxSeg() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.maximumSegmentSize, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_MAXSEG), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingConnectTimeout() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.connectionTimeout, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_CONNECTIONTIMEOUT), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingConnectDropTime() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.connectionDropTime, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_RXT_CONNDROPTIME), + defaultsTo: 0, and: 0, + canBeSetTo: 5, whichLeadsTo: 5) + } + + func testReadingAndSettingFinDrop() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.retransmitFinDrop, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_RXT_FINDROP), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testReadingAndSettingAckStretching() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.disableAckStretching, + correspondsTo: SocketOption(level: IPPROTO_TCP, name: TCP_SENDMOREACKS), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testReadingAndSettingKeepalive() throws { + try self.assertProperty(called: \NWProtocolTCP.Options.enableKeepalive, + correspondsTo: SocketOption(level: SOL_SOCKET, name: SO_KEEPALIVE), + defaultsTo: false, and: 0, + canBeSetTo: 1, whichLeadsTo: true) + } + + func testWritingNonexistentSocketOption() { + let option = SocketOption(level: Int32.max, name: Int32.max) + + do { + try self.options.applyChannelOption(option: option, value: 0) + } catch let err as NIOTSErrors.UnsupportedSocketOption { + XCTAssertEqual(err.optionValue.0, Int32.max) + XCTAssertEqual(err.optionValue.1, Int32.max) + } catch { + XCTFail("Unexpected error \(error)") + } + } + + func testReadingNonexistentSocketOption() { + let option = SocketOption(level: Int32.max, name: Int32.max) + + do { + _ = try self.options.valueFor(socketOption: option) + } catch let err as NIOTSErrors.UnsupportedSocketOption { + XCTAssertEqual(err.optionValue.0, Int32.max) + XCTAssertEqual(err.optionValue.1, Int32.max) + } catch { + XCTFail("Unexpected error \(error)") + } + } +} diff --git a/Tests/NIOTransportServicesTests/NIOTSSocketOptionsOnChannelTests.swift b/Tests/NIOTransportServicesTests/NIOTSSocketOptionsOnChannelTests.swift new file mode 100644 index 0000000..e89eda6 --- /dev/null +++ b/Tests/NIOTransportServicesTests/NIOTSSocketOptionsOnChannelTests.swift @@ -0,0 +1,124 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// swift-tools-version:4.0 +// +// swift-tools-version:4.0 +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import XCTest +import NIO +import Network +@testable import NIOTransportServices + + +private extension Channel { + private func getSocketOption(_ option: SocketOption) -> EventLoopFuture { + return self.getOption(option: ChannelOptions.socket(option.value.0, option.value.1)) + } + + private func setSocketOption(_ option: SocketOption, to value: SocketOptionValue) -> EventLoopFuture { + return self.setOption(option: ChannelOptions.socket(option.value.0, option.value.1), value: value) + } + + /// Asserts that a given socket option has a default value, that its value can be changed to a new value, and that it can then be + /// switched back. + func assertOptionRoundTrips(option: SocketOption, initialValue: SocketOptionValue, testAlternativeValue: SocketOptionValue) -> EventLoopFuture { + return self.getSocketOption(option).then { actualInitialValue in + XCTAssertEqual(actualInitialValue, initialValue) + return self.setSocketOption(option, to: testAlternativeValue) + }.then { + self.getSocketOption(option) + }.then { actualNewValue in + XCTAssertEqual(actualNewValue, testAlternativeValue) + return self.setSocketOption(option, to: initialValue) + }.then { + self.getSocketOption(option) + }.map { returnedToValue in + XCTAssertEqual(returnedToValue, initialValue) + } + } +} + + +class NIOTSSocketOptionsOnChannelTests: XCTestCase { + private var group: NIOTSEventLoopGroup! + + override func setUp() { + self.group = NIOTSEventLoopGroup() + } + + override func tearDown() { + XCTAssertNoThrow(try self.group.syncShutdownGracefully()) + } + + func assertChannelOptionAfterCreation(option: SocketOption, initialValue: SocketOptionValue, testAlternativeValue: SocketOptionValue) throws { + let listener = try NIOTSListenerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait() + defer { + XCTAssertNoThrow(try listener.close().wait()) + } + let connector = try NIOTSConnectionBootstrap(group: group).connect(to: listener.localAddress!).wait() + defer { + XCTAssertNoThrow(try connector.close().wait()) + } + + XCTAssertNoThrow(try listener.assertOptionRoundTrips(option: option, initialValue: initialValue, testAlternativeValue: testAlternativeValue).wait()) + XCTAssertNoThrow(try connector.assertOptionRoundTrips(option: option, initialValue: initialValue, testAlternativeValue: testAlternativeValue).wait()) + } + + func testNODELAY() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_NODELAY), initialValue: 0, testAlternativeValue: 1) + } + + func testNOPUSH() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_NOPUSH), initialValue: 0, testAlternativeValue: 1) + } + + func testNOOPT() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_NOOPT), initialValue: 0, testAlternativeValue: 1) + } + + func testKEEPCNT() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPCNT), initialValue: 0, testAlternativeValue: 5) + } + + func testKEEPALIVE() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPALIVE), initialValue: 0, testAlternativeValue: 5) + } + + func testKEEPINTVL() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_KEEPINTVL), initialValue: 0, testAlternativeValue: 5) + } + + func testMAXSEG() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_MAXSEG), initialValue: 0, testAlternativeValue: 5) + } + + func testCONNECTIONTIMEOUT() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_CONNECTIONTIMEOUT), initialValue: 0, testAlternativeValue: 5) + } + + func testRXT_CONNDROPTIME() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_RXT_CONNDROPTIME), initialValue: 0, testAlternativeValue: 5) + } + + func testRXT_FINDROP() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_RXT_FINDROP), initialValue: 0, testAlternativeValue: 1) + } + + func testSENDMOREACKS() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: IPPROTO_TCP, name: TCP_SENDMOREACKS), initialValue: 0, testAlternativeValue: 1) + } + + func testSO_KEEPALIVE() throws { + try self.assertChannelOptionAfterCreation(option: SocketOption(level: SOL_SOCKET, name: SO_KEEPALIVE), initialValue: 0, testAlternativeValue: 1) + } +} + diff --git a/dev/git.commit.template b/dev/git.commit.template new file mode 100644 index 0000000..c52bfa1 --- /dev/null +++ b/dev/git.commit.template @@ -0,0 +1,14 @@ +One line description of your change + +Motivation: + +Explain here the context, and why you're making that change. +What is the problem you're trying to solve. + +Modifications: + +Describe the modifications you've done. + +Result: + +After your change, what will change.