/
MultipleConsumerGroupsQueue.cs
253 lines (212 loc) · 7.29 KB
/
MultipleConsumerGroupsQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
using System;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Log;
namespace NewLife.Caching
{
/// <summary>
/// Redis多消费组可重复消费的队列
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
public class MultipleConsumerGroupsQueue<T> : IDisposable
{
/// <summary>
/// Redis客户端
/// </summary>
private FullRedis _Redis;
/// <summary>
/// 消息列队
/// </summary>
private RedisStream<T> _Queue;
/// <summary>
/// 读写超时(默认15000ms)
/// </summary>
public Int32 TimeOut { set; get; } = 15_000;
/// <summary>
/// 订阅者名称
/// </summary>
public String SubscribeAppName { private set; get; }
/// <summary>
/// 消费者组名已经存在的Redis错误消息关键词
/// </summary>
public String ConsumeGroupExistErrMsgKeyWord { set; get; } = "exists";
/// <summary>
/// 列队长度
/// </summary>
public Int32 QueueLen { set; get; } = 1_000_000;
/// <summary>
/// 日志对像
/// </summary>
public ILog Log
{
set { if (_Redis != null) _Redis.Log = value; }
get
{
if (_Redis != null)
return _Redis.Log;
else
return null;
}
}
/// <summary>
/// 连接Redis服务器
/// </summary>
/// <param name="host">Redis地址</param>
/// <param name="queueName">列队名称</param>
/// <param name="port">端口(默认6379)</param>
/// <param name="password">密码</param>
/// <param name="db">连接Redis数据库</param>
public void Connect(String host, String queueName, Int32 port = 6379, String password = "", Int32 db = 0)
{
_Redis = new FullRedis($"{host}:{port}", password, db) { Timeout = TimeOut };
if (_Redis != null)
{
_Queue = _Redis.GetStream<T>(queueName);
_Queue.MaxLenngth = QueueLen;
}
else
{
throw new NullReferenceException("连接Redis服务器失败。");
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="data"></param>
public void Publish(T data) => _Queue.Add(data);
/// <summary>
/// 独立线程消费
/// </summary>
private CancellationTokenSource _Cts;
/// <summary>
/// 订阅
/// </summary>
/// <param name="subscribeAppName">消费者名称</param>
public void Subscribe(String subscribeAppName)
{
SubscribeAppName = subscribeAppName;
_Cts = new CancellationTokenSource();
if (_Redis == null || _Queue == null)
{
OnDisconnected("订阅时列队对像为Null。");
}
//尝试创建消费组
try
{
_Queue.Group = subscribeAppName;
_Queue.GroupCreate(subscribeAppName);
}
catch (Exception err)
{
//遇到其它非消费组名已经存在的错误消息时,停止消费并提示消息
if (err.Message.IndexOf(ConsumeGroupExistErrMsgKeyWord) < 0)
{
if (XTrace.Debug) XTrace.WriteException(err);
OnStopSubscribe(err.Message);
return;
}
}
#if NET40
var thread = new Thread(s => getSubscribe(subscribeAppName));
thread.Start();
#else
Task.Run(() => getSubscribe(subscribeAppName), _Cts.Token);
#endif
}
/// <summary>
/// 取消订阅
/// </summary>
public void UnSubscribe() => _Cts.Cancel();
/// <summary>
/// 获取消费消息
/// </summary>
/// <param name="subscribeAppName">订阅APP名称</param>
private async Task getSubscribe(String subscribeAppName)
{
if (_Queue == null)
{
_Cts.Cancel();
OnDisconnected("消息列队对像为Null");
return;
}
while (!_Cts.IsCancellationRequested)
{
if (_Redis == null || _Queue == null)
{
OnDisconnected("获取订阅消息时列队对像为Null。");
}
try
{
var msg = await _Queue.TakeMessageAsync(10);
if (msg != null)
{
var data = msg.GetBody<T>();
_Queue.Acknowledge(msg.Id);
//通知订阅者
OnReceived(data);
}
}
catch (Exception err)
{
if (XTrace.Debug) XTrace.WriteException(err);
_Cts.Cancel();
OnStopSubscribe(err.Message);
return;
}
}
}
/// <summary>
/// 销毁对像
/// </summary>
public void Dispose()
{
if (_Cts != null) _Cts.Cancel();
if (_Queue != null) _Queue = null;
if (_Redis != null) _Redis.Dispose();
}
#region 事件
/// <summary>
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="data">命令</param>
public delegate void ReceivedHandler(T data);
/// <summary>
/// 通知订阅者接收到新消息
/// </summary>
public event ReceivedHandler Received;
/// <summary>
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="data"></param>
protected void OnReceived(T data) => Received?.Invoke(data);
/// <summary>
/// 通知订阅者停止订阅
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void StopSubscribeHandler(String msg);
/// <summary>
/// 通知订阅者停止订阅
/// </summary>
/// <remarks>可以在这里处理重新订阅的相关业务逻辑</remarks>
public event StopSubscribeHandler StopSubscribe;
/// <summary>通知订阅者停止订阅</summary>
/// <param name="msg">停止消息</param>
protected void OnStopSubscribe(String msg) => StopSubscribe?.Invoke(msg);
/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void DisconnectedHandler(String msg);
/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <remarks>可以在这里处理重新连接的相关业务逻辑</remarks>
public event DisconnectedHandler Disconnected;
/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
protected void OnDisconnected(String msg) => Disconnected?.Invoke(msg);
#endregion
}
}