forked from mysql-net/MySqlConnector
/
ConnectionPool.cs
307 lines (268 loc) · 8.57 KB
/
ConnectionPool.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using Xunit;
namespace SideBySide
{
public class ConnectionPool : IClassFixture<DatabaseFixture>
{
[Theory]
[InlineData(false, 11, 0L)]
[InlineData(true, 12, 1L)]
#if BASELINE
// baseline default behaviour is to not reset the connection, which trades correctness for speed
// see bug report at http://bugs.mysql.com/bug.php?id=77421
[InlineData(null, 13, 0L)]
#else
[InlineData(null, 13, 1L)]
#endif
public void ResetConnection(object connectionReset, int poolSize, long expected)
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MaximumPoolSize = (uint) poolSize; // use a different pool size to create a unique connection string to force a unique pool to be created
if (connectionReset != null)
csb.ConnectionReset = (bool) connectionReset;
using (var connection = new MySqlConnection(csb.ConnectionString))
{
connection.Open();
using (var command = connection.CreateCommand())
{
command.CommandText = "select @@autocommit;";
Assert.Equal(1L, command.ExecuteScalar());
}
}
using (var connection = new MySqlConnection(csb.ConnectionString))
{
connection.Open();
using (var command = connection.CreateCommand())
{
command.CommandText = "SET autocommit=0;";
command.ExecuteNonQuery();
}
}
using (var connection = new MySqlConnection(csb.ConnectionString))
{
connection.Open();
using (var command = connection.CreateCommand())
{
command.CommandText = "select @@autocommit;";
Assert.Equal(expected, command.ExecuteScalar());
}
}
}
[Fact]
public async Task ExhaustConnectionPool()
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 3;
csb.ConnectionTimeout = 60;
var connections = new List<MySqlConnection>();
for (int i = 0; i < csb.MaximumPoolSize; i++)
{
var connection = new MySqlConnection(csb.ConnectionString);
await connection.OpenAsync().ConfigureAwait(false);
connections.Add(connection);
}
var closeTask = Task.Run(() =>
{
Thread.Sleep(5000);
connections[0].Dispose();
connections.RemoveAt(0);
});
using (var extraConnection = new MySqlConnection(csb.ConnectionString))
{
var stopwatch = Stopwatch.StartNew();
await extraConnection.OpenAsync().ConfigureAwait(false);
stopwatch.Stop();
Assert.InRange(stopwatch.ElapsedMilliseconds, 4500, 7500);
}
closeTask.Wait();
foreach (var connection in connections)
connection.Dispose();
}
[Fact]
public async Task ExhaustConnectionPoolWithTimeout()
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 3;
csb.ConnectionTimeout = 5;
var connections = new List<MySqlConnection>();
for (int i = 0; i < csb.MaximumPoolSize; i++)
{
var connection = new MySqlConnection(csb.ConnectionString);
await connection.OpenAsync().ConfigureAwait(false);
connections.Add(connection);
}
using (var extraConnection = new MySqlConnection(csb.ConnectionString))
{
var stopwatch = Stopwatch.StartNew();
Assert.Throws<MySqlException>(() => extraConnection.Open());
stopwatch.Stop();
Assert.InRange(stopwatch.ElapsedMilliseconds, 4500, 5500);
}
foreach (var connection in connections)
connection.Dispose();
}
[Fact]
public async Task WaitTimeout()
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 1;
int serverThread;
using (var connection = new MySqlConnection(csb.ConnectionString))
{
await connection.OpenAsync();
using (var cmd = connection.CreateCommand())
{
cmd.CommandText = "SET @@session.wait_timeout=3";
await cmd.ExecuteNonQueryAsync();
}
serverThread = connection.ServerThread;
}
await Task.Delay(TimeSpan.FromSeconds(5));
using (var connection = new MySqlConnection(csb.ConnectionString))
{
await connection.OpenAsync();
Assert.NotEqual(serverThread, connection.ServerThread);
}
}
[Theory]
[InlineData(2u, 3u, true)]
[InlineData(180u, 3u, false)]
public async Task ConnectionLifeTime(uint lifeTime, uint delaySeconds, bool shouldTimeout)
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 1;
csb.ConnectionLifeTime = lifeTime;
int serverThread;
using (var connection = new MySqlConnection(csb.ConnectionString))
{
await connection.OpenAsync();
serverThread = connection.ServerThread;
await Task.Delay(TimeSpan.FromSeconds(delaySeconds));
}
using (var connection = new MySqlConnection(csb.ConnectionString))
{
await connection.OpenAsync();
if (shouldTimeout)
Assert.NotEqual(serverThread, connection.ServerThread);
else
Assert.Equal(serverThread, connection.ServerThread);
}
}
[Fact]
public async Task CharacterSet()
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 21; // use a uniqe pool size to create a unique connection string to force a unique pool to be created
#if BASELINE
csb.CharacterSet = "utf8mb4";
#endif
// verify that connection charset is the same when retrieving a connection from the pool
await CheckCharacterSetAsync(csb.ConnectionString).ConfigureAwait(false);
await CheckCharacterSetAsync(csb.ConnectionString).ConfigureAwait(false);
await CheckCharacterSetAsync(csb.ConnectionString).ConfigureAwait(false);
}
private async Task CheckCharacterSetAsync(string connectionString)
{
using (var connection = new MySqlConnection(connectionString))
{
await connection.OpenAsync().ConfigureAwait(false);
using (var cmd = connection.CreateCommand())
{
cmd.CommandText = @"select @@character_set_client, @@character_set_connection";
using (var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false))
{
Assert.True(await reader.ReadAsync().ConfigureAwait(false));
Assert.Equal("utf8mb4", reader.GetString(0));
Assert.Equal("utf8mb4", reader.GetString(1));
Assert.False(await reader.ReadAsync().ConfigureAwait(false));
}
}
}
}
[Fact]
public async Task ClearConnectionPool()
{
var csb = AppConfig.CreateConnectionStringBuilder();
csb.Pooling = true;
csb.MinimumPoolSize = 0;
csb.MaximumPoolSize = 3;
var connections = new List<MySqlConnection>();
for (int i = 0; i < csb.MaximumPoolSize; i++)
{
var connection = new MySqlConnection(csb.ConnectionString);
connections.Add(connection);
}
Func<HashSet<long>> getConnectionIds = () =>
{
var cids = GetConnectionIds(connections);
Assert.Equal(connections.Count, cids.Count);
return cids;
};
Func<Task> openConnections = async () =>
{
foreach (var connection in connections)
{
await connection.OpenAsync();
}
};
Action closeConnections = () =>
{
foreach (var connection in connections)
{
connection.Close();
}
};
// connections should all be disposed when returned to pool
await openConnections();
var connectionIds = getConnectionIds();
await ClearPoolAsync(connections[0]);
closeConnections();
await openConnections();
var connectionIds2 = getConnectionIds();
Assert.Empty(connectionIds.Intersect(connectionIds2));
closeConnections();
// connections should all be disposed in ClearPoolAsync
await ClearPoolAsync(connections[0]);
await openConnections();
var connectionIds3 = getConnectionIds();
Assert.Empty(connectionIds2.Intersect(connectionIds3));
closeConnections();
// some connections may be disposed in ClearPoolAsync, others in OpenAsync
var clearTask = ClearPoolAsync(connections[0]);
await openConnections();
var connectionIds4 = GetConnectionIds(connections);
Assert.Empty(connectionIds3.Intersect(connectionIds4));
await clearTask;
closeConnections();
foreach (var connection in connections)
connection.Dispose();
}
private Task ClearPoolAsync(MySqlConnection connection)
{
#if BASELINE
return connection.ClearPoolAsync(connection);
#else
return MySqlConnection.ClearPoolAsync(connection);
#endif
}
private static HashSet<long> GetConnectionIds(IEnumerable<MySqlConnection> connections)
=> new HashSet<long>(connections.Select(x => (long) x.ServerThread));
}
}