Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 34ca2f6

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
fixed design flaw in sequential work queue
1 parent 2510256 commit 34ca2f6

File tree

4 files changed

+79
-35
lines changed

4 files changed

+79
-35
lines changed

src/ServiceStack.Redis/Support/Locking/DistributedLock.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,10 @@ public virtual bool Unlock()
122122

123123
if (lockVal != lockExpire)
124124
{
125-
Debug.WriteLine(String.Format("Unlock(): Failed to unlock key {0}; lock has been acquired by another client ", lockKey));
125+
if (lockVal != 0)
126+
Debug.WriteLine(String.Format("Unlock(): Failed to unlock key {0}; lock has been acquired by another client ", lockKey));
127+
else
128+
Debug.WriteLine(String.Format("Unlock(): Failed to unlock key {0}; lock has been identifed as a zombie and harvested ", lockKey));
126129
localClient.UnWatch();
127130
return false;
128131
}

src/ServiceStack.Redis/Support/Queue/ISequentialWorkQueue.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@ namespace ServiceStack.Redis.Support.Queue
66
{
77
public interface ISequentialWorkQueue<T> : IDisposable where T : class
88
{
9+
910
/// <summary>
10-
/// Enqueue item in queue corresponding to workItemId identifier
11+
/// Enqueue item in priority queue corresponding to workItemId identifier
1112
/// </summary>
1213
/// <param name="workItemId"></param>
1314
/// <param name="workItem"></param>
1415
void Enqueue(string workItemId, T workItem);
15-
16+
17+
/// <summary>
18+
/// Preprare next work item id for dequeueing
19+
/// </summary>
20+
void PrepareNextWorkItem();
21+
22+
1623
/// <summary>
1724
/// Dequeue up to maxBatchSize items from queue corresponding to workItemId identifier.
1825
/// Once this method is called, <see cref="Dequeue"/> or <see cref="Peek"/> will not

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ public partial class RedisSequentialWorkQueue<T> : RedisWorkQueue<T>, ISequentia
2020
private int dequeueLockTimeout = 300;
2121
protected const double CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0;
2222

23-
private string dequeueIds;
23+
// store list of work item ids that have been dequeued
24+
// this list is checked regularly in harvest zombies method
25+
private string dequeueIdSet;
26+
27+
28+
private string workItemIdPriorityQueue;
29+
2430
private const int numTagsForDequeueLock = RedisNamespace.NumTagsForLockKey + 1;
2531

2632
public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, int port, int dequeueLockTimeout)
@@ -32,7 +38,8 @@ public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, strin
3238
: base(maxReadPoolSize, maxWritePoolSize, host, port, queueName)
3339
{
3440
this.dequeueLockTimeout = dequeueLockTimeout;
35-
dequeueIds = queueNamespace.GlobalCacheKey("DequeueIds");
41+
dequeueIdSet = queueNamespace.GlobalCacheKey("DequeueIdSet");
42+
workItemIdPriorityQueue = queueNamespace.GlobalCacheKey("WorkItemIdPriorityQueue");
3643
}
3744

3845
/// <summary>
@@ -51,15 +58,15 @@ public void Enqueue(string workItemId, T workItem)
5158
using (var pipe = client.CreatePipeline())
5259
{
5360
pipe.QueueCommand(r => ((RedisNativeClient)r).RPush(queueNamespace.GlobalCacheKey(workItemId), client.Serialize(workItem)));
54-
pipe.QueueCommand(r => ((RedisNativeClient)r).ZIncrBy(pendingWorkItemIdQueue, -1, client.Serialize(workItemId)));
61+
pipe.QueueCommand(r => ((RedisNativeClient)r).ZIncrBy(workItemIdPriorityQueue, -1, client.Serialize(workItemId)));
5562
pipe.Flush();
5663
}
5764
}
5865
}
5966
}
6067

61-
62-
public ISequentialData<T> Dequeue(int maxBatchSize)
68+
69+
public void PrepareNextWorkItem()
6370
{
6471
//harvest zombies every 5 minutes
6572
var now = DateTime.UtcNow;
@@ -70,44 +77,65 @@ public ISequentialData<T> Dequeue(int maxBatchSize)
7077
harvestTime = now;
7178
}
7279

