Skip to content

Commit

Permalink
对Redis多消费组可重复消费的队列业务整理了下代码,更新了示例
Browse files Browse the repository at this point in the history
  • Loading branch information
e4ky committed Feb 25, 2021
1 parent 521601f commit 7f5da82
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 26 deletions.
139 changes: 114 additions & 25 deletions NewLife.Redis/MultipleConsumerGroupsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,30 @@ public class MultipleConsumerGroupsQueue<T> : IDisposable
/// <summary>
/// Redis客户端
/// </summary>
private FullRedis _Redis;
FullRedis _Redis;

/// <summary>
/// 消息列队
/// </summary>
private RedisStream<T> _Queue;
RedisStream<T> _Queue;

/// <summary>
/// 读写超时(默认15000ms)
/// </summary>
public Int32 TimeOut { set; get; } = 15_000;
public int TimeOut
{
set;
get;
} = 15_000;

/// <summary>
/// 订阅者名称
/// </summary>
public string SubscribeAppName
{
private set;
get;
}

/// <summary>
/// 消费者组名已经存在的Redis错误消息关键词
Expand All @@ -34,7 +47,23 @@ public class MultipleConsumerGroupsQueue<T> : IDisposable
/// <summary>
/// 列队长度
/// </summary>
public Int32 QueueLen { set; get; } = 2000;
public int QueueLen { set; get; } = 2000;


/// <summary>
/// 日志对像
/// </summary>
public ILog Log
{
set { if (_Redis != null) _Redis.Log = value; }
get
{
if (_Redis != null)
return _Redis.Log;
else
return null;
}
}

/// <summary>
/// 连接Redis服务器
Expand All @@ -44,50 +73,67 @@ public class MultipleConsumerGroupsQueue<T> : IDisposable
/// <param name="port">端口(默认6379)</param>
/// <param name="password">密码</param>
/// <param name="db">连接Redis数据库</param>
public void Connect(String host, String queueName, Int32 port = 6379, String password = "", Int32 db = 0)
public void Connect(string host, string queueName, int port = 6379, string password = "", int db = 0)
{
_Redis = new FullRedis($"{host}:{port}", password, db) { Timeout = TimeOut };
if (_Redis != null)
{
_Queue = _Redis.GetStream<T>(queueName);
_Queue.MaxLenngth = QueueLen;
}
else
{
throw new NullReferenceException("连接Redis服务器失败。");
}

}

/// <summary>
/// 发送消息
/// </summary>
/// <param name="data"></param>
public void Publish(T data) => _Queue.Add(data);
public void Publish(T data)
{
_Queue.Add(data);
}


/// <summary>
/// 独立线程消费
/// </summary>
private CancellationTokenSource _Cts;
CancellationTokenSource _Cts;

