-
Notifications
You must be signed in to change notification settings - Fork 36
/
RecurringJobInfoStorage.cs
142 lines (116 loc) · 4.95 KB
/
RecurringJobInfoStorage.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
using System;
using System.Collections.Generic;
using Hangfire.Common;
using Hangfire.Storage;
namespace Hangfire.RecurringJobExtensions
{
/// <summary>
/// The storage APIs for <see cref="RecurringJobInfo"/>.
/// </summary>
public class RecurringJobInfoStorage : IRecurringJobInfoStorage
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromMinutes(1);
private readonly IStorageConnection _connection;
/// <summary>
/// Initializes a new <see cref="RecurringJobInfoStorage"/>
/// </summary>
public RecurringJobInfoStorage() : this(JobStorage.Current.GetConnection()) { }
/// <summary>
/// Initializes a new <see cref="RecurringJobInfoStorage"/>
/// </summary>
/// <param name="connection"><see cref="IStorageConnection"/></param>
public RecurringJobInfoStorage(IStorageConnection connection)
{
if (connection == null) throw new ArgumentNullException(nameof(connection));
_connection = connection;
}
/// <summary>
/// Finds all <see cref="RecurringJobInfo"/> from storage.
/// </summary>
/// <returns>The collection of <see cref="RecurringJobInfo"/></returns>
public IEnumerable<RecurringJobInfo> FindAll()
{
var recurringJobIds = _connection.GetAllItemsFromSet("recurring-jobs");
foreach (var recurringJobId in recurringJobIds)
{
var recurringJob = _connection.GetAllEntriesFromHash($"recurring-job:{recurringJobId}");
if (recurringJob == null) continue;
yield return InternalFind(recurringJobId, recurringJob);
}
}
/// <summary>
/// Finds <see cref="RecurringJobInfo"/> by jobId.
/// The job id is associated with <seealso cref="BackgroundJob.Id"/>
/// </summary>
/// <param name="jobId">The specified <see cref="BackgroundJob.Id"/></param>
/// <returns><see cref="RecurringJobInfo"/></returns>
public RecurringJobInfo FindByJobId(string jobId)
{
if (string.IsNullOrEmpty(jobId)) throw new ArgumentNullException(nameof(jobId));
var paramValue = _connection.GetJobParameter(jobId, "RecurringJobId");
if (string.IsNullOrEmpty(paramValue)) throw new Exception($"There is not RecurringJobId with associated BackgroundJob Id:{jobId}");
var recurringJobId = JobHelper.FromJson<string>(paramValue);
return FindByRecurringJobId(recurringJobId);
}
/// <summary>
/// Finds <see cref="RecurringJobInfo"/> by recurringJobId.
/// </summary>
/// <param name="recurringJobId">The specified identifier of the RecurringJob.</param>
/// <returns><see cref="RecurringJobInfo"/></returns>
public RecurringJobInfo FindByRecurringJobId(string recurringJobId)
{
if (string.IsNullOrEmpty(recurringJobId)) throw new ArgumentNullException(nameof(recurringJobId));
var recurringJob = _connection.GetAllEntriesFromHash($"recurring-job:{recurringJobId}");
if (recurringJob == null) return null;
return InternalFind(recurringJobId, recurringJob);
}
private RecurringJobInfo InternalFind(string recurringJobId, Dictionary<string, string> recurringJob)
{
if (string.IsNullOrEmpty(recurringJobId)) throw new ArgumentNullException(nameof(recurringJobId));
if (recurringJob == null) throw new ArgumentNullException(nameof(recurringJob));
var serializedJob = JobHelper.FromJson<InvocationData>(recurringJob["Job"]);
var job = serializedJob.Deserialize();
return new RecurringJobInfo
{
RecurringJobId = recurringJobId,
Cron = recurringJob["Cron"],
TimeZone = recurringJob.ContainsKey("TimeZoneId")
? TimeZoneInfo.FindSystemTimeZoneById(recurringJob["TimeZoneId"])
: TimeZoneInfo.Utc,
Queue = recurringJob["Queue"],
Method = job.Method,
Enable = recurringJob.ContainsKey(nameof(RecurringJobInfo.Enable))
? JobHelper.FromJson<bool>(recurringJob[nameof(RecurringJobInfo.Enable)])
: true,
JobData = recurringJob.ContainsKey(nameof(RecurringJobInfo.JobData))
? JobHelper.FromJson<Dictionary<string, object>>(recurringJob[nameof(RecurringJobInfo.JobData)])
: null
};
}
/// <summary>
/// Sets <see cref="RecurringJobInfo"/> to storage which associated with <see cref="RecurringJob"/>.
/// </summary>
/// <param name="recurringJobInfo">The specified identifier of the RecurringJob.</param>
public void SetJobData(RecurringJobInfo recurringJobInfo)
{
if (recurringJobInfo == null) throw new ArgumentNullException(nameof(recurringJobInfo));
if (recurringJobInfo.JobData == null || recurringJobInfo.JobData.Count == 0) return;
using (_connection.AcquireDistributedLock($"recurringjobextensions-jobdata:{recurringJobInfo.RecurringJobId}", LockTimeout))
{
var changedFields = new Dictionary<string, string>
{
[nameof(RecurringJobInfo.Enable)] = JobHelper.ToJson(recurringJobInfo.Enable),
[nameof(RecurringJobInfo.JobData)] = JobHelper.ToJson(recurringJobInfo.JobData)
};
_connection.SetRangeInHash($"recurring-job:{recurringJobInfo.RecurringJobId}", changedFields);
}
}
/// <summary>
/// Disposes storage connection.
/// </summary>
public void Dispose()
{
_connection?.Dispose();
}
}
}