-
Notifications
You must be signed in to change notification settings - Fork 79
/
Producer.cs
290 lines (248 loc) · 10.7 KB
/
Producer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife.RocketMQ.Client;
using NewLife.RocketMQ.Common;
using NewLife.RocketMQ.Protocol;
using NewLife.Serialization;
#if !NET4
using TaskEx = System.Threading.Tasks.Task;
#endif
namespace NewLife.RocketMQ
{
/// <summary>生产者</summary>
public class Producer : MqBase
{
#region 属性
/// <summary>负载均衡。发布消息时,分发到各个队列的负载均衡算法,默认使用带权重的轮询</summary>
public ILoadBalance LoadBalance { get; set; }
//public Int32 DefaultTopicQueueNums { get; set; } = 4;
//public Int32 SendMsgTimeout { get; set; } = 3_000;
//public Int32 CompressMsgBodyOverHowmuch { get; set; } = 4096;
/// <summary>发送消息失败时的重试次数。默认3次</summary>
public Int32 RetryTimesWhenSendFailed { get; set; } = 3;
//public Int32 RetryTimesWhenSendAsyncFailed { get; set; } = 2;
//public Boolean RetryAnotherBrokerWhenNotStoreOK { get; set; }
/// <summary>最大消息大小。默认4*1024*1024</summary>
public Int32 MaxMessageSize { get; set; } = 4 * 1024 * 1024;
#endregion
#region 基础方法
/// <summary>启动</summary>
/// <returns></returns>
public override Boolean Start()
{
if (!base.Start()) return false;
if (LoadBalance == null) LoadBalance = new WeightRoundRobin();
if (_NameServer != null)
{
_NameServer.OnBrokerChange += (s, e) =>
{
_brokers = null;
//_robin = null;
LoadBalance.Ready = false;
};
}
return true;
}
#endregion
#region 发送消息
/// <summary>
/// 用于计算 UnixTime 的辅助,在 .NET 4.5 或之前,不存在 DateTimeOffset.Now.ToUnixTimeMilliseconds() 方法
/// </summary>
private static readonly DateTime _dt1970 = new(1970, 1, 1);
/// <summary>发送消息</summary>
/// <param name="msg"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public virtual SendResult Publish(Message msg, Int32 timeout = -1)
{
var max = MaxMessageSize;
if (max > 0 && msg.Body.Length > max) throw new InvalidOperationException($"主题[{Topic}]的数据包大小[{msg.Body.Length}]超过最大限制[{max}],大key会拖累整个队列,可通过MaxMessageSize调节。");
// 选择队列分片
var mq = SelectQueue();
mq.Topic = Topic;
// 构造请求头
var ts = DateTime.Now - _dt1970;
var smrh = new SendMessageRequestHeader
{
ProducerGroup = Group,
Topic = Topic,
QueueId = mq.QueueId,
SysFlag = 0,
BornTimestamp = (Int64)ts.TotalMilliseconds,
Flag = msg.Flag,
Properties = msg.GetProperties(),
ReconsumeTimes = 0,
UnitMode = UnitMode,
};
for (var i = 0; i <= RetryTimesWhenSendFailed; i++)
{
// 性能埋点
using var span = Tracer?.NewSpan($"mq:{Topic}:Publish");
try
{
// 根据队列获取Broker客户端
var bk = GetBroker(mq.BrokerName);
var rs = bk.Invoke(RequestCode.SEND_MESSAGE_V2, msg.Body, smrh.GetProperties(), true);
// 包装结果
var sr = new SendResult
{
//Status = SendStatus.SendOK,
Queue = mq
};
sr.Status = (ResponseCode)rs.Header.Code switch
{
ResponseCode.SUCCESS => SendStatus.SendOK,
ResponseCode.FLUSH_DISK_TIMEOUT => SendStatus.FlushDiskTimeout,
ResponseCode.FLUSH_SLAVE_TIMEOUT => SendStatus.FlushSlaveTimeout,
ResponseCode.SLAVE_NOT_AVAILABLE => SendStatus.SlaveNotAvailable,
_ => throw rs.Header.CreateException(),
};
sr.Read(rs.Header?.ExtFields);
return sr;
}
catch (Exception ex)
{
span?.SetError(ex, msg);
// 如果网络异常,则延迟重发
if (ex is not ResponseException && i < RetryTimesWhenSendFailed)
{
Thread.Sleep(1000);
continue;
}
throw;
}
}
return null;
}
/// <summary>发布消息</summary>
/// <param name="body"></param>
/// <param name="tags"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public virtual SendResult Publish(Object body, String tags = null, Int32 timeout = -1) => Publish(body, tags, null, timeout);
/// <summary>发布消息</summary>
/// <param name="body"></param>
/// <param name="tags">传null则为空</param>
/// <param name="keys">传null则为空</param>
/// <param name="timeout"></param>
/// <returns></returns>
public virtual SendResult Publish(Object body, String tags, String keys, Int32 timeout = -1)
{
if (body is not Byte[] buf)
{
if (body is not String str) str = body.ToJson();
buf = str.GetBytes();
}
return Publish(new Message { Body = buf, Tags = tags, Keys = keys }, timeout);
}
/// <summary>发布消息</summary>
public virtual async Task<SendResult> PublishAsync(Message message)
{
var max = MaxMessageSize;
if (max > 0 && message.Body.Length > max) throw new InvalidOperationException($"主题[{Topic}]的数据包大小[{message.Body.Length}]超过最大限制[{max}],大key会拖累整个队列,可通过{nameof(MaxMessageSize)}调节最大允许发送数据包大小。");
// 选择队列分片
var mq = SelectQueue();
mq.Topic = Topic;
// 构造请求头
var ts = DateTime.Now - _dt1970;
var sendMessageRequestHeader = new SendMessageRequestHeader
{
ProducerGroup = Group,
Topic = Topic,
QueueId = mq.QueueId,
SysFlag = 0,
BornTimestamp = (Int64)ts.TotalMilliseconds,
Flag = message.Flag,
Properties = message.GetProperties(),
ReconsumeTimes = 0,
UnitMode = UnitMode,
};
for (var i = 0; i <= RetryTimesWhenSendFailed; i++)
{
// 性能埋点
using var span = Tracer?.NewSpan($"mq:{Topic}:Publish");
try
{
// 根据队列获取Broker客户端
var bk = GetBroker(mq.BrokerName);
var rs = await bk.InvokeAsync(RequestCode.SEND_MESSAGE_V2, message.Body, sendMessageRequestHeader.GetProperties(), ignoreError: true);
// 包装结果
var sendResult = new SendResult
{
Queue = mq,
Status = (ResponseCode) rs.Header.Code switch
{
ResponseCode.SUCCESS => SendStatus.SendOK,
ResponseCode.FLUSH_DISK_TIMEOUT => SendStatus.FlushDiskTimeout,
ResponseCode.FLUSH_SLAVE_TIMEOUT => SendStatus.FlushSlaveTimeout,
ResponseCode.SLAVE_NOT_AVAILABLE => SendStatus.SlaveNotAvailable,
_ => throw rs.Header.CreateException(),
}
};
sendResult.Read(rs.Header?.ExtFields);
return sendResult;
}
catch (Exception ex)
{
span?.SetError(ex, message);
// 如果网络异常,则延迟重发
if (ex is not ResponseException && i < RetryTimesWhenSendFailed)
{
await TaskEx.Delay(TimeSpan.FromSeconds(1));
continue;
}
throw;
}
}
return null;
}
/// <summary>发布消息</summary>
/// <param name="body"></param>
/// <param name="tags">传null则为空</param>
/// <returns></returns>
public virtual Task<SendResult> PublishAsync(Object body, String tags = null) => PublishAsync(body, tags, keys: null);
/// <summary>发布消息</summary>
/// <param name="body"></param>
/// <param name="tags">传null则为空</param>
/// <param name="keys">传null则为空</param>
/// <returns></returns>
public virtual Task<SendResult> PublishAsync(Object body, String tags, String keys)
{
if (body is not Byte[] buf)
{
if (body is not String str) str = body.ToJson();
buf = str.GetBytes();
}
var message = new Message { Body = buf, Tags = tags, Keys = keys };
return PublishAsync(message);
}
#endregion
#region 选择Broker队列
private IList<BrokerInfo> _brokers;
//private WeightRoundRobin _robin;
/// <summary>选择队列</summary>
/// <returns></returns>
public virtual MessageQueue SelectQueue()
{
var lb = LoadBalance;
if (!lb.Ready)
{
var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();
if (list.Count == 0) return null;
var total = list.Sum(e => e.WriteQueueNums);
if (total <= 0) return null;
_brokers = list;
//lb = new WeightRoundRobin();
lb.Set(list.Select(e => e.WriteQueueNums).ToArray());
}
// 构造排序列表。希望能够均摊到各Broker
var idx = lb.Get(out var times);
var bk = _brokers[idx];
return new MessageQueue { BrokerName = bk.Name, QueueId = (times - 1) % bk.WriteQueueNums };
}
#endregion
}
}