/// <summary>
/// 订阅
/// </summary>
/// <param name="subscribeAppName">消费者名称</param>
public void Subscribe(String subscribeAppName)
public void Subscribe(string subscribeAppName)
{
SubscribeAppName = subscribeAppName;
_Cts = new CancellationTokenSource();
_Queue.Group = subscribeAppName;

if (_Redis == null || _Queue == null)
{
OnDisconnected("订阅时列队对像为Null。");
}

//尝试创建消费组
try
{
_Queue.Group = subscribeAppName;
_Queue.GroupCreate(subscribeAppName);
}
catch (Exception err)
{
//遇到其它非消费组名已经存在的错误消息时,停止消费并提示消息
if (err.Message.IndexOf(ConsumeGroupExistErrMsgKeyWord) < 0)
{
if (XTrace.Debug) XTrace.WriteException(err); //TODO:要优化处理日志的记录(此处日志调试用,实际生产环境不记录)
if (XTrace.Debug) XTrace.WriteException(err);
OnStopSubscribe(err.Message);
return;
}


}

#if NET40
Expand All @@ -102,36 +148,46 @@ public void Subscribe(String subscribeAppName)
/// <summary>
/// 取消订阅
/// </summary>
public void UnSubscribe() => _Cts.Cancel();
public void UnSubscribe()
{
_Cts.Cancel();
}

/// <summary>
/// 获取消费消息
/// </summary>
/// <param name="subscribeAppName">订阅APP名称</param>
private async Task getSubscribe(String subscribeAppName)
private async Task getSubscribe(string subscribeAppName)
{
if (_Queue == null)
{
_Cts.Cancel();
OnStopSubscribe("消息列队对像为Null");
OnDisconnected("消息列队对像为Null");
return;
}
while (!_Cts.IsCancellationRequested)
{
if (_Redis == null || _Queue == null)
{
OnDisconnected("获取订阅消息时列队对像为Null。");
}

try
{
var msg = await _Queue.TakeMessageAsync(10);
if (msg != null)
{

var data = msg.GetBody<T>();
_Queue.Acknowledge(msg.Id);
//通知订阅者
OnReceived(data);
_Queue.Acknowledge(msg.Id);

}
}
catch (Exception err)
{
if (XTrace.Debug) XTrace.WriteException(err); //TODO:要优化处理日志的记录(此处日志调试用,实际生产环境不记录)
if (XTrace.Debug) XTrace.WriteException(err);

_Cts.Cancel();
OnStopSubscribe(err.Message);
Expand All @@ -145,33 +201,39 @@ private async Task getSubscribe(String subscribeAppName)
/// </summary>
public void Dispose()
{
_Queue = null;
_Redis.Dispose();
if (_Cts != null) _Cts.Cancel();
if (_Queue != null) _Queue = null;
if (_Redis != null) _Redis.Dispose();
}


#region 事件

/// <summary>
/// 通知订阅者接收到新命令
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="cmd">命令</param>
/// <param name="data">命令</param>
public delegate void ReceivedHandler(T data);

/// <summary>
/// 通知订阅者接收到新命令
/// 通知订阅者接收到新消息
/// </summary>
public event ReceivedHandler Received;

/// <summary>
/// 通知订阅者接收到新命令
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="cmd"></param>
protected void OnReceived(T data) => Received?.Invoke(data);
/// <param name="data"></param>
protected void OnReceived(T data)
{
Received?.Invoke(data);
}

/// <summary>
/// 通知订阅者停止订阅
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void StopSubscribeHandler(String msg);
public delegate void StopSubscribeHandler(string msg);

/// <summary>
/// 通知订阅者停止订阅
Expand All @@ -183,7 +245,34 @@ public void Dispose()
/// 通知订阅者停止订阅
/// </summary>
/// <param name="msg">停止消息</param>
protected void OnStopSubscribe(String msg) => StopSubscribe?.Invoke(msg);
protected void OnStopSubscribe(string msg)
{
StopSubscribe?.Invoke(msg);
}

/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void DisconnectedHandler(string msg);

/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <remarks>可以在这里处理重新连接的相关业务逻辑</remarks>
public event DisconnectedHandler Disconnected;

/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
protected void OnDisconnected(string msg)
{
Disconnected?.Invoke(msg);
}


#endregion

}
}
8 changes: 7 additions & 1 deletion QueueDemo/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static void Main(string[] args)
var consumerName = "Consumer1";
var mq = new MultipleConsumerGroupsQueue<string>();
mq.ConsumeGroupExistErrMsgKeyWord = "exist"; //不同版本的redis错误消息关键词可能不一样,这里注意设置合适的关键词
mq.Connect("centos.newlifex.com", "MultipleConsumerGroupsQueue", 6000, "Pass@word", 1);
mq.Connect("centos.newlifex.com", "MultipleConsumerGroupsQueue", 6000, "Pass@word", 7);
mq.Received += (data) => { XTrace.WriteLine($"[Redis多消费组可重复消费的队列]收到列队消息:{data}"); };
mq.StopSubscribe += (msg) =>
{
Expand All @@ -57,6 +57,12 @@ static void Main(string[] args)
Thread.Sleep(5000);
mq.Subscribe(consumerName);
};
mq.Disconnected += (msg) =>
{
//一般不会进入这里。(可能这个事件还可以再优化一下)
XTrace.WriteLine($"因“{msg}”断开连接,进入重连模式。");
mq.Connect("centos.newlifex.com", "MultipleConsumerGroupsQueue", 6000, "Pass@word", 7);
};
mq.Subscribe(consumerName); //开始订阅消息

//多消费组可重复消费消息发布
Expand Down

0 comments on commit 7f5da82

Please sign in to comment.