diff --git a/Makefile b/Makefile index 771ef91eb..a5e699ea4 100644 --- a/Makefile +++ b/Makefile @@ -58,8 +58,8 @@ test-plugin: test-plugin-nio: swift build $(CFLAGS) --product protoc-gen-swiftgrpc - protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=Client=false,NIO=true - diff -u /tmp/echo.grpc.swift Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift + protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=NIO=true + diff -u /tmp/echo.grpc.swift Sources/Examples/EchoNIO/Generated/echo.grpc.swift xcodebuild: project xcodebuild -project SwiftGRPC.xcodeproj -configuration "Debug" -parallelizeTargets -target SwiftGRPC -target Echo -target Simple -target protoc-gen-swiftgrpc build diff --git a/Sources/CgRPC/shim/channel.c b/Sources/CgRPC/shim/channel.c index d632add53..451c97d5c 100644 --- a/Sources/CgRPC/shim/channel.c +++ b/Sources/CgRPC/shim/channel.c @@ -57,9 +57,6 @@ cgrpc_channel *cgrpc_channel_create_secure(const char *address, c->channel = grpc_secure_channel_create(creds, address, &channel_args, NULL); c->completion_queue = grpc_completion_queue_create_for_next(NULL); - - grpc_channel_credentials_release(creds); - return c; } @@ -76,9 +73,6 @@ cgrpc_channel *cgrpc_channel_create_google(const char *address, c->channel = grpc_secure_channel_create(google_creds, address, &channel_args, NULL); c->completion_queue = grpc_completion_queue_create_for_next(NULL); - - grpc_channel_credentials_release(google_creds); - return c; } diff --git a/Sources/Examples/EchoNIO/Generated/echo.grpc.swift b/Sources/Examples/EchoNIO/Generated/echo.grpc.swift new file mode 100644 index 000000000..861c08a92 --- /dev/null +++ b/Sources/Examples/EchoNIO/Generated/echo.grpc.swift @@ -0,0 +1,144 @@ +// +// DO NOT EDIT. +// +// Generated by the protocol buffer compiler. +// Source: echo.proto +// + +// +// Copyright 2018, gRPC Authors All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +import Foundation +import NIO +import NIOHTTP1 +import SwiftGRPCNIO +import SwiftProtobuf + + +/// Usage: instantiate Echo_EchoService_NIOClient, then call methods of this protocol to make API calls. +internal protocol Echo_EchoService_NIO { + func get(_ request: Echo_EchoRequest, callOptions: CallOptions?) -> UnaryClientCall + func expand(_ request: Echo_EchoRequest, callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall + func collect(callOptions: CallOptions?) -> ClientStreamingClientCall + func update(callOptions: CallOptions?, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall +} + +internal final class Echo_EchoService_NIOClient: GRPCServiceClient, Echo_EchoService_NIO { + internal let client: GRPCClient + internal let service = "echo.Echo" + internal var defaultCallOptions: CallOptions + + /// Creates a client for the echo.Echo service. + /// + /// - Parameters: + /// - client: `GRPCClient` with a connection to the service host. + /// - defaultCallOptions: Options to use for each service call if the user doesn't provide them. Defaults to `client.defaultCallOptions`. + internal init(client: GRPCClient, defaultCallOptions: CallOptions? = nil) { + self.client = client + self.defaultCallOptions = defaultCallOptions ?? client.defaultCallOptions + } + + /// Asynchronous unary call to Get. + /// + /// - Parameters: + /// - request: Request to send to Get. + /// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`. + /// - Returns: A `UnaryClientCall` with futures for the metadata, status and response. + internal func get(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil) -> UnaryClientCall { + return UnaryClientCall(client: client, path: path(forMethod: "Get"), request: request, callOptions: callOptions ?? self.defaultCallOptions) + } + + /// Asynchronous server-streaming call to Expand. + /// + /// - Parameters: + /// - request: Request to send to Expand. + /// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`. + /// - handler: A closure called when each response is received from the server. + /// - Returns: A `ServerStreamingClientCall` with futures for the metadata and status. + internal func expand(_ request: Echo_EchoRequest, callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> ServerStreamingClientCall { + return ServerStreamingClientCall(client: client, path: path(forMethod: "Expand"), request: request, callOptions: callOptions ?? self.defaultCallOptions, handler: handler) + } + + /// Asynchronous client-streaming call to Collect. + /// + /// Callers should use the `send` method on the returned object to send messages + /// to the server. The caller should send an `.end` after the final message has been sent. + /// + /// - Parameters: + /// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`. + /// - Returns: A `ClientStreamingClientCall` with futures for the metadata, status and response. + internal func collect(callOptions: CallOptions? = nil) -> ClientStreamingClientCall { + return ClientStreamingClientCall(client: client, path: path(forMethod: "Collect"), callOptions: callOptions ?? self.defaultCallOptions) + } + + /// Asynchronous bidirectional-streaming call to Update. + /// + /// Callers should use the `send` method on the returned object to send messages + /// to the server. The caller should send an `.end` after the final message has been sent. + /// + /// - Parameters: + /// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`. + /// - handler: A closure called when each response is received from the server. + /// - Returns: A `ClientStreamingClientCall` with futures for the metadata and status. + internal func update(callOptions: CallOptions? = nil, handler: @escaping (Echo_EchoResponse) -> Void) -> BidirectionalStreamingClientCall { + return BidirectionalStreamingClientCall(client: client, path: path(forMethod: "Update"), callOptions: callOptions ?? self.defaultCallOptions, handler: handler) + } + +} + +/// To build a server, implement a class that conforms to this protocol. +internal protocol Echo_EchoProvider_NIO: CallHandlerProvider { + func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture + func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext) -> EventLoopFuture + func collect(context: UnaryResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> + func update(context: StreamingResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> +} + +extension Echo_EchoProvider_NIO { + internal var serviceName: String { return "echo.Echo" } + + /// Determines, calls and returns the appropriate request handler, depending on the request's method. + /// Returns nil for methods not handled by this service. + internal func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel, errorDelegate: ServerErrorDelegate?) -> GRPCCallHandler? { + switch methodName { + case "Get": + return UnaryCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in + return { request in + self.get(request: request, context: context) + } + } + + case "Expand": + return ServerStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in + return { request in + self.expand(request: request, context: context) + } + } + + case "Collect": + return ClientStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in + return self.collect(context: context) + } + + case "Update": + return BidirectionalStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in + return self.update(context: context) + } + + default: return nil + } + } +} + diff --git a/Sources/Examples/EchoNIO/Generated/echo.pb.swift b/Sources/Examples/EchoNIO/Generated/echo.pb.swift index c95f2daee..a313aab3f 120000 --- a/Sources/Examples/EchoNIO/Generated/echo.pb.swift +++ b/Sources/Examples/EchoNIO/Generated/echo.pb.swift @@ -1 +1 @@ -../../../../Tests/SwiftGRPCNIOTests/echo.pb.swift \ No newline at end of file +../../Echo/Generated/echo.pb.swift \ No newline at end of file diff --git a/Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift b/Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift deleted file mode 120000 index b6bf95ab4..000000000 --- a/Sources/Examples/EchoNIO/Generated/echo_nio.grpc.swift +++ /dev/null @@ -1 +0,0 @@ -../../../../Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift \ No newline at end of file diff --git a/Sources/Examples/EchoNIO/main.swift b/Sources/Examples/EchoNIO/main.swift index 4247028f2..3d62df24e 100644 --- a/Sources/Examples/EchoNIO/main.swift +++ b/Sources/Examples/EchoNIO/main.swift @@ -1,5 +1,5 @@ /* - * Copyright 2018, gRPC Authors All rights reserved. + * Copyright 2019, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,9 +24,23 @@ func addressOption(_ address: String) -> Option { return Option("address", default: address, description: "address of server") } -let portOption = Option("port", - default: "8080", - description: "port of server") +let portOption = Option("port", default: 8080) +let messageOption = Option("message", + default: "Testing 1 2 3", + description: "message to send") + +/// Create en `EchoClient` and wait for it to initialize. Returns nil if initialisation fails. +func makeEchoClient(address: String, port: Int) -> Echo_EchoService_NIOClient? { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + do { + return try GRPCClient.start(host: address, port: port, eventLoopGroup: eventLoopGroup) + .map { client in Echo_EchoService_NIOClient(client: client) } + .wait() + } catch { + print("Unable to create an EchoClient: \(error)") + return nil + } +} Group { $0.command("serve", @@ -38,9 +52,9 @@ Group { print("starting insecure server") _ = try! GRPCServer.start(hostname: address, - port: Int(port)!, - eventLoopGroup: eventLoopGroup, - serviceProviders: [EchoProviderNIO()]) + port: port, + eventLoopGroup: eventLoopGroup, + serviceProviders: [EchoProviderNIO()]) .wait() // This blocks to keep the main thread from finishing while the server runs, @@ -48,4 +62,131 @@ Group { _ = sem.wait() } + $0.command( + "get", + addressOption("localhost"), + portOption, + messageOption, + description: "Perform a unary get()." + ) { address, port, message in + print("calling get") + guard let echo = makeEchoClient(address: address, port: port) else { return } + + var requestMessage = Echo_EchoRequest() + requestMessage.text = message + + print("get sending: \(requestMessage.text)") + let get = echo.get(requestMessage) + get.response.whenSuccess { response in + print("get received: \(response.text)") + } + + get.response.whenFailure { error in + print("get response failed with error: \(error)") + } + + // wait() on the status to stop the program from exiting. + do { + let status = try get.status.wait() + print("get completed with status: \(status)") + } catch { + print("get status failed with error: \(error)") + } + } + + $0.command( + "expand", + addressOption("localhost"), + portOption, + messageOption, + description: "Perform a server-streaming expand()." + ) { address, port, message in + print("calling expand") + guard let echo = makeEchoClient(address: address, port: port) else { return } + + let requestMessage = Echo_EchoRequest.with { $0.text = message } + + print("expand sending: \(requestMessage.text)") + let expand = echo.expand(requestMessage) { response in + print("expand received: \(response.text)") + } + + // wait() on the status to stop the program from exiting. + do { + let status = try expand.status.wait() + print("expand completed with status: \(status)") + } catch { + print("expand status failed with error: \(error)") + } + } + + $0.command( + "collect", + addressOption("localhost"), + portOption, + messageOption, + description: "Perform a client-streaming collect()." + ) { address, port, message in + print("calling collect") + guard let echo = makeEchoClient(address: address, port: port) else { return } + + let collect = echo.collect() + + var queue = collect.newMessageQueue() + for part in message.components(separatedBy: " ") { + var requestMessage = Echo_EchoRequest() + requestMessage.text = part + print("collect sending: \(requestMessage.text)") + queue = queue.then { collect.sendMessage(requestMessage) } + } + queue.whenSuccess { collect.sendEnd(promise: nil) } + + collect.response.whenSuccess { respone in + print("collect received: \(respone.text)") + } + + collect.response.whenFailure { error in + print("collect response failed with error: \(error)") + } + + // wait() on the status to stop the program from exiting. + do { + let status = try collect.status.wait() + print("collect completed with status: \(status)") + } catch { + print("collect status failed with error: \(error)") + } + } + + $0.command( + "update", + addressOption("localhost"), + portOption, + messageOption, + description: "Perform a bidirectional-streaming update()." + ) { address, port, message in + print("calling update") + guard let echo = makeEchoClient(address: address, port: port) else { return } + + let update = echo.update { response in + print("update received: \(response.text)") + } + + var queue = update.newMessageQueue() + for part in message.components(separatedBy: " ") { + var requestMessage = Echo_EchoRequest() + requestMessage.text = part + print("update sending: \(requestMessage.text)") + queue = queue.then { update.sendMessage(requestMessage) } + } + queue.whenSuccess { update.sendEnd(promise: nil) } + + // wait() on the status to stop the program from exiting. + do { + let status = try update.status.wait() + print("update completed with status: \(status)") + } catch { + print("update status failed with error: \(error)") + } + } }.run() diff --git a/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift index ae3986fd5..5aae56950 100644 --- a/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift +++ b/Sources/SwiftGRPCNIO/CallHandlers/BaseCallHandler.swift @@ -50,7 +50,7 @@ extension BaseCallHandler: ChannelInboundHandler { switch self.unwrapInboundIn(data) { case .head(let requestHead): // Head should have been handled by `GRPCChannelHandler`. - self.errorCaught(ctx: ctx, error: GRPCServerError.invalidState("unexpected request head received \(requestHead)")) + self.errorCaught(ctx: ctx, error: GRPCError.server(.invalidState("unexpected request head received \(requestHead)"))) case .message(let message): do { @@ -71,7 +71,7 @@ extension BaseCallHandler: ChannelOutboundHandler { public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { guard serverCanWrite else { - promise?.fail(error: GRPCServerError.serverNotWritable) + promise?.fail(error: GRPCError.server(.serverNotWritable)) return } diff --git a/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift b/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift index 6374cea58..d3cfe5d61 100644 --- a/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift +++ b/Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift @@ -10,9 +10,9 @@ import NIOHTTP1 public class ServerStreamingCallHandler: BaseCallHandler { public typealias EventObserver = (RequestMessage) -> EventLoopFuture private var eventObserver: EventObserver? - + private var context: StreamingResponseCallContext? - + public init(channel: Channel, request: HTTPRequestHead, errorDelegate: ServerErrorDelegate?, eventObserverFactory: (StreamingResponseCallContext) -> EventObserver) { super.init(errorDelegate: errorDelegate) let context = StreamingResponseCallContextImpl(channel: channel, request: request) @@ -24,14 +24,13 @@ public class ServerStreamingCallHandler public override func processMessage(_ message: RequestMessage) throws { guard let eventObserver = self.eventObserver, let context = self.context else { - throw GRPCServerError.requestCardinalityViolation + throw GRPCError.server(.requestCardinalityViolation) } let resultFuture = eventObserver(message) diff --git a/Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift new file mode 100644 index 000000000..d47f76f4c --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/BaseClientCall.swift @@ -0,0 +1,234 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 +import NIOHTTP2 +import SwiftProtobuf + +/// This class provides much of the boilerplate for the four types of gRPC call objects returned to framework +/// users. +/// +/// Setup includes: +/// - creation of an HTTP/2 stream for the call to execute on, +/// - configuration of the NIO channel handlers for the stream, and +/// - setting a call timeout, if one is provided. +/// +/// This class also provides much of the framework user facing functionality via conformance to `ClientCall`. +open class BaseClientCall { + /// The underlying `GRPCClient` providing the HTTP/2 channel and multiplexer. + internal let client: GRPCClient + + /// Promise for an HTTP/2 stream to execute the call on. + internal let streamPromise: EventLoopPromise + + /// Client channel handler. Handles internal state for reading/writing messages to the channel. + /// The handler also owns the promises for the futures that this class surfaces to the user (such as + /// `initialMetadata` and `status`). + internal let clientChannelHandler: GRPCClientChannelHandler + + /// Sets up a gRPC call. + /// + /// A number of actions are performed: + /// - a new HTTP/2 stream is created and configured using the channel and multiplexer provided by `client`, + /// - a callback is registered on the new stream (`subchannel`) to send the request head, + /// - a timeout is scheduled if one is set in the `callOptions`. + /// + /// - Parameters: + /// - client: client containing the HTTP/2 channel and multiplexer to use for this call. + /// - path: path for this RPC method. + /// - callOptions: options to use when configuring this call. + /// - responseObserver: observer for received messages. + init( + client: GRPCClient, + path: String, + callOptions: CallOptions, + responseObserver: ResponseObserver + ) { + self.client = client + self.streamPromise = client.channel.eventLoop.newPromise() + self.clientChannelHandler = GRPCClientChannelHandler( + initialMetadataPromise: client.channel.eventLoop.newPromise(), + statusPromise: client.channel.eventLoop.newPromise(), + responseObserver: responseObserver) + + self.createStreamChannel() + self.setTimeout(callOptions.timeout) + } +} + +extension BaseClientCall: ClientCall { + public var subchannel: EventLoopFuture { + return self.streamPromise.futureResult + } + + public var initialMetadata: EventLoopFuture { + return self.clientChannelHandler.initialMetadataPromise.futureResult + } + + public var status: EventLoopFuture { + return self.clientChannelHandler.statusPromise.futureResult + } + + public func cancel() { + self.client.channel.eventLoop.execute { + self.subchannel.whenSuccess { channel in + channel.close(mode: .all, promise: nil) + } + } + } +} + +extension BaseClientCall { + /// Creates and configures an HTTP/2 stream channel. `subchannel` will contain the stream channel when it is created. + /// + /// - Important: This should only ever be called once. + private func createStreamChannel() { + self.client.channel.eventLoop.execute { + self.client.multiplexer.createStreamChannel(promise: self.streamPromise) { (subchannel, streamID) -> EventLoopFuture in + subchannel.pipeline.addHandlers([HTTP2ToHTTP1ClientCodec(streamID: streamID, httpProtocol: .http), + HTTP1ToRawGRPCClientCodec(), + GRPCClientCodec(), + self.clientChannelHandler], + first: false) + } + } + } + + /// Send the request head once `subchannel` becomes available. + /// + /// - Important: This should only ever be called once. + /// + /// - Parameters: + /// - requestHead: The request head to send. + /// - promise: A promise to fulfill once the request head has been sent. + internal func sendHead(_ requestHead: HTTPRequestHead, promise: EventLoopPromise?) { + // The nghttp2 implementation of NIOHTTP2 has a known defect where "promises on control frame + // writes do not work and will be leaked. Promises on DATA frame writes work just fine and will + // be fulfilled correctly." Succeed the promise here as a temporary workaround. + //! TODO: remove this and pass the promise to `writeAndFlush` when NIOHTTP2 supports it. + promise?.succeed(result: ()) + self.subchannel.whenSuccess { channel in + channel.writeAndFlush(GRPCClientRequestPart.head(requestHead), promise: nil) + } + } + + /// Send the request head once `subchannel` becomes available. + /// + /// - Important: This should only ever be called once. + /// + /// - Parameter requestHead: The request head to send. + /// - Returns: A future which will be succeeded once the request head has been sent. + internal func sendHead(_ requestHead: HTTPRequestHead) -> EventLoopFuture { + let promise = client.channel.eventLoop.newPromise(of: Void.self) + self.sendHead(requestHead, promise: promise) + return promise.futureResult + } + + /// Send the given message once `subchannel` becomes available. + /// + /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name. + /// - Parameters: + /// - message: The message to send. + /// - promise: A promise to fulfil when the message reaches the network. + internal func _sendMessage(_ message: RequestMessage, promise: EventLoopPromise?) { + self.subchannel.whenSuccess { channel in + channel.writeAndFlush(GRPCClientRequestPart.message(message), promise: promise) + } + } + + /// Send the given message once `subchannel` becomes available. + /// + /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name. + /// - Returns: A future which will be fullfilled when the message reaches the network. + internal func _sendMessage(_ message: RequestMessage) -> EventLoopFuture { + let promise = client.channel.eventLoop.newPromise(of: Void.self) + self._sendMessage(message, promise: promise) + return promise.futureResult + } + + /// Send `end` once `subchannel` becomes available. + /// + /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name. + /// - Important: This should only ever be called once. + /// - Parameter promise: A promise to succeed once then end has been sent. + internal func _sendEnd(promise: EventLoopPromise?) { + // The nghttp2 implementation of NIOHTTP2 has a known defect where "promises on control frame + // writes do not work and will be leaked. Promises on DATA frame writes work just fine and will + // be fulfilled correctly." Succeed the promise here as a temporary workaround. + //! TODO: remove this and pass the promise to `writeAndFlush` when NIOHTTP2 supports it. + promise?.succeed(result: ()) + self.subchannel.whenSuccess { channel in + channel.writeAndFlush(GRPCClientRequestPart.end, promise: nil) + } + } + + /// Send `end` once `subchannel` becomes available. + /// + /// - Note: This is prefixed to allow for classes conforming to `StreamingRequestClientCall` to use the non-underbarred name. + /// - Important: This should only ever be called once. + ///- Returns: A future which will be succeeded once the end has been sent. + internal func _sendEnd() -> EventLoopFuture { + let promise = client.channel.eventLoop.newPromise(of: Void.self) + self._sendEnd(promise: promise) + return promise.futureResult + } + + /// Creates a client-side timeout for this call. + /// + /// - Important: This should only ever be called once. + private func setTimeout(_ timeout: GRPCTimeout) { + if timeout == .infinite { return } + + self.client.channel.eventLoop.scheduleTask(in: timeout.asNIOTimeAmount) { [weak self] in + self?.clientChannelHandler.observeError(.client(.deadlineExceeded(timeout))) + } + } + + /// Makes a new `HTTPRequestHead` for a call with this signature. + /// + /// - Parameters: + /// - path: path for this RPC method. + /// - host: the address of the host we are connected to. + /// - callOptions: options to use when configuring this call. + /// - Returns: `HTTPRequestHead` configured for this call. + internal func makeRequestHead(path: String, host: String, callOptions: CallOptions) -> HTTPRequestHead { + var requestHead = HTTPRequestHead(version: .init(major: 2, minor: 0), method: .POST, uri: path) + + callOptions.customMetadata.forEach { name, value in + requestHead.headers.add(name: name, value: value) + } + + // We're dealing with HTTP/1; the NIO HTTP2ToHTTP1Codec replaces "host" with ":authority". + requestHead.headers.add(name: "host", value: host) + + requestHead.headers.add(name: "content-type", value: "application/grpc") + + // Used to detect incompatible proxies, as per https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + requestHead.headers.add(name: "te", value: "trailers") + + //! FIXME: Add a more specific user-agent, see: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents + requestHead.headers.add(name: "user-agent", value: "grpc-swift-nio") + + requestHead.headers.add(name: "grpc-accept-encoding", value: CompressionMechanism.acceptEncodingHeader) + + if callOptions.timeout != .infinite { + requestHead.headers.add(name: "grpc-timeout", value: String(describing: callOptions.timeout)) + } + + return requestHead + } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/BidirectionalStreamingClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/BidirectionalStreamingClientCall.swift new file mode 100644 index 000000000..deb5a1f5f --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/BidirectionalStreamingClientCall.swift @@ -0,0 +1,59 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftProtobuf +import NIO + +/// A bidirectional-streaming gRPC call. Each response is passed to the provided observer block. +/// +/// Messages should be sent via the `send` method; an `.end` message should be sent +/// to indicate the final message has been sent. +/// +/// The following futures are available to the caller: +/// - `initialMetadata`: the initial metadata returned from the server, +/// - `status`: the status of the gRPC call after it has ended, +/// - `trailingMetadata`: any metadata returned from the server alongside the `status`. +public class BidirectionalStreamingClientCall: BaseClientCall, StreamingRequestClientCall { + private var messageQueue: EventLoopFuture + + public init(client: GRPCClient, path: String, callOptions: CallOptions, handler: @escaping (ResponseMessage) -> Void) { + self.messageQueue = client.channel.eventLoop.newSucceededFuture(result: ()) + super.init(client: client, path: path, callOptions: callOptions, responseObserver: .callback(handler)) + + let requestHead = self.makeRequestHead(path: path, host: client.host, callOptions: callOptions) + self.messageQueue = self.messageQueue.then { self.sendHead(requestHead) } + } + + public func sendMessage(_ message: RequestMessage) -> EventLoopFuture { + return self._sendMessage(message) + } + + public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise?) { + self._sendMessage(message, promise: promise) + } + + public func sendEnd() -> EventLoopFuture { + return self._sendEnd() + } + + public func sendEnd(promise: EventLoopPromise?) { + self._sendEnd(promise: promise) + } + + public func newMessageQueue() -> EventLoopFuture { + return self.messageQueue + } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/ClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/ClientCall.swift new file mode 100644 index 000000000..49dc52c81 --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/ClientCall.swift @@ -0,0 +1,115 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 +import NIOHTTP2 +import SwiftProtobuf + +/// Base protocol for a client call to a gRPC service. +public protocol ClientCall { + /// The type of the request message for the call. + associatedtype RequestMessage: Message + /// The type of the response message for the call. + associatedtype ResponseMessage: Message + + /// HTTP/2 stream that requests and responses are sent and received on. + var subchannel: EventLoopFuture { get } + + /// Initial response metadata. + var initialMetadata: EventLoopFuture { get } + + /// Status of this call which may be populated by the server or client. + /// + /// The client may populate the status if, for example, it was not possible to connect to the service. + /// + /// Note: despite `GRPCStatus` being an `Error`, the value will be __always__ delivered as a __success__ + /// result even if the status represents a __negative__ outcome. This future will __never__ be fulfilled + /// with an error. + var status: EventLoopFuture { get } + + /// Trailing response metadata. + /// + /// This is the same metadata as `GRPCStatus.trailingMetadata` returned by `status`. + var trailingMetadata: EventLoopFuture { get } + + /// Cancel the current call. + /// + /// Closes the HTTP/2 stream once it becomes available. Additional writes to the channel will be ignored. + /// Any unfulfilled promises will be failed with a cancelled status (excepting `status` which will be + /// succeeded, if not already succeeded). + func cancel() +} + +extension ClientCall { + public var trailingMetadata: EventLoopFuture { + return status.map { $0.trailingMetadata } + } +} + +/// A `ClientCall` with request streaming; i.e. client-streaming and bidirectional-streaming. +public protocol StreamingRequestClientCall: ClientCall { + /// Sends a message to the service. + /// + /// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`. + /// + /// - Parameter message: The message to send. + /// - Returns: A future which will be fullfilled when the message has been sent. + func sendMessage(_ message: RequestMessage) -> EventLoopFuture + + /// Sends a message to the service. + /// + /// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`. + /// + /// - Parameters: + /// - message: The message to send. + /// - promise: A promise to be fulfilled when the message has been sent. + func sendMessage(_ message: RequestMessage, promise: EventLoopPromise?) + + /// Returns a future which can be used as a message queue. + /// + /// Callers may use this as such: + /// ``` + /// var queue = call.newMessageQueue() + /// for message in messagesToSend { + /// queue = queue.then { call.sendMessage(message) } + /// } + /// ``` + /// + /// - Returns: A future which may be used as the head of a message queue. + func newMessageQueue() -> EventLoopFuture + + /// Terminates a stream of messages sent to the service. + /// + /// - Important: This should only ever be called once. + /// - Returns: A future which will be fullfilled when the end has been sent. + func sendEnd() -> EventLoopFuture + + /// Terminates a stream of messages sent to the service. + /// + /// - Important: This should only ever be called once. + /// - Parameter promise: A promise to be fulfilled when the end has been sent. + func sendEnd(promise: EventLoopPromise?) +} + +/// A `ClientCall` with a unary response; i.e. unary and client-streaming. +public protocol UnaryResponseClientCall: ClientCall { + /// The response message returned from the service if the call is successful. This may be failed + /// if the call encounters an error. + /// + /// Callers should rely on the `status` of the call for the canonical outcome. + var response: EventLoopFuture { get } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/ClientStreamingClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/ClientStreamingClientCall.swift new file mode 100644 index 000000000..6d40a6fa6 --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/ClientStreamingClientCall.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftProtobuf +import NIO + +/// A client-streaming gRPC call. +/// +/// Messages should be sent via the `send` method; an `.end` message should be sent +/// to indicate the final message has been sent. +/// +/// The following futures are available to the caller: +/// - `initialMetadata`: the initial metadata returned from the server, +/// - `response`: the response from the call, +/// - `status`: the status of the gRPC call after it has ended, +/// - `trailingMetadata`: any metadata returned from the server alongside the `status`. +public class ClientStreamingClientCall: BaseClientCall, StreamingRequestClientCall, UnaryResponseClientCall { + public let response: EventLoopFuture + private var messageQueue: EventLoopFuture + + public init(client: GRPCClient, path: String, callOptions: CallOptions) { + let responsePromise: EventLoopPromise = client.channel.eventLoop.newPromise() + self.response = responsePromise.futureResult + self.messageQueue = client.channel.eventLoop.newSucceededFuture(result: ()) + + super.init( + client: client, + path: path, + callOptions: callOptions, + responseObserver: .succeedPromise(responsePromise)) + + let requestHead = self.makeRequestHead(path: path, host: client.host, callOptions: callOptions) + self.messageQueue = self.messageQueue.then { self.sendHead(requestHead) } + } + + public func sendMessage(_ message: RequestMessage) -> EventLoopFuture { + return self._sendMessage(message) + } + + public func sendMessage(_ message: RequestMessage, promise: EventLoopPromise?) { + self._sendMessage(message, promise: promise) + } + + public func sendEnd() -> EventLoopFuture { + return self._sendEnd() + } + + public func sendEnd(promise: EventLoopPromise?) { + self._sendEnd(promise: promise) + } + + public func newMessageQueue() -> EventLoopFuture { + return self.messageQueue + } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/ResponseObserver.swift b/Sources/SwiftGRPCNIO/ClientCalls/ResponseObserver.swift new file mode 100644 index 000000000..a1061d2cd --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/ResponseObserver.swift @@ -0,0 +1,48 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import SwiftProtobuf + +/// A response message observer. +/// +/// - succeedPromise: succeed the given promise on receipt of a message. +/// - callback: calls the given callback for each response observed. +public enum ResponseObserver { + case succeedPromise(EventLoopPromise) + case callback((ResponseMessage) -> Void) + + /// Observe the given message. + func observe(_ message: ResponseMessage) { + switch self { + case .callback(let callback): + callback(message) + + case .succeedPromise(let promise): + promise.succeed(result: message) + } + } + + var expectsMultipleResponses: Bool { + switch self { + case .callback: + return true + + case .succeedPromise: + return false + } + } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/ServerStreamingClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/ServerStreamingClientCall.swift new file mode 100644 index 000000000..0d3027714 --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/ServerStreamingClientCall.swift @@ -0,0 +1,35 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftProtobuf +import NIO + +/// A server-streaming gRPC call. The request is sent on initialization, each response is passed to the provided observer block. +/// +/// The following futures are available to the caller: +/// - `initialMetadata`: the initial metadata returned from the server, +/// - `status`: the status of the gRPC call after it has ended, +/// - `trailingMetadata`: any metadata returned from the server alongside the `status`. +public class ServerStreamingClientCall: BaseClientCall { + public init(client: GRPCClient, path: String, request: RequestMessage, callOptions: CallOptions, handler: @escaping (ResponseMessage) -> Void) { + super.init(client: client, path: path, callOptions: callOptions, responseObserver: .callback(handler)) + + let requestHead = self.makeRequestHead(path: path, host: client.host, callOptions: callOptions) + self.sendHead(requestHead) + .then { self._sendMessage(request) } + .whenSuccess { self._sendEnd(promise: nil) } + } +} diff --git a/Sources/SwiftGRPCNIO/ClientCalls/UnaryClientCall.swift b/Sources/SwiftGRPCNIO/ClientCalls/UnaryClientCall.swift new file mode 100644 index 000000000..fbcb75316 --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientCalls/UnaryClientCall.swift @@ -0,0 +1,45 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftProtobuf +import NIO + +/// A unary gRPC call. The request is sent on initialization. +/// +/// The following futures are available to the caller: +/// - `initialMetadata`: the initial metadata returned from the server, +/// - `response`: the response from the unary call, +/// - `status`: the status of the gRPC call after it has ended, +/// - `trailingMetadata`: any metadata returned from the server alongside the `status`. +public class UnaryClientCall: BaseClientCall, UnaryResponseClientCall { + public let response: EventLoopFuture + + public init(client: GRPCClient, path: String, request: RequestMessage, callOptions: CallOptions) { + let responsePromise: EventLoopPromise = client.channel.eventLoop.newPromise() + self.response = responsePromise.futureResult + + super.init( + client: client, + path: path, + callOptions: callOptions, + responseObserver: .succeedPromise(responsePromise)) + + let requestHead = self.makeRequestHead(path: path, host: client.host, callOptions: callOptions) + self.sendHead(requestHead) + .then { self._sendMessage(request) } + .whenSuccess { self._sendEnd(promise: nil) } + } +} diff --git a/Sources/SwiftGRPCNIO/ClientOptions.swift b/Sources/SwiftGRPCNIO/ClientOptions.swift new file mode 100644 index 000000000..6dc7e0691 --- /dev/null +++ b/Sources/SwiftGRPCNIO/ClientOptions.swift @@ -0,0 +1,31 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIOHTTP1 + +/// Options to use for GRPC calls. +public struct CallOptions { + /// Additional metadata to send to the service. + public var customMetadata: HTTPHeaders + + /// The call timeout. + public var timeout: GRPCTimeout + + public init(customMetadata: HTTPHeaders = HTTPHeaders(), timeout: GRPCTimeout = GRPCTimeout.default) { + self.customMetadata = customMetadata + self.timeout = timeout + } +} diff --git a/Sources/SwiftGRPCNIO/CompressionMechanism.swift b/Sources/SwiftGRPCNIO/CompressionMechanism.swift new file mode 100644 index 000000000..f7a755bd9 --- /dev/null +++ b/Sources/SwiftGRPCNIO/CompressionMechanism.swift @@ -0,0 +1,81 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +public enum CompressionError: Error { + case unsupported(CompressionMechanism) +} + +/// The mechanism to use for message compression. +public enum CompressionMechanism: String { + // No compression was indicated. + case none + + // Compression indicated via a header. + case gzip + case deflate + case snappy + // This is the same as `.none` but was indicated via a "grpc-encoding" and may be listed + // in the "grpc-accept-encoding" header. If this is the compression mechanism being used + // then the compression flag should be indicated in length-prefxied messages (see + // `LengthPrefixedMessageReader`). + case identity + + // Compression indicated via a header, but not one listed in the specification. + case unknown + + /// Whether the compression flag in gRPC length-prefixed messages should be set or not. + /// + /// See `LengthPrefixedMessageReader` for the message format. + var requiresFlag: Bool { + switch self { + case .none: + return false + + case .identity, .gzip, .deflate, .snappy, .unknown: + return true + } + } + + /// Whether the given compression is supported. + var supported: Bool { + switch self { + case .identity, .none: + return true + + case .gzip, .deflate, .snappy, .unknown: + return false + } + } + + /// A string containing the supported compression mechanisms to list in the "grpc-accept-encoding" header. + static let acceptEncodingHeader: String = { + return CompressionMechanism + .allCases + .filter { $0.supported && $0.requiresFlag } + .map { $0.rawValue } + .joined(separator: ", ") + }() +} + +#if swift(>=4.2) +extension CompressionMechanism: CaseIterable {} +//! FIXME: Remove this code once the CI is updated to 4.2. +#else +extension CompressionMechanism { + public static let allCases: [CompressionMechanism] = [.none, .identity, .gzip, .deflate, .snappy, .unknown] +} +#endif diff --git a/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift b/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift index d18b7a4dc..de586e60e 100644 --- a/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift +++ b/Sources/SwiftGRPCNIO/GRPCChannelHandler.swift @@ -53,7 +53,7 @@ extension GRPCChannelHandler: ChannelInboundHandler { switch requestPart { case .head(let requestHead): guard let callHandler = getCallHandler(channel: ctx.channel, requestHead: requestHead) else { - errorCaught(ctx: ctx, error: GRPCServerError.unimplementedMethod(requestHead.uri)) + errorCaught(ctx: ctx, error: GRPCError.server(.unimplementedMethod(requestHead.uri))) return } diff --git a/Sources/SwiftGRPCNIO/GRPCClient.swift b/Sources/SwiftGRPCNIO/GRPCClient.swift new file mode 100644 index 000000000..d32684f74 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCClient.swift @@ -0,0 +1,89 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP2 + +/// Underlying channel and HTTP/2 stream multiplexer. +/// +/// Different service clients implementing `GRPCServiceClient` may share an instance of this class. +open class GRPCClient { + public static func start( + host: String, + port: Int, + eventLoopGroup: EventLoopGroup + ) -> EventLoopFuture { + let bootstrap = ClientBootstrap(group: eventLoopGroup) + // Enable SO_REUSEADDR. + .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .channelInitializer { channel in + channel.pipeline.add(handler: HTTP2Parser(mode: .client)) + } + + return bootstrap.connect(host: host, port: port).then { (channel: Channel) -> EventLoopFuture in + let multiplexer = HTTP2StreamMultiplexer(inboundStreamStateInitializer: nil) + return channel.pipeline.add(handler: multiplexer) + .map { GRPCClient(channel: channel, multiplexer: multiplexer, host: host) } + } + } + + public let channel: Channel + public let multiplexer: HTTP2StreamMultiplexer + public let host: String + public var defaultCallOptions: CallOptions + + init(channel: Channel, multiplexer: HTTP2StreamMultiplexer, host: String, defaultCallOptions: CallOptions = CallOptions()) { + self.channel = channel + self.multiplexer = multiplexer + self.host = host + self.defaultCallOptions = defaultCallOptions + } + + /// Fired when the client shuts down. + public var onClose: EventLoopFuture { + return channel.closeFuture + } + + public func close() -> EventLoopFuture { + return channel.close(mode: .all) + } +} + +/// A GRPC client for a given service. +public protocol GRPCServiceClient { + /// The client providing the underlying HTTP/2 channel for this client. + var client: GRPCClient { get } + + /// Name of the service this client is for (e.g. "echo.Echo"). + var service: String { get } + + /// The call options to use should the user not provide per-call options. + var defaultCallOptions: CallOptions { get set } + + /// Return the path for the given method in the format "/Service-Name/Method-Name". + /// + /// This may be overriden if consumers require a different path format. + /// + /// - Parameter forMethod: name of method to return a path for. + /// - Returns: path for the given method used in gRPC request headers. + func path(forMethod method: String) -> String +} + +extension GRPCServiceClient { + public func path(forMethod method: String) -> String { + return "/\(service)/\(method)" + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift b/Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift new file mode 100644 index 000000000..141066cc9 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCClientChannelHandler.swift @@ -0,0 +1,219 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 +import SwiftProtobuf + +/// The final client-side channel handler. +/// +/// This handler holds promises for the initial metadata and the status, as well as an observer +/// for responses. For unary and client-streaming calls the observer will succeed a response +/// promise. For server-streaming and bidirectional-streaming the observer will call the supplied +/// callback with each response received. +/// +/// Errors are also handled by the channel handler. Promises for the initial metadata and +/// response (if applicable) are failed with first error received. The status promise is __succeeded__ +/// with the error as a `GRPCStatus`. The stream is also closed and any inbound or outbound messages +/// are ignored. +internal class GRPCClientChannelHandler { + internal let initialMetadataPromise: EventLoopPromise + internal let statusPromise: EventLoopPromise + internal let responseObserver: ResponseObserver + + /// A promise for a unary response. + internal var responsePromise: EventLoopPromise? { + guard case .succeedPromise(let promise) = responseObserver else { return nil } + return promise + } + + /// Promise that the `HTTPRequestHead` has been sent to the network. + /// + /// If we attempt to close the stream before this has been fulfilled then the program will fatal + /// error because of an issue with nghttp2/swift-nio-http2. + /// + /// Since we need this promise to succeed before we can close the channel, `BaseClientCall` sends + /// the request head in `init` which will in turn initialize this promise in `write(ctx:data:promise:)`. + /// This means that this promise should never be nil in practice. + /// + /// See: https://github.com/apple/swift-nio-http2/issues/39. + private var requestHeadSentPromise: EventLoopPromise! + + private enum InboundState { + case expectingHeadersOrStatus + case expectingMessageOrStatus + case expectingStatus + case ignore + + var expectingStatus: Bool { + switch self { + case .expectingHeadersOrStatus, .expectingMessageOrStatus, .expectingStatus: + return true + + case .ignore: + return false + } + } + } + + private enum OutboundState { + case expectingHead + case expectingMessageOrEnd + case ignore + } + + private var inboundState: InboundState = .expectingHeadersOrStatus + private var outboundState: OutboundState = .expectingHead + + /// Creates a new `GRPCClientChannelHandler`. + /// + /// - Parameters: + /// - initialMetadataPromise: a promise to succeed on receiving the initial metadata from the service. + /// - statusPromise: a promise to succeed with the outcome of the call. + /// - responseObserver: an observer for response messages from the server; for unary responses this should + /// be the `succeedPromise` case. + public init( + initialMetadataPromise: EventLoopPromise, + statusPromise: EventLoopPromise, + responseObserver: ResponseObserver + ) { + self.initialMetadataPromise = initialMetadataPromise + self.statusPromise = statusPromise + self.responseObserver = responseObserver + } + + /// Observe the given status. + /// + /// The `status` promise is __succeeded__ with the given status despite `GRPCStatus` being an + /// `Error`. If `status.code != .ok` then the initial metadata and response promises (if applicable) + /// are failed with the given status. + /// + /// - Parameter status: the status to observe. + internal func observeStatus(_ status: GRPCStatus) { + if status.code != .ok { + self.initialMetadataPromise.fail(error: status) + self.responsePromise?.fail(error: status) + } + self.statusPromise.succeed(result: status) + } + + /// Observe the given error. + /// + /// Calls `observeStatus(status:)`. with `error.asGRPCStatus()`. + /// + /// - Parameter error: the error to observe. + internal func observeError(_ error: GRPCError) { + self.observeStatus(error.asGRPCStatus()) + } +} + +extension GRPCClientChannelHandler: ChannelInboundHandler { + public typealias InboundIn = GRPCClientResponsePart + + /// Reads inbound data. + /// + /// On receipt of: + /// - headers: the initial metadata promise is succeeded. + /// - message: the message observer is called with the message; for unary responses a response + /// promise is succeeded, otherwise a callback is called. + /// - status: the status promise is succeeded; if the status is not `ok` then the initial metadata + /// and response promise (if available) are failed with the status. The channel is then closed. + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + guard self.inboundState != .ignore else { return } + + switch unwrapInboundIn(data) { + case .headers(let headers): + guard self.inboundState == .expectingHeadersOrStatus else { + self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received headers while in state \(self.inboundState)"))) + return + } + + self.initialMetadataPromise.succeed(result: headers) + self.inboundState = .expectingMessageOrStatus + + case .message(let message): + guard self.inboundState == .expectingMessageOrStatus else { + self.errorCaught(ctx: ctx, error: GRPCError.client(.responseCardinalityViolation)) + return + } + + self.responseObserver.observe(message) + self.inboundState = self.responseObserver.expectsMultipleResponses ? .expectingMessageOrStatus : .expectingStatus + + case .status(let status): + guard self.inboundState.expectingStatus else { + self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received status while in state \(self.inboundState)"))) + return + } + + self.observeStatus(status) + + // We don't expect any more requests/responses beyond this point. + self.close(ctx: ctx, mode: .all, promise: nil) + } + } +} + +extension GRPCClientChannelHandler: ChannelOutboundHandler { + public typealias OutboundIn = GRPCClientRequestPart + public typealias OutboundOut = GRPCClientRequestPart + + public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + guard self.outboundState != .ignore else { return } + + switch self.unwrapOutboundIn(data) { + case .head: + guard self.outboundState == .expectingHead else { + self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received headers while in state \(self.outboundState)"))) + return + } + + // See the documentation for `requestHeadSentPromise` for an explanation of this. + self.requestHeadSentPromise = promise ?? ctx.eventLoop.newPromise() + ctx.write(data, promise: self.requestHeadSentPromise) + self.outboundState = .expectingMessageOrEnd + + default: + guard self.outboundState == .expectingMessageOrEnd else { + self.errorCaught(ctx: ctx, error: GRPCError.client(.invalidState("received message or end while in state \(self.outboundState)"))) + return + } + + ctx.write(data, promise: promise) + } + } +} + +extension GRPCClientChannelHandler { + /// Closes the HTTP/2 stream. Inbound and outbound state are set to ignore. + public func close(ctx: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { + self.observeError(GRPCError.client(.cancelledByClient)) + + requestHeadSentPromise.futureResult.whenComplete { + ctx.close(mode: mode, promise: promise) + } + + self.inboundState = .ignore + self.outboundState = .ignore + } + + /// Observe an error from the pipeline and close the channel. + public func errorCaught(ctx: ChannelHandlerContext, error: Error) { + //! TODO: Add an error handling delegate, similar to in the server. + self.observeError((error as? GRPCError) ?? GRPCError.unknown(error, origin: .client)) + ctx.close(mode: .all, promise: nil) + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCClientCodec.swift b/Sources/SwiftGRPCNIO/GRPCClientCodec.swift new file mode 100644 index 000000000..266f90b67 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCClientCodec.swift @@ -0,0 +1,91 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 +import SwiftProtobuf + +/// Outgoing gRPC package with a fixed message type. +public enum GRPCClientRequestPart { + case head(HTTPRequestHead) + case message(RequestMessage) + case end +} + +/// Incoming gRPC package with a fixed message type. +public enum GRPCClientResponsePart { + case headers(HTTPHeaders) + case message(ResponseMessage) + case status(GRPCStatus) +} + +/// This channel handler simply encodes and decodes protobuf messages into typed messages +/// and `Data`. +public final class GRPCClientCodec { + public init() {} +} + +extension GRPCClientCodec: ChannelInboundHandler { + public typealias InboundIn = RawGRPCClientResponsePart + public typealias InboundOut = GRPCClientResponsePart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + let response = self.unwrapInboundIn(data) + + switch response { + case .headers(let headers): + ctx.fireChannelRead(self.wrapInboundOut(.headers(headers))) + + case .message(var messageBuffer): + // Force unwrapping is okay here; we're reading the readable bytes. + let messageAsData = messageBuffer.readData(length: messageBuffer.readableBytes)! + do { + ctx.fireChannelRead(self.wrapInboundOut(.message(try ResponseMessage(serializedData: messageAsData)))) + } catch { + ctx.fireErrorCaught(GRPCError.client(.responseProtoDeserializationFailure)) + } + + case .status(let status): + ctx.fireChannelRead(self.wrapInboundOut(.status(status))) + } + } +} + +extension GRPCClientCodec: ChannelOutboundHandler { + public typealias OutboundIn = GRPCClientRequestPart + public typealias OutboundOut = RawGRPCClientRequestPart + + public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + let request = self.unwrapOutboundIn(data) + + switch request { + case .head(let head): + ctx.write(self.wrapOutboundOut(.head(head)), promise: promise) + + case .message(let message): + do { + ctx.write(self.wrapOutboundOut(.message(try message.serializedData())), promise: promise) + } catch { + let error = GRPCError.client(.requestProtoSerializationFailure) + promise?.fail(error: error) + ctx.fireErrorCaught(error) + } + + case .end: + ctx.write(self.wrapOutboundOut(.end), promise: promise) + } + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCError.swift b/Sources/SwiftGRPCNIO/GRPCError.swift new file mode 100644 index 000000000..d45feea89 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCError.swift @@ -0,0 +1,192 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIOHTTP1 + +/// Wraps a gRPC error to provide contextual information about where it was thrown. +public struct GRPCError: Error, GRPCStatusTransformable { + public enum Origin { case client, server } + + /// The underlying error thrown by framework. + public let error: Error + + /// The origin of the error. + public let origin: Origin + + /// The file in which the error was thrown. + public let file: StaticString + + /// The line number in the `file` where the error was thrown. + public let line: Int + + public func asGRPCStatus() -> GRPCStatus { + return (error as? GRPCStatusTransformable)?.asGRPCStatus() ?? GRPCStatus.processingError + } + + private init(_ error: Error, origin: Origin, file: StaticString, line: Int) { + self.error = error + self.origin = origin + self.file = file + self.line = line + } + + /// Creates a `GRPCError` which may only be thrown from the client. + public static func client(_ error: GRPCClientError, file: StaticString = #file, line: Int = #line) -> GRPCError { + return GRPCError(error, origin: .client, file: file, line: line) + } + + /// Creates a `GRPCError` which was thrown from the client. + public static func client(_ error: GRPCCommonError, file: StaticString = #file, line: Int = #line) -> GRPCError { + return GRPCError(error, origin: .client, file: file, line: line) + } + + /// Creates a `GRPCError` which may only be thrown from the server. + public static func server(_ error: GRPCServerError, file: StaticString = #file, line: Int = #line) -> GRPCError { + return GRPCError(error, origin: .server, file: file, line: line) + } + + /// Creates a `GRPCError` which was thrown from the server. + public static func server(_ error: GRPCCommonError, file: StaticString = #file, line: Int = #line) -> GRPCError { + return GRPCError(error, origin: .server, file: file, line: line) + } + + /// Creates a `GRPCError` which was may be thrown by either the server or the client. + public static func common(_ error: GRPCCommonError, origin: Origin, file: StaticString = #file, line: Int = #line) -> GRPCError { + return GRPCError(error, origin: origin, file: file, line: line) + } + + public static func unknown(_ error: Error, origin: Origin) -> GRPCError { + return GRPCError(error, origin: origin, file: "", line: 0) + } +} + +/// An error which should only be thrown by the server. +public enum GRPCServerError: Error, Equatable { + /// The RPC method is not implemented on the server. + case unimplementedMethod(String) + + /// It was not possible to decode a base64 message (gRPC-Web only). + case base64DecodeError + + /// It was not possible to deserialize the request protobuf. + case requestProtoDeserializationFailure + + /// It was not possible to serialize the response protobuf. + case responseProtoSerializationFailure + + /// More than one request was sent for a unary-request call. + case requestCardinalityViolation + + /// The server received a message when it was not in a writable state. + case serverNotWritable +} + +/// An error which should only be thrown by the client. +public enum GRPCClientError: Error, Equatable { + /// The response status was not "200 OK". + case HTTPStatusNotOk(HTTPResponseStatus) + + /// The call was cancelled by the client. + case cancelledByClient + + /// It was not possible to deserialize the response protobuf. + case responseProtoDeserializationFailure + + /// It was not possible to serialize the request protobuf. + case requestProtoSerializationFailure + + /// More than one response was received for a unary-response call. + case responseCardinalityViolation + + /// The call deadline was exceeded. + case deadlineExceeded(GRPCTimeout) +} + +/// An error which should be thrown by either the client or server. +public enum GRPCCommonError: Error, Equatable { + /// An invalid state has been reached; something has gone very wrong. + case invalidState(String) + + /// Compression was indicated in the "grpc-message-encoding" header but not in the gRPC message compression flag, or vice versa. + case unexpectedCompression + + /// The given compression mechanism is not supported. + case unsupportedCompressionMechanism(String) +} + +extension GRPCServerError: GRPCStatusTransformable { + public func asGRPCStatus() -> GRPCStatus { + // These status codes are informed by: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + switch self { + case .unimplementedMethod(let method): + return GRPCStatus(code: .unimplemented, message: "unknown method \(method)") + + case .base64DecodeError: + return GRPCStatus(code: .internalError, message: "could not decode base64 message") + + case .requestProtoDeserializationFailure: + return GRPCStatus(code: .internalError, message: "could not parse request proto") + + case .responseProtoSerializationFailure: + return GRPCStatus(code: .internalError, message: "could not serialize response proto") + + case .requestCardinalityViolation: + return GRPCStatus(code: .unimplemented, message: "request cardinality violation; method requires exactly one request but client sent more") + + case .serverNotWritable: + return GRPCStatus.processingError + } + } +} + +extension GRPCClientError: GRPCStatusTransformable { + public func asGRPCStatus() -> GRPCStatus { + switch self { + case .HTTPStatusNotOk(let status): + return GRPCStatus(code: .unknown, message: "server returned \(status.code) \(status.reasonPhrase)") + + case .cancelledByClient: + return GRPCStatus(code: .cancelled, message: "client cancelled the call") + + case .responseCardinalityViolation: + return GRPCStatus(code: .unimplemented, message: "response cardinality violation; method requires exactly one response but server sent more") + + case .responseProtoDeserializationFailure: + return GRPCStatus(code: .internalError, message: "could not parse response proto") + + case .requestProtoSerializationFailure: + return GRPCStatus(code: .internalError, message: "could not serialize request proto") + + case .deadlineExceeded(let timeout): + return GRPCStatus(code: .deadlineExceeded, message: "call exceeded timeout of \(timeout)") + } + } +} + +extension GRPCCommonError: GRPCStatusTransformable { + public func asGRPCStatus() -> GRPCStatus { + switch self { + case .invalidState: + return GRPCStatus.processingError + + case .unexpectedCompression: + return GRPCStatus(code: .unimplemented, message: "compression was enabled for this gRPC message but not for this call") + + case .unsupportedCompressionMechanism(let mechanism): + return GRPCStatus(code: .unimplemented, message: "unsupported compression mechanism \(mechanism)") + } + } +} diff --git a/Sources/SwiftGRPCNIO/GRPCServerCodec.swift b/Sources/SwiftGRPCNIO/GRPCServerCodec.swift index 9193c9ea9..5cc6e2104 100644 --- a/Sources/SwiftGRPCNIO/GRPCServerCodec.swift +++ b/Sources/SwiftGRPCNIO/GRPCServerCodec.swift @@ -5,16 +5,16 @@ import NIOFoundationCompat import NIOHTTP1 /// Incoming gRPC package with a fixed message type. -public enum GRPCServerRequestPart { +public enum GRPCServerRequestPart { case head(HTTPRequestHead) - case message(MessageType) + case message(RequestMessage) case end } /// Outgoing gRPC package with a fixed message type. -public enum GRPCServerResponsePart { +public enum GRPCServerResponsePart { case headers(HTTPHeaders) - case message(MessageType) + case message(ResponseMessage) case status(GRPCStatus) } @@ -35,7 +35,7 @@ extension GRPCServerCodec: ChannelInboundHandler { do { ctx.fireChannelRead(self.wrapInboundOut(.message(try RequestMessage(serializedData: messageAsData)))) } catch { - ctx.fireErrorCaught(GRPCServerError.requestProtoParseFailure) + ctx.fireErrorCaught(GRPCError.server(.requestProtoDeserializationFailure)) } case .end: @@ -57,17 +57,15 @@ extension GRPCServerCodec: ChannelOutboundHandler { case .message(let message): do { let messageData = try message.serializedData() - var responseBuffer = ctx.channel.allocator.buffer(capacity: messageData.count) - responseBuffer.write(bytes: messageData) - ctx.write(self.wrapOutboundOut(.message(responseBuffer)), promise: promise) + ctx.write(self.wrapOutboundOut(.message(messageData)), promise: promise) } catch { - let error = GRPCServerError.responseProtoSerializationFailure + let error = GRPCError.server(.responseProtoSerializationFailure) promise?.fail(error: error) ctx.fireErrorCaught(error) } case .status(let status): - ctx.write(self.wrapOutboundOut(.status(status)), promise: promise) + ctx.writeAndFlush(self.wrapOutboundOut(.status(status)), promise: promise) } } } diff --git a/Sources/SwiftGRPCNIO/GRPCServerError.swift b/Sources/SwiftGRPCNIO/GRPCServerError.swift deleted file mode 100644 index 309fe3ad7..000000000 --- a/Sources/SwiftGRPCNIO/GRPCServerError.swift +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation - -public enum GRPCServerError: Error, Equatable { - /// The RPC method is not implemented on the server. - case unimplementedMethod(String) - - /// It was not possible to decode a base64 message (gRPC-Web only). - case base64DecodeError - - /// It was not possible to parse the request protobuf. - case requestProtoParseFailure - - /// It was not possible to serialize the response protobuf. - case responseProtoSerializationFailure - - /// The given compression mechanism is not supported. - case unsupportedCompressionMechanism(String) - - /// Compression was indicated in the gRPC message, but not for the call. - case unexpectedCompression - - /// More than one request was sent for a unary-request call. - case requestCardinalityViolation - - /// The server received a message when it was not in a writable state. - case serverNotWritable - - /// An invalid state has been reached; something has gone very wrong. - case invalidState(String) -} - -extension GRPCServerError: GRPCStatusTransformable { - public func asGRPCStatus() -> GRPCStatus { - // These status codes are informed by: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md - switch self { - case .unimplementedMethod(let method): - return GRPCStatus(code: .unimplemented, message: "unknown method \(method)") - - case .base64DecodeError: - return GRPCStatus(code: .internalError, message: "could not decode base64 message") - - case .requestProtoParseFailure: - return GRPCStatus(code: .internalError, message: "could not parse request proto") - - case .responseProtoSerializationFailure: - return GRPCStatus(code: .internalError, message: "could not serialize response proto") - - case .unsupportedCompressionMechanism(let mechanism): - return GRPCStatus(code: .unimplemented, message: "unsupported compression mechanism \(mechanism)") - - case .unexpectedCompression: - return GRPCStatus(code: .unimplemented, message: "compression was enabled for this gRPC message but not for this call") - - case .requestCardinalityViolation: - return GRPCStatus(code: .unimplemented, message: "request cardinality violation; method requires exactly one request but client sent more") - - case .serverNotWritable, .invalidState: - return GRPCStatus.processingError - } - } -} diff --git a/Sources/SwiftGRPCNIO/GRPCStatus.swift b/Sources/SwiftGRPCNIO/GRPCStatus.swift index 7a023242c..7a7322051 100644 --- a/Sources/SwiftGRPCNIO/GRPCStatus.swift +++ b/Sources/SwiftGRPCNIO/GRPCStatus.swift @@ -6,18 +6,18 @@ public struct GRPCStatus: Error, Equatable { /// The code to return in the `grpc-status` header. public let code: StatusCode /// The message to return in the `grpc-message` header. - public let message: String + public let message: String? /// Additional HTTP headers to return in the trailers. public let trailingMetadata: HTTPHeaders - public init(code: StatusCode, message: String, trailingMetadata: HTTPHeaders = HTTPHeaders()) { + public init(code: StatusCode, message: String?, trailingMetadata: HTTPHeaders = HTTPHeaders()) { self.code = code self.message = message self.trailingMetadata = trailingMetadata } // Frequently used "default" statuses. - + /// The default status to return for succeeded calls. public static let ok = GRPCStatus(code: .ok, message: "OK") /// "Internal server error" status. diff --git a/Sources/SwiftGRPCNIO/GRPCTimeout.swift b/Sources/SwiftGRPCNIO/GRPCTimeout.swift new file mode 100644 index 000000000..013007ed2 --- /dev/null +++ b/Sources/SwiftGRPCNIO/GRPCTimeout.swift @@ -0,0 +1,142 @@ +import Foundation +import NIO + +public enum GRPCTimeoutError: String, Error { + case negative = "GRPCTimeout must be non-negative" + case tooManyDigits = "GRPCTimeout must be at most 8 digits" +} + +/// A timeout for a gRPC call. +/// +/// Timeouts must be positive and at most 8-digits long. +public struct GRPCTimeout: CustomStringConvertible, Equatable { + public static let `default`: GRPCTimeout = try! .minutes(1) + /// Creates an infinite timeout. This is a sentinel value which must __not__ be sent to a gRPC service. + public static let infinite: GRPCTimeout = GRPCTimeout(nanoseconds: Int64.max, description: "infinite") + + /// A description of the timeout in the format described in the + /// [gRPC protocol](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md). + public let description: String + public let nanoseconds: Int64 + + private init(nanoseconds: Int64, description: String) { + self.nanoseconds = nanoseconds + self.description = description + } + + private static func makeTimeout(_ amount: Int, _ unit: GRPCTimeoutUnit) throws -> GRPCTimeout { + // Timeouts must be positive and at most 8-digits. + if amount < 0 { throw GRPCTimeoutError.negative } + if amount >= 100_000_000 { throw GRPCTimeoutError.tooManyDigits } + + // See "Timeout" in https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + let description = "\(amount) \(unit.rawValue)" + let nanoseconds = Int64(amount) * Int64(unit.asNanoseconds) + + return GRPCTimeout(nanoseconds: nanoseconds, description: description) + } + + /// Creates a new GRPCTimeout for the given amount of hours. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of hours this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of hours. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func hours(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .hours) + } + + /// Creates a new GRPCTimeout for the given amount of minutes. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of minutes this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of minutes. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func minutes(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .minutes) + } + + /// Creates a new GRPCTimeout for the given amount of seconds. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of seconds this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of seconds. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func seconds(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .seconds) + } + + /// Creates a new GRPCTimeout for the given amount of milliseconds. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of milliseconds this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of milliseconds. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func milliseconds(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .milliseconds) + } + + /// Creates a new GRPCTimeout for the given amount of microseconds. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of microseconds this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of microseconds. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func microseconds(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .microseconds) + } + + /// Creates a new GRPCTimeout for the given amount of nanoseconds. + /// + /// `amount` must be positive and at most 8-digits. + /// + /// - Parameter amount: the amount of nanoseconds this `GRPCTimeout` represents. + /// - Returns: A `GRPCTimeout` representing the given number of nanoseconds. + /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. + public static func nanoseconds(_ amount: Int) throws -> GRPCTimeout { + return try makeTimeout(amount, .nanoseconds) + } +} + +extension GRPCTimeout { + /// Returns a NIO `TimeAmount` representing the amount of time as this timeout. + public var asNIOTimeAmount: TimeAmount { + return TimeAmount.nanoseconds(numericCast(nanoseconds)) + } +} + +private enum GRPCTimeoutUnit: String { + case hours = "H" + case minutes = "M" + case seconds = "S" + case milliseconds = "m" + case microseconds = "u" + case nanoseconds = "n" + + internal var asNanoseconds: Int { + switch self { + case .hours: + return 60 * 60 * 1000 * 1000 * 1000 + + case .minutes: + return 60 * 1000 * 1000 * 1000 + + case .seconds: + return 1000 * 1000 * 1000 + + case .milliseconds: + return 1000 * 1000 + + case .microseconds: + return 1000 + + case .nanoseconds: + return 1 + } + } +} diff --git a/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCClientCodec.swift b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCClientCodec.swift new file mode 100644 index 000000000..1886ddc7c --- /dev/null +++ b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCClientCodec.swift @@ -0,0 +1,159 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 + +/// Outgoing gRPC package with an unknown message type (represented as the serialized protobuf message). +public enum RawGRPCClientRequestPart { + case head(HTTPRequestHead) + case message(Data) + case end +} + +/// Incoming gRPC package with an unknown message type (represented by a byte buffer). +public enum RawGRPCClientResponsePart { + case headers(HTTPHeaders) + case message(ByteBuffer) + case status(GRPCStatus) +} + +/// Codec for translating HTTP/1 responses from the server into untyped gRPC packages +/// and vice-versa. +/// +/// Most of the inbound processing is done by `LengthPrefixedMessageReader`; which +/// reads length-prefxied gRPC messages into `ByteBuffer`s containing serialized +/// Protobuf messages. +/// +/// The outbound processing transforms serialized Protobufs into length-prefixed +/// gRPC messages stored in `ByteBuffer`s. +/// +/// See `HTTP1ToRawGRPCServerCodec` for the corresponding server codec. +public final class HTTP1ToRawGRPCClientCodec { + public init() {} + + private enum State { + case expectingHeaders + case expectingBodyOrTrailers + case ignore + } + + private var state: State = .expectingHeaders + private let messageReader = LengthPrefixedMessageReader(mode: .client) + private let messageWriter = LengthPrefixedMessageWriter() + private var inboundCompression: CompressionMechanism = .none +} + +extension HTTP1ToRawGRPCClientCodec: ChannelInboundHandler { + public typealias InboundIn = HTTPClientResponsePart + public typealias InboundOut = RawGRPCClientResponsePart + + public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + if case .ignore = state { return } + + do { + switch self.unwrapInboundIn(data) { + case .head(let head): + state = try processHead(ctx: ctx, head: head) + + case .body(var message): + state = try processBody(ctx: ctx, messageBuffer: &message) + + case .end(let trailers): + state = try processTrailers(ctx: ctx, trailers: trailers) + } + } catch { + ctx.fireErrorCaught(error) + state = .ignore + } + } + + /// Forwards the headers from the request head to the next handler. + /// + /// - note: Requires the `.expectingHeaders` state. + private func processHead(ctx: ChannelHandlerContext, head: HTTPResponseHead) throws -> State { + guard case .expectingHeaders = state else { + throw GRPCError.client(.invalidState("received headers while in state \(state)")) + } + + guard head.status == .ok else { + throw GRPCError.client(.HTTPStatusNotOk(head.status)) + } + + if let encodingType = head.headers["grpc-encoding"].first { + self.inboundCompression = CompressionMechanism(rawValue: encodingType) ?? .unknown + } + + guard inboundCompression.supported else { + throw GRPCError.client(.unsupportedCompressionMechanism(inboundCompression.rawValue)) + } + + ctx.fireChannelRead(self.wrapInboundOut(.headers(head.headers))) + return .expectingBodyOrTrailers + } + + /// Processes the given buffer; if a complete message is read then it is forwarded to the + /// next channel handler. + /// + /// - note: Requires the `.expectingBodyOrTrailers` state. + private func processBody(ctx: ChannelHandlerContext, messageBuffer: inout ByteBuffer) throws -> State { + guard case .expectingBodyOrTrailers = state else { + throw GRPCError.client(.invalidState("received body while in state \(state)")) + } + + for message in try self.messageReader.consume(messageBuffer: &messageBuffer, compression: inboundCompression) { + ctx.fireChannelRead(self.wrapInboundOut(.message(message))) + } + + return .expectingBodyOrTrailers + } + + /// Forwards a `GRPCStatus` to the next handler. The status and message are extracted + /// from the trailers if they exist; the `.unknown` status code is used if no status exists. + private func processTrailers(ctx: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> State { + guard case .expectingBodyOrTrailers = state else { + throw GRPCError.client(.invalidState("received trailers while in state \(state)")) + } + + let statusCode = trailers?["grpc-status"].first + .flatMap { Int($0) } + .flatMap { StatusCode(rawValue: $0) } + let statusMessage = trailers?["grpc-message"].first + + ctx.fireChannelRead(wrapInboundOut(.status(GRPCStatus(code: statusCode ?? .unknown, message: statusMessage)))) + return .ignore + } +} + +extension HTTP1ToRawGRPCClientCodec: ChannelOutboundHandler { + public typealias OutboundIn = RawGRPCClientRequestPart + public typealias OutboundOut = HTTPClientRequestPart + + public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + switch self.unwrapOutboundIn(data) { + case .head(let requestHead): + ctx.write(self.wrapOutboundOut(.head(requestHead)), promise: promise) + + case .message(let message): + var request = ctx.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength) + messageWriter.write(message, into: &request, usingCompression: .none) + ctx.write(self.wrapOutboundOut(.body(.byteBuffer(request))), promise: promise) + + case .end: + ctx.write(self.wrapOutboundOut(.end(nil)), promise: promise) + } + } +} diff --git a/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift index 2b502398a..bcf202c33 100644 --- a/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift +++ b/Sources/SwiftGRPCNIO/HTTP1ToRawGRPCServerCodec.swift @@ -10,10 +10,10 @@ public enum RawGRPCServerRequestPart { case end } -/// Outgoing gRPC package with an unknown message type (represented by a byte buffer). +/// Outgoing gRPC package with an unknown message type (represented by `Data`). public enum RawGRPCServerResponsePart { case headers(HTTPHeaders) - case message(ByteBuffer) + case message(Data) case status(GRPCStatus) } @@ -51,6 +51,9 @@ public final class HTTP1ToRawGRPCServerCodec { var inboundState = InboundState.expectingHeaders var outboundState = OutboundState.expectingHeaders + + var messageWriter = LengthPrefixedMessageWriter() + var messageReader = LengthPrefixedMessageReader(mode: .server) } extension HTTP1ToRawGRPCServerCodec { @@ -66,15 +69,9 @@ extension HTTP1ToRawGRPCServerCodec { enum InboundState { case expectingHeaders - case expectingBody(Body) + case expectingBody // ignore any additional messages; e.g. we've seen .end or we've sent an error and are waiting for the stream to close. case ignore - - enum Body { - case expectingCompressedFlag - case expectingMessageLength - case expectingMoreMessageBytes(UInt32) - } } enum OutboundState { @@ -110,7 +107,7 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler { func processHead(ctx: ChannelHandlerContext, requestHead: HTTPRequestHead) throws -> InboundState { guard case .expectingHeaders = inboundState else { - throw GRPCServerError.invalidState("expecteded state .expectingHeaders, got \(inboundState)") + throw GRPCError.server(.invalidState("expecteded state .expectingHeaders, got \(inboundState)")) } if let contentTypeHeader = requestHead.headers["content-type"].first { @@ -125,12 +122,12 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler { } ctx.fireChannelRead(self.wrapInboundOut(.head(requestHead))) - return .expectingBody(.expectingCompressedFlag) + return .expectingBody } func processBody(ctx: ChannelHandlerContext, body: inout ByteBuffer) throws -> InboundState { - guard case .expectingBody(let bodyState) = inboundState else { - throw GRPCServerError.invalidState("expecteded state .expectingBody(_), got \(inboundState)") + guard case .expectingBody = inboundState else { + throw GRPCError.server(.invalidState("expecteded state .expectingBody, got \(inboundState)")) } // If the contentType is text, then decode the incoming bytes as base64 encoded, and append @@ -145,72 +142,22 @@ extension HTTP1ToRawGRPCServerCodec: ChannelInboundHandler { let readyBytes = requestTextBuffer.readableBytes - (requestTextBuffer.readableBytes % 4) guard let base64Encoded = requestTextBuffer.readString(length: readyBytes), let decodedData = Data(base64Encoded: base64Encoded) else { - throw GRPCServerError.base64DecodeError + throw GRPCError.server(.base64DecodeError) } body.write(bytes: decodedData) } - return .expectingBody(try processBodyState(ctx: ctx, initialState: bodyState, messageBuffer: &body)) - } - - func processBodyState(ctx: ChannelHandlerContext, initialState: InboundState.Body, messageBuffer: inout ByteBuffer) throws -> InboundState.Body { - var bodyState = initialState - - // Iterate over all available incoming data, trying to read length-delimited messages. - // Each message has the following format: - // - 1 byte "compressed" flag (currently always zero, as we do not support for compression) - // - 4 byte signed-integer payload length (N) - // - N bytes payload (normally a valid wire-format protocol buffer) - while true { - switch bodyState { - case .expectingCompressedFlag: - guard let compressedFlag: Int8 = messageBuffer.readInteger() else { return .expectingCompressedFlag } - - // TODO: Add support for compression. - guard compressedFlag == 0 else { throw GRPCServerError.unexpectedCompression } - - bodyState = .expectingMessageLength - - case .expectingMessageLength: - guard let messageLength: UInt32 = messageBuffer.readInteger() else { return .expectingMessageLength } - bodyState = .expectingMoreMessageBytes(messageLength) - - case .expectingMoreMessageBytes(let bytesOutstanding): - // We need to account for messages being spread across multiple `ByteBuffer`s so buffer them - // into `buffer`. Note: when messages are contained within a single `ByteBuffer` we're just - // taking a slice so don't incur any extra writes. - guard messageBuffer.readableBytes >= bytesOutstanding else { - let remainingBytes = bytesOutstanding - numericCast(messageBuffer.readableBytes) - - if self.binaryRequestBuffer != nil { - self.binaryRequestBuffer.write(buffer: &messageBuffer) - } else { - messageBuffer.reserveCapacity(numericCast(bytesOutstanding)) - self.binaryRequestBuffer = messageBuffer - } - return .expectingMoreMessageBytes(remainingBytes) - } - - // We know buffer.readableBytes >= messageLength, so it's okay to force unwrap here. - var slice = messageBuffer.readSlice(length: numericCast(bytesOutstanding))! - - if self.binaryRequestBuffer != nil { - self.binaryRequestBuffer.write(buffer: &slice) - ctx.fireChannelRead(self.wrapInboundOut(.message(self.binaryRequestBuffer))) - } else { - ctx.fireChannelRead(self.wrapInboundOut(.message(slice))) - } - - self.binaryRequestBuffer = nil - bodyState = .expectingCompressedFlag - } + for message in try messageReader.consume(messageBuffer: &body, compression: .none) { + ctx.fireChannelRead(self.wrapInboundOut(.message(message))) } + + return .expectingBody } private func processEnd(ctx: ChannelHandlerContext, trailers: HTTPHeaders?) throws -> InboundState { if let trailers = trailers { - throw GRPCServerError.invalidState("unexpected trailers received \(trailers)") + throw GRPCError.server(.invalidState("unexpected trailers received \(trailers)")) } ctx.fireChannelRead(self.wrapInboundOut(.end)) @@ -244,25 +191,30 @@ extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler { ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: version, status: .ok, headers: headers))), promise: promise) outboundState = .expectingBodyOrStatus - case .message(var messageBytes): + case .message(let messageBytes): guard case .expectingBodyOrStatus = outboundState else { return } - // Write out a length-delimited message payload. See `processBodyState` for the corresponding format. - var responseBuffer = ctx.channel.allocator.buffer(capacity: messageBytes.readableBytes + protobufMetadataSize) - responseBuffer.write(integer: Int8(0)) // Compression flag: no compression - responseBuffer.write(integer: UInt32(messageBytes.readableBytes)) - responseBuffer.write(buffer: &messageBytes) - if contentType == .text { - precondition(responseTextBuffer != nil) + precondition(self.responseTextBuffer != nil) + // Store the response into an independent buffer. We can't return the message directly as // it needs to be aggregated with all the responses plus the trailers, in order to have // the base64 response properly encoded in a single byte stream. - responseTextBuffer!.write(buffer: &responseBuffer) + #if swift(>=4.2) + messageWriter.write(messageBytes, into: &self.responseTextBuffer, usingCompression: .none) + #else + // Write into a temporary buffer to avoid: "error: cannot pass immutable value as inout argument: 'self' is immutable" + var responseBuffer = ctx.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength) + messageWriter.write(messageBytes, into: &responseBuffer, usingCompression: .none) + responseTextBuffer.write(buffer: &responseBuffer) + #endif + // Since we stored the written data, mark the write promise as successful so that the // ServerStreaming provider continues sending the data. promise?.succeed(result: Void()) } else { + var responseBuffer = ctx.channel.allocator.buffer(capacity: LengthPrefixedMessageWriter.metadataLength) + messageWriter.write(messageBytes, into: &responseBuffer, usingCompression: .none) ctx.write(self.wrapOutboundOut(.body(.byteBuffer(responseBuffer))), promise: promise) } outboundState = .expectingBodyOrStatus @@ -277,7 +229,9 @@ extension HTTP1ToRawGRPCServerCodec: ChannelOutboundHandler { var trailers = status.trailingMetadata trailers.add(name: "grpc-status", value: String(describing: status.code.rawValue)) - trailers.add(name: "grpc-message", value: status.message) + if let message = status.message { + trailers.add(name: "grpc-message", value: message) + } if contentType == .text { precondition(responseTextBuffer != nil) diff --git a/Sources/SwiftGRPCNIO/LengthPrefixedMessageReader.swift b/Sources/SwiftGRPCNIO/LengthPrefixedMessageReader.swift new file mode 100644 index 000000000..b477eec08 --- /dev/null +++ b/Sources/SwiftGRPCNIO/LengthPrefixedMessageReader.swift @@ -0,0 +1,148 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO +import NIOHTTP1 + +/// This class reads and decodes length-prefixed gRPC messages. +/// +/// Messages are expected to be in the following format: +/// - compression flag: 0/1 as a 1-byte unsigned integer, +/// - message length: length of the message as a 4-byte unsigned integer, +/// - message: `message_length` bytes. +/// +/// Messages may span multiple `ByteBuffer`s, and `ByteBuffer`s may contain multiple +/// length-prefixed messages. +/// +/// - SeeAlso: +/// [gRPC Protocol](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) +public class LengthPrefixedMessageReader { + public typealias Mode = GRPCError.Origin + + private let mode: Mode + private var buffer: ByteBuffer! + private var state: State = .expectingCompressedFlag + + private enum State { + case expectingCompressedFlag + case expectingMessageLength + case receivedMessageLength(Int) + case willBuffer(requiredBytes: Int) + case isBuffering(requiredBytes: Int) + } + + public init(mode: Mode) { + self.mode = mode + } + + /// Consumes all readable bytes from given buffer and returns all messages which could be read. + /// + /// - SeeAlso: `read(messageBuffer:compression:)` + public func consume(messageBuffer: inout ByteBuffer, compression: CompressionMechanism) throws -> [ByteBuffer] { + var messages: [ByteBuffer] = [] + + while messageBuffer.readableBytes > 0 { + if let message = try self.read(messageBuffer: &messageBuffer, compression: compression) { + messages.append(message) + } + } + + return messages + } + + /// Reads bytes from the given buffer until it is exhausted or a message has been read. + /// + /// Length prefixed messages may be split across multiple input buffers in any of the + /// following places: + /// 1. after the compression flag, + /// 2. after the message length field, + /// 3. at any point within the message. + /// + /// It is possible for the message length field to be split across multiple `ByteBuffer`s, + /// this is unlikely to happen in practice. + /// + /// - Note: + /// This method relies on state; if a message is _not_ returned then the next time this + /// method is called it expects to read the bytes which follow the most recently read bytes. + /// + /// - Parameters: + /// - messageBuffer: buffer to read from. + /// - compression: compression mechanism to decode message with. + /// - Returns: A buffer containing a message if one has been read, or `nil` if not enough + /// bytes have been consumed to return a message. + /// - Throws: Throws an error if the compression algorithm is not supported. + public func read(messageBuffer: inout ByteBuffer, compression: CompressionMechanism) throws -> ByteBuffer? { + while true { + switch state { + case .expectingCompressedFlag: + guard let compressionFlag: Int8 = messageBuffer.readInteger() else { return nil } + try handleCompressionFlag(enabled: compressionFlag != 0, mechanism: compression) + self.state = .expectingMessageLength + + case .expectingMessageLength: + //! FIXME: Support the message length being split across multiple byte buffers. + guard let messageLength: UInt32 = messageBuffer.readInteger() else { return nil } + self.state = .receivedMessageLength(numericCast(messageLength)) + + case .receivedMessageLength(let messageLength): + // If this holds true, we can skip buffering and return a slice. + guard messageLength <= messageBuffer.readableBytes else { + self.state = .willBuffer(requiredBytes: messageLength) + continue + } + + self.state = .expectingCompressedFlag + // We know messageBuffer.readableBytes >= messageLength, so it's okay to force unwrap here. + return messageBuffer.readSlice(length: messageLength)! + + case .willBuffer(let requiredBytes): + messageBuffer.reserveCapacity(requiredBytes) + self.buffer = messageBuffer + + let readableBytes = messageBuffer.readableBytes + // Move the reader index to avoid reading the bytes again. + messageBuffer.moveReaderIndex(forwardBy: readableBytes) + + self.state = .isBuffering(requiredBytes: requiredBytes - readableBytes) + return nil + + case .isBuffering(let requiredBytes): + guard requiredBytes <= messageBuffer.readableBytes else { + self.state = .isBuffering(requiredBytes: requiredBytes - self.buffer.write(buffer: &messageBuffer)) + return nil + } + + // We know messageBuffer.readableBytes >= requiredBytes, so it's okay to force unwrap here. + var slice = messageBuffer.readSlice(length: requiredBytes)! + self.buffer.write(buffer: &slice) + self.state = .expectingCompressedFlag + + defer { self.buffer = nil } + return buffer + } + } + } + + private func handleCompressionFlag(enabled flagEnabled: Bool, mechanism: CompressionMechanism) throws { + guard flagEnabled == mechanism.requiresFlag else { + throw GRPCError.common(.unexpectedCompression, origin: mode) + } + + guard mechanism.supported else { + throw GRPCError.common(.unsupportedCompressionMechanism(mechanism.rawValue), origin: mode) + } + } +} diff --git a/Sources/SwiftGRPCNIO/LengthPrefixedMessageWriter.swift b/Sources/SwiftGRPCNIO/LengthPrefixedMessageWriter.swift new file mode 100644 index 000000000..77ee4bd8b --- /dev/null +++ b/Sources/SwiftGRPCNIO/LengthPrefixedMessageWriter.swift @@ -0,0 +1,41 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import NIO + +public class LengthPrefixedMessageWriter { + public static let metadataLength = 5 + + /// Writes the data into a `ByteBuffer` as a gRPC length-prefixed message. + /// + /// - Parameters: + /// - message: The serialized Protobuf message to write. + /// - buffer: The buffer to write the message into. + /// - compression: Compression mechanism to use; the mechansim must be supported. + /// - Returns: A `ByteBuffer` containing a gRPC length-prefixed message. + /// - Precondition: `compression.supported` is `true`. + /// - Note: See `LengthPrefixedMessageReader` for more details on the format. + func write(_ message: Data, into buffer: inout ByteBuffer, usingCompression compression: CompressionMechanism) { + precondition(compression.supported, "compression mechanism \(compression) is not supported") + + buffer.reserveCapacity(LengthPrefixedMessageWriter.metadataLength + message.count) + + //! TODO: Add compression support, use the length and compressed content. + buffer.write(integer: Int8(compression.requiresFlag ? 1 : 0)) + buffer.write(integer: UInt32(message.count)) + buffer.write(bytes: message) + } +} diff --git a/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift b/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift index 2f329842e..635aac805 100644 --- a/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift +++ b/Sources/SwiftGRPCNIO/ServerCallContexts/UnaryResponseCallContext.swift @@ -3,7 +3,7 @@ import SwiftProtobuf import NIO import NIOHTTP1 -/// Abstract base class exposing a method that exposes a promise fot the RPC response. +/// Abstract base class exposing a method that exposes a promise for the RPC response. /// /// - When `responsePromise` is fulfilled, the call is closed and the provided response transmitted with status `responseStatus` (`.ok` by default). /// - If `statusPromise` is failed and the error is of type `GRPCStatus`, that error will be returned to the client. diff --git a/Sources/SwiftGRPCNIO/ServerErrorDelegate.swift b/Sources/SwiftGRPCNIO/ServerErrorDelegate.swift index 83521a6e3..efb3f0ccb 100644 --- a/Sources/SwiftGRPCNIO/ServerErrorDelegate.swift +++ b/Sources/SwiftGRPCNIO/ServerErrorDelegate.swift @@ -17,7 +17,7 @@ import Foundation import NIO public protocol ServerErrorDelegate: class { - //! FIXME: Provide more context about where the error was thrown. + //! FIXME: Provide more context about where the error was thrown, i.e. using `GRPCError`. /// Called when an error is thrown in the channel pipeline. func observe(_ error: Error) diff --git a/Sources/protoc-gen-swiftgrpc/Generator-Client.swift b/Sources/protoc-gen-swiftgrpc/Generator-Client.swift index 66b6d383a..d3cf3e269 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator-Client.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator-Client.swift @@ -20,6 +20,16 @@ import SwiftProtobufPluginLibrary extension Generator { internal func printClient(asynchronousCode: Bool, synchronousCode: Bool) { + if options.generateNIOImplementation { + printNIOGRPCClient() + } else { + printCGRPCClient(asynchronousCode: asynchronousCode, + synchronousCode: synchronousCode) + } + } + + private func printCGRPCClient(asynchronousCode: Bool, + synchronousCode: Bool) { for method in service.methods { self.method = method switch streamingType(method) { @@ -367,4 +377,141 @@ extension Generator { outdent() println("}") } + + private func printNIOGRPCClient() { + println() + printNIOServiceClientProtocol() + println() + printNIOServiceClientImplementation() + } + + private func printNIOServiceClientProtocol() { + println("/// Usage: instantiate \(serviceClassName)Client, then call methods of this protocol to make API calls.") + println("\(options.visibility.sourceSnippet) protocol \(serviceClassName) {") + indent() + for method in service.methods { + self.method = method + switch streamingType(method) { + case .unary: + println("func \(methodFunctionName)(_ request: \(methodInputName), callOptions: CallOptions?) -> UnaryClientCall<\(methodInputName), \(methodOutputName)>") + + case .serverStreaming: + println("func \(methodFunctionName)(_ request: \(methodInputName), callOptions: CallOptions?, handler: @escaping (\(methodOutputName)) -> Void) -> ServerStreamingClientCall<\(methodInputName), \(methodOutputName)>") + + case .clientStreaming: + println("func \(methodFunctionName)(callOptions: CallOptions?) -> ClientStreamingClientCall<\(methodInputName), \(methodOutputName)>") + + case .bidirectionalStreaming: + println("func \(methodFunctionName)(callOptions: CallOptions?, handler: @escaping (\(methodOutputName)) -> Void) -> BidirectionalStreamingClientCall<\(methodInputName), \(methodOutputName)>") + } + } + outdent() + println("}") + } + + private func printNIOServiceClientImplementation() { + println("\(access) final class \(serviceClassName)Client: GRPCServiceClient, \(serviceClassName) {") + indent() + println("\(access) let client: GRPCClient") + println("\(access) let service = \"\(servicePath)\"") + println("\(access) var defaultCallOptions: CallOptions") + println() + println("/// Creates a client for the \(servicePath) service.") + println("///") + printParameters() + println("/// - client: `GRPCClient` with a connection to the service host.") + println("/// - defaultCallOptions: Options to use for each service call if the user doesn't provide them. Defaults to `client.defaultCallOptions`.") + println("\(access) init(client: GRPCClient, defaultCallOptions: CallOptions? = nil) {") + indent() + println("self.client = client") + println("self.defaultCallOptions = defaultCallOptions ?? client.defaultCallOptions") + outdent() + println("}") + println() + + for method in service.methods { + self.method = method + switch streamingType(method) { + case .unary: + println("/// Asynchronous unary call to \(method.name).") + println("///") + printParameters() + printRequestParameter() + printCallOptionsParameter() + println("/// - Returns: A `UnaryClientCall` with futures for the metadata, status and response.") + println("\(access) func \(methodFunctionName)(_ request: \(methodInputName), callOptions: CallOptions? = nil) -> UnaryClientCall<\(methodInputName), \(methodOutputName)> {") + indent() + println("return UnaryClientCall(client: client, path: path(forMethod: \"\(method.name)\"), request: request, callOptions: callOptions ?? self.defaultCallOptions)") + outdent() + println("}") + + case .serverStreaming: + println("/// Asynchronous server-streaming call to \(method.name).") + println("///") + printParameters() + printRequestParameter() + printCallOptionsParameter() + printHandlerParameter() + println("/// - Returns: A `ServerStreamingClientCall` with futures for the metadata and status.") + println("\(access) func \(methodFunctionName)(_ request: \(methodInputName), callOptions: CallOptions? = nil, handler: @escaping (\(methodOutputName)) -> Void) -> ServerStreamingClientCall<\(methodInputName), \(methodOutputName)> {") + indent() + println("return ServerStreamingClientCall(client: client, path: path(forMethod: \"\(method.name)\"), request: request, callOptions: callOptions ?? self.defaultCallOptions, handler: handler)") + outdent() + println("}") + + case .clientStreaming: + println("/// Asynchronous client-streaming call to \(method.name).") + println("///") + printClientStreamingDetails() + println("///") + printParameters() + printCallOptionsParameter() + println("/// - Returns: A `ClientStreamingClientCall` with futures for the metadata, status and response.") + println("\(access) func \(methodFunctionName)(callOptions: CallOptions? = nil) -> ClientStreamingClientCall<\(methodInputName), \(methodOutputName)> {") + indent() + println("return ClientStreamingClientCall(client: client, path: path(forMethod: \"\(method.name)\"), callOptions: callOptions ?? self.defaultCallOptions)") + outdent() + println("}") + + case .bidirectionalStreaming: + println("/// Asynchronous bidirectional-streaming call to \(method.name).") + println("///") + printClientStreamingDetails() + println("///") + printParameters() + printCallOptionsParameter() + printHandlerParameter() + println("/// - Returns: A `ClientStreamingClientCall` with futures for the metadata and status.") + println("\(access) func \(methodFunctionName)(callOptions: CallOptions? = nil, handler: @escaping (\(methodOutputName)) -> Void) -> BidirectionalStreamingClientCall<\(methodInputName), \(methodOutputName)> {") + indent() + println("return BidirectionalStreamingClientCall(client: client, path: path(forMethod: \"\(method.name)\"), callOptions: callOptions ?? self.defaultCallOptions, handler: handler)") + outdent() + println("}") + } + println() + } + outdent() + println("}") + } + + private func printClientStreamingDetails() { + println("/// Callers should use the `send` method on the returned object to send messages") + println("/// to the server. The caller should send an `.end` after the final message has been sent.") + } + + private func printParameters() { + println("/// - Parameters:") + } + + private func printRequestParameter() { + println("/// - request: Request to send to \(method.name).") + } + + private func printCallOptionsParameter() { + println("/// - callOptions: Call options; `self.defaultCallOptions` is used if `nil`.") + } + + private func printHandlerParameter() { + println("/// - handler: A closure called when each response is received from the server.") + } } diff --git a/Sources/protoc-gen-swiftgrpc/Generator-Names.swift b/Sources/protoc-gen-swiftgrpc/Generator-Names.swift index 6fe3ddea1..34abbb73c 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator-Names.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator-Names.swift @@ -44,7 +44,11 @@ extension Generator { } internal var serviceClassName: String { - return nameForPackageService(file, service) + "Service" + if options.generateNIOImplementation { + return nameForPackageService(file, service) + "Service_NIO" + } else { + return nameForPackageService(file, service) + "Service" + } } internal var providerName: String { diff --git a/Sources/protoc-gen-swiftgrpc/Generator.swift b/Sources/protoc-gen-swiftgrpc/Generator.swift index bff68b30b..e722e1f70 100644 --- a/Sources/protoc-gen-swiftgrpc/Generator.swift +++ b/Sources/protoc-gen-swiftgrpc/Generator.swift @@ -105,9 +105,7 @@ class Generator { } println() - if options.generateClient { - guard !options.generateNIOImplementation else { fatalError("Generating client code is not yet supported for SwiftGRPC-NIO.") } - + if options.generateClient { for service in file.services { self.service = service printClient(asynchronousCode: options.generateAsynchronous, diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 91d0a9807..f1c83e9d0 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -38,6 +38,8 @@ XCTMain([ // SwiftGRPCNIO testCase(NIOServerTests.allTests), + testCase(NIOClientCancellingTests.allTests), + testCase(NIOClientTimeoutTests.allTests), testCase(NIOServerWebTests.allTests), testCase(GRPCChannelHandlerTests.allTests), testCase(HTTP1ToRawGRPCServerCodecTests.allTests) diff --git a/Tests/SwiftGRPCNIOTests/EchoProvider.swift b/Tests/SwiftGRPCNIOTests/EchoProvider.swift deleted file mode 120000 index 7d14bfe94..000000000 --- a/Tests/SwiftGRPCNIOTests/EchoProvider.swift +++ /dev/null @@ -1 +0,0 @@ -../../Sources/Examples/Echo/EchoProvider.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/EchoProviderNIO.swift b/Tests/SwiftGRPCNIOTests/EchoProviderNIO.swift new file mode 120000 index 000000000..81e19ce11 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/EchoProviderNIO.swift @@ -0,0 +1 @@ +../../Sources/Examples/EchoNIO/EchoProviderNIO.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerResponseCapturingTestCase.swift b/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerResponseCapturingTestCase.swift index 0999801fd..e1db681cd 100644 --- a/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerResponseCapturingTestCase.swift +++ b/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerResponseCapturingTestCase.swift @@ -15,13 +15,25 @@ class CollectingChannelHandler: ChannelOutboundHandler { class CollectingServerErrorDelegate: ServerErrorDelegate { var errors: [Error] = [] + var asGRPCErrors: [GRPCError]? { + return self.errors as? [GRPCError] + } + + var asGRPCServerErrors: [GRPCServerError]? { + return (self.asGRPCErrors?.map { $0.error }) as? [GRPCServerError] + } + + var asGRPCCommonErrors: [GRPCCommonError]? { + return (self.asGRPCErrors?.map { $0.error }) as? [GRPCCommonError] + } + func observe(_ error: Error) { self.errors.append(error) } } class GRPCChannelHandlerResponseCapturingTestCase: XCTestCase { - static let echoProvider: [String: CallHandlerProvider] = ["echo.Echo": EchoProvider_NIO()] + static let echoProvider: [String: CallHandlerProvider] = ["echo.Echo": EchoProviderNIO()] class var defaultServiceProvider: [String: CallHandlerProvider] { return echoProvider } diff --git a/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerTests.swift b/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerTests.swift index b97c3f49f..a0f4f68ac 100644 --- a/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerTests.swift +++ b/Tests/SwiftGRPCNIOTests/GRPCChannelHandlerTests.swift @@ -20,7 +20,7 @@ class GRPCChannelHandlerTests: GRPCChannelHandlerResponseCapturingTestCase { } let expectedError = GRPCServerError.unimplementedMethod("unimplementedMethodName") - XCTAssertEqual([expectedError], errorCollector.errors as? [GRPCServerError]) + XCTAssertEqual([expectedError], errorCollector.asGRPCServerErrors) XCTAssertNoThrow(try extractStatus(responses[0])) { status in XCTAssertEqual(status, expectedError.asGRPCStatus()) @@ -56,8 +56,8 @@ class GRPCChannelHandlerTests: GRPCChannelHandlerResponseCapturingTestCase { try channel.writeInbound(RawGRPCServerRequestPart.message(buffer)) } - let expectedError = GRPCServerError.requestProtoParseFailure - XCTAssertEqual([expectedError], errorCollector.errors as? [GRPCServerError]) + let expectedError = GRPCServerError.requestProtoDeserializationFailure + XCTAssertEqual([expectedError], errorCollector.asGRPCServerErrors) XCTAssertNoThrow(try extractHeaders(responses[0])) XCTAssertNoThrow(try extractStatus(responses[1])) { status in diff --git a/Tests/SwiftGRPCNIOTests/HTTP1ToRawGRPCServerCodecTests.swift b/Tests/SwiftGRPCNIOTests/HTTP1ToRawGRPCServerCodecTests.swift index bb17fef8e..39b92fee0 100644 --- a/Tests/SwiftGRPCNIOTests/HTTP1ToRawGRPCServerCodecTests.swift +++ b/Tests/SwiftGRPCNIOTests/HTTP1ToRawGRPCServerCodecTests.swift @@ -33,8 +33,8 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas try channel.writeInbound(HTTPServerRequestPart.body(gRPCMessage(channel: channel, compression: true))) } - let expectedError = GRPCServerError.unexpectedCompression - XCTAssertEqual([expectedError], errorCollector.errors as? [GRPCServerError]) + let expectedError = GRPCCommonError.unexpectedCompression + XCTAssertEqual([expectedError], errorCollector.asGRPCCommonErrors) XCTAssertNoThrow(try extractHeaders(responses[0])) XCTAssertNoThrow(try extractStatus(responses[1])) { status in @@ -80,8 +80,8 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas try channel.writeInbound(HTTPServerRequestPart.body(buffer)) } - let expectedError = GRPCServerError.requestProtoParseFailure - XCTAssertEqual([expectedError], errorCollector.errors as? [GRPCServerError]) + let expectedError = GRPCServerError.requestProtoDeserializationFailure + XCTAssertEqual([expectedError], errorCollector.asGRPCServerErrors) XCTAssertNoThrow(try extractHeaders(responses[0])) XCTAssertNoThrow(try extractStatus(responses[1])) { status in @@ -105,10 +105,10 @@ class HTTP1ToRawGRPCServerCodecTests: GRPCChannelHandlerResponseCapturingTestCas XCTAssertEqual(errorCollector.errors.count, 1) - if case .invalidState(let message)? = errorCollector.errors.first as? GRPCServerError { + if case .some(.invalidState(let message)) = errorCollector.asGRPCCommonErrors?.first { XCTAssert(message.contains("trailers")) } else { - XCTFail("\(String(describing: errorCollector.errors.first)) was not GRPCError.invalidState") + XCTFail("\(String(describing: errorCollector.errors.first)) was not .invalidState") } XCTAssertNoThrow(try extractHeaders(responses[0])) diff --git a/Tests/SwiftGRPCNIOTests/NIOBasicEchoTestCase.swift b/Tests/SwiftGRPCNIOTests/NIOBasicEchoTestCase.swift new file mode 100644 index 000000000..861821ea4 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/NIOBasicEchoTestCase.swift @@ -0,0 +1,71 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Dispatch +import Foundation +import NIO +@testable import SwiftGRPCNIO +import XCTest + +extension Echo_EchoRequest { + init(text: String) { + self.text = text + } +} + +extension Echo_EchoResponse { + init(text: String) { + self.text = text + } +} + +class NIOBasicEchoTestCase: XCTestCase { + var defaultTestTimeout: TimeInterval = 1.0 + + var serverEventLoopGroup: EventLoopGroup! + var server: GRPCServer! + + var clientEventLoopGroup: EventLoopGroup! + var client: Echo_EchoService_NIOClient! + + override func setUp() { + super.setUp() + + self.serverEventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + self.server = try! GRPCServer.start( + hostname: "localhost", port: 5050, eventLoopGroup: self.serverEventLoopGroup, serviceProviders: [EchoProviderNIO()]) + .wait() + + self.clientEventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + self.client = try! GRPCClient.start( + host: "localhost", port: 5050, eventLoopGroup: self.clientEventLoopGroup) + .map { Echo_EchoService_NIOClient(client: $0, defaultCallOptions: CallOptions(timeout: try! .seconds(5))) } + .wait() + } + + override func tearDown() { + XCTAssertNoThrow(try self.client.client.close().wait()) + XCTAssertNoThrow(try self.clientEventLoopGroup.syncShutdownGracefully()) + self.clientEventLoopGroup = nil + self.client = nil + + XCTAssertNoThrow(try self.server.close().wait()) + XCTAssertNoThrow(try self.serverEventLoopGroup.syncShutdownGracefully()) + self.serverEventLoopGroup = nil + self.server = nil + + super.tearDown() + } +} diff --git a/Tests/SwiftGRPCNIOTests/NIOClientCancellingTests.swift b/Tests/SwiftGRPCNIOTests/NIOClientCancellingTests.swift new file mode 100644 index 000000000..9725c41a4 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/NIOClientCancellingTests.swift @@ -0,0 +1,103 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftGRPCNIO +import XCTest + +class NIOClientCancellingTests: NIOBasicEchoTestCase { + static var allTests: [(String, (NIOClientCancellingTests) -> () throws -> Void)] { + return [ + ("testUnary", testUnary), + ("testClientStreaming", testClientStreaming), + ("testServerStreaming", testServerStreaming), + ("testBidirectionalStreaming", testBidirectionalStreaming) + ] + } +} + +extension NIOClientCancellingTests { + func testUnary() { + let statusReceived = self.expectation(description: "status received") + let responseReceived = self.expectation(description: "response received") + + let call = client.get(Echo_EchoRequest(text: "foo bar baz")) + call.cancel() + + call.response.whenFailure { error in + XCTAssertEqual((error as? GRPCStatus)?.code, .cancelled) + responseReceived.fulfill() + } + + call.status.whenSuccess { status in + XCTAssertEqual(status.code, .cancelled) + statusReceived.fulfill() + } + + waitForExpectations(timeout: self.defaultTestTimeout) + } + + func testClientStreaming() throws { + let statusReceived = self.expectation(description: "status received") + let responseReceived = self.expectation(description: "response received") + + let call = client.collect() + call.cancel() + + call.response.whenFailure { error in + XCTAssertEqual((error as? GRPCStatus)?.code, .cancelled) + responseReceived.fulfill() + } + + call.status.whenSuccess { status in + XCTAssertEqual(status.code, .cancelled) + statusReceived.fulfill() + } + + waitForExpectations(timeout: self.defaultTestTimeout) + } + + func testServerStreaming() { + let statusReceived = self.expectation(description: "status received") + + let call = client.expand(Echo_EchoRequest(text: "foo bar baz")) { response in + XCTFail("response should not be received after cancelling call") + } + call.cancel() + + call.status.whenSuccess { status in + XCTAssertEqual(status.code, .cancelled) + statusReceived.fulfill() + } + + waitForExpectations(timeout: self.defaultTestTimeout) + } + + func testBidirectionalStreaming() { + let statusReceived = self.expectation(description: "status received") + + let call = client.update { response in + XCTFail("response should not be received after cancelling call") + } + call.cancel() + + call.status.whenSuccess { status in + XCTAssertEqual(status.code, .cancelled) + statusReceived.fulfill() + } + + waitForExpectations(timeout: self.defaultTestTimeout) + } +} diff --git a/Tests/SwiftGRPCNIOTests/NIOClientTimeoutTests.swift b/Tests/SwiftGRPCNIOTests/NIOClientTimeoutTests.swift new file mode 100644 index 000000000..7d18c2d52 --- /dev/null +++ b/Tests/SwiftGRPCNIOTests/NIOClientTimeoutTests.swift @@ -0,0 +1,131 @@ +/* + * Copyright 2019, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation +import SwiftGRPCNIO +import NIO +import XCTest + +class NIOClientTimeoutTests: NIOBasicEchoTestCase { + let optionsWithShortTimeout = CallOptions(timeout: try! GRPCTimeout.milliseconds(10)) + let moreThanShortTimeout: TimeInterval = 0.011 + + static var allTests: [(String, (NIOClientTimeoutTests) -> () throws -> Void)] { + return [ + ("testUnaryTimeoutAfterSending", testUnaryTimeoutAfterSending), + ("testServerStreamingTimeoutAfterSending", testServerStreamingTimeoutAfterSending), + ("testClientStreamingTimeoutBeforeSending", testClientStreamingTimeoutBeforeSending), + ("testClientStreamingTimeoutAfterSending", testClientStreamingTimeoutAfterSending), + ("testBidirectionalStreamingTimeoutBeforeSending", testBidirectionalStreamingTimeoutBeforeSending), + ("testBidirectionalStreamingTimeoutAfterSending", testBidirectionalStreamingTimeoutAfterSending) + ] + } + + private func expectDeadlineExceeded(forStatus status: EventLoopFuture) { + let statusExpectation = self.expectation(description: "status received") + + status.whenSuccess { status in + XCTAssertEqual(status.code, .deadlineExceeded) + statusExpectation.fulfill() + } + + status.whenFailure { error in + XCTFail("unexpectedly received error for status: \(error)") + } + } + + private func expectDeadlineExceeded(forResponse response: EventLoopFuture) { + let responseExpectation = self.expectation(description: "response received") + + response.whenFailure { error in + XCTAssertEqual((error as? GRPCStatus)?.code, .deadlineExceeded) + responseExpectation.fulfill() + } + + response.whenSuccess { response in + XCTFail("response received after deadline") + } + } +} + +extension NIOClientTimeoutTests { + func testUnaryTimeoutAfterSending() { + // The request gets fired on call creation, so we need a very short timeout. + let callOptions = CallOptions(timeout: try! .milliseconds(1)) + let call = client.get(Echo_EchoRequest(text: "foo"), callOptions: callOptions) + + self.expectDeadlineExceeded(forStatus: call.status) + self.expectDeadlineExceeded(forResponse: call.response) + + waitForExpectations(timeout: defaultTestTimeout) + } + + func testServerStreamingTimeoutAfterSending() { + // The request gets fired on call creation, so we need a very short timeout. + let callOptions = CallOptions(timeout: try! .milliseconds(1)) + let call = client.expand(Echo_EchoRequest(text: "foo bar baz"), callOptions: callOptions) { _ in } + + self.expectDeadlineExceeded(forStatus: call.status) + + waitForExpectations(timeout: defaultTestTimeout) + } + + func testClientStreamingTimeoutBeforeSending() { + let call = client.collect(callOptions: optionsWithShortTimeout) + + self.expectDeadlineExceeded(forStatus: call.status) + self.expectDeadlineExceeded(forResponse: call.response) + + waitForExpectations(timeout: defaultTestTimeout) + } + + func testClientStreamingTimeoutAfterSending() { + let call = client.collect(callOptions: optionsWithShortTimeout) + + self.expectDeadlineExceeded(forStatus: call.status) + self.expectDeadlineExceeded(forResponse: call.response) + + call.sendMessage(Echo_EchoRequest(text: "foo"), promise: nil) + + // Timeout before sending `.end` + Thread.sleep(forTimeInterval: moreThanShortTimeout) + call.sendEnd(promise: nil) + + waitForExpectations(timeout: defaultTestTimeout) + } + + func testBidirectionalStreamingTimeoutBeforeSending() { + let call = client.update(callOptions: optionsWithShortTimeout) { _ in } + + self.expectDeadlineExceeded(forStatus: call.status) + + Thread.sleep(forTimeInterval: moreThanShortTimeout) + waitForExpectations(timeout: defaultTestTimeout) + } + + func testBidirectionalStreamingTimeoutAfterSending() { + let call = client.update(callOptions: optionsWithShortTimeout) { _ in } + + self.expectDeadlineExceeded(forStatus: call.status) + + call.sendMessage(Echo_EchoRequest(text: "foo"), promise: nil) + + // Timeout before sending `.end` + Thread.sleep(forTimeInterval: moreThanShortTimeout) + call.sendEnd(promise: nil) + + waitForExpectations(timeout: defaultTestTimeout) + } +} diff --git a/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift b/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift deleted file mode 100644 index cd0848214..000000000 --- a/Tests/SwiftGRPCNIOTests/NIOServerTestCase.swift +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2018, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Dispatch -import Foundation -@testable import SwiftGRPC -import XCTest - -extension Echo_EchoRequest { - init(text: String) { - self.text = text - } -} - -extension Echo_EchoResponse { - init(text: String) { - self.text = text - } -} - -class NIOServerTestCase: XCTestCase { - func makeProvider() -> Echo_EchoProvider { return EchoProvider() } - - var provider: Echo_EchoProvider! - var client: Echo_EchoServiceClient! - - var defaultTimeout: TimeInterval { return 1.0 } - var address: String { return "localhost:5050" } - - override func setUp() { - super.setUp() - - provider = makeProvider() - - client = Echo_EchoServiceClient(address: address, secure: false) - - client.timeout = defaultTimeout - } - - override func tearDown() { - client = nil - - super.tearDown() - } -} diff --git a/Tests/SwiftGRPCNIOTests/NIOServerTests.swift b/Tests/SwiftGRPCNIOTests/NIOServerTests.swift index 475a2d63b..33c64a9b2 100644 --- a/Tests/SwiftGRPCNIOTests/NIOServerTests.swift +++ b/Tests/SwiftGRPCNIOTests/NIOServerTests.swift @@ -18,65 +18,10 @@ import Foundation import NIO import NIOHTTP1 import NIOHTTP2 -@testable import SwiftGRPC @testable import SwiftGRPCNIO import XCTest -// This class is what the SwiftGRPC user would actually implement to provide their service. -final class EchoProvider_NIO: Echo_EchoProvider_NIO { - func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture { - var response = Echo_EchoResponse() - response.text = "Swift echo get: " + request.text - return context.eventLoop.newSucceededFuture(result: response) - } - - func collect(context: UnaryResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> { - var parts: [String] = [] - return context.eventLoop.newSucceededFuture(result: { event in - switch event { - case .message(let message): - parts.append(message.text) - - case .end: - var response = Echo_EchoResponse() - response.text = "Swift echo collect: " + parts.joined(separator: " ") - context.responsePromise.succeed(result: response) - } - }) - } - - func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext) -> EventLoopFuture { - var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ()) - let parts = request.text.components(separatedBy: " ") - for (i, part) in parts.enumerated() { - var response = Echo_EchoResponse() - response.text = "Swift echo expand (\(i)): \(part)" - endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) } - } - return endOfSendOperationQueue.map { GRPCStatus.ok } - } - - func update(context: StreamingResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> { - var endOfSendOperationQueue = context.eventLoop.newSucceededFuture(result: ()) - var count = 0 - return context.eventLoop.newSucceededFuture(result: { event in - switch event { - case .message(let message): - var response = Echo_EchoResponse() - response.text = "Swift echo update (\(count)): \(message.text)" - endOfSendOperationQueue = endOfSendOperationQueue.then { context.sendResponse(response) } - count += 1 - - case .end: - endOfSendOperationQueue - .map { GRPCStatus.ok } - .cascade(promise: context.statusPromise) - } - }) - } -} - -class NIOServerTests: NIOServerTestCase { +class NIOServerTests: NIOBasicEchoTestCase { static var allTests: [(String, (NIOServerTests) -> () throws -> Void)] { return [ ("testUnary", testUnary), @@ -92,223 +37,119 @@ class NIOServerTests: NIOServerTestCase { ] } - static let lotsOfStrings = (0..<1000).map { String(describing: $0) } - - var eventLoopGroup: MultiThreadedEventLoopGroup! - var server: GRPCServer! - - override func setUp() { - super.setUp() - - // This is how a GRPC server would actually be set up. - eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - server = try! GRPCServer.start( - hostname: "localhost", port: 5050, eventLoopGroup: eventLoopGroup, serviceProviders: [EchoProvider_NIO()]) - .wait() - } - - override func tearDown() { - XCTAssertNoThrow(try server.close().wait()) - - XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) - eventLoopGroup = nil - - super.tearDown() - } + static let aFewStrings = ["foo", "bar", "baz"] + static let lotsOfStrings = (0..<5_000).map { String(describing: $0) } } extension NIOServerTests { - func testUnary() { - XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text) - } - - func testUnaryWithLargeData() throws { - // Default max frame size is: 16,384. We'll exceed this as we also have to send the size and compression flag. - let longMessage = String(repeating: "e", count: 16_384) - XCTAssertNoThrow(try client.get(Echo_EchoRequest(text: longMessage))) { response in - XCTAssertEqual("Swift echo get: \(longMessage)", response.text) - } + func testUnary() throws { + XCTAssertEqual(try client.get(Echo_EchoRequest(text: "foo")).response.wait().text, "Swift echo get: foo") } - func testUnaryLotsOfRequests() { + func testUnaryLotsOfRequests() throws { // Sending that many requests at once can sometimes trip things up, it seems. - client.timeout = 5.0 let clockStart = clock() let numberOfRequests = 2_000 + for i in 0.. 0 { print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))") } - XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text) + XCTAssertEqual(try client.get(Echo_EchoRequest(text: "foo \(i)")).response.wait().text, "Swift echo get: foo \(i)") } print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))") } + func testUnaryWithLargeData() throws { + // Default max frame size is: 16,384. We'll exceed this as we also have to send the size and compression flag. + let longMessage = String(repeating: "e", count: 16_384) + XCTAssertEqual(try client.get(Echo_EchoRequest(text: longMessage)).response.wait().text, "Swift echo get: \(longMessage)") + } + func testUnaryEmptyRequest() throws { - XCTAssertNoThrow(try client.get(Echo_EchoRequest())) + XCTAssertNoThrow(try client.get(Echo_EchoRequest()).response.wait()) } } extension NIOServerTests { - func testClientStreaming() { - let completionHandlerExpectation = expectation(description: "final completion handler called") - let call = try! client.collect { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - completionHandlerExpectation.fulfill() - } - - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) - call.waitForSendOperationsToFinish() - - let response = try! call.closeAndReceive() - XCTAssertEqual("Swift echo collect: foo bar baz", response.text) - - waitForExpectations(timeout: defaultTimeout) - } + func doTestClientStreaming(messages: [String], file: StaticString = #file, line: UInt = #line) throws { + let call = client.collect() - func testClientStreamingLotsOfMessages() { - let completionHandlerExpectation = expectation(description: "completion handler called") - let call = try! client.collect { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - completionHandlerExpectation.fulfill() + var queue = call.newMessageQueue() + for message in messages { + queue = queue.then { call.sendMessage(Echo_EchoRequest(text: message)) } } + queue.whenSuccess { call.sendEnd(promise: nil) } - for string in NIOServerTests.lotsOfStrings { - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) - } - call.waitForSendOperationsToFinish() + XCTAssertEqual("Swift echo collect: " + messages.joined(separator: " "), try call.response.wait().text, file: file, line: line) + XCTAssertEqual(.ok, try call.status.wait().code, file: file, line: line) + } - let response = try! call.closeAndReceive() - XCTAssertEqual("Swift echo collect: " + NIOServerTests.lotsOfStrings.joined(separator: " "), response.text) + func testClientStreaming() { + XCTAssertNoThrow(try doTestClientStreaming(messages: NIOServerTests.aFewStrings)) + } - waitForExpectations(timeout: defaultTimeout) + func testClientStreamingLotsOfMessages() throws { + XCTAssertNoThrow(try doTestClientStreaming(messages: NIOServerTests.lotsOfStrings)) } } extension NIOServerTests { - func testServerStreaming() { - let completionHandlerExpectation = expectation(description: "completion handler called") - let call = try! client.expand(Echo_EchoRequest(text: "foo bar baz")) { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - completionHandlerExpectation.fulfill() + func doTestServerStreaming(messages: [String], file: StaticString = #file, line: UInt = #line) throws { + var index = 0 + let call = client.expand(Echo_EchoRequest.with { $0.text = messages.joined(separator: " ") }) { response in + XCTAssertEqual("Swift echo expand (\(index)): \(messages[index])", response.text, file: file, line: line) + index += 1 } - XCTAssertEqual("Swift echo expand (0): foo", try! call.receive()!.text) - XCTAssertEqual("Swift echo expand (1): bar", try! call.receive()!.text) - XCTAssertEqual("Swift echo expand (2): baz", try! call.receive()!.text) - XCTAssertNil(try! call.receive()) + XCTAssertEqual(try call.status.wait().code, .ok, file: file, line: line) + XCTAssertEqual(index, messages.count) + } - waitForExpectations(timeout: defaultTimeout) + func testServerStreaming() { + XCTAssertNoThrow(try doTestServerStreaming(messages: NIOServerTests.aFewStrings)) } func testServerStreamingLotsOfMessages() { - let completionHandlerExpectation = expectation(description: "completion handler called") - let call = try! client.expand(Echo_EchoRequest(text: NIOServerTests.lotsOfStrings.joined(separator: " "))) { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - completionHandlerExpectation.fulfill() - } - - for string in NIOServerTests.lotsOfStrings { - XCTAssertEqual("Swift echo expand (\(string)): \(string)", try! call.receive()!.text) - } - XCTAssertNil(try! call.receive()) - - waitForExpectations(timeout: defaultTimeout) + XCTAssertNoThrow(try doTestServerStreaming(messages: NIOServerTests.lotsOfStrings)) } } extension NIOServerTests { - func testBidirectionalStreamingBatched() { - let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") - let call = try! client.update { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - finalCompletionHandlerExpectation.fulfill() - } - - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) - - call.waitForSendOperationsToFinish() - - XCTAssertNoThrow(try call.closeSend()) + private func doTestBidirectionalStreaming(messages: [String], waitForEachResponse: Bool = false, timeout: GRPCTimeout? = nil, file: StaticString = #file, line: UInt = #line) throws { + let responseReceived = waitForEachResponse ? DispatchSemaphore(value: 0) : nil + var index = 0 - XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text) - XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text) - XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text) - XCTAssertNil(try! call.receive()) - - waitForExpectations(timeout: defaultTimeout) - } - - func testBidirectionalStreamingPingPong() { - let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") - let call = try! client.update { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - finalCompletionHandlerExpectation.fulfill() + let callOptions = timeout.map { CallOptions(timeout: $0) } + let call = client.update(callOptions: callOptions) { response in + XCTAssertEqual("Swift echo update (\(index)): \(messages[index])", response.text, file: file, line: line) + responseReceived?.signal() + index += 1 } - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "foo"))) - XCTAssertEqual("Swift echo update (0): foo", try! call.receive()!.text) - - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "bar"))) - XCTAssertEqual("Swift echo update (1): bar", try! call.receive()!.text) - - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: "baz"))) - XCTAssertEqual("Swift echo update (2): baz", try! call.receive()!.text) - - call.waitForSendOperationsToFinish() - - XCTAssertNoThrow(try call.closeSend()) - - XCTAssertNil(try! call.receive()) - - waitForExpectations(timeout: defaultTimeout) - } - - func testBidirectionalStreamingLotsOfMessagesBatched() { - let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") - let call = try! client.update { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - finalCompletionHandlerExpectation.fulfill() - } - - for string in NIOServerTests.lotsOfStrings { - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) + messages.forEach { part in + call.sendMessage(Echo_EchoRequest(text: part), promise: nil) + XCTAssertNotEqual(responseReceived?.wait(timeout: .now() + .seconds(1)), .some(.timedOut), file: file, line: line) } + call.sendEnd(promise: nil) - call.waitForSendOperationsToFinish() - - XCTAssertNoThrow(try call.closeSend()) - - for string in NIOServerTests.lotsOfStrings { - XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text) - } - XCTAssertNil(try! call.receive()) - - waitForExpectations(timeout: defaultTimeout) + XCTAssertEqual(try call.status.wait().code, .ok, file: file, line: line) + XCTAssertEqual(index, messages.count) } - func testBidirectionalStreamingLotsOfMessagesPingPong() { - let finalCompletionHandlerExpectation = expectation(description: "final completion handler called") - let call = try! client.update { callResult in - XCTAssertEqual(.ok, callResult.statusCode) - finalCompletionHandlerExpectation.fulfill() - } - - for string in NIOServerTests.lotsOfStrings { - XCTAssertNoThrow(try call.send(Echo_EchoRequest(text: string))) - XCTAssertEqual("Swift echo update (\(string)): \(string)", try! call.receive()!.text) - } - - call.waitForSendOperationsToFinish() + func testBidirectionalStreamingBatched() throws { + XCTAssertNoThrow(try doTestBidirectionalStreaming(messages: NIOServerTests.aFewStrings)) + } - XCTAssertNoThrow(try call.closeSend()) + func testBidirectionalStreamingPingPong() throws { + XCTAssertNoThrow(try doTestBidirectionalStreaming(messages: NIOServerTests.aFewStrings, waitForEachResponse: true)) + } - XCTAssertNil(try! call.receive()) + func testBidirectionalStreamingLotsOfMessagesBatched() throws { + XCTAssertNoThrow(try doTestBidirectionalStreaming(messages: NIOServerTests.lotsOfStrings, timeout: try .seconds(15))) + } - waitForExpectations(timeout: defaultTimeout) + func testBidirectionalStreamingLotsOfMessagesPingPong() throws { + XCTAssertNoThrow(try doTestBidirectionalStreaming(messages: NIOServerTests.lotsOfStrings, waitForEachResponse: true, timeout: try .seconds(15))) } } diff --git a/Tests/SwiftGRPCNIOTests/NIOServerWebTests.swift b/Tests/SwiftGRPCNIOTests/NIOServerWebTests.swift index ce87064b5..8b4a5ed02 100644 --- a/Tests/SwiftGRPCNIOTests/NIOServerWebTests.swift +++ b/Tests/SwiftGRPCNIOTests/NIOServerWebTests.swift @@ -21,7 +21,7 @@ import XCTest // Only test Unary and ServerStreaming, as ClientStreaming is not // supported in HTTP1. // TODO: Add tests for application/grpc-web as well. -class NIOServerWebTests: NIOServerTestCase { +class NIOServerWebTests: NIOBasicEchoTestCase { static var allTests: [(String, (NIOServerWebTests) -> () throws -> Void)] { return [ ("testUnary", testUnary), @@ -31,28 +31,6 @@ class NIOServerWebTests: NIOServerTestCase { ] } - var eventLoopGroup: MultiThreadedEventLoopGroup! - var server: GRPCServer! - - override func setUp() { - super.setUp() - - // This is how a GRPC server would actually be set up. - eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - server = try! GRPCServer.start( - hostname: "localhost", port: 5050, eventLoopGroup: eventLoopGroup, serviceProviders: [EchoProvider_NIO()]) - .wait() - } - - override func tearDown() { - XCTAssertNoThrow(try server.close().wait()) - - XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) - eventLoopGroup = nil - - super.tearDown() - } - private func gRPCEncodedEchoRequest(_ text: String) -> Data { var request = Echo_EchoRequest() request.text = text @@ -108,7 +86,7 @@ extension NIOServerWebTests { } } - waitForExpectations(timeout: defaultTimeout) + waitForExpectations(timeout: defaultTestTimeout) } func testUnaryLotsOfRequests() { @@ -166,6 +144,6 @@ extension NIOServerWebTests { } } - waitForExpectations(timeout: defaultTimeout) + waitForExpectations(timeout: defaultTestTimeout) } } diff --git a/Tests/SwiftGRPCNIOTests/TestHelpers.swift b/Tests/SwiftGRPCNIOTests/TestHelpers.swift index 73be06743..851bd77e0 100644 --- a/Tests/SwiftGRPCNIOTests/TestHelpers.swift +++ b/Tests/SwiftGRPCNIOTests/TestHelpers.swift @@ -45,7 +45,7 @@ func extractHeaders(_ response: RawGRPCServerResponsePart) throws -> HTTPHeaders } @discardableResult -func extractMessage(_ response: RawGRPCServerResponsePart) throws -> ByteBuffer { +func extractMessage(_ response: RawGRPCServerResponsePart) throws -> Data { guard case .message(let message) = response else { throw CaseExtractError(message: "\(response) did not match .message") } diff --git a/Tests/SwiftGRPCNIOTests/echo.grpc.swift b/Tests/SwiftGRPCNIOTests/echo.grpc.swift index 9fef35792..29b4d6b86 120000 --- a/Tests/SwiftGRPCNIOTests/echo.grpc.swift +++ b/Tests/SwiftGRPCNIOTests/echo.grpc.swift @@ -1 +1 @@ -../../Sources/Examples/Echo/Generated/echo.grpc.swift \ No newline at end of file +../../Sources/Examples/EchoNIO/Generated/echo.grpc.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/echo.pb.swift b/Tests/SwiftGRPCNIOTests/echo.pb.swift index 9dbe28075..1b8ab0b6a 120000 --- a/Tests/SwiftGRPCNIOTests/echo.pb.swift +++ b/Tests/SwiftGRPCNIOTests/echo.pb.swift @@ -1 +1 @@ -../../Sources/Examples/Echo/Generated/echo.pb.swift \ No newline at end of file +../../Sources/Examples/EchoNIO/Generated/echo.pb.swift \ No newline at end of file diff --git a/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift b/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift deleted file mode 100644 index ecf86a285..000000000 --- a/Tests/SwiftGRPCNIOTests/echo_nio.grpc.swift +++ /dev/null @@ -1,73 +0,0 @@ -// -// DO NOT EDIT. -// -// Generated by the protocol buffer compiler. -// Source: echo.proto -// - -// -// Copyright 2018, gRPC Authors All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -import Foundation -import NIO -import NIOHTTP1 -import SwiftGRPCNIO -import SwiftProtobuf - - -/// To build a server, implement a class that conforms to this protocol. -internal protocol Echo_EchoProvider_NIO: CallHandlerProvider { - func get(request: Echo_EchoRequest, context: StatusOnlyCallContext) -> EventLoopFuture - func expand(request: Echo_EchoRequest, context: StreamingResponseCallContext) -> EventLoopFuture - func collect(context: UnaryResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> - func update(context: StreamingResponseCallContext) -> EventLoopFuture<(StreamEvent) -> Void> -} - -extension Echo_EchoProvider_NIO { - internal var serviceName: String { return "echo.Echo" } - - /// Determines, calls and returns the appropriate request handler, depending on the request's method. - /// Returns nil for methods not handled by this service. - internal func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel, errorDelegate: ServerErrorDelegate?) -> GRPCCallHandler? { - switch methodName { - case "Get": - return UnaryCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in - return { request in - self.get(request: request, context: context) - } - } - - case "Expand": - return ServerStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in - return { request in - self.expand(request: request, context: context) - } - } - - case "Collect": - return ClientStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in - return self.collect(context: context) - } - - case "Update": - return BidirectionalStreamingCallHandler(channel: channel, request: request, errorDelegate: errorDelegate) { context in - return self.update(context: context) - } - - default: return nil - } - } -} -