Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 0 additions & 158 deletions Sources/GRPC/ClientCalls/BaseClientCall.swift

This file was deleted.

160 changes: 124 additions & 36 deletions Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,126 @@
* limitations under the License.
*/
import NIO
import NIOHPACK
import NIOHTTP2
import Logging

/// A bidirectional-streaming gRPC call. Each response is passed to the provided observer block.
///
/// Messages should be sent via the `send` method; an `.end` message should be sent
/// to indicate the final message has been sent.
///
/// The following futures are available to the caller:
/// - `initialMetadata`: the initial metadata returned from the server,
/// - `status`: the status of the gRPC call after it has ended,
/// - `trailingMetadata`: any metadata returned from the server alongside the `status`.
public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>
: BaseClientCall<RequestPayload, ResponsePayload>,
StreamingRequestClientCall {
private var messageQueue: EventLoopFuture<Void>

init(
/// Messages should be sent via the `sendMessage` and `sendMessages` methods; the stream of messages
/// must be terminated by calling `sendEnd` to indicate the final message has been sent.
public final class BidirectionalStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: StreamingRequestClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
public let options: CallOptions

/// The `Channel` used to transport messages for this RPC.
public var subchannel: EventLoopFuture<Channel> {
return self.transport.streamChannel()
}

/// The `EventLoop` this call is running on.
public var eventLoop: EventLoop {
return self.transport.eventLoop
}

/// Cancel this RPC if it hasn't already completed.
public func cancel(promise: EventLoopPromise<Void>?) {
self.transport.cancel(promise: promise)
}

// MARK: - Response Parts

/// The initial metadata returned from the server.
public var initialMetadata: EventLoopFuture<HPACKHeaders> {
if self.eventLoop.inEventLoop {
return self.transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
} else {
return self.eventLoop.flatSubmit {
return self.transport.responseContainer.lazyInitialMetadataPromise.getFutureResult()
}
}
}

/// The trailing metadata returned from the server.
public var trailingMetadata: EventLoopFuture<HPACKHeaders> {
if self.eventLoop.inEventLoop {
return self.transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
} else {
return self.eventLoop.flatSubmit {
return self.transport.responseContainer.lazyTrailingMetadataPromise.getFutureResult()
}
}
}

/// The final status of the the RPC.
public var status: EventLoopFuture<GRPCStatus> {
if self.eventLoop.inEventLoop {
return self.transport.responseContainer.lazyStatusPromise.getFutureResult()
} else {
return self.eventLoop.flatSubmit {
return self.transport.responseContainer.lazyStatusPromise.getFutureResult()
}
}
}

// MARK: - Requests

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or
/// `sendEnd(promise:)`.
///
/// - Parameters:
/// - message: The message to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
/// - promise: A promise to fulfill with the outcome of the send operation.
public func sendMessage(
_ message: RequestPayload,
compression: Compression = .deferToCallDefault,
promise: EventLoopPromise<Void>?
) {
let compressed = compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
let messageContext = _MessageContext(message, compressed: compressed)
self.transport.sendRequest(.message(messageContext), promise: promise)
}

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or
/// `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
/// - promise: A promise to fulfill with the outcome of the send operation. It will only succeed
/// if all messages were written successfully.
public func sendMessages<S>(
_ messages: S,
compression: Compression = .deferToCallDefault,
promise: EventLoopPromise<Void>?
) where S: Sequence, S.Element == RequestPayload {
let compressed = compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
self.transport.sendRequests(messages.map {
.message(_MessageContext($0, compressed: compressed))
}, promise: promise)
}

/// Terminates a stream of messages sent to the service.
///
/// - Important: This should only ever be called once.
/// - Parameter promise: A promise to be fulfilled when the end has been sent.
public func sendEnd(promise: EventLoopPromise<Void>?) {
self.transport.sendRequest(.end, promise: promise)
}

internal init(
path: String,
scheme: String,
authority: String,
Expand All @@ -42,22 +144,22 @@ public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, Respo
logger: Logger,
handler: @escaping (ResponsePayload) -> Void
) {
self.messageQueue = eventLoop.makeSucceededFuture(())
let requestID = callOptions.requestIDProvider.requestID()
var logger = logger
logger[metadataKey: MetadataKey.requestID] = "\(requestID)"
logger.debug("starting rpc", metadata: ["path": "\(path)"])

let responseHandler = GRPCClientStreamingResponseChannelHandler(
initialMetadataPromise: eventLoop.makePromise(),
trailingMetadataPromise: eventLoop.makePromise(),
statusPromise: eventLoop.makePromise(),
errorDelegate: errorDelegate,
self.transport = ChannelTransport(
multiplexer: multiplexer,
responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: handler),
callType: .bidirectionalStreaming,
timeout: callOptions.timeout,
logger: logger,
responseHandler: handler
errorDelegate: errorDelegate,
logger: logger
)

self.options = callOptions

let requestHead = _GRPCRequestHead(
scheme: scheme,
path: path,
Expand All @@ -66,20 +168,6 @@ public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, Respo
options: callOptions
)

let requestHandler = _StreamingRequestChannelHandler<RequestPayload>(requestHead: requestHead)

super.init(
eventLoop: eventLoop,
multiplexer: multiplexer,
callType: .bidirectionalStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
requestHandler: requestHandler,
logger: logger
)
}

public func newMessageQueue() -> EventLoopFuture<Void> {
return self.messageQueue
self.transport.sendRequest(.head(requestHead), promise: nil)
}
}
Loading