Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 37dfe6e

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
fixed potential race condition when preparing an item for dequeue
1 parent 66adc41 commit 37dfe6e

File tree

3 files changed

+44
-21
lines changed

3 files changed

+44
-21
lines changed

src/ServiceStack.Redis/Support/Queue/ISequentialWorkQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface ISequentialWorkQueue<T> : IDisposable where T : class
1717
/// <summary>
1818
/// Preprare next work item id for dequeueing
1919
/// </summary>
20-
void PrepareNextWorkItem();
20+
bool PrepareNextWorkItem();
2121

2222

2323
/// <summary>

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void Enqueue(string workItemId, T workItem)
6868
/// <summary>
6969
/// Must call this periodically to move work items from priority queue to pending queue
7070
/// </summary>
71-
public void PrepareNextWorkItem()
71+
public bool PrepareNextWorkItem()
7272
{
7373
//harvest zombies every 5 minutes
7474
var now = DateTime.UtcNow;
@@ -86,24 +86,39 @@ public void PrepareNextWorkItem()
8686
//1. get next workItemId, or return if there isn't one
8787
var smallest = client.ZRangeWithScores(workItemIdPriorityQueue, 0, 0);
8888
if (smallest == null || smallest.Length <= 1 ||
89-
RedisNativeClient.ParseDouble(smallest[1]) == CONVENIENTLY_SIZED_FLOAT) return;
89+
RedisNativeClient.ParseDouble(smallest[1]) == CONVENIENTLY_SIZED_FLOAT) return false;
9090
var workItemId = client.Deserialize(smallest[0]) as string;
91-
using (var pipe = client.CreatePipeline())
91+
92+
// lock on work item id
93+
var lockKey = queueNamespace.GlobalLockKey(workItemId);
94+
using (var disposableLock = new DisposableDistributedLock(client, lockKey, lockAcquisitionTimeout, lockTimeout))
9295
{
93-
var rawWorkItemId = client.Serialize(workItemId);
96+
// if another client has queued this work item id, then the work item id score
97+
// will be set to CONVENIENTLY_SIZED_FLOAT
98+
var score = client.ZScore(workItemIdPriorityQueue, smallest[0]);
99+
if (score == CONVENIENTLY_SIZED_FLOAT) return false;
94100

95-
// lock work item id
96-
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(workItemIdPriorityQueue, CONVENIENTLY_SIZED_FLOAT, smallest[0]));
101+
using (var pipe = client.CreatePipeline())
102+
{
103+
var rawWorkItemId = client.Serialize(workItemId);
97104

98-
// track dequeue lock id
99-
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(dequeueIdSet, rawWorkItemId));
105+
// lock work item id
106+
pipe.QueueCommand(
107+
r =>
108+
((RedisNativeClient) r).ZAdd(workItemIdPriorityQueue, CONVENIENTLY_SIZED_FLOAT, smallest[0]));
100109

101-
// push into pending set
102-
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(pendingWorkItemIdQueue, rawWorkItemId));
110+
// track dequeue lock id
111+
pipe.QueueCommand(r => ((RedisNativeClient) r).SAdd(dequeueIdSet, rawWorkItemId));
112+
113+
// push into pending set
114+
pipe.QueueCommand(r => ((RedisNativeClient) r).SAdd(pendingWorkItemIdQueue, rawWorkItemId));
115+
116+
pipe.Flush();
117+
}
103118

104-
pipe.Flush();
105119
}
106120
}
121+
return true;
107122
}
108123

109124
public ISequentialData<T> Dequeue(int maxBatchSize)

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,32 @@ namespace ServiceStack.Redis.Tests
99
[TestFixture]
1010
public class QueueTests : RedisClientTestsBase
1111
{
12+
const int numMessages = 6;
13+
private IList<string> messages0 = new List<string>();
14+
private IList<string> messages1 = new List<string>();
15+
private string[] patients = new[] {"patient0", "patient1"};
16+
17+
[SetUp]
18+
public override void OnBeforeEachTest()
19+
{
20+
base.OnBeforeEachTest();
21+
for (int i = 0; i < numMessages; ++i)
22+
{
23+
messages0.Add(String.Format("{0}_message{1}", patients[0], i));
24+
messages1.Add(String.Format("{0}_message{1}", patients[1], i));
25+
}
26+
27+
}
1228

1329
[Test]
1430
public void TestSequentialWorkQueue()
1531
{
1632
using (var queue = new RedisSequentialWorkQueue<string>(10,10,"127.0.0.1",6379,1))
1733
{
18-
const int numMessages = 6;
19-
var messages0 = new List<string>();
20-
var messages1 = new List<string>();
21-
var patients = new string[2];
22-
patients[0] = "patient0";
23-
patients[1] = "patient1";
24-
34+
2535
for (int i = 0; i < numMessages; ++i)
2636
{
27-
messages0.Add(String.Format("{0}_message{1}", patients[0], i));
2837
queue.Enqueue(patients[0], messages0[i]);
29-
messages1.Add(String.Format("{0}_message{1}", patients[1], i));
3038
queue.Enqueue(patients[1], messages1[i]);
3139
}
3240

0 commit comments

Comments
 (0)