-
Notifications
You must be signed in to change notification settings - Fork 51
/
GcpCallInvoker.cs
456 lines (411 loc) · 20.7 KB
/
GcpCallInvoker.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
/*
* Copyright 2018 Google LLC
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/
using Google.Protobuf;
using Google.Protobuf.Collections;
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Google.Api.Gax.Grpc.Gcp
{
/// <summary>
/// Call invoker which can fan calls out to multiple underlying channels
/// based on request properties.
/// </summary>
public sealed class GcpCallInvoker : CallInvoker
{
private static int s_clientChannelIdCounter;
private const string ClientChannelId = "grpc_gcp.client_channel.id";
// Lock to protect the channel reference collections, as they're not thread-safe.
private readonly object _thisLock = new object();
private readonly IDictionary<string, ChannelRef> _channelRefByAffinityKey = new Dictionary<string, ChannelRef>();
private readonly IList<ChannelRef> _channelRefs = new List<ChannelRef>();
// Access to these fields does not need to be protected by the lock: the objects are never modified.
private readonly string _target;
private readonly ApiConfig _apiConfig;
private readonly IDictionary<string, AffinityConfig> _affinityByMethod;
private readonly ChannelCredentials _credentials;
private readonly GrpcChannelOptions _channelOptions;
private readonly GrpcAdapter _adapter;
private readonly ServiceMetadata _serviceMetadata;
/// <summary>
/// Initializes a new instance.
/// </summary>
/// <param name="serviceMetadata">The metadata for the service that this call invoker will be used with. Must not be null.</param>
/// <param name="target">Target of the underlying grpc channels. Must not be null.</param>
/// <param name="credentials">Credentials to secure the underlying grpc channels. Must not be null.</param>
/// <param name="options">Channel options to be used by the underlying grpc channels. Must not be null.</param>
/// <param name="apiConfig">The API config to apply. Must not be null.</param>
/// <param name="adapter">The adapter to use to create channels. Must not be null.</param>
public GcpCallInvoker(ServiceMetadata serviceMetadata, string target, ChannelCredentials credentials, GrpcChannelOptions options, ApiConfig apiConfig, GrpcAdapter adapter)
{
_serviceMetadata = GaxPreconditions.CheckNotNull(serviceMetadata, nameof(serviceMetadata));
_target = GaxPreconditions.CheckNotNull(target, nameof(target));
_credentials = GaxPreconditions.CheckNotNull(credentials, nameof(credentials));
_channelOptions = GaxPreconditions.CheckNotNull(options, nameof(options));
_apiConfig = GaxPreconditions.CheckNotNull(apiConfig, nameof(apiConfig)).Clone();
_adapter = GaxPreconditions.CheckNotNull(adapter, nameof(adapter));
GaxPreconditions.CheckArgument(this._apiConfig.ChannelPool is object, nameof(apiConfig), "Invalid API config: no channel pool settings");
_affinityByMethod = InitAffinityByMethodIndex(this._apiConfig);
}
private static IDictionary<string, AffinityConfig> InitAffinityByMethodIndex(ApiConfig config)
{
IDictionary<string, AffinityConfig> index = new Dictionary<string, AffinityConfig>();
foreach (MethodConfig method in config.Method)
{
// TODO(fengli): supports wildcard in method selector.
foreach (string name in method.Name)
{
index.Add(name, method.Affinity);
}
}
return index;
}
private ChannelRef GetChannelRef(string affinityKey = null)
{
// TODO(fengli): Supports load reporting.
lock (_thisLock)
{
if (!string.IsNullOrEmpty(affinityKey))
{
// Finds the gRPC channel according to the affinity key.
if (_channelRefByAffinityKey.TryGetValue(affinityKey, out ChannelRef channelRef))
{
return channelRef;
}
// TODO(fengli): Affinity key not found, log an error.
}
// TODO(fengli): Creates new gRPC channels on demand, depends on the load reporting.
IOrderedEnumerable<ChannelRef> orderedChannelRefs =
_channelRefs.OrderBy(channelRef => channelRef.ActiveStreamCount);
foreach (ChannelRef channelRef in orderedChannelRefs)
{
if (channelRef.ActiveStreamCount < _apiConfig.ChannelPool.MaxConcurrentStreamsLowWatermark)
{
// If there's a free channel, use it.
return channelRef;
}
else
{
// If all channels are busy, break.
break;
}
}
int count = _channelRefs.Count;
if (count < _apiConfig.ChannelPool.MaxSize)
{
// Creates a new gRPC channel.
// TODO: Logging?
// GrpcEnvironment.Logger.Info("Grpc.Gcp creating new channel");
ChannelBase channel = _adapter.CreateChannel(_serviceMetadata, _target, _credentials, _channelOptions.WithCustomOption(ClientChannelId, Interlocked.Increment(ref s_clientChannelIdCounter)));
ChannelRef channelRef = new ChannelRef(channel, count);
_channelRefs.Add(channelRef);
return channelRef;
}
// If all channels are overloaded and the channel pool is full already,
// return the channel with least active streams.
return orderedChannelRefs.First();
}
}
private List<string> GetAffinityKeysFromProto(string affinityKey, IMessage message)
{
List<string> affinityKeyValues = new List<string>();
if (!string.IsNullOrEmpty(affinityKey))
{
string[] names = affinityKey.Split('.');
GetAffinityKeysFromProto(names, 0, message, affinityKeyValues);
}
return affinityKeyValues;
}
private void GetAffinityKeysFromProto(string[] names, int namesIndex, IMessage message, List<string> affinityKeyValues)
{
if (namesIndex >= names.Length)
{
throw new InvalidOperationException($"Affinity key {string.Join(".", names)} missing field name for message {message.Descriptor.Name}.");
}
string name = names[namesIndex];
var field = message.Descriptor.FindFieldByName(name);
if (field == null)
{
throw new InvalidOperationException($"Field {name} not present in message {message.Descriptor.Name}");
}
var accessor = field.Accessor;
if (accessor == null)
{
throw new InvalidOperationException($"Field {name} in message {message.Descriptor.Name} has no accessor");
}
int lastIndex = names.Length - 1;
switch (accessor.GetValue(message))
{
case string text when namesIndex < lastIndex:
case RepeatedField<string> texts when namesIndex < lastIndex:
throw new InvalidOperationException($"Field {name} in message {message.Descriptor.Name} is neither a message or repeated message field.");
case string text:
affinityKeyValues.Add(text);
break;
case RepeatedField<string> texts:
affinityKeyValues.AddRange(texts);
break;
case IMessage nestedMessage:
GetAffinityKeysFromProto(names, namesIndex + 1, nestedMessage, affinityKeyValues);
break;
// We can't use RepeatedField<IMessage> because RepeatedField<T> is not
// covariant on T. But IEnumerable<T> is covariant on T.
// We can safely assume that any IEnumerable<IMessage> is really
// a RepeatedField<T> where T is IMessage.
case IEnumerable<IMessage> nestedMessages:
foreach (IMessage nestedMessage in nestedMessages)
{
GetAffinityKeysFromProto(names, namesIndex + 1, nestedMessage, affinityKeyValues);
}
break;
case null:
// Probably a nested message, but with no value. Just don't use an affinity key.
break;
default:
throw new InvalidOperationException($"Field {name} in message {message.Descriptor.Name} is neither a string or repeated string field nor another message or repeated message field.");
}
}
private void Bind(ChannelRef channelRef, string affinityKey)
{
if (!string.IsNullOrEmpty(affinityKey))
{
lock (_thisLock)
{
// TODO: What should we do if the dictionary already contains this key, but for a different channel ref?
if (!_channelRefByAffinityKey.Keys.Contains(affinityKey))
{
_channelRefByAffinityKey.Add(affinityKey, channelRef);
}
_channelRefByAffinityKey[affinityKey].AffinityCountIncr();
}
}
}
private void Unbind(string affinityKey)
{
if (!string.IsNullOrEmpty(affinityKey))
{
lock (_thisLock)
{
if (_channelRefByAffinityKey.TryGetValue(affinityKey, out ChannelRef channelRef))
{
int newCount = channelRef.AffinityCountDecr();
// We would expect it to be exactly 0, but it doesn't hurt to be cautious.
if (newCount <= 0)
{
_channelRefByAffinityKey.Remove(affinityKey);
}
}
}
}
}
private ChannelRef PreProcess<TRequest>(AffinityConfig affinityConfig, TRequest request)
{
// Gets the affinity bound key if required in the request method.
string boundKey = null;
if (affinityConfig != null && affinityConfig.Command == AffinityConfig.Types.Command.Bound)
{
boundKey = GetAffinityKeysFromProto(affinityConfig.AffinityKey, (IMessage)request).SingleOrDefault();
}
ChannelRef channelRef = GetChannelRef(boundKey);
channelRef.ActiveStreamCountIncr();
return channelRef;
}
// Note: response may be default(TResponse) in the case of a failure. We only expect to be called from
// protobuf-based calls anyway, so it will always be a class type, and will never be null for success cases.
// We can therefore check for nullity rather than having a separate "success" parameter.
private void PostProcess<TRequest, TResponse>(AffinityConfig affinityConfig, ChannelRef channelRef, TRequest request, TResponse response)
{
channelRef.ActiveStreamCountDecr();
// Process BIND or UNBIND if the method has affinity feature enabled, but only for successful calls.
if (affinityConfig != null && response != null)
{
if (affinityConfig.Command == AffinityConfig.Types.Command.Bind)
{
foreach (string bindingKey in GetAffinityKeysFromProto(affinityConfig.AffinityKey, (IMessage)response))
{
Bind(channelRef, bindingKey);
}
}
else if (affinityConfig.Command == AffinityConfig.Types.Command.Unbind)
{
foreach (string unbindingKey in GetAffinityKeysFromProto(affinityConfig.AffinityKey, (IMessage)request))
{
Unbind(unbindingKey);
}
}
}
}
/// <summary>
/// Invokes a client streaming call asynchronously.
/// In client streaming scenario, client sends a stream of requests and server responds with a single response.
/// </summary>
public override AsyncClientStreamingCall<TRequest, TResponse>
AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
{
// No channel affinity feature for client streaming call.
ChannelRef channelRef = GetChannelRef();
var originalCall = channelRef.CallInvoker.AsyncClientStreamingCall(method, host, options);
// Decrease the active streams count once async response finishes.
var gcpResponseAsync = DecrementCountAndPropagateResult(originalCall.ResponseAsync);
// Create a wrapper of the original AsyncClientStreamingCall.
return new AsyncClientStreamingCall<TRequest, TResponse>(
originalCall.RequestStream,
gcpResponseAsync,
originalCall.ResponseHeadersAsync,
() => originalCall.GetStatus(),
() => originalCall.GetTrailers(),
() => originalCall.Dispose());
async Task<TResponse> DecrementCountAndPropagateResult(Task<TResponse> task)
{
try
{
return await task.ConfigureAwait(false);
}
finally
{
channelRef.ActiveStreamCountDecr();
}
}
}
/// <summary>
/// Invokes a duplex streaming call asynchronously.
/// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses.
/// The response stream is completely independent and both side can be sending messages at the same time.
/// </summary>
public override AsyncDuplexStreamingCall<TRequest, TResponse>
AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
{
// No channel affinity feature for duplex streaming call.
ChannelRef channelRef = GetChannelRef();
var originalCall = channelRef.CallInvoker.AsyncDuplexStreamingCall(method, host, options);
// Decrease the active streams count once the streaming response finishes its final batch.
var gcpResponseStream = new GcpClientResponseStream<TRequest, TResponse>(
originalCall.ResponseStream,
(resp) => channelRef.ActiveStreamCountDecr());
// Create a wrapper of the original AsyncDuplexStreamingCall.
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
originalCall.RequestStream,
gcpResponseStream,
originalCall.ResponseHeadersAsync,
() => originalCall.GetStatus(),
() => originalCall.GetTrailers(),
() => originalCall.Dispose());
}
/// <summary>
/// Invokes a server streaming call asynchronously.
/// In server streaming scenario, client sends on request and server responds with a stream of responses.
/// </summary>
public override AsyncServerStreamingCall<TResponse>
AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
_affinityByMethod.TryGetValue(method.FullName, out AffinityConfig affinityConfig);
ChannelRef channelRef = PreProcess(affinityConfig, request);
var originalCall = channelRef.CallInvoker.AsyncServerStreamingCall(method, host, options, request);
// Executes affinity postprocess once the streaming response finishes its final batch.
var gcpResponseStream = new GcpClientResponseStream<TRequest, TResponse>(
originalCall.ResponseStream,
(resp) => PostProcess(affinityConfig, channelRef, request, resp));
// Create a wrapper of the original AsyncServerStreamingCall.
return new AsyncServerStreamingCall<TResponse>(
gcpResponseStream,
originalCall.ResponseHeadersAsync,
() => originalCall.GetStatus(),
() => originalCall.GetTrailers(),
() => originalCall.Dispose());
}
/// <summary>
/// Invokes a simple remote call asynchronously.
/// </summary>
public override AsyncUnaryCall<TResponse>
AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
_affinityByMethod.TryGetValue(method.FullName, out AffinityConfig affinityConfig);
ChannelRef channelRef = PreProcess(affinityConfig, request);
var originalCall = channelRef.CallInvoker.AsyncUnaryCall<TRequest, TResponse>(method, host, options, request);
// Executes affinity postprocess once the async response finishes.
var gcpResponseAsync = PostProcessPropagateResult(originalCall.ResponseAsync);
// Create a wrapper of the original AsyncUnaryCall.
return new AsyncUnaryCall<TResponse>(
gcpResponseAsync,
originalCall.ResponseHeadersAsync,
() => originalCall.GetStatus(),
() => originalCall.GetTrailers(),
() => originalCall.Dispose());
async Task<TResponse> PostProcessPropagateResult(Task<TResponse> task)
{
TResponse response = default(TResponse);
try
{
response = await task.ConfigureAwait(false);
return response;
}
finally
{
PostProcess(affinityConfig, channelRef, request, response);
}
}
}
/// <summary>
/// Invokes a simple remote call in a blocking fashion.
/// </summary>
public override TResponse
BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
_affinityByMethod.TryGetValue(method.FullName, out AffinityConfig affinityConfig);
ChannelRef channelRef = PreProcess(affinityConfig, request);
TResponse response = default(TResponse);
try
{
response = channelRef.CallInvoker.BlockingUnaryCall(method, host, options, request);
return response;
}
finally
{
PostProcess(affinityConfig, channelRef, request, response);
}
}
/// <summary>
/// Shuts down the all channels in the underlying channel pool cleanly. It is strongly
/// recommended to shutdown all previously created channels before exiting from the process.
/// </summary>
public async Task ShutdownAsync()
{
for (int i = 0; i < _channelRefs.Count; i++)
{
await _channelRefs[i].Channel.ShutdownAsync().ConfigureAwait(false);
}
}
// Test helper methods
/// <summary>
/// Returns a deep clone of the internal list of channel references.
/// This method should only be used in tests.
/// </summary>
internal IList<ChannelRef> GetChannelRefsForTest()
{
lock (_thisLock)
{
// Create an independent copy
return _channelRefs.Select(cr => cr.Clone()).ToList();
}
}
/// <summary>
/// Returns a deep clone of the internal dictionary of channel references by affinity key.
/// This method should only be used in tests.
/// </summary>
internal IDictionary<string, ChannelRef> GetChannelRefsByAffinityKeyForTest()
{
lock (_thisLock)
{
// Create an independent copy
return _channelRefByAffinityKey.ToDictionary(pair => pair.Key, pair => pair.Value.Clone());
}
}
}
}