-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
ExpirationManager.cs
237 lines (210 loc) · 9.5 KB
/
ExpirationManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// This file is part of Hangfire. Copyright © 2013-2014 Hangfire OÜ.
//
// Hangfire is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.
using System;
using System.Data;
using System.Data.Common;
using System.Globalization;
using System.Threading;
using Hangfire.Common;
using Hangfire.Logging;
using Hangfire.Server;
using Hangfire.Storage;
namespace Hangfire.SqlServer
{
#pragma warning disable 618
internal sealed class ExpirationManager : IServerComponent
#pragma warning restore 618
{
private const string DistributedLockKey = "locks:expirationmanager";
private static readonly TimeSpan DefaultLockTimeout = TimeSpan.FromMinutes(5);
// This value should be high enough to optimize the deletion as much, as possible,
// reducing the number of queries. But low enough to cause lock escalations (it
// appears, when ~5000 locks were taken, but this number is a subject of version).
// Note, that lock escalation may also happen during the cascade deletions for
// State (3-5 rows/job usually) and JobParameters (2-3 rows/job usually) tables.
private const int DefaultNumberOfRecordsInSinglePass = 1000;
private static readonly string[] ProcessedTables =
{
"AggregatedCounter",
"Job",
"List",
"Set",
"Hash",
};
private readonly ILog _logger = LogProvider.For<ExpirationManager>();
private readonly SqlServerStorage _storage;
private readonly TimeSpan _stateExpirationTimeout;
private readonly TimeSpan _checkInterval;
public ExpirationManager(SqlServerStorage storage, TimeSpan stateExpirationTimeout, TimeSpan checkInterval)
{
if (storage == null) throw new ArgumentNullException(nameof(storage));
if (stateExpirationTimeout < TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(stateExpirationTimeout), "Timeout value should be equal to or greater than zero.");
if (checkInterval <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(checkInterval), "Timeout value should be greater than zero.");
_storage = storage;
_stateExpirationTimeout = stateExpirationTimeout;
_checkInterval = checkInterval;
}
public void Execute(CancellationToken cancellationToken)
{
var numberOfRecordsInSinglePass = _storage.Options.DeleteExpiredBatchSize;
if (numberOfRecordsInSinglePass <= 0 || numberOfRecordsInSinglePass > 100_000)
{
numberOfRecordsInSinglePass = DefaultNumberOfRecordsInSinglePass;
}
foreach (var table in ProcessedTables)
{
try
{
CleanupTable(GetExpireQuery(_storage, table), table, numberOfRecordsInSinglePass, cancellationToken);
}
catch (DbException ex)
{
_logger.ErrorException($"Error occurred while cleaning up the '{table}' table: {ex.Message}", ex);
}
}
if (_stateExpirationTimeout > TimeSpan.Zero)
{
try
{
CleanupTable(GetStateCleanupQuery(_storage), "State", numberOfRecordsInSinglePass,
cancellationToken,
command => command.AddParameter("@expireMin", (long)_stateExpirationTimeout.Negate().TotalMinutes, DbType.Int64));
}
catch (DbException ex)
{
_logger.ErrorException($"Error occurred while cleaning up the 'State' table: {ex.Message}", ex);
}
}
cancellationToken.Wait(_checkInterval);
}
public override string ToString()
{
return GetType().ToString();
}
private void CleanupTable(string query, string table, int numberOfRecordsInSinglePass, CancellationToken cancellationToken, Action<DbCommand> additionalActions = null)
{
_logger.Debug($"Removing outdated records from the '{table}' table...");
UseConnectionDistributedLock(_storage, connection =>
{
int affected;
do
{
affected = ExecuteNonQuery(
connection,
query,
numberOfRecordsInSinglePass,
additionalActions,
cancellationToken);
} while (affected == numberOfRecordsInSinglePass);
return affected;
});
_logger.Trace($"Outdated records removed from the '{table}' table.");
}
private T UseConnectionDistributedLock<T>(SqlServerStorage storage, Func<DbConnection, T> action)
{
try
{
return storage.UseConnection(null, static (_, connection, ctx) =>
{
SqlServerDistributedLock.Acquire(connection, DistributedLockKey, DefaultLockTimeout);
try
{
return ctx(connection);
}
finally
{
SqlServerDistributedLock.Release(connection, DistributedLockKey);
}
}, action);
}
catch (DistributedLockTimeoutException e) when (e.Resource == DistributedLockKey)
{
// DistributedLockTimeoutException here doesn't mean that outdated records weren't removed.
// It just means another Hangfire server did this work.
_logger.Log(
LogLevel.Debug,
() => $@"An exception was thrown during acquiring distributed lock on the {DistributedLockKey} resource within {DefaultLockTimeout.TotalSeconds} seconds. Outdated records were not removed.
It will be retried in {_checkInterval.TotalSeconds} seconds.",
e);
return default;
}
}
private static string GetExpireQuery(SqlServerStorage storage, string table)
{
if (table.Equals("AggregatedCounter", StringComparison.OrdinalIgnoreCase))
{
// Schema 5, which still should be supported by Hangfire doesn't have an index that covers
// the `ExpireAt` column, making it impossible to run the query.
return String.Format(CultureInfo.InvariantCulture, storage.GetQueryFromTemplate(static schemaName => $@"
set deadlock_priority low;
set transaction isolation level read committed;
set xact_abort on;
set lock_timeout 1000;
delete top (@count) T from [{schemaName}].[{{0}}] T
where ExpireAt < @now
option (loop join, optimize for (@count = 20000));"), table);
}
return String.Format(CultureInfo.InvariantCulture, storage.GetQueryFromTemplate(static schemaName => $@"
set deadlock_priority low;
set transaction isolation level read committed;
set xact_abort on;
set lock_timeout 1000;
delete top (@count) T from [{schemaName}].[{{0}}] T with (forceseek)
where ExpireAt < @now
option (loop join, optimize for (@count = 20000));"), table);
}
private static string GetStateCleanupQuery(SqlServerStorage storage)
{
// TODO: Make expiration condition configurable
return storage.GetQueryFromTemplate(static schemaName => $@"
set deadlock_priority low;
set transaction isolation level read committed;
set xact_abort on;
set lock_timeout 1000;
;with cte as (
select s.[JobId], s.[Id]
from [{schemaName}].[State] s with (forceseek)
where s.[CreatedAt] < dateadd(minute, @expireMin, @now)
and exists (
select * from [{schemaName}].[Job] j with (forceseek)
where j.[Id] = s.[JobId] and j.[StateId] != s.[Id]))
delete top(@count) from cte option (maxdop 1);");
}
private static int ExecuteNonQuery(
DbConnection connection,
string commandText,
int numberOfRecordsInSinglePass,
Action<DbCommand> additionalActions,
CancellationToken cancellationToken)
{
using var command = connection.Create(commandText, timeout: 0)
.AddParameter("@count", numberOfRecordsInSinglePass, DbType.Int32)
.AddParameter("@now", DateTime.UtcNow, DbType.DateTime);
additionalActions?.Invoke(command);
using (cancellationToken.Register(static state => ((DbCommand)state).Cancel(), command))
{
try
{
return command.ExecuteNonQuery();
}
catch (DbException ex) when (cancellationToken.IsCancellationRequested || ex.Message.Contains("Lock request time out period exceeded"))
{
// Exception was triggered due to the Cancel method call, ignoring
return 0;
}
}
}
}
}