-
-
Notifications
You must be signed in to change notification settings - Fork 242
/
ThrottlingLockProvider.cs
138 lines (114 loc) · 5.42 KB
/
ThrottlingLockProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Caching;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Foundatio.Lock;
public class ThrottlingLockProvider : ILockProvider, IHaveLogger
{
private readonly ICacheClient _cacheClient;
private readonly TimeSpan _throttlingPeriod = TimeSpan.FromMinutes(15);
private readonly int _maxHitsPerPeriod;
private readonly ILogger _logger;
public ThrottlingLockProvider(ICacheClient cacheClient, int maxHitsPerPeriod = 100, TimeSpan? throttlingPeriod = null, ILoggerFactory loggerFactory = null)
{
_logger = loggerFactory?.CreateLogger<ThrottlingLockProvider>() ?? NullLogger<ThrottlingLockProvider>.Instance;
_cacheClient = new ScopedCacheClient(cacheClient, "lock:throttled");
_maxHitsPerPeriod = maxHitsPerPeriod;
if (maxHitsPerPeriod <= 0)
throw new ArgumentException("Must be a positive number.", nameof(maxHitsPerPeriod));
if (throttlingPeriod.HasValue)
_throttlingPeriod = throttlingPeriod.Value;
}
ILogger IHaveLogger.Logger => _logger;
public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpires = null, bool releaseOnDispose = true, CancellationToken cancellationToken = default)
{
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
if (isTraceLogLevelEnabled) _logger.LogTrace("AcquireLockAsync: {Resource}", resource);
bool allowLock = false;
byte errors = 0;
string lockId = Guid.NewGuid().ToString("N");
var sw = Stopwatch.StartNew();
do
{
string cacheKey = GetCacheKey(resource, SystemClock.UtcNow);
try
{
if (isTraceLogLevelEnabled)
_logger.LogTrace("Current time: {CurrentTime} throttle: {ThrottlingPeriod} key: {Key}", SystemClock.UtcNow.ToString("mm:ss.fff"), SystemClock.UtcNow.Floor(_throttlingPeriod).ToString("mm:ss.fff"), cacheKey);
var hitCount = await _cacheClient.GetAsync<long?>(cacheKey, 0).AnyContext();
if (isTraceLogLevelEnabled)
_logger.LogTrace("Current hit count: {HitCount} max: {MaxHitsPerPeriod}", hitCount, _maxHitsPerPeriod);
if (hitCount <= _maxHitsPerPeriod - 1)
{
hitCount = await _cacheClient.IncrementAsync(cacheKey, 1, SystemClock.UtcNow.Ceiling(_throttlingPeriod)).AnyContext();
// make sure someone didn't beat us to it.
if (hitCount <= _maxHitsPerPeriod)
{
allowLock = true;
break;
}
if (isTraceLogLevelEnabled) _logger.LogTrace("Max hits exceeded after increment for {Resource}.", resource);
}
else if (isTraceLogLevelEnabled)
{
_logger.LogTrace("Max hits exceeded for {Resource}.", resource);
}
if (cancellationToken.IsCancellationRequested)
break;
var sleepUntil = SystemClock.UtcNow.Ceiling(_throttlingPeriod).AddMilliseconds(1);
if (sleepUntil > SystemClock.UtcNow)
{
if (isTraceLogLevelEnabled) _logger.LogTrace("Sleeping until key expires: {SleepUntil}", sleepUntil - SystemClock.UtcNow);
await SystemClock.SleepAsync(sleepUntil - SystemClock.UtcNow, cancellationToken).AnyContext();
}
else
{
if (isTraceLogLevelEnabled) _logger.LogTrace("Default sleep");
await SystemClock.SleepAsync(50, cancellationToken).AnyContext();
}
}
catch (OperationCanceledException)
{
return null;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error acquiring throttled lock: name={Resource} message={Message}", resource, ex.Message);
errors++;
if (errors >= 3)
break;
await SystemClock.SleepSafeAsync(50, cancellationToken).AnyContext();
}
} while (!cancellationToken.IsCancellationRequested);
if (cancellationToken.IsCancellationRequested && isTraceLogLevelEnabled)
_logger.LogTrace("Cancellation requested");
if (!allowLock)
return null;
if (isTraceLogLevelEnabled)
_logger.LogTrace("Allowing lock: {Resource}", resource);
sw.Stop();
return new DisposableLock(resource, lockId, sw.Elapsed, this, _logger, releaseOnDispose);
}
public async Task<bool> IsLockedAsync(string resource)
{
string cacheKey = GetCacheKey(resource, SystemClock.UtcNow);
long hitCount = await _cacheClient.GetAsync<long>(cacheKey, 0).AnyContext();
return hitCount >= _maxHitsPerPeriod;
}
public Task ReleaseAsync(string resource, string lockId)
{
return Task.CompletedTask;
}
public Task RenewAsync(string resource, string lockId, TimeSpan? timeUntilExpires = null)
{
return Task.CompletedTask;
}
private string GetCacheKey(string resource, DateTime now)
{
return String.Concat(resource, ":", now.Floor(_throttlingPeriod).Ticks);
}
}