diff --git a/NewLife.RocketMQ/Common/ILoadBalance.cs b/NewLife.RocketMQ/Common/ILoadBalance.cs new file mode 100644 index 0000000..7ec91ce --- /dev/null +++ b/NewLife.RocketMQ/Common/ILoadBalance.cs @@ -0,0 +1,20 @@ +using System; + +namespace NewLife.RocketMQ.Common +{ + /// 负载均衡接口 + public interface ILoadBalance + { + /// 已就绪 + Boolean Ready { get; set; } + + /// 设置每个选项的权重数据 + /// + void Set(Int32[] weights); + + /// 根据权重选择,并返回该项是第几次选中 + /// 第几次选中 + /// + Int32 Get(out Int32 times); + } +} \ No newline at end of file diff --git a/NewLife.RocketMQ/Common/WeightRoundRobin.cs b/NewLife.RocketMQ/Common/WeightRoundRobin.cs index 6623544..4e35684 100644 --- a/NewLife.RocketMQ/Common/WeightRoundRobin.cs +++ b/NewLife.RocketMQ/Common/WeightRoundRobin.cs @@ -4,37 +4,45 @@ namespace NewLife.RocketMQ.Common { /// 带权重负载均衡算法 - public class WeightRoundRobin + public class WeightRoundRobin : ILoadBalance { #region 属性 + /// 已就绪 + public Boolean Ready { get; set; } + /// 权重集合 - public Int32[] Weights { get; set; } + public Int32[] Weights { get; private set; } /// 最小权重 - private readonly Int32 minWeight; + private Int32 minWeight; /// 状态值 - private readonly Int32[] _states; + private Int32[] _states; /// 次数 - private readonly Int32[] _times; + private Int32[] _times; #endregion - #region 构造 - /// 实例化 - public WeightRoundRobin(Int32[] weights) + #region 方法 + /// 设置每个选项的权重数据 + /// + public void Set(Int32[] weights) { + if (weights == null) throw new ArgumentNullException(nameof(weights)); + if (Weights != null && Weights.SequenceEqual(weights)) return; + Weights = weights; minWeight = weights.Min(); _states = new Int32[weights.Length]; _times = new Int32[weights.Length]; + + Ready = true; } - #endregion - #region 方法 /// 根据权重选择,并返回该项是第几次选中 + /// 第几次选中 /// public Int32 Get(out Int32 times) { diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs index 8c63322..294b52e 100644 --- a/NewLife.RocketMQ/MqBase.cs +++ b/NewLife.RocketMQ/MqBase.cs @@ -166,7 +166,7 @@ public virtual Boolean Start() #endregion #region 收发信息 - private readonly ConcurrentDictionary _Brokers = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _Brokers = new(); /// 获取代理客户端 /// /// diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj index 49195fb..46f4b03 100644 --- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj +++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj @@ -7,8 +7,8 @@ 纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能! 新生命开发团队 ©2002-2021 NewLife - 1.5.2021.0304 - 1.5.2021.0304 + 1.5.2021.0403 + 1.5.2021.0403 1.5.* false ..\Bin @@ -24,7 +24,7 @@ https://github.com/NewLifeX/NewLife.RocketMQ git 新生命团队;X组件;NewLife;$(AssemblyName) - 每次消费完成后,实时提交消费进度 + 生产者公开负载均衡接口,同时支持重载选择队列;消费者支持解压缩超过4k的大消息 MIT $(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb true @@ -65,7 +65,7 @@ - + diff --git a/NewLife.RocketMQ/Producer.cs b/NewLife.RocketMQ/Producer.cs index eeb671d..b585ac8 100644 --- a/NewLife.RocketMQ/Producer.cs +++ b/NewLife.RocketMQ/Producer.cs @@ -13,6 +13,9 @@ namespace NewLife.RocketMQ public class Producer : MqBase { #region 属性 + /// 负载均衡。发布消息时,分发到各个队列的负载均衡算法,默认使用带权重的轮询 + public ILoadBalance LoadBalance { get; set; } + //public Int32 DefaultTopicQueueNums { get; set; } = 4; //public Int32 SendMsgTimeout { get; set; } = 3_000; @@ -35,12 +38,15 @@ public override Boolean Start() { if (!base.Start()) return false; + if (LoadBalance == null) LoadBalance = new WeightRoundRobin(); + if (_NameServer != null) { _NameServer.OnBrokerChange += (s, e) => { _brokers = null; - _robin = null; + //_robin = null; + LoadBalance.Ready = false; }; } @@ -49,7 +55,7 @@ public override Boolean Start() #endregion #region 发送消息 - private static readonly DateTime _dt1970 = new DateTime(1970, 1, 1); + private static readonly DateTime _dt1970 = new(1970, 1, 1); /// 发送消息 /// /// @@ -137,12 +143,13 @@ public virtual SendResult Publish(Object body, String tags, String keys, Int32 t #region 选择Broker队列 private IList _brokers; - private WeightRoundRobin _robin; + //private WeightRoundRobin _robin; /// 选择队列 /// - public MessageQueue SelectQueue() + public virtual MessageQueue SelectQueue() { - if (_robin == null) + var lb = LoadBalance; + if (!lb.Ready) { var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList(); if (list.Count == 0) return null; @@ -151,11 +158,12 @@ public MessageQueue SelectQueue() if (total <= 0) return null; _brokers = list; - _robin = new WeightRoundRobin(list.Select(e => e.WriteQueueNums).ToArray()); + //lb = new WeightRoundRobin(); + lb.Set(list.Select(e => e.WriteQueueNums).ToArray()); } // 构造排序列表。希望能够均摊到各Broker - var idx = _robin.Get(out var times); + var idx = lb.Get(out var times); var bk = _brokers[idx]; return new MessageQueue { BrokerName = bk.Name, QueueId = (times - 1) % bk.WriteQueueNums }; } diff --git a/Test/Program.cs b/Test/Program.cs index df220be..a6e0ea7 100644 --- a/Test/Program.cs +++ b/Test/Program.cs @@ -143,7 +143,8 @@ static void Test3() Console.WriteLine(list[1].Equals(list2[1])); Console.WriteLine(list2.SequenceEqual(list)); - var robin = new WeightRoundRobin(list.Select(e => e.WriteQueueNums).ToArray()); + var robin = new WeightRoundRobin(); + robin.Set(list.Select(e => e.WriteQueueNums).ToArray()); var count = list.Sum(e => e.WriteQueueNums); for (var i = 0; i < count; i++) { diff --git a/Test/Test.csproj b/Test/Test.csproj index b3326f3..2100aaf 100644 --- a/Test/Test.csproj +++ b/Test/Test.csproj @@ -7,7 +7,7 @@ - +