73-
80+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
81+
{
82+
var client = disposableClient.Client;
83+
84+
//1. get next workItemId, or return if there isn't one
85+
var smallest = client.ZRangeWithScores(workItemIdPriorityQueue, 0, 0);
86+
if (smallest == null || smallest.Length <= 1 ||
87+
RedisNativeClient.ParseDouble(smallest[1]) == CONVENIENTLY_SIZED_FLOAT) return;
88+
var workItemId = client.Deserialize(smallest[0]) as string;
89+
using (var pipe = client.CreatePipeline())
90+
{
91+
var rawWorkItemId = client.Serialize(workItemId);
92+
93+
// lock work item id
94+
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(workItemIdPriorityQueue, CONVENIENTLY_SIZED_FLOAT, smallest[0]));
95+
96+
// track dequeue lock id
97+
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(dequeueIdSet, rawWorkItemId));
98+
99+
// push into pending set
100+
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(pendingWorkItemIdQueue, rawWorkItemId));
101+
102+
pipe.Flush();
103+
}
104+
}
105+
}
106+
107+
public ISequentialData<T> Dequeue(int maxBatchSize)
108+
{
109+
74110
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
75111
{
76112
var client = disposableClient.Client;
77113

78114
//1. get next workItemId
79-
string workItemId = null;
80115
var workItems = new List<T>();
81-
var smallest = client.ZRangeWithScores(pendingWorkItemIdQueue, 0, 0);
82116
DequeueLock workItemIdLock = null;
83117
try
84118
{
85-
if (smallest != null && smallest.Length > 1 && RedisNativeClient.ParseDouble(smallest[1]) != CONVENIENTLY_SIZED_FLOAT)
119+
var rawWorkItemId = client.SPop(pendingWorkItemIdQueue);
120+
var workItemId = client.Deserialize(rawWorkItemId) as string;;
121+
if (rawWorkItemId != null)
86122
{
87-
workItemId = client.Deserialize(smallest[0]) as string;
88-
89123
using (var pipe = client.CreatePipeline())
90124
{
91-
// track dequeue lock id
92-
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(dequeueIds, client.Serialize(workItemId)));
93-
94-
// lock work item id
95-
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(pendingWorkItemIdQueue, CONVENIENTLY_SIZED_FLOAT, smallest[0]));
96-
97125
// dequeue items
98126
var key = queueNamespace.GlobalCacheKey(workItemId);
99-
Action<byte[]> dequeueCallback = x =>
100-
{
101-
if (x != null)
102-
workItems.Add((T) client.Deserialize(x));
103-
};
127+
Action<byte[]> dequeueCallback = x =>
128+
{
129+
if (x != null)
130+
workItems.Add((T) client.Deserialize(x));
131+
};
104132

105133
for (var i = 0; i < maxBatchSize; ++i)
106134
{
107135
int index = i;
108136
pipe.QueueCommand(
109-
r => ((RedisNativeClient)r).LIndex(key, index),
110-
dequeueCallback);
137+
r => ((RedisNativeClient) r).LIndex(key, index),
138+
dequeueCallback);
111139

112140
}
113141
pipe.Flush();
@@ -117,7 +145,7 @@ public ISequentialData<T> Dequeue(int maxBatchSize)
117145
string dequeueLockKey = null;
118146
// don't lock if there are no work items to be processed (can't lock on null lock key)
119147
if (workItems.Count > 0)
120-
dequeueLockKey = GlobalDequeueLockKey(workItemId);
148+
dequeueLockKey = GlobalDequeueLockKey(workItemId);
121149
workItemIdLock.Lock(dequeueLockKey, lockAcquisitionTimeout, dequeueLockTimeout);
122150

123151
}
@@ -165,7 +193,7 @@ public bool HarvestZombies()
165193
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
166194
{
167195
var client = disposableClient.Client;
168-
var dequeueWorkItemIds = client.SMembers(dequeueIds);
196+
var dequeueWorkItemIds = client.SMembers(dequeueIdSet);
169197
if (dequeueWorkItemIds.Length == 0) return false;
170198

171199
var keys = new string[dequeueWorkItemIds.Length];
@@ -225,16 +253,16 @@ public bool TryForceReleaseLock(SerializingRedisClient client, string workItemId
225253
using (var trans = client.CreateTransaction())
226254
{
227255
//untrack dequeue lock
228-
trans.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueIds, client.Serialize(workItemId)));
256+
trans.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueIdSet, client.Serialize(workItemId)));
229257

