Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit d404170

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
1. add UnDequeue method to simple queue, to push items back to front of queue
2. pipelined dequeue 3. UnDequeue test
1 parent 2df393e commit d404170

File tree

3 files changed

+51
-9
lines changed

3 files changed

+51
-9
lines changed

src/ServiceStack.Redis/Support/Queue/ISimpleWorkQueue.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ public interface ISimpleWorkQueue<T> : IDisposable where T : class
88
{
99
void Enqueue(T workItem);
1010

11+
void UnDequeue(IList<T> workItems);
12+
1113
IList<T> Dequeue(int maxBatchSize);
1214
}
1315
}

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSimpleWorkQueue.cs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23

34

45
namespace ServiceStack.Redis.Support.Queue.Implementation
@@ -34,7 +35,33 @@ public void Enqueue(T msg)
3435
client.RPush(key, client.Serialize(msg));
3536
}
3637
}
37-
38+
39+
/// <summary>
40+
/// Return dequeued items to front of list
41+
/// </summary>
42+
/// <param name="workItems"></param>
43+
public void UnDequeue(IList<T> workItems)
44+
{
45+
if (workItems == null || workItems.Count == 0)
46+
return;
47+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
48+
{
49+
var client = disposableClient.Client;
50+
var key = queueNamespace.GlobalCacheKey(pendingWorkItemIdQueue);
51+
using (var pipe = client.CreatePipeline())
52+
{
53+
for (int i = workItems.Count - 1; i >= 0; i-- )
54+
{
55+
int index = i;
56+
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
57+
}
58+
pipe.Flush();
59+
60+
}
61+
}
62+
}
63+
64+
3865
/// <summary>
3966
/// Dequeue next batch of work items for processing. After this method is called,
4067
/// no other work items with same id will be available for
@@ -48,14 +75,22 @@ public IList<T> Dequeue(int maxBatchSize)
4875
{
4976
var client = disposableClient.Client;
5077
var dequeueItems = new List<T>();
51-
var key = queueNamespace.GlobalCacheKey(pendingWorkItemIdQueue);
52-
int workItemCount = 0;
53-
while (client.LLen(key) > 0 && workItemCount < maxBatchSize)
78+
using (var pipe = client.CreatePipeline())
5479
{
55-
var workItem = (T) client.Deserialize(client.LPop(key));
56-
if (workItem == null) continue;
57-
dequeueItems.Add(workItem);
58-
workItemCount++;
80+
var key = queueNamespace.GlobalCacheKey(pendingWorkItemIdQueue);
81+
for (var i = 0; i < maxBatchSize; ++i)
82+
{
83+
pipe.QueueCommand(
84+
r => ((RedisNativeClient) r).LPop(key),
85+
x =>
86+
{
87+
if (x != null)
88+
dequeueItems.Add((T) client.Deserialize(x));
89+
});
90+
91+
}
92+
pipe.Flush();
93+
5994
}
6095
return dequeueItems;
6196
}

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public void TestSimpleWorkQueue()
119119
batch = queue.Dequeue(numMessages * 2);
120120
Assert.AreEqual(batch.Count, 0);
121121

122+
// test that UnDequeue works
123+
queue.UnDequeue(batch);
124+
var undequeuedBatch = queue.Dequeue(numMessages*2);
125+
Assert.AreEqual(undequeuedBatch, batch);
126+
122127
}
123128
}
124129

0 commit comments

Comments
 (0)