|
| 1 | +/* |
| 2 | + * Copyright 2020, gRPC Authors All rights reserved. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | +import NIO |
| 17 | +import NIOHPACK |
| 18 | + |
| 19 | +/// Sending on a fake response stream would have resulted in a protocol violation (such as |
| 20 | +/// sending initial metadata multiple times or sending messages after the stream has closed). |
| 21 | +public struct FakeResponseProtocolViolation: Error, Hashable { |
| 22 | + /// The reason that sending the message would have resulted in a protocol violation. |
| 23 | + public var reason: String |
| 24 | + |
| 25 | + init(_ reason: String) { |
| 26 | + self.reason = reason |
| 27 | + } |
| 28 | +} |
| 29 | + |
| 30 | +/// A fake response stream into which users may inject response parts for use in unit tests. |
| 31 | +/// |
| 32 | +/// Users may not interact with this class directly but may do so via one of its subclasses |
| 33 | +/// `FakeUnaryResponse` and `FakeStreamingResponse`. |
| 34 | +public class _FakeResponseStream<Request: GRPCPayload, Response: GRPCPayload> { |
| 35 | + private enum StreamEvent { |
| 36 | + case responsePart(_GRPCClientResponsePart<Response>) |
| 37 | + case error(Error) |
| 38 | + } |
| 39 | + |
| 40 | + /// The channel to use for communication. |
| 41 | + internal let channel: EmbeddedChannel |
| 42 | + |
| 43 | + /// A buffer to hold responses in before the proxy is activated. |
| 44 | + private var responseBuffer: CircularBuffer<StreamEvent> |
| 45 | + |
| 46 | + /// The current state of the proxy. |
| 47 | + private var activeState: ActiveState |
| 48 | + |
| 49 | + /// The state of sending response parts. |
| 50 | + private var sendState: SendState |
| 51 | + |
| 52 | + private enum ActiveState { |
| 53 | + case inactive |
| 54 | + case active |
| 55 | + } |
| 56 | + |
| 57 | + private enum SendState { |
| 58 | + // Nothing has been sent; we can send initial metadata to become 'sending' or trailing metadata |
| 59 | + // to start 'closing'. |
| 60 | + case idle |
| 61 | + |
| 62 | + // We're sending messages. We can send more messages in this state or trailing metadata to |
| 63 | + // transition to 'closing'. |
| 64 | + case sending |
| 65 | + |
| 66 | + // We're closing: we've sent trailing metadata, we may only send a status now to close. |
| 67 | + case closing |
| 68 | + |
| 69 | + // Closed, nothing more can be sent. |
| 70 | + case closed |
| 71 | + } |
| 72 | + |
| 73 | + internal init() { |
| 74 | + self.activeState = .inactive |
| 75 | + self.sendState = .idle |
| 76 | + self.responseBuffer = CircularBuffer() |
| 77 | + self.channel = EmbeddedChannel() |
| 78 | + } |
| 79 | + |
| 80 | + /// Activate the test proxy; this should be called |
| 81 | + internal func activate() { |
| 82 | + switch self.activeState { |
| 83 | + case .inactive: |
| 84 | + // Activate the channel. This will allow any request parts to be sent. |
| 85 | + self.channel.pipeline.fireChannelActive() |
| 86 | + |
| 87 | + // Unbuffer any response parts. |
| 88 | + while !self.responseBuffer.isEmpty { |
| 89 | + self.write(self.responseBuffer.removeFirst()) |
| 90 | + } |
| 91 | + |
| 92 | + // Now we're active. |
| 93 | + self.activeState = .active |
| 94 | + |
| 95 | + case .active: |
| 96 | + () |
| 97 | + } |
| 98 | + } |
| 99 | + |
| 100 | + /// Write or buffer the response part, depending on the our current state. |
| 101 | + internal func _sendResponsePart(_ part: _GRPCClientResponsePart<Response>) throws { |
| 102 | + try self.send(.responsePart(part)) |
| 103 | + } |
| 104 | + |
| 105 | + internal func _sendError(_ error: Error) throws { |
| 106 | + try self.send(.error(error)) |
| 107 | + } |
| 108 | + |
| 109 | + private func send(_ event: StreamEvent) throws { |
| 110 | + switch self.validate(event) { |
| 111 | + case .valid: |
| 112 | + self.writeOrBuffer(event) |
| 113 | + |
| 114 | + case .validIfSentAfter(let extraPart): |
| 115 | + self.writeOrBuffer(extraPart) |
| 116 | + self.writeOrBuffer(event) |
| 117 | + |
| 118 | + case .invalid(let reason): |
| 119 | + throw FakeResponseProtocolViolation(reason) |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + /// Validate events the user wants to send on the stream. |
| 124 | + private func validate(_ event: StreamEvent) -> Validation { |
| 125 | + switch (event, self.sendState) { |
| 126 | + case (.responsePart(.initialMetadata), .idle): |
| 127 | + self.sendState = .sending |
| 128 | + return .valid |
| 129 | + |
| 130 | + case (.responsePart(.initialMetadata), .sending), |
| 131 | + (.responsePart(.initialMetadata), .closing), |
| 132 | + (.responsePart(.initialMetadata), .closed): |
| 133 | + // We can only send initial metadata from '.idle'. |
| 134 | + return .invalid(reason: "Initial metadata has already been sent") |
| 135 | + |
| 136 | + case (.responsePart(.message), .idle): |
| 137 | + // This is fine: we don't force the user to specify initial metadata so we send some on their |
| 138 | + // behalf. |
| 139 | + self.sendState = .sending |
| 140 | + return .validIfSentAfter(.responsePart(.initialMetadata([:]))) |
| 141 | + |
| 142 | + case (.responsePart(.message), .sending): |
| 143 | + return .valid |
| 144 | + |
| 145 | + case (.responsePart(.message), .closing), |
| 146 | + (.responsePart(.message), .closed): |
| 147 | + // We can't send messages once we're closing or closed. |
| 148 | + return .invalid(reason: "Messages can't be sent after the stream has been closed") |
| 149 | + |
| 150 | + case (.responsePart(.trailingMetadata), .idle), |
| 151 | + (.responsePart(.trailingMetadata), .sending): |
| 152 | + self.sendState = .closing |
| 153 | + return .valid |
| 154 | + |
| 155 | + case (.responsePart(.trailingMetadata), .closing), |
| 156 | + (.responsePart(.trailingMetadata), .closed): |
| 157 | + // We're already closing or closed. |
| 158 | + return .invalid(reason: "Trailing metadata can't be sent after the stream has been closed") |
| 159 | + |
| 160 | + case (.responsePart(.status), .idle), |
| 161 | + (.error, .idle), |
| 162 | + (.responsePart(.status), .sending), |
| 163 | + (.error, .sending), |
| 164 | + (.responsePart(.status), .closed), |
| 165 | + (.error, .closed): |
| 166 | + // We can only error/close if we're closing (i.e. have already sent trailers which we enforce |
| 167 | + // from the API in the subclasses). |
| 168 | + return .invalid(reason: "Status/error can only be sent after trailing metadata has been sent") |
| 169 | + |
| 170 | + case (.responsePart(.status), .closing), |
| 171 | + (.error, .closing): |
| 172 | + self.sendState = .closed |
| 173 | + return .valid |
| 174 | + } |
| 175 | + } |
| 176 | + |
| 177 | + private enum Validation { |
| 178 | + /// Sending the part is valid. |
| 179 | + case valid |
| 180 | + |
| 181 | + /// Sending the part, if it is sent after the given part. |
| 182 | + case validIfSentAfter(_ part: StreamEvent) |
| 183 | + |
| 184 | + /// Sending the part would be a protocol violation. |
| 185 | + case invalid(reason: String) |
| 186 | + } |
| 187 | + |
| 188 | + private func writeOrBuffer(_ event: StreamEvent) { |
| 189 | + switch self.activeState { |
| 190 | + case .inactive: |
| 191 | + self.responseBuffer.append(event) |
| 192 | + |
| 193 | + case .active: |
| 194 | + self.write(event) |
| 195 | + } |
| 196 | + } |
| 197 | + |
| 198 | + private func write(_ part: StreamEvent) { |
| 199 | + switch part { |
| 200 | + case .error(let error): |
| 201 | + self.channel.pipeline.fireErrorCaught(error) |
| 202 | + |
| 203 | + case .responsePart(let responsePart): |
| 204 | + // We tolerate errors here: an error will be thrown if the write results in an error which |
| 205 | + // isn't caught in the channel. Errors in the channel get funnelled into the transport held |
| 206 | + // by the actual call object and handled there. |
| 207 | + _ = try? self.channel.writeInbound(responsePart) |
| 208 | + } |
| 209 | + } |
| 210 | +} |
| 211 | + |
| 212 | +// MARK: - Unary Response |
| 213 | + |
| 214 | +/// A fake unary response to be used with a generated test client. |
| 215 | +/// |
| 216 | +/// Users typically create fake responses via helper methods on their generated test clients |
| 217 | +/// corresponding to the RPC which they intend to test. |
| 218 | +/// |
| 219 | +/// For unary responses users may call one of two functions for each RPC: |
| 220 | +/// - `sendMessage(_:initialMetadata:trailingMetadata:status)`, or |
| 221 | +/// - `sendError(status:trailingMetadata)` |
| 222 | +/// |
| 223 | +/// `sendMessage` sends a normal unary response with the provided message and allows the caller to |
| 224 | +/// also specify initial metadata, trailing metadata and the status. Both metadata arguments are |
| 225 | +/// empty by default and the status defaults to one with an 'ok' status code. |
| 226 | +/// |
| 227 | +/// `sendError` may be used to terminate an RPC without providing a response. As for `sendMessage`, |
| 228 | +/// the `trailingMetadata` defaults to being empty. |
| 229 | +public class FakeUnaryResponse<Request: GRPCPayload, Response: GRPCPayload>: _FakeResponseStream<Request, Response> { |
| 230 | + public override init() { |
| 231 | + super.init() |
| 232 | + } |
| 233 | + |
| 234 | + /// Send a response message to the client. |
| 235 | + /// |
| 236 | + /// - Parameters: |
| 237 | + /// - response: The message to send. |
| 238 | + /// - initialMetadata: The initial metadata to send. By default the metadata will be empty. |
| 239 | + /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
| 240 | + /// - status: The status to send. By default this has an '.ok' status code. |
| 241 | + /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC |
| 242 | + /// protocol, e.g. sending messages after the RPC has ended. |
| 243 | + public func sendMessage( |
| 244 | + _ response: Response, |
| 245 | + initialMetadata: HPACKHeaders = [:], |
| 246 | + trailingMetadata: HPACKHeaders = [:], |
| 247 | + status: GRPCStatus = .ok |
| 248 | + ) throws { |
| 249 | + try self._sendResponsePart(.initialMetadata(initialMetadata)) |
| 250 | + try self._sendResponsePart(.message(.init(response, compressed: false))) |
| 251 | + try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
| 252 | + try self._sendResponsePart(.status(status)) |
| 253 | + } |
| 254 | + |
| 255 | + /// Send an error to the client. |
| 256 | + /// |
| 257 | + /// - Parameters: |
| 258 | + /// - error: The error to send. |
| 259 | + /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
| 260 | + public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws { |
| 261 | + try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
| 262 | + try self._sendError(error) |
| 263 | + } |
| 264 | +} |
| 265 | + |
| 266 | +// MARK: - Streaming Response |
| 267 | + |
| 268 | +/// A fake streaming response to be used with a generated test client. |
| 269 | +/// |
| 270 | +/// Users typically create fake responses via helper methods on their generated test clients |
| 271 | +/// corresponding to the RPC which they intend to test. |
| 272 | +/// |
| 273 | +/// For streaming responses users have a number of methods available to them: |
| 274 | +/// - `sendInitialMetadata(_:)` |
| 275 | +/// - `sendMessage(_:)` |
| 276 | +/// - `sendEnd(status:trailingMetadata:)` |
| 277 | +/// - `sendError(_:trailingMetadata)` |
| 278 | +/// |
| 279 | +/// `sendInitialMetadata` may be called to send initial metadata to the client, however, it |
| 280 | +/// must be called first in order for the metadata to be sent. If it is not called, empty |
| 281 | +/// metadata will be sent automatically if necessary. |
| 282 | +/// |
| 283 | +/// `sendMessage` may be called to send a response message on the stream. This may be called |
| 284 | +/// multiple times. Messages will be ignored if this is called after `sendEnd` or `sendError`. |
| 285 | +/// |
| 286 | +/// `sendEnd` indicates that the response stream has closed. It – or `sendError` - must be called |
| 287 | +/// once. The `status` defaults to a value with the `ok` code and `trailingMetadata` is empty by |
| 288 | +/// default. |
| 289 | +/// |
| 290 | +/// `sendError` may be called at any time to indicate an error on the response stream. |
| 291 | +/// Like `sendEnd`, `trailingMetadata` is empty by default. |
| 292 | +public class FakeStreamingResponse<Request: GRPCPayload, Response: GRPCPayload>: _FakeResponseStream<Request, Response> { |
| 293 | + public override init() { |
| 294 | + super.init() |
| 295 | + } |
| 296 | + |
| 297 | + /// Send initial metadata to the client. |
| 298 | + /// |
| 299 | + /// Note that calling this function is not required; empty initial metadata will be sent |
| 300 | + /// automatically if necessary. |
| 301 | + /// |
| 302 | + /// - Parameter metadata: The metadata to send |
| 303 | + /// - Throws: FakeResponseProtocolViolation if sending initial metadata would violate the gRPC |
| 304 | + /// protocol, e.g. sending metadata too many times, or out of order. |
| 305 | + public func sendInitialMetadata(_ metadata: HPACKHeaders) throws { |
| 306 | + try self._sendResponsePart(.initialMetadata(metadata)) |
| 307 | + } |
| 308 | + |
| 309 | + /// Send a response message to the client. |
| 310 | + /// |
| 311 | + /// - Parameter response: The response to send. |
| 312 | + /// - Throws: FakeResponseProtocolViolation if sending the message would violate the gRPC |
| 313 | + /// protocol, e.g. sending messages after the RPC has ended. |
| 314 | + public func sendMessage(_ response: Response) throws { |
| 315 | + try self._sendResponsePart(.message(.init(response, compressed: false))) |
| 316 | + } |
| 317 | + |
| 318 | + /// Send the RPC status and trailing metadata to the client. |
| 319 | + /// |
| 320 | + /// - Parameters: |
| 321 | + /// - status: The status to send. By default the status code will be '.ok'. |
| 322 | + /// - trailingMetadata: The trailing metadata to send. Empty by default. |
| 323 | + /// - Throws: FakeResponseProtocolViolation if ending the RPC would violate the gRPC |
| 324 | + /// protocol, e.g. sending end after the RPC has already completed. |
| 325 | + public func sendEnd(status: GRPCStatus = .ok, trailingMetadata: HPACKHeaders = [:]) throws { |
| 326 | + try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
| 327 | + try self._sendResponsePart(.status(status)) |
| 328 | + } |
| 329 | + |
| 330 | + /// Send an error to the client. |
| 331 | + /// |
| 332 | + /// - Parameters: |
| 333 | + /// - error: The error to send. |
| 334 | + /// - trailingMetadata: The trailing metadata to send. By default the metadata will be empty. |
| 335 | + /// - Throws: FakeResponseProtocolViolation if sending the error would violate the gRPC |
| 336 | + /// protocol, e.g. erroring after the RPC has already completed. |
| 337 | + public func sendError(_ error: Error, trailingMetadata: HPACKHeaders = [:]) throws { |
| 338 | + try self._sendResponsePart(.trailingMetadata(trailingMetadata)) |
| 339 | + try self._sendError(error) |
| 340 | + } |
| 341 | +} |
0 commit comments