-
Notifications
You must be signed in to change notification settings - Fork 360
/
SessionPoolManager.cs
320 lines (282 loc) · 14.5 KB
/
SessionPoolManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using Google.Api.Gax;
using Google.Cloud.Spanner.Common.V1;
using Google.Cloud.Spanner.V1;
using Google.Cloud.Spanner.V1.Internal.Logging;
using Grpc.Core;
using Grpc.Gcp;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static Grpc.Gcp.AffinityConfig.Types;
namespace Google.Cloud.Spanner.Data
{
/// <summary>
/// Manages sessions used by <see cref="SpannerConnection"/>. This is rarely used
/// directly by user code; it's public to provide flexibility when custom options are
/// required.
/// </summary>
public sealed class SessionPoolManager
{
/// <summary>
/// Static constructor to ensure that the static initializers aren't run before the first explicit
/// reference to the class. This in turn ensures that a call to <see cref="Logger.SetDefaultLogger(Logger)" />
/// before any other code will result in the new logger being picked up by <see cref="SessionPoolManager.Default"/>.
/// </summary>
static SessionPoolManager()
{
}
/// <summary>
/// The default session pool manager, used by <see cref="SpannerConnection"/> unless a different pool
/// is specified on construction.
/// </summary>
public static SessionPoolManager Default { get; } =
new SessionPoolManager(new SessionPoolOptions(), Logger.DefaultLogger, CreateClientAsync);
private readonly Func<SpannerClientCreationOptions, SpannerSettings, Logger, Task<SpannerClient>> _clientFactory;
private readonly ConcurrentDictionary<SpannerClientCreationOptions, TargetedPool> _targetedPools =
new ConcurrentDictionary<SpannerClientCreationOptions, TargetedPool>();
private readonly ConcurrentDictionary<SessionPool, TargetedPool> _poolReverseLookup =
new ConcurrentDictionary<SessionPool, TargetedPool>();
/// <summary>
/// The session pool options used for every <see cref="SessionPool"/> created by this session pool manager.
/// </summary>
public SessionPoolOptions SessionPoolOptions { get; }
/// <summary>
/// The logger used by this SessionPoolManager and the session pools it creates.
/// </summary>
internal Logger Logger { get; }
/// <summary>
/// The SpannerSettings used by this SessionPoolManager. These are expected to remain unaltered for the lifetime of the manager.
/// Currently the default settings are used in all cases, but with the "gccl" version header added to specify the version of Google.Cloud.Spanner.Data
/// being used.
/// </summary>
internal SpannerSettings SpannerSettings { get; } = CreateSpannerSettingsWithVersionHeader();
private static SpannerSettings CreateSpannerSettingsWithVersionHeader()
{
var settings = new SpannerSettings();
settings.VersionHeaderBuilder.AppendAssemblyVersion("gccl", typeof(SessionPoolManager));
return settings;
}
/// <summary>
/// Constructor for test purposes, allowing the SpannerClient creation to be customized (e.g. for
/// fake clients).
/// </summary>
/// <param name="options">The session pool options to use. Must not be null.</param>
/// <param name="logger">The logger to use. Must not be null.</param>
/// <param name="clientFactory">The client factory delegate to use. Must not be null.</param>
internal SessionPoolManager(
SessionPoolOptions options,
Logger logger,
Func<SpannerClientCreationOptions, SpannerSettings, Logger, Task<SpannerClient>> clientFactory)
{
SessionPoolOptions = GaxPreconditions.CheckNotNull(options, nameof(options));
Logger = GaxPreconditions.CheckNotNull(logger, nameof(logger));
_clientFactory = GaxPreconditions.CheckNotNull(clientFactory, nameof(clientFactory));
}
/// <summary>
/// Creates a <see cref="SessionPoolManager"/> with the specified options.
/// </summary>
/// <param name="options">The options to use. Must not be null.</param>
/// <param name="logger">The logger to use. May be null, in which case the default logger is used.</param>
/// <returns>A <see cref="SessionPoolManager"/> with the given options.</returns>
public static SessionPoolManager Create(SessionPoolOptions options, Logger logger = null) =>
new SessionPoolManager(options, logger ?? Logger.DefaultLogger, CreateClientAsync);
internal Task<SessionPool> AcquireSessionPoolAsync(SpannerClientCreationOptions options)
{
GaxPreconditions.CheckNotNull(options, nameof(options));
var targetedPool = _targetedPools.GetOrAdd(options, key => new TargetedPool(this, key));
targetedPool.IncrementConnectionCount();
return targetedPool.SessionPoolTask;
}
/// <summary>
/// Decrements the connection count associated with a client session pool.
/// </summary>
internal void Release(SessionPool sessionPool)
{
if (_poolReverseLookup.TryGetValue(sessionPool, out var targetedPool))
{
targetedPool.DecrementConnectionCount();
}
else
{
Logger.Warn("Attempt to release a session pool to the wrong session pool manager");
}
}
/// <summary>
/// Provides a diagnostic summary of this session pool manager.
/// </summary>
/// <returns></returns>
internal string ToDiagnosticSummary() => string.Join(Environment.NewLine, GetStatistics());
internal IReadOnlyList<Statistics> GetStatistics() =>
_targetedPools.ToArray().Select(tp => tp.Value.GetStatisticsSnapshot()).ToList().AsReadOnly();
internal SessionPool.DatabaseStatistics GetDatabaseStatistics(SpannerClientCreationOptions options, DatabaseName databaseName)
{
GaxPreconditions.CheckNotNull(options, nameof(options));
GaxPreconditions.CheckNotNull(databaseName, nameof(databaseName));
_targetedPools.TryGetValue(options, out var targetedPool);
return targetedPool?.SessionPoolOrNull?.GetStatisticsSnapshot(databaseName);
}
// TODO: We *may* want a method to get the session pool statistics for a specific database,
// e.g. GetStatistics(SessionPoolOptions, DatabaseName) but we don't currently have a need for it.
// It would only be for convenience.
/// <summary>
/// A session pool manager that works with a specific target server and credentials.
/// This class is mostly an implementation detail. It allows the asynchrony of credential aquisition to be handled
/// cleanly, and implements mostly-diagnostic reference counting.
/// </summary>
internal class TargetedPool
{
private readonly string _diagnosticName;
private int _activeConnections;
internal Task<SessionPool> SessionPoolTask { get; }
internal TargetedPool(SessionPoolManager parent, SpannerClientCreationOptions channelOptions)
{
_diagnosticName = channelOptions.ToString();
SessionPoolTask = CreateSessionPoolAsync();
async Task<SessionPool> CreateSessionPoolAsync()
{
var client = await parent._clientFactory.Invoke(channelOptions, parent.SpannerSettings, parent.Logger).ConfigureAwait(false);
var pool = new SessionPool(client, parent.SessionPoolOptions);
parent._poolReverseLookup.TryAdd(pool, this);
return pool;
}
}
internal void IncrementConnectionCount() => Interlocked.Increment(ref _activeConnections);
internal void DecrementConnectionCount() => Interlocked.Decrement(ref _activeConnections);
internal int ActiveConnections => Interlocked.CompareExchange(ref _activeConnections, 0, 0);
internal SessionPool SessionPoolOrNull => SessionPoolTask.Status == TaskStatus.RanToCompletion ? SessionPoolTask.Result : null;
/// <summary>
/// Returns a snapshot of statistics for all session pools managed by this instance.
/// </summary>
internal Statistics GetStatisticsSnapshot()
{
SessionPool.Statistics sessionPoolStatistics = SessionPoolOrNull?.GetStatisticsSnapshot();
return new Statistics(_diagnosticName, ActiveConnections, sessionPoolStatistics);
}
}
internal sealed class Statistics
{
internal string DiagnosticName { get; }
internal int ActiveConnectionCount { get; }
/// <summary>
/// Session pool statistics, or null if the session pool has not yet been created.
/// </summary>
internal SessionPool.Statistics SessionPoolStatistics { get; }
internal Statistics(string diagnosticName, int activeConnectionCount, SessionPool.Statistics sessionPoolStatistics)
{
DiagnosticName = diagnosticName;
ActiveConnectionCount = activeConnectionCount;
SessionPoolStatistics = sessionPoolStatistics;
}
public override string ToString()
{
var builder = new StringBuilder();
builder.AppendLine($"Pool for {DiagnosticName}: {ActiveConnectionCount} active SpannerConnections");
if (SessionPoolStatistics == null)
{
builder.AppendLine(" (SessionPool not created yet.)");
}
else
{
foreach (var databaseStatistics in SessionPoolStatistics.PerDatabaseStatistics)
{
builder.AppendLine($" {databaseStatistics}");
}
}
return builder.ToString();
}
}
/// <summary>
/// The Grpc.Gcp method configurations for pool options. These are here rather than at the top of the file
/// as they're only used in CreateClientAsync.
/// </summary>
private static readonly MethodConfig[] s_methodConfigs = new[]
{
// Note: Can't use nameof for affinity keys, as we need the original proto field name.
// Creating a session isn't bound to a channel, but binds the resulting session to that channel
new MethodConfig
{
Name = { "/google.spanner.v1.Spanner/CreateSession" },
Affinity = new AffinityConfig { AffinityKey = "name", Command = Command.Bind }
},
// Batch creating sessions isn't bound to a channel, but binds the resulting sessions to that channel
new MethodConfig
{
Name = { "/google.spanner.v1.Spanner/BatchCreateSessions" },
Affinity = new AffinityConfig { AffinityKey = "session.name", Command = Command.Bind }
},
// Most methods are bound by the session within the request
new MethodConfig
{
// We don't currently use this, but include it for completeness...
Name = { "/google.spanner.v1.Spanner/GetSession" },
Affinity = new AffinityConfig { AffinityKey = "name", Command = Command.Bound }
},
new MethodConfig
{
Name =
{
"/google.spanner.v1.Spanner/ExecuteSql",
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
"/google.spanner.v1.Spanner/Read",
"/google.spanner.v1.Spanner/StreamingRead",
"/google.spanner.v1.Spanner/BeginTransaction",
"/google.spanner.v1.Spanner/Commit",
"/google.spanner.v1.Spanner/Rollback",
"/google.spanner.v1.Spanner/PartitionQuery",
"/google.spanner.v1.Spanner/PartitionRead",
},
Affinity = new AffinityConfig { AffinityKey = "session", Command = Command.Bound }
},
// DeleteSession is bound by the session within the request, and removes the key afterwards
new MethodConfig
{
Name = { "/google.spanner.v1.Spanner/DeleteSession" },
Affinity = new AffinityConfig { AffinityKey = "name", Command = Command.Unbind }
}
};
/// <inheritdoc />
private static async Task<SpannerClient> CreateClientAsync(SpannerClientCreationOptions channelOptions, SpannerSettings spannerSettings, Logger logger)
{
var credentials = await channelOptions.GetCredentialsAsync().ConfigureAwait(false);
var apiConfig = new ApiConfig
{
ChannelPool = new ChannelPoolConfig
{
MaxSize = (uint)channelOptions.MaximumGrpcChannels,
MaxConcurrentStreamsLowWatermark = channelOptions.MaximumConcurrentStreamsLowWatermark
},
Method = { s_methodConfigs }
};
var grpcOptions = new List<ChannelOption>
{
// Keep the channel alive for streaming requests.
new ChannelOption("grpc.keepalive_time_ms", 60_000),
new ChannelOption(GcpCallInvoker.ApiConfigChannelArg, apiConfig.ToString())
};
var endpoint = channelOptions.Endpoint;
var callInvoker = new GcpCallInvoker(channelOptions.Endpoint, credentials, grpcOptions);
return new SpannerClientBuilder
{
CallInvoker = callInvoker,
Settings = spannerSettings
}.Build();
}
}
}