Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 8b7842a

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
small tweaks to work queues
1 parent 69512cd commit 8b7842a

File tree

5 files changed

+34
-76
lines changed

5 files changed

+34
-76
lines changed

src/ServiceStack.Redis/Support/Queue/ISimpleWorkQueue.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@ public interface ISimpleWorkQueue<T> : IDisposable where T : class
1212
/// <param name="workItem"></param>
1313
void Enqueue(T workItem);
1414

15-
/// <summary>
16-
/// Push items to front of queue
17-
/// </summary>
18-
/// <param name="workItems"></param>
19-
void PushFront(IList<T> workItems);
20-
2115
/// <summary>
2216
/// Dequeue up to maxBatchSize items from queue
2317
/// </summary>

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class RedisSequentialWorkQueue<T> : RedisWorkQueue<T>, ISequentialWorkQue
1515
{
1616
private int lockAcquisitionTimeout = 2;
1717
private int lockTimeout = 2;
18-
private double CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0;
18+
protected const double CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0;
1919

2020

2121

@@ -24,7 +24,7 @@ public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, strin
2424
{
2525
}
2626

27-
public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, int port, string queueName )
27+
public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, int port, string queueName)
2828
: base(maxReadPoolSize, maxWritePoolSize, host, port, queueName)
2929
{
3030
}
@@ -53,33 +53,6 @@ public void Enqueue(string workItemId, T workItem)
5353
}
5454
}
5555

56-
/// <summary>
57-
/// Return dequeued items to front of queue
58-
/// </summary>
59-
/// <param name="workItems"></param>
60-
/// <param name="workItemId"></param>
61-
public void PushFront(string workItemId, IList<T> workItems)
62-
{
63-
if (workItems == null || workItems.Count == 0)
64-
return;
65-
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
66-
{
67-
var client = disposableClient.Client;
68-
var key = queueNamespace.GlobalCacheKey(workItemId);
69-
using (var pipe = client.CreatePipeline())
70-
{
71-
for (int i = workItems.Count - 1; i >= 0; i--)
72-
{
73-
int index = i;
74-
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
75-
}
76-
pipe.Flush();
77-
78-
}
79-
PostDequeue(workItemId);
80-
}
81-
}
82-
8356
/// <summary>
8457
/// Dequeue next batch of messages for processing. After this method is called,
8558
/// no other messages with same work item id will be available for
@@ -123,6 +96,37 @@ public KeyValuePair<string, IList<T>> Dequeue(int maxBatchSize)
12396
}
12497
}
12598

99+
/// <summary>
100+
/// Return dequeued items to front of queue
101+
/// </summary>
102+
/// <param name="workItems"></param>
103+
/// <param name="workItemId"></param>
104+
public void PushFront(string workItemId, IList<T> workItems)
105+
{
106+
if (workItems == null || workItems.Count == 0)
107+
return;
108+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
109+
{
110+
var client = disposableClient.Client;
111+
var key = queueNamespace.GlobalCacheKey(workItemId);
112+
var lockKey = queueNamespace.GlobalKey(workItemId, RedisNamespace.NumTagsForLockKey);
113+
using (var disposableLock = new DisposableDistributedLock(client, lockKey, lockAcquisitionTimeout, lockTimeout))
114+
{
115+
using (var pipe = client.CreatePipeline())
116+
{
117+
for (int i = workItems.Count - 1; i >= 0; i--)
118+
{
119+
int index = i;
120+
pipe.QueueCommand(
121+
r => ((RedisNativeClient) r).LPush(key, client.Serialize(workItems[index])));
122+
}
123+
pipe.QueueCommand(r => ((RedisNativeClient)r).ZIncrBy(pendingWorkItemIdQueue, -1, client.Serialize(workItemId)));
124+
pipe.Flush();
125+
}
126+
}
127+
}
128+
}
129+
126130
/// <summary>
127131
/// Unlock message id, so other servers can process messages for this message id
128132
/// </summary>

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSimpleWorkQueue.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,6 @@ public void Enqueue(T msg)
3535
client.RPush(key, client.Serialize(msg));
3636
}
3737
}
38-
/// <summary>
39-
/// Return dequeued items to front of queue
40-
/// </summary>
41-
/// <param name="workItems"></param>
42-
public void PushFront(IList<T> workItems)
43-
{
44-
UnDequeueImpl(workItems, pendingWorkItemIdQueue);
45-
}
4638

4739

4840
/// <summary>

src/ServiceStack.Redis/Support/Queue/Implementation/RedisWorkQueue.cs

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,34 +35,6 @@ public RedisWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, in
3535
};
3636
}
3737

38-
39-
/// <summary>
40-
/// Return dequeued items to front of queue
41-
/// </summary>
42-
/// <param name="workItems"></param>
43-
/// <param name="listId"></param>
44-
protected void UnDequeueImpl(IList<T> workItems, string listId)
45-
{
46-
if (workItems == null || workItems.Count == 0)
47-
return;
48-
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
49-
{
50-
var client = disposableClient.Client;
51-
var key = queueNamespace.GlobalCacheKey(listId);
52-
using (var pipe = client.CreatePipeline())
53-
{
54-
for (int i = workItems.Count - 1; i >= 0; i--)
55-
{
56-
int index = i;
57-
pipe.QueueCommand(r => ((RedisNativeClient)r).LPush(key, client.Serialize(workItems[index])));
58-
}
59-
pipe.Flush();
60-
61-
}
62-
}
63-
}
64-
65-
6638
public void Dispose()
6739
{
6840
clientManager.Dispose();

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace ServiceStack.Redis.Tests
88
[TestFixture]
99
public class QueueTests
1010
{
11+
1112
[Test]
1213
public void TestSequentialWorkQueue()
1314
{
@@ -127,11 +128,6 @@ public void TestSimpleWorkQueue()
127128
batch = queue.Dequeue(numMessages * 2);
128129
Assert.AreEqual(batch.Count, 0);
129130

130-
// test that UnDequeue works
131-
queue.PushFront(batch);
132-
var undequeuedBatch = queue.Dequeue(numMessages*2);
133-
Assert.AreEqual(undequeuedBatch, batch);
134-
135131
}
136132
}
137133

0 commit comments

Comments
 (0)