230258
//delete dequeue lock
231259
trans.QueueCommand(r => ((RedisNativeClient)r).Del(dequeueLockKey));
232260

233261
// update priority queue : this will allow other clients to access this workItemId
234262
if (len == 0)
235-
trans.QueueCommand(r => ((RedisNativeClient)r).ZRem(pendingWorkItemIdQueue, client.Serialize(workItemId)));
263+
trans.QueueCommand(r => ((RedisNativeClient)r).ZRem(workItemIdPriorityQueue, client.Serialize(workItemId)));
236264
else
237-
trans.QueueCommand(r => ((RedisNativeClient)r).ZAdd(pendingWorkItemIdQueue, len, client.Serialize(workItemId)));
265+
trans.QueueCommand(r => ((RedisNativeClient)r).ZAdd(workItemIdPriorityQueue, len, client.Serialize(workItemId)));
238266

239267
rc = trans.Commit();
240268
}
@@ -264,13 +292,13 @@ private void Unlock(string workItemId)
264292
using (var pipe = client.CreatePipeline())
265293
{
266294
//untrack dequeue lock
267-
pipe.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueIds, client.Serialize(workItemId)));
295+
pipe.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueIdSet, client.Serialize(workItemId)));
268296

269297
// update priority queue
270298
if (len == 0)
271-
pipe.QueueCommand(r => ((RedisNativeClient)r).ZRem(pendingWorkItemIdQueue, client.Serialize(workItemId)));
299+
pipe.QueueCommand(r => ((RedisNativeClient)r).ZRem(workItemIdPriorityQueue, client.Serialize(workItemId)));
272300
else
273-
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(pendingWorkItemIdQueue, len, client.Serialize(workItemId)));
301+
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(workItemIdPriorityQueue, len, client.Serialize(workItemId)));
274302

275303

276304
pipe.Flush();

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void TestSequentialWorkQueue()
3030
queue.Enqueue(patients[1], messages1[i]);
3131
}
3232

33+
queue.PrepareNextWorkItem();
3334
var batch = queue.Dequeue(numMessages/2);
3435
// check that half of patient[0] messages are returned
3536
for (int i = 0; i < numMessages/2; ++i)
@@ -41,6 +42,7 @@ public void TestSequentialWorkQueue()
4142

4243

4344
// check that all patient[1] messages are returned
45+
queue.PrepareNextWorkItem();
4446
batch = queue.Dequeue(2*numMessages);
4547
// check that batch size is respected
4648
Assert.AreEqual(batch.DequeueItems.Count, numMessages);
@@ -52,6 +54,7 @@ public void TestSequentialWorkQueue()
5254

5355

5456
// check that there are numMessages/2 messages in the queue
57+
queue.PrepareNextWorkItem();
5558
batch = queue.Dequeue(numMessages);
5659
Assert.AreEqual(batch.DequeueItems.Count, numMessages/2);
5760

@@ -60,15 +63,18 @@ public void TestSequentialWorkQueue()
6063
int remaining = batch.DequeueItems.Count-1;
6164
batch.PopAndUnlock();
6265

66+
queue.PrepareNextWorkItem();
6367
batch = queue.Dequeue(numMessages);
6468
Assert.AreEqual(batch.DequeueItems.Count, remaining);
6569

6670
//process remaining items
71+
queue.PrepareNextWorkItem();
6772
batch = queue.Dequeue(remaining);
6873
Assert.AreEqual(batch.DequeueItems.Count, remaining);
6974
for (int i = 0; i < numMessages; ++i)
7075
batch.DoneProcessedWorkItem();
7176

77+
queue.PrepareNextWorkItem();
7278
batch = queue.Dequeue(remaining);
7379
Assert.AreEqual(batch.DequeueItems.Count, 0);
7480

0 commit comments

Comments
 (0)