-
-
Notifications
You must be signed in to change notification settings - Fork 84
/
MongoDistributedLock.cs
277 lines (239 loc) · 10.6 KB
/
MongoDistributedLock.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
using System;
using System.Collections.Generic;
using System.Threading;
using Hangfire.Logging;
using Hangfire.Mongo.Database;
using Hangfire.Mongo.Dto;
using Hangfire.Storage;
using MongoDB.Driver;
namespace Hangfire.Mongo.DistributedLock
{
/// <summary>
/// Represents distibuted lock implementation for MongoDB
/// </summary>
internal sealed class MongoDistributedLock : IDisposable
{
// EventWaitHandle is not supported on UNIX systems
// https://github.com/dotnet/coreclr/pull/1387
// Instead of using a compiler directive, we catch the
// exception and handles it. This way, when EventWaitHandle
// becomes available on UNIX, we will start working.
private static bool _isEventWaitHandleSupported = true;
private static readonly ILog Logger = LogProvider.For<MongoDistributedLock>();
private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks
= new ThreadLocal<Dictionary<string, int>>(() => new Dictionary<string, int>());
private readonly string _resource;
private readonly HangfireDbContext _database;
private readonly MongoStorageOptions _storageOptions;
private Timer _heartbeatTimer;
private bool _completed;
private readonly object _lockObject = new object();
private string EventWaitHandleName => $@"{GetType().FullName}.{_resource}";
/// <summary>
/// Creates MongoDB distributed lock
/// </summary>
/// <param name="resource">Lock resource</param>
/// <param name="timeout">Lock timeout</param>
/// <param name="database">Lock database</param>
/// <param name="storageOptions">Database options</param>
/// <exception cref="DistributedLockTimeoutException">Thrown if lock is not acuired within the timeout</exception>
/// <exception cref="MongoDistributedLockException">Thrown if other mongo specific issue prevented the lock to be acquired</exception>
public MongoDistributedLock(string resource, TimeSpan timeout, HangfireDbContext database, MongoStorageOptions storageOptions)
{
_resource = resource ?? throw new ArgumentNullException(nameof(resource));
_database = database ?? throw new ArgumentNullException(nameof(database));
_storageOptions = storageOptions ?? throw new ArgumentNullException(nameof(storageOptions));
if (string.IsNullOrEmpty(resource))
{
throw new ArgumentException($@"The {nameof(resource)} cannot be empty", nameof(resource));
}
if (timeout.TotalSeconds > int.MaxValue)
{
throw new ArgumentException($"The timeout specified is too large. Please supply a timeout equal to or less than {int.MaxValue} seconds", nameof(timeout));
}
if (!AcquiredLocks.Value.ContainsKey(_resource) || AcquiredLocks.Value[_resource] == 0)
{
Cleanup();
Acquire(timeout);
AcquiredLocks.Value[_resource] = 1;
StartHeartBeat();
}
else
{
AcquiredLocks.Value[_resource]++;
}
}
/// <summary>
/// Disposes the object
/// </summary>
/// <exception cref="MongoDistributedLockException"></exception>
public void Dispose()
{
if (_completed)
{
return;
}
_completed = true;
if (!AcquiredLocks.Value.ContainsKey(_resource))
{
return;
}
AcquiredLocks.Value[_resource]--;
if (AcquiredLocks.Value[_resource] > 0)
{
return;
}
// Timer callback may be invoked after the Dispose method call,
// so we are using lock to avoid unsynchronized calls.
lock (_lockObject)
{
AcquiredLocks.Value.Remove(_resource);
if (_heartbeatTimer != null)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
Release();
Cleanup();
}
}
private void Acquire(TimeSpan timeout)
{
try
{
// If result is null, then it means we acquired the lock
var isLockAcquired = false;
var now = DateTime.Now;
var lockTimeoutTime = now.Add(timeout);
while (!isLockAcquired && (lockTimeoutTime >= now))
{
// Acquire the lock if it does not exist - Notice: ReturnDocument.Before
var filter = Builders<DistributedLockDto>.Filter.Eq(_ => _.Resource, _resource);
var update = Builders<DistributedLockDto>.Update.SetOnInsert(_ => _.ExpireAt, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime));
var options = new FindOneAndUpdateOptions<DistributedLockDto>
{
IsUpsert = true,
ReturnDocument = ReturnDocument.Before
};
var result = _database.DistributedLock.FindOneAndUpdate(filter, update, options);
// If result is null, it means we acquired the lock
if (result == null)
{
isLockAcquired = true;
}
else
{
EventWaitHandle eventWaitHandle = null;
var waitTime = (int)timeout.TotalMilliseconds / 10;
if (_isEventWaitHandleSupported)
{
try
{
// Wait on the event. This allows us to be "woken" up sooner rather than later.
// We wait in chunks as we need to "wake-up" from time to time and poll mongo,
// in case that the lock was acquired on another machine or instance.
eventWaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset, EventWaitHandleName);
eventWaitHandle.WaitOne(waitTime);
}
catch (PlatformNotSupportedException)
{
// See _isEventWaitHandleSupported definition for more info.
_isEventWaitHandleSupported = false;
eventWaitHandle = null;
}
}
if (eventWaitHandle == null)
{
// Sleep for a while and then check if the lock has been released.
Thread.Sleep(waitTime);
}
now = DateTime.Now;
}
}
if (!isLockAcquired)
{
throw new DistributedLockTimeoutException($"Could not place a lock on the resource \'{_resource}\': The lock request timed out.");
}
}
catch (DistributedLockTimeoutException)
{
throw;
}
catch (Exception ex)
{
throw new MongoDistributedLockException($"Could not place a lock on the resource \'{_resource}\': Check inner exception for details.", ex);
}
}
/// <summary>
/// Release the lock
/// </summary>
/// <exception cref="MongoDistributedLockException"></exception>
private void Release()
{
try
{
// Remove resource lock
_database.DistributedLock.DeleteOne(
Builders<DistributedLockDto>.Filter.Eq(_ => _.Resource, _resource));
if (_isEventWaitHandleSupported)
{
try
{
if (EventWaitHandle.TryOpenExisting(EventWaitHandleName, out EventWaitHandle eventWaitHandler))
{
eventWaitHandler.Set();
}
}
catch (PlatformNotSupportedException)
{
// See _isEventWaitHandleSupported definition for more info.
_isEventWaitHandleSupported = false;
}
}
}
catch (Exception ex)
{
throw new MongoDistributedLockException($"Could not release a lock on the resource \'{_resource}\': Check inner exception for details.", ex);
}
}
private void Cleanup()
{
try
{
// Delete expired locks
_database.DistributedLock.DeleteOne(
Builders<DistributedLockDto>.Filter.Eq(_ => _.Resource, _resource) &
Builders<DistributedLockDto>.Filter.Lt(_ => _.ExpireAt, DateTime.UtcNow));
}
catch (Exception ex)
{
Logger.ErrorFormat("Unable to clean up locks on the resource '{0}'. {1}", _resource, ex);
}
}
/// <summary>
/// Starts database heartbeat
/// </summary>
private void StartHeartBeat()
{
TimeSpan timerInterval = TimeSpan.FromMilliseconds(_storageOptions.DistributedLockLifetime.TotalMilliseconds / 5);
_heartbeatTimer = new Timer(state =>
{
// Timer callback may be invoked after the Dispose method call,
// so we are using lock to avoid unsynchronized calls.
lock (_lockObject)
{
try
{
var filter = Builders<DistributedLockDto>.Filter.Eq(_ => _.Resource, _resource);
var update = Builders<DistributedLockDto>.Update.Set(_ => _.ExpireAt, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime));
_database.DistributedLock.FindOneAndUpdate(filter, update);
}
catch (Exception ex)
{
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. {1}", _resource, ex);
}
}
}, null, timerInterval, timerInterval);
}
}
}