Skip to content

Commit

Permalink
Added connection hub
Browse files Browse the repository at this point in the history
  • Loading branch information
gregyjames committed Feb 18, 2024
1 parent b026bc2 commit 2aedb54
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -28,3 +28,5 @@ RequeueTesterProducer/obj/Debug/net7.0/
RequeueTesterProducer/obj/

ReQueue/bin/Debug/

.idea/
35 changes: 35 additions & 0 deletions ReQueue/ConnectionHub.cs
@@ -0,0 +1,35 @@
using StackExchange.Redis;

namespace ReQueue;

public class ConnectionHub
{
private readonly ConnectionMultiplexer _connectionMultiplexer;
private readonly IDatabase _db;

/// <summary>
/// Initialize a new connection hub instance.
/// </summary>
/// <param name="connectionString">The connection string of the redis database to use.</param>
/// <param name="dbnumber">The number of the database to use.</param>
/// <exception cref="ArgumentNullException"></exception>
public ConnectionHub(string connectionString, int dbnumber = 0)
{
if (string.IsNullOrEmpty(connectionString))
{
throw new ArgumentNullException(nameof(connectionString));
}
_connectionMultiplexer = ConnectionMultiplexer.Connect(connectionString);
_db = _connectionMultiplexer.GetDatabase(dbnumber);
}

/// <summary>
/// Creates a new queue of the specified type.
/// </summary>
/// <typeparam name="T">The type of the queue to create.</typeparam>
/// <returns>A message queue object of specified type.</returns>
public MessageQueue<T> GetMessageQueue<T>()
{
return new MessageQueue<T>(_db);
}
}
30 changes: 11 additions & 19 deletions ReQueue/MessageQueue.cs
@@ -1,27 +1,19 @@
using MessagePack;
using StackExchange.Redis;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace ReQueue
{
public class MessageQueue<T>
{
private ConnectionMultiplexer connectionMultiplexer = null;
private IDatabase db = null;
private readonly IDatabase _db;

/// <summary>
/// Create a new MessageQueue object.
/// </summary>
/// <param name="connectionString">The redis connection string.</param>
/// <param name="dbnumber">The number of the database to use.</param>
public MessageQueue(string connectionString, int dbnumber = 0)
/// <param name="database">The redis database to use.</param>
internal MessageQueue(IDatabase database)
{
if (string.IsNullOrEmpty(connectionString))
{
throw new ArgumentNullException("connectionString");
}
connectionMultiplexer = ConnectionMultiplexer.Connect(connectionString);
db = connectionMultiplexer.GetDatabase(dbnumber);
_db = database;
}

/// <summary>
Expand All @@ -34,25 +26,25 @@ public async Task EnqueueMessages(string queueKey, T item)
{
byte[] bytes = MessagePackSerializer.Serialize(item);
string base64String = Convert.ToBase64String(bytes);
await db.ListLeftPushAsync(queueKey, base64String);
await _db.ListLeftPushAsync(queueKey, base64String);
}

/// <summary>
/// Task to wait and recieve all messages from the queue.
/// Task to wait and receive all messages from the queue.
/// </summary>
/// <param name="queueKey">The name of the queue.</param>
/// <param name="action">The action to be run when an item is recieved.</param>
/// <param name="action">The action to be run when an item is received.</param>
/// <param name="cancellationToken">The token to be used for cancelling the task.</param>
/// <param name="filter">Optional filter for recieved messages. By default, items that fail the filter are requeued.</param>
/// <param name="filter">Optional filter for received messages. By default, items that fail the filter are re-queued.</param>
/// <returns></returns>
public async Task DequeueMessages(string queueKey, Action<T> action, CancellationToken cancellationToken, Func<T, bool> filter = null)
public async Task DequeueMessages(string queueKey, Action<T> action, CancellationToken cancellationToken, Func<T, bool> filter = null!)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
// Use BLPOP to block and wait for an item to be added to the list
var result = await db.ListRightPopAsync(queueKey);
var result = await _db.ListRightPopAsync(queueKey);

if (result != RedisValue.Null)
{
Expand Down Expand Up @@ -95,7 +87,7 @@ public async Task DequeueMessages(string queueKey, Action<T> action, Cancellatio
/// <returns></returns>
public async Task ClearQueue(string queueKey)
{
await db.KeyDeleteAsync(queueKey);
await _db.KeyDeleteAsync(queueKey);
}
}
}
3 changes: 2 additions & 1 deletion ReQueueClient/Program.cs
Expand Up @@ -6,7 +6,8 @@ internal class Program
{
static async Task Main(string[] args)
{
var queue = new MessageQueue<Data>("localhost", 0);
var manager = new ConnectionHub("localhost", 0);
var queue = manager.GetMessageQueue<Data>();
var tokenSource = new CancellationTokenSource();

tokenSource.CancelAfter(TimeSpan.FromSeconds(10));
Expand Down
5 changes: 4 additions & 1 deletion RequeueTesterProducer/Program.cs
@@ -1,11 +1,14 @@

using ReQueue;

namespace RequeueTesterProducer
{
internal class Program
{
static async Task Main(string[] args)
{
var queue = new ReQueue.MessageQueue<Data>("localhost", 0);
var manager = new ConnectionHub("localhost", 0);
var queue = manager.GetMessageQueue<Data>();

for (int i = 0; i < 1000; i++)
{
Expand Down

0 comments on commit 2aedb54

Please sign in to comment.