Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 63141ff

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
add a kind of reference count for sequential queue lock: when all items have marked as processed,
call Unlock() method.
1 parent 7764171 commit 63141ff

File tree

4 files changed

+24
-29
lines changed

4 files changed

+24
-29
lines changed

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.Locks.cs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,24 @@ namespace ServiceStack.Redis.Support.Queue.Implementation
1111
///
1212
///
1313
/// </summary>
14-
public partial class RedisSequentialWorkQueue<T> : RedisWorkQueue<T>, ISequentialWorkQueue<T> where T : class
14+
public partial class RedisSequentialWorkQueue<T>
1515
{
16-
public class DequeueLockFactory : DistributedLockFactory
17-
{
18-
protected readonly RedisSequentialWorkQueue<T> workQueue;
19-
protected readonly string workItemId;
20-
protected readonly PooledRedisClientManager clientManager;
21-
public DequeueLockFactory(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId)
22-
: base(client)
23-
{
24-
this.clientManager = clientManager;
25-
this.workQueue = workQueue;
26-
this.workItemId = workItemId;
27-
}
28-
public override IDistributedLock CreateLock()
29-
{
30-
return new DequeueLock(client, clientManager, workQueue, workItemId);
31-
}
32-
}
3316

3417
public class DequeueLock : DistributedLock
3518
{
3619
private bool ownsClient;
3720
protected readonly RedisSequentialWorkQueue<T> workQueue;
3821
protected readonly string workItemId;
3922
protected readonly PooledRedisClientManager clientManager;
40-
public DequeueLock(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId) : base(client)
23+
protected readonly int numberOfDequeuedItems;
24+
protected int numberOfProcessedItems;
25+
public DequeueLock(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId, int numberOfDequeuedItems) : base(client)
4126
{
4227
this.workQueue = workQueue;
4328
this.workItemId = workItemId;
4429
this.clientManager = clientManager;
4530
ownsClient = false;
31+
this.numberOfDequeuedItems = numberOfDequeuedItems;
4632
}
4733

4834
public override long Lock(string key, int acquisitionTimeout, int lockTimeout)
@@ -53,6 +39,13 @@ public override long Lock(string key, int acquisitionTimeout, int lockTimeout)
5339
return rc;
5440
}
5541

42+
public void DoneProcessedWorkItem()
43+
{
44+
numberOfProcessedItems++;
45+
if (numberOfProcessedItems == numberOfDequeuedItems)
46+
Unlock();
47+
}
48+
5649
public override bool Unlock()
5750
{
5851
workQueue.Unlock(workItemId);
@@ -80,16 +73,15 @@ protected override RedisClient AcquireClient()
8073
}
8174
public class DeferredDequeueLock : DequeueLock
8275
{
83-
private readonly int numberofPeekedItems;
84-
public DeferredDequeueLock(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId, int numberofPeekedItems)
85-
:base(client, clientManager, workQueue, workItemId)
76+
public DeferredDequeueLock(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId, int numberOfDequeuedItems)
77+
: base(client, clientManager, workQueue, workItemId, numberOfDequeuedItems)
8678
{
87-
this.numberofPeekedItems = numberofPeekedItems;
8879
}
80+
8981
public override bool Unlock()
9082
{
9183
//remove items from queue
92-
workQueue.Pop(workItemId, numberofPeekedItems);
84+
workQueue.Pop(workItemId, numberOfDequeuedItems);
9385

9486
// unlock work queue id
9587
workQueue.Unlock(workItemId);

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private SequentialData<T> DequeueImpl(int maxBatchSize, bool defer)
7474
string workItemId = null;
7575
var dequeueItems = new List<T>();
7676
var smallest = client.ZRangeWithScores(pendingWorkItemIdQueue, 0, 0);
77-
IDistributedLock workItemIdLock = null;
77+
DequeueLock workItemIdLock = null;
7878
try
7979
{
8080
if (smallest != null && smallest.Length > 1 && RedisNativeClient.ParseDouble(smallest[1]) != CONVENIENTLY_SIZED_FLOAT)
@@ -118,7 +118,7 @@ private SequentialData<T> DequeueImpl(int maxBatchSize, bool defer)
118118
}
119119
workItemIdLock = defer ?
120120
new DeferredDequeueLock(client, clientManager, this, workItemId, dequeueItems.Count) :
121-
new DequeueLock(client, clientManager, this, workItemId);
121+
new DequeueLock(client, clientManager, this, workItemId, dequeueItems.Count);
122122
var dequeueLockKey = queueNamespace.GlobalKey(workItemId, numTagsForDequeueLock);
123123
workItemIdLock.Lock(dequeueLockKey, lockAcquisitionTimeout, dequeueLockTimeout);
124124

src/ServiceStack.Redis/Support/Queue/SequentialData.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
22
using ServiceStack.Redis.Support.Locking;
3+
using ServiceStack.Redis.Support.Queue.Implementation;
34

45
namespace ServiceStack.Redis.Support.Queue
56
{
@@ -13,7 +14,7 @@ public IList<T> WorkItems
1314
{
1415
get; set;
1516
}
16-
public IDistributedLock WorkItemIdLock
17+
public RedisSequentialWorkQueue<T>.DequeueLock WorkItemIdLock
1718
{
1819
get;set;
1920
}

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ public void TestSequentialWorkQueue()
4848
// check that batch size is respected
4949
Assert.AreEqual(batch.WorkItems.Count, numMessages);
5050
for (int i = 0; i < numMessages; ++i)
51+
{
5152
Assert.AreEqual(batch.WorkItems[i], messages1[i]);
52-
batch.WorkItemIdLock.Unlock();
53-
53+
batch.WorkItemIdLock.DoneProcessedWorkItem();
54+
}
55+
5456

5557
// check that there are numMessages/2 messages in the queue
5658
batch = queue.Dequeue(numMessages, defer);

0 commit comments

Comments
 (0)