Skip to content

Commit

Permalink
生产者公开负载均衡接口,同时支持重载选择队列
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Apr 3, 2021
1 parent c68d9ac commit 08f89c8
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 24 deletions.
20 changes: 20 additions & 0 deletions NewLife.RocketMQ/Common/ILoadBalance.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace NewLife.RocketMQ.Common
{
/// <summary>负载均衡接口</summary>
public interface ILoadBalance
{
/// <summary>已就绪</summary>
Boolean Ready { get; set; }

/// <summary>设置每个选项的权重数据</summary>
/// <param name="weights"></param>
void Set(Int32[] weights);

/// <summary>根据权重选择,并返回该项是第几次选中</summary>
/// <param name="times">第几次选中</param>
/// <returns></returns>
Int32 Get(out Int32 times);
}
}
28 changes: 18 additions & 10 deletions NewLife.RocketMQ/Common/WeightRoundRobin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,45 @@
namespace NewLife.RocketMQ.Common
{
/// <summary>带权重负载均衡算法</summary>
public class WeightRoundRobin
public class WeightRoundRobin : ILoadBalance
{
#region 属性
/// <summary>已就绪</summary>
public Boolean Ready { get; set; }

/// <summary>权重集合</summary>
public Int32[] Weights { get; set; }
public Int32[] Weights { get; private set; }

/// <summary>最小权重</summary>
private readonly Int32 minWeight;
private Int32 minWeight;

/// <summary>状态值</summary>
private readonly Int32[] _states;
private Int32[] _states;

/// <summary>次数</summary>
private readonly Int32[] _times;
private Int32[] _times;
#endregion

#region 构造
/// <summary>实例化</summary>
public WeightRoundRobin(Int32[] weights)
#region 方法
/// <summary>设置每个选项的权重数据</summary>
/// <param name="weights"></param>
public void Set(Int32[] weights)
{
if (weights == null) throw new ArgumentNullException(nameof(weights));
if (Weights != null && Weights.SequenceEqual(weights)) return;

Weights = weights;

minWeight = weights.Min();

_states = new Int32[weights.Length];
_times = new Int32[weights.Length];

Ready = true;
}
#endregion

#region 方法
/// <summary>根据权重选择,并返回该项是第几次选中</summary>
/// <param name="times">第几次选中</param>
/// <returns></returns>
public Int32 Get(out Int32 times)
{
Expand Down
2 changes: 1 addition & 1 deletion NewLife.RocketMQ/MqBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public virtual Boolean Start()
#endregion

#region 收发信息
private readonly ConcurrentDictionary<String, BrokerClient> _Brokers = new ConcurrentDictionary<String, BrokerClient>();
private readonly ConcurrentDictionary<String, BrokerClient> _Brokers = new();
/// <summary>获取代理客户端</summary>
/// <param name="name"></param>
/// <returns></returns>
Expand Down
8 changes: 4 additions & 4 deletions NewLife.RocketMQ/NewLife.RocketMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
<Description>纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!</Description>
<Company>新生命开发团队</Company>
<Copyright>©2002-2021 NewLife</Copyright>
<Version>1.5.2021.0304</Version>
<FileVersion>1.5.2021.0304</FileVersion>
<Version>1.5.2021.0403</Version>
<FileVersion>1.5.2021.0403</FileVersion>
<AssemblyVersion>1.5.*</AssemblyVersion>
<Deterministic>false</Deterministic>
<OutputPath>..\Bin</OutputPath>
Expand All @@ -24,7 +24,7 @@
<RepositoryUrl>https://github.com/NewLifeX/NewLife.RocketMQ</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>新生命团队;X组件;NewLife;$(AssemblyName)</PackageTags>
<PackageReleaseNotes>每次消费完成后,实时提交消费进度</PackageReleaseNotes>
<PackageReleaseNotes>生产者公开负载均衡接口,同时支持重载选择队列;消费者支持解压缩超过4k的大消息</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
Expand Down Expand Up @@ -65,7 +65,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="8.10.2021.316-beta2" />
<PackageReference Include="NewLife.Core" Version="8.10.2021.403-rc3" />
</ItemGroup>

</Project>
22 changes: 15 additions & 7 deletions NewLife.RocketMQ/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace NewLife.RocketMQ
public class Producer : MqBase
{
#region 属性
/// <summary>负载均衡。发布消息时,分发到各个队列的负载均衡算法,默认使用带权重的轮询</summary>
public ILoadBalance LoadBalance { get; set; }

//public Int32 DefaultTopicQueueNums { get; set; } = 4;

//public Int32 SendMsgTimeout { get; set; } = 3_000;
Expand All @@ -35,12 +38,15 @@ public override Boolean Start()
{
if (!base.Start()) return false;

if (LoadBalance == null) LoadBalance = new WeightRoundRobin();

if (_NameServer != null)
{
_NameServer.OnBrokerChange += (s, e) =>
{
_brokers = null;
_robin = null;
//_robin = null;
LoadBalance.Ready = false;
};
}

Expand All @@ -49,7 +55,7 @@ public override Boolean Start()
#endregion

#region 发送消息
private static readonly DateTime _dt1970 = new DateTime(1970, 1, 1);
private static readonly DateTime _dt1970 = new(1970, 1, 1);
/// <summary>发送消息</summary>
/// <param name="msg"></param>
/// <param name="timeout"></param>
Expand Down Expand Up @@ -137,12 +143,13 @@ public virtual SendResult Publish(Object body, String tags, String keys, Int32 t

#region 选择Broker队列
private IList<BrokerInfo> _brokers;
private WeightRoundRobin _robin;
//private WeightRoundRobin _robin;
/// <summary>选择队列</summary>
/// <returns></returns>
public MessageQueue SelectQueue()
public virtual MessageQueue SelectQueue()
{
if (_robin == null)
var lb = LoadBalance;
if (!lb.Ready)
{
var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();
if (list.Count == 0) return null;
Expand All @@ -151,11 +158,12 @@ public MessageQueue SelectQueue()
if (total <= 0) return null;

_brokers = list;
_robin = new WeightRoundRobin(list.Select(e => e.WriteQueueNums).ToArray());
//lb = new WeightRoundRobin();
lb.Set(list.Select(e => e.WriteQueueNums).ToArray());
}

// 构造排序列表。希望能够均摊到各Broker
var idx = _robin.Get(out var times);
var idx = lb.Get(out var times);
var bk = _brokers[idx];
return new MessageQueue { BrokerName = bk.Name, QueueId = (times - 1) % bk.WriteQueueNums };
}
Expand Down
3 changes: 2 additions & 1 deletion Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ static void Test3()
Console.WriteLine(list[1].Equals(list2[1]));
Console.WriteLine(list2.SequenceEqual(list));

var robin = new WeightRoundRobin(list.Select(e => e.WriteQueueNums).ToArray());
var robin = new WeightRoundRobin();
robin.Set(list.Select(e => e.WriteQueueNums).ToArray());
var count = list.Sum(e => e.WriteQueueNums);
for (var i = 0; i < count; i++)
{
Expand Down
2 changes: 1 addition & 1 deletion Test/Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NewLife.Core" Version="8.10.2021.316-beta2" />
<PackageReference Include="NewLife.Core" Version="8.10.2021.403-rc3" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 08f89c8

Please sign in to comment.