-
Notifications
You must be signed in to change notification settings - Fork 51
/
BidirectionalStreamingBase.cs
142 lines (130 loc) · 7.26 KB
/
BidirectionalStreamingBase.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
* Copyright 2016 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/
using Grpc.Core;
using System;
using System.Threading.Tasks;
namespace Google.Api.Gax.Grpc
{
/// <summary>
/// Base class for bidirectional streaming RPC methods. This wraps an underlying call returned by gRPC,
/// in order to provide a wrapper for the async response stream, allowing users to take advantage
/// of <code>await foreach</code> support from C# 8 onwards. Additionally, it wraps the
/// request stream in a buffer, allowing multiple requests to be written without waiting for them
/// to be transmitted.
/// </summary>
/// <remarks>
/// To avoid memory leaks, users must dispose of gRPC streams.
/// Additionally, you are strongly advised to read the whole response stream, even if the data
/// is not required - this avoids effectively cancelling the call.
/// </remarks>
/// <typeparam name="TRequest">RPC request type</typeparam>
/// <typeparam name="TResponse">RPC response type</typeparam>
public abstract class BidirectionalStreamingBase<TRequest, TResponse> : IDisposable
{
/// <summary>
/// The underlying gRPC duplex streaming call.
/// Warning: DO NOT USE <c>GrpcCall.RequestStream</c> at all if using
/// <see cref="TryWriteAsync(TRequest)"/>, <see cref="WriteAsync(TRequest)"/>,
/// <see cref="TryWriteAsync(TRequest, WriteOptions)"/> , or <see cref="WriteAsync(TRequest, WriteOptions)"/>.
/// Doing so will cause conflict with the write-buffer used within the <c>[Try]WriteAsync</c> methods.
/// </summary>
public virtual AsyncDuplexStreamingCall<TRequest, TResponse> GrpcCall
{
get { throw new NotImplementedException(); }
}
// Streaming requests
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called. The same write options will be used as for the previous message.
/// </summary>
/// <param name="message">The message to write.</param>
/// <returns><c>null</c> if the message queue is full or the stream has already been completed;
/// otherwise, a <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public virtual Task TryWriteAsync(TRequest message)
{
throw new NotImplementedException();
}
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called. The same write options will be used as for the previous message.
/// </summary>
/// <param name="message">The message to write.</param>
/// <exception cref="InvalidOperationException">There isn't enough space left in the buffer,
/// or <see cref="WriteCompleteAsync"/> has already been called.</exception>
/// <returns>A <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public virtual Task WriteAsync(TRequest message)
{
throw new NotImplementedException();
}
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called.
/// </summary>
/// <param name="message">The message to write.</param>
/// <param name="options">The write options to use for this message.</param>
/// <returns><c>null</c> if the message queue is full or the stream has already been completed.</returns>
public virtual Task TryWriteAsync(TRequest message, WriteOptions options)
{
throw new NotImplementedException();
}
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called.
/// </summary>
/// <param name="message">The message to write.</param>
/// <param name="options">The write options to use for this message.</param>
/// <exception cref="InvalidOperationException">There isn't enough space left in the buffer,
/// or <see cref="WriteCompleteAsync"/> has already been called.</exception>
/// <returns>A <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public virtual Task WriteAsync(TRequest message, WriteOptions options)
{
throw new NotImplementedException();
}
/// <summary>
/// Completes the stream when all buffered messages have been sent.
/// Only the first call to this method on any instance will have any effect;
/// subsequent calls will return <c>null</c>.
/// </summary>
/// <returns>A <see cref="Task"/> which will complete when the stream has finished being completed;
/// or <c>null</c> if this method has already been called.</returns>
public virtual Task TryWriteCompleteAsync()
{
throw new NotImplementedException();
}
/// <summary>
/// Completes the stream when all buffered messages have been sent. This method can only be called
/// once, and further messages cannot be written after it has been called.
/// </summary>
/// <exception cref="InvalidOperationException">This method has already been called.</exception>
/// <returns>A <see cref="Task"/> which will complete when the stream has finished being completed.</returns>
public virtual Task WriteCompleteAsync()
{
throw new NotImplementedException();
}
// Streaming responses
/// <summary>
/// Async stream to read streaming responses, exposed as an async sequence.
/// The default implementation will use <see cref="GrpcCall"/> to extract a response
/// stream, and adapt it to <see cref="AsyncResponseStream{T}"/>.
/// </summary>
/// <remarks>
/// If this method is called more than once, all the returned enumerators will be enumerating over the
/// same underlying response stream, which may cause confusion. Additionally, the sequence returned by
/// this method can only be iterated over a single time. Attempting to iterate more than once will cause
/// an <see cref="InvalidOperationException"/>.
/// </remarks>
public virtual AsyncResponseStream<TResponse> GetResponseStream() =>
new AsyncResponseStream<TResponse>(GrpcCall.ResponseStream);
/// <summary>
/// Disposes of the underlying gRPC call. There is no need to dispose of both the wrapper
/// and the underlying call; it's typically simpler to dispose of the wrapper with a
/// <code>using</code> statement as the wrapper is returned by client libraries.
/// </summary>
/// <remarks>The default implementation just calls Dispose on the result of <see cref="GrpcCall"/>.</remarks>
public virtual void Dispose() => GrpcCall.Dispose();
}
}