Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 69512cd

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
added ability to re-queue items at front of queue. Useful
for when processing hits an error condition on one of the queue items, and needs to pause the processing
1 parent f9520b8 commit 69512cd

File tree

6 files changed

+105
-23
lines changed

6 files changed

+105
-23
lines changed

src/ServiceStack.Redis/Support/Queue/ISequentialWorkQueue.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,34 @@ namespace ServiceStack.Redis.Support.Queue
55
{
66
public interface ISequentialWorkQueue<T> : IDisposable where T : class
77
{
8+
/// <summary>
9+
/// Enqueue item in queue corresponding to workItemId identifier
10+
/// </summary>
11+
/// <param name="workItemId"></param>
12+
/// <param name="workItem"></param>
813
void Enqueue(string workItemId, T workItem);
914

15+
/// <summary>
16+
/// Dequeue up to maxBatchSize items from queue corresponding to workItemId identifier.
17+
/// Once this method is called, no items for workItemId queue can be dequeued until
18+
/// <see cref="PostDequeue"/> is called
19+
/// </summary>
20+
/// <param name="maxBatchSize"></param>
21+
/// <returns></returns>
1022
KeyValuePair<string, IList<T>> Dequeue(int maxBatchSize);
1123

24+
/// <summary>
25+
/// Push items to front of queue corresponding to workItemId identifier
26+
/// </summary>
27+
/// <param name="workItemId"></param>
28+
/// <param name="workItems"></param>
29+
void PushFront(string workItemId, IList<T> workItems);
30+
31+
/// <summary>
32+
/// Must be called after dequeued items have been processed, in order for other callers
33+
/// to dequeue items from workItemId queue
34+
/// </summary>
35+
/// <param name="workItemId"></param>
1236
void PostDequeue(string workItemId);
1337
}
1438
}

src/ServiceStack.Redis/Support/Queue/ISimpleWorkQueue.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,23 @@ namespace ServiceStack.Redis.Support.Queue
66
{
77
public interface ISimpleWorkQueue<T> : IDisposable where T : class
88
{
9+
/// <summary>
10+
/// Enqueue item
11+
/// </summary>
12+
/// <param name="workItem"></param>
913
void Enqueue(T workItem);
1014

11-
void UnDequeue(IList<T> workItems);
15+
/// <summary>
16+
/// Push items to front of queue
17+
/// </summary>
18+
/// <param name="workItems"></param>
19+
void PushFront(IList<T> workItems);
1220

21+
/// <summary>
22+
/// Dequeue up to maxBatchSize items from queue
23+
/// </summary>
24+
/// <param name="maxBatchSize"></param>
25+
/// <returns></returns>
1326
IList<T> Dequeue(int maxBatchSize);
1427
}
1528
}

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,33 @@ public void Enqueue(string workItemId, T workItem)
5151
}
5252
}
5353
}
54+
}
55+
56+
/// <summary>
57+
/// Return dequeued items to front of queue
58+
/// </summary>
59+
/// <param name="workItems"></param>
60+
/// <param name="workItemId"></param>
61+
public void PushFront(string workItemId, IList<T> workItems)
62+
{
63+
if (workItems == null || workItems.Count == 0)
64+
return;
65+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
66+
{
67+
var client = disposableClient.Client;
68+
var key = queueNamespace.GlobalCacheKey(workItemId);
69+
using (var pipe = client.CreatePipeline())
70+
{
71+
for (int i = workItems.Count - 1; i >= 0; i--)
72+
{
73+
int index = i;
74+
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
75+
}
76+
pipe.Flush();
5477

78+
}
79+
PostDequeue(workItemId);
80+
}
5581
}
5682

5783
/// <summary>

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSimpleWorkQueue.cs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,13 @@ public void Enqueue(T msg)
3535
client.RPush(key, client.Serialize(msg));
3636
}
3737
}
38-
3938
/// <summary>
40-
/// Return dequeued items to front of list
39+
/// Return dequeued items to front of queue
4140
/// </summary>
4241
/// <param name="workItems"></param>
43-
public void UnDequeue(IList<T> workItems)
42+
public void PushFront(IList<T> workItems)
4443
{
45-
if (workItems == null || workItems.Count == 0)
46-
return;
47-
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
48-
{
49-
var client = disposableClient.Client;
50-
var key = queueNamespace.GlobalCacheKey(pendingWorkItemIdQueue);
51-
using (var pipe = client.CreatePipeline())
52-
{
53-
for (int i = workItems.Count - 1; i >= 0; i-- )
54-
{
55-
int index = i;
56-
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
57-
}
58-
pipe.Flush();
59-
60-
}
61-
}
44+
UnDequeueImpl(workItems, pendingWorkItemIdQueue);
6245
}
6346

6447

src/ServiceStack.Redis/Support/Queue/Implementation/RedisWorkQueue.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,34 @@ public RedisWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, in
3535
};
3636
}
3737

38+
39+
/// <summary>
40+
/// Return dequeued items to front of queue
41+
/// </summary>
42+
/// <param name="workItems"></param>
43+
/// <param name="listId"></param>
44+
protected void UnDequeueImpl(IList<T> workItems, string listId)
45+
{
46+
if (workItems == null || workItems.Count == 0)
47+
return;
48+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
49+
{
50+
var client = disposableClient.Client;
51+
var key = queueNamespace.GlobalCacheKey(listId);
52+
using (var pipe = client.CreatePipeline())
53+
{
54+
for (int i = workItems.Count - 1; i >= 0; i--)
55+
{
56+
int index = i;
57+
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
58+
}
59+
pipe.Flush();
60+
61+
}
62+
}
63+
}
64+
65+
3866
public void Dispose()
3967
{
4068
clientManager.Dispose();

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,19 @@ public void TestSequentialWorkQueue()
4848
Assert.AreEqual(batch.Value.Count, numMessages/2);
4949
queue.PostDequeue(batch.Key);
5050

51+
// test that PushFront works
52+
queue.PushFront(batch.Key, batch.Value);
53+
var undequeuedBatch = queue.Dequeue(numMessages * 2);
54+
Assert.AreEqual(undequeuedBatch.Value, batch.Value);
55+
Assert.AreEqual(undequeuedBatch.Key, batch.Key);
56+
queue.PostDequeue(batch.Key);
57+
58+
5159
// check that there are no more messages in the queue
5260
batch = queue.Dequeue(numMessages);
5361
Assert.IsNull(batch.Key);
5462
Assert.AreEqual(batch.Value.Count, 0);
55-
63+
5664
}
5765
}
5866

@@ -120,7 +128,7 @@ public void TestSimpleWorkQueue()
120128
Assert.AreEqual(batch.Count, 0);
121129

122130
// test that UnDequeue works
123-
queue.UnDequeue(batch);
131+
queue.PushFront(batch);
124132
var undequeuedBatch = queue.Dequeue(numMessages*2);
125133
Assert.AreEqual(undequeuedBatch, batch);
126134

0 commit comments

Comments
 (0)