Skip to content

Commit

Permalink
队列长度默认值100万
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Jun 4, 2021
1 parent 1248463 commit 033b12e
Showing 1 changed file with 16 additions and 39 deletions.
55 changes: 16 additions & 39 deletions NewLife.Redis/MultipleConsumerGroupsQueue.cs
Expand Up @@ -14,30 +14,22 @@ public class MultipleConsumerGroupsQueue<T> : IDisposable
/// <summary>
/// Redis客户端
/// </summary>
FullRedis _Redis;
private FullRedis _Redis;

/// <summary>
/// 消息列队
/// </summary>
RedisStream<T> _Queue;
private RedisStream<T> _Queue;

/// <summary>
/// 读写超时(默认15000ms)
/// </summary>
public int TimeOut
{
set;
get;
} = 15_000;
public Int32 TimeOut { set; get; } = 15_000;

/// <summary>
/// 订阅者名称
/// </summary>
public string SubscribeAppName
{
private set;
get;
}
public String SubscribeAppName { private set; get; }

/// <summary>
/// 消费者组名已经存在的Redis错误消息关键词
Expand All @@ -47,7 +39,7 @@ public string SubscribeAppName
/// <summary>
/// 列队长度
/// </summary>
public int QueueLen { set; get; } = 2000;
public Int32 QueueLen { set; get; } = 1_000_000;


/// <summary>
Expand All @@ -73,7 +65,7 @@ public ILog Log
/// <param name="port">端口(默认6379)</param>
/// <param name="password">密码</param>
/// <param name="db">连接Redis数据库</param>
public void Connect(string host, string queueName, int port = 6379, string password = "", int db = 0)
public void Connect(String host, String queueName, Int32 port = 6379, String password = "", Int32 db = 0)
{
_Redis = new FullRedis($"{host}:{port}", password, db) { Timeout = TimeOut };
if (_Redis != null)
Expand All @@ -92,22 +84,19 @@ public void Connect(string host, string queueName, int port = 6379, string passw
/// 发送消息
/// </summary>
/// <param name="data"></param>
public void Publish(T data)
{
_Queue.Add(data);
}
public void Publish(T data) => _Queue.Add(data);


/// <summary>
/// 独立线程消费
/// </summary>
CancellationTokenSource _Cts;
private CancellationTokenSource _Cts;

/// <summary>
/// 订阅
/// </summary>
/// <param name="subscribeAppName">消费者名称</param>
public void Subscribe(string subscribeAppName)
public void Subscribe(String subscribeAppName)
{
SubscribeAppName = subscribeAppName;
_Cts = new CancellationTokenSource();
Expand Down Expand Up @@ -148,16 +137,13 @@ public void Subscribe(string subscribeAppName)
/// <summary>
/// 取消订阅
/// </summary>
public void UnSubscribe()
{
_Cts.Cancel();
}
public void UnSubscribe() => _Cts.Cancel();

/// <summary>
/// 获取消费消息
/// </summary>
/// <param name="subscribeAppName">订阅APP名称</param>
private async Task getSubscribe(string subscribeAppName)
private async Task getSubscribe(String subscribeAppName)
{
if (_Queue == null)
{
Expand Down Expand Up @@ -224,16 +210,13 @@ public void Dispose()
/// 通知订阅者接收到新消息
/// </summary>
/// <param name="data"></param>
protected void OnReceived(T data)
{
Received?.Invoke(data);
}
protected void OnReceived(T data) => Received?.Invoke(data);

/// <summary>
/// 通知订阅者停止订阅
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void StopSubscribeHandler(string msg);
public delegate void StopSubscribeHandler(String msg);

/// <summary>
/// 通知订阅者停止订阅
Expand All @@ -243,16 +226,13 @@ protected void OnReceived(T data)

/// <summary>通知订阅者停止订阅</summary>
/// <param name="msg">停止消息</param>
protected void OnStopSubscribe(string msg)
{
StopSubscribe?.Invoke(msg);
}
protected void OnStopSubscribe(String msg) => StopSubscribe?.Invoke(msg);

/// <summary>
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
public delegate void DisconnectedHandler(string msg);
public delegate void DisconnectedHandler(String msg);

/// <summary>
/// 通知订阅者断开连接
Expand All @@ -264,10 +244,7 @@ protected void OnStopSubscribe(string msg)
/// 通知订阅者断开连接
/// </summary>
/// <param name="msg">停止消息</param>
protected void OnDisconnected(string msg)
{
Disconnected?.Invoke(msg);
}
protected void OnDisconnected(String msg) => Disconnected?.Invoke(msg);


#endregion
Expand Down

0 comments on commit 033b12e

Please sign in to comment.