/
SqlServerQueueBackgroundService.cs
120 lines (101 loc) · 4.14 KB
/
SqlServerQueueBackgroundService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
using BackgroundServiceExtensions.Extensions;
using BackgroundServiceExtensions.Interfaces;
using Dapper;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Data;
using System.Text.Json;
namespace BackgroundServiceExtensions;
/// <summary>
/// help from http://rusanu.com/2010/03/26/using-tables-as-queues/ Heap Queues
/// </summary>
public abstract class SqlServerQueueBackgroundService<TMessage, TData> : BackgroundService where TMessage : IQueueMessage, new() where TData : notnull
{
protected readonly ILogger<SqlServerQueueBackgroundService<TMessage, TData>> Logger;
public SqlServerQueueBackgroundService(ILogger<SqlServerQueueBackgroundService<TMessage, TData>> logger)
{
Logger = logger;
}
protected abstract IDbConnection GetConnection();
protected abstract string QueueTableName { get; }
protected abstract string ErrorTableName { get; }
protected abstract Task DoWorkAsync(CancellationToken stoppingToken, DateTime started, TMessage message, TData? data);
public async Task<long> EnqueueAsync(string userName, TData data)
{
ArgumentNullException.ThrowIfNull(userName, nameof(userName));
ArgumentNullException.ThrowIfNull(data, nameof(data));
var message = new TMessage();
message.UserName = userName;
message.Queued = DateTime.UtcNow;
message.Data = JsonSerializer.Serialize(data);
message.Type = typeof(TData).Name;
using var cn = GetConnection();
return await cn.QuerySingleAsync<long>(
$@"INSERT INTO {QueueTableName} ([Queued], [Type], [UserName], [Data]) VALUES (getdate(), @type, @userName, @data);
SELECT SCOPE_IDENTITY()",
message);
}
/// <summary>
/// normally you would not call this yourself, but rather let it be called by the background service.
/// This is public for testing purposes only.
/// </summary>
public async Task DequeueAsync(CancellationToken stoppingToken)
{
using var cn = GetConnection();
var result = await cn.DequeueAsync<TMessage>(QueueTableName, "[Type]=@type", new { type = typeof(TData).Name });
if (result.Success)
{
var data = JsonSerializer.Deserialize<TData>(result.Message.Data);
try
{
await DoWorkAsync(stoppingToken, DateTime.UtcNow, result.Message, data);
}
catch (Exception exc)
{
Logger.LogError(exc, "Error in SqlServerQueueBackgroundService.DequeueAsync");
await LogErrorAsync(exc, result.Message, data);
}
}
}
public static string QueueTableSql(string tableName) =>
$@"CREATE TABLE {tableName} (
[Id] bigint identity(1,1) PRIMARY KEY,
[UserName] nvarchar(50) NOT NULL,
[Queued] datetime NOT NULL,
[Type] nvarchar(50) NOT NULL,
[Data] nvarchar(max) NOT NULL
);";
public static string ErrorTableSql(string tableName) =>
$@"CREATE TABLE {tableName} (
[Id] bigint identity(1,1) PRIMARY KEY,
[Timestamp] datetime NOT NULL DEFAULT (getdate()),
[ErrorMessage] nvarchar(max) NOT NULL,
[QueueMessage] nvarchar(max) NOT NULL,
[Data] nvarchar(max) NOT NULL
);";
private async Task LogErrorAsync(Exception exception, TMessage item, TData? data)
{
using var cn = GetConnection();
item.Data = string.Empty; // because it will be redundant to the Data value
try
{
await cn.ExecuteAsync($"INSERT INTO {ErrorTableName} ([ErrorMessage], [QueueMessage], [Data]) VALUES (@errorMessage, @queueMessage, @data)", new
{
errorMessage = exception.Message,
queueMessage = JsonSerializer.Serialize(item),
data = JsonSerializer.Serialize(data)
});
}
catch (Exception exc)
{
throw new Exception($"Error logging queue process failure: {exc.Message}", exc);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await DequeueAsync(stoppingToken);
}
}
}