Skip to content

Commit

Permalink
阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]。 fix #24 fix#35 …
Browse files Browse the repository at this point in the history
…fix#37 fix#41
  • Loading branch information
nnhy committed May 15, 2022
1 parent fea342d commit 33e58a6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish-beta.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: publish-beta

on:
push:
branches:
- master
branches: [ master ]
paths:
- 'NewLife.RocketMQ/**'
workflow_dispatch:

jobs:
build-publish:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: publish

on: workflow_dispatch
on:
push:
tags: [ v* ]
workflow_dispatch:

jobs:
build-publish:
Expand Down
28 changes: 15 additions & 13 deletions NewLife.RocketMQ/ClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,38 +156,40 @@ private void SetSignature(Command cmd)
String onsChannel;

// 根据配置判断是阿里版本还是Apache开源版本
if (Config.Aliyun == null || Config.Aliyun.AccessKey.IsNullOrEmpty())
var aliyun = Config.Aliyun;
if (aliyun == null || aliyun.AccessKey.IsNullOrEmpty())
{
// Apache RocketMQ:如果未配置签名AccessKey信息直接返回,不加密
if (Config.AclOptions == null || Config.AclOptions.AccessKey.IsNullOrEmpty()) return;

accessKey = Config.AclOptions.AccessKey;
secretKey = Config.AclOptions.SecretKey;
onsChannel = Config.AclOptions.OnsChannel;
var acl = Config.AclOptions;
if (acl == null || acl.AccessKey.IsNullOrEmpty()) return;

accessKey = acl.AccessKey;
secretKey = acl.SecretKey;
onsChannel = acl.OnsChannel;
}
else
{
// 阿里版本RocketMQ
accessKey = Config.Aliyun.AccessKey;
secretKey = Config.Aliyun.SecretKey;
onsChannel = Config.Aliyun.OnsChannel;
accessKey = aliyun.AccessKey;
secretKey = aliyun.SecretKey;
onsChannel = aliyun.OnsChannel;
}

var sha = new HMACSHA1(secretKey.GetBytes());
var ms = new MemoryStream();

// AccessKey + OnsChannel
ms.Write(accessKey.GetBytes());
ms.Write(onsChannel.GetBytes());

// ExtFields
var dic = cmd.Header.GetExtFields();
//var extFieldsDic = dic.OrderBy(e => e.Key).ToDictionary(e => e.Key, e => e.Value);
foreach (var extFields in dic)
{
if (extFields.Value != null) ms.Write(extFields.Value.GetBytes());
}

// Body
cmd.Payload?.CopyTo(ms);

Expand Down
26 changes: 16 additions & 10 deletions NewLife.RocketMQ/MqBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ public abstract class MqBase : DisposeBase

private String _group = "DEFAULT_PRODUCER";
/// <summary>消费组</summary>
/// <remarks>阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]</remarks>
public String Group
{
get
{
// 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]
if (Aliyun == null || String.IsNullOrWhiteSpace(Aliyun.InstanceId)) return _group;
return String.Join('%', Aliyun.InstanceId, _group);
var ins = Aliyun?.InstanceId;
return ins.IsNullOrEmpty() ? _group : $"{ins}%{_group}";
}
set
{
Expand All @@ -31,13 +32,14 @@ public String Group

private String _topic = "TBW102";
/// <summary>主题</summary>
/// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
public String Topic
{
get
{
// 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
if (Aliyun == null || String.IsNullOrWhiteSpace(Aliyun.InstanceId)) return _topic;
return String.Join('%', Aliyun.InstanceId, _topic);
var ins = Aliyun?.InstanceId;
return ins.IsNullOrEmpty() ? _topic : $"{ins}%{_topic}";
}
set
{
Expand Down Expand Up @@ -77,7 +79,7 @@ public String Topic
/// <summary>代理集合</summary>
public IList<BrokerInfo> Brokers => _NameServer?.Brokers.OrderBy(t => t.Name).ToList();

/// <summary>阿里云选项</summary>
/// <summary>阿里云选项。使用阿里云RocketMQ的参数有些不一样</summary>
public AliyunOptions Aliyun { get; set; }

/// <summary> Apache RocketMQ ACL 客户端配置。在Borker服务器配置设置为AclEnable = true 时配置生效。</summary>
Expand Down Expand Up @@ -142,12 +144,16 @@ public virtual void Configure(MqSetting setting)
Topic = setting.Topic;
Group = setting.Group;

Aliyun = new AliyunOptions
if (!setting.Server.IsNullOrEmpty() &&
!setting.AccessKey.IsNullOrEmpty())
{
Server = setting.Server,
AccessKey = setting.AccessKey,
SecretKey = setting.SecretKey,
};
Aliyun = new AliyunOptions
{
Server = setting.Server,
AccessKey = setting.AccessKey,
SecretKey = setting.SecretKey,
};
}
}

/// <summary>开始</summary>
Expand Down

0 comments on commit 33e58a6

Please sign in to comment.