Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 3f412bb

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
added Peek method to sequential queue
1 parent 80bb291 commit 3f412bb

File tree

6 files changed

+98
-55
lines changed

6 files changed

+98
-55
lines changed

src/ServiceStack.Redis/ServiceStack.Redis.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
<Compile Include="Support\Locking\Factory\IDistributedLockFactory.cs" />
106106
<Compile Include="Support\Locking\IDistributedLock.cs" />
107107
<Compile Include="Support\OptimizedObjectSerializer.cs" />
108-
<Compile Include="Support\Queue\SequentialDequeueData.cs" />
108+
<Compile Include="Support\Queue\SequentialData.cs" />
109109
<Compile Include="Support\SerializedObjectWrapper.cs" />
110110
<Compile Include="Support\ISerializer.cs" />
111111
<Compile Include="Support\Locking\DisposableDistributedLock.cs" />

src/ServiceStack.Redis/Support/Locking/DistributedLock.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class DistributedLock : IDistributedLock
1111
protected string lockKey;
1212
protected long lockExpire;
1313

14-
protected readonly IRedisClient client;
14+
private readonly IRedisClient client;
1515

1616
public DistributedLock(IRedisClient client)
1717
{
@@ -35,7 +35,7 @@ public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
3535
var ts = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
3636
var newLockExpire = CalculateLockExpire(ts, lockTimeout);
3737

38-
var nativeClient = client as RedisNativeClient;
38+
var nativeClient = GetClient();
3939
int wasSet = nativeClient.SetNX(key, BitConverter.GetBytes(newLockExpire));
4040
int totalTime = 0;
4141
while (wasSet == 0 && totalTime < acquisitionTimeout)
@@ -54,14 +54,15 @@ public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
5454
if (wasSet != LOCK_NOT_ACQUIRED) break;
5555

5656
// handle possibliity of crashed client still holding the lock
57-
using (var pipe = client.CreatePipeline())
57+
using (var pipe = nativeClient.CreatePipeline())
5858
{
5959
long lockValue=0;
6060
pipe.QueueCommand(r => ((RedisNativeClient)r).Watch(key));
6161
pipe.QueueCommand(r => ((RedisNativeClient)r).Get(key), x => lockValue = (x != null) ? BitConverter.ToInt64(x,0) : 0);
6262
pipe.Flush();
6363

6464
// if lock value is 0 (key is empty), or expired, then we can try to acquire it
65+
ts = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
6566
if (lockValue < ts.TotalSeconds)
6667
{
6768
ts = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
@@ -88,6 +89,7 @@ public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
8889
return wasSet;
8990

9091
}
92+
9193
/// <summary>
9294
/// unlock key
9395
/// </summary>
@@ -131,5 +133,10 @@ private static long CalculateLockExpire(TimeSpan ts, int timeout)
131133
return (long)(ts.TotalSeconds + timeout + 1.5);
132134
}
133135

136+
protected RedisNativeClient GetClient()
137+
{
138+
return (RedisNativeClient)client;
139+
}
140+
134141
}
135142
}

src/ServiceStack.Redis/Support/Queue/ISequentialWorkQueue.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,23 @@ public interface ISequentialWorkQueue<T> : IDisposable where T : class
1414

1515
/// <summary>
1616
/// Dequeue up to maxBatchSize items from queue corresponding to workItemId identifier.
17-
/// Once this method is called, no items for workItemId queue can be dequeued until
18-
/// <see cref="PostDequeue"/> is called
17+
/// Once this method is called, <see cref="Dequeue"/> or <see cref="Peek"/> will not
18+
/// return any items for workItemId until the dequeue lock returned is unlocked.
1919
/// </summary>
2020
/// <param name="maxBatchSize"></param>
2121
/// <returns></returns>
22-
SequentialDeueueData<T> Dequeue(int maxBatchSize);
22+
SequentialData<T> Dequeue(int maxBatchSize);
23+
24+
/// <summary>
25+
/// Get up to maxBatchSize items from queue corresponding to workItemId identifier.
26+
/// Items are not removed form the queue.
27+
/// Once this method is called, <see cref="Dequeue"/> or <see cref="Peek"/> will not
28+
/// return any items for workItemId until the dequeue lock returned is unlocked.
29+
/// </summary>
30+
/// <param name="maxBatchSize"></param>
31+
/// <returns></returns>
32+
SequentialData<T> Peek(int maxBatchSize);
33+
2334

2435
}
2536
}

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using ServiceStack.Redis.Support.Locking;
34

45

@@ -13,18 +14,18 @@ namespace ServiceStack.Redis.Support.Queue.Implementation
1314
/// </summary>
1415
public class RedisSequentialWorkQueue<T> : RedisWorkQueue<T>, ISequentialWorkQueue<T> where T : class
1516
{
16-
public class DequeueLock : DistributedLock
17+
public class WorkItemIdLock : DistributedLock
1718
{
18-
private RedisSequentialWorkQueue<T> workQueue;
19-
private string workItemId;
20-
public DequeueLock(IRedisClient client, RedisSequentialWorkQueue<T> workQueue, string workItemId) : base(client)
19+
private readonly RedisSequentialWorkQueue<T> workQueue;
20+
private readonly string workItemId;
21+
public WorkItemIdLock(IRedisClient client, RedisSequentialWorkQueue<T> workQueue, string workItemId) : base(client)
2122
{
2223
this.workQueue = workQueue;
2324
this.workItemId = workItemId;
2425
}
2526
public override bool Unlock()
2627
{
27-
workQueue.PostDequeue(workItemId);
28+
workQueue.Unlock(workItemId);
2829
return base.Unlock();
2930
}
3031
}
@@ -34,7 +35,7 @@ public override bool Unlock()
3435
private int lockTimeout = 2;
3536
protected const double CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0;
3637

37-
private string dequeueLockIds = "DequeueLockIds";
38+
private string dequeueIds = "DequeueIds";
3839
private const int numTagsForDequeueLock = RedisNamespace.NumTagsForLockKey + 1;
3940

4041
public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, int port)
@@ -70,14 +71,18 @@ public void Enqueue(string workItemId, T workItem)
7071
}
7172
}
7273

73-
/// <summary>
74-
/// Dequeue next batch of messages for processing. After this method is called,
75-
/// no other messages with same work item id will be returned by subsequent calles
76-
/// to this method until <see cref="PostDequeue"/> is called
77-
/// </summary>
78-
/// <returns>KeyValuePair: key is work item id, and value is list of dequeued items.
79-
/// </returns>
80-
public SequentialDeueueData<T> Dequeue(int maxBatchSize)
74+
public SequentialData<T> Dequeue(int maxBatchSize)
75+
{
76+
return Peek(maxBatchSize, true);
77+
}
78+
79+
public SequentialData<T> Peek(int maxBatchSize)
80+
{
81+
return Peek(maxBatchSize, false);
82+
}
83+
84+
85+
private SequentialData<T> Peek(int maxBatchSize, bool dequeue)
8186
{
8287
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
8388
{
@@ -87,7 +92,7 @@ public SequentialDeueueData<T> Dequeue(int maxBatchSize)
8792
string workItemId = null;
8893
var dequeueItems = new List<T>();
8994
var smallest = client.ZRangeWithScores(pendingWorkItemIdQueue, 0, 0);
90-
IDistributedLock dequeueLock = null;
95+
IDistributedLock workItemIdLock = null;
9196
try
9297
{
9398
if (smallest != null && smallest.Length > 1 && RedisNativeClient.ParseDouble(smallest[1]) != CONVENIENTLY_SIZED_FLOAT)
@@ -96,45 +101,56 @@ public SequentialDeueueData<T> Dequeue(int maxBatchSize)
96101

97102
// acquire dequeue lock
98103
var dequeueLockKey = queueNamespace.GlobalKey(workItemId, numTagsForDequeueLock);
99-
dequeueLock = new DequeueLock(client, this, workItemId);
100-
dequeueLock.Lock(dequeueLockKey, 2, 300);
104+
workItemIdLock = new WorkItemIdLock(client, this, workItemId);
105+
workItemIdLock.Lock(dequeueLockKey, 2, 300);
101106

102107
using (var pipe = client.CreatePipeline())
103108
{
104109
// track dequeue lock id
105-
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(dequeueLockIds, client.Serialize(dequeueLockKey)));
110+
pipe.QueueCommand(r => ((RedisNativeClient)r).SAdd(dequeueIds, client.Serialize(workItemId)));
106111

107112
// lock work item id
108113
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(pendingWorkItemIdQueue, CONVENIENTLY_SIZED_FLOAT, smallest[0]));
109114

110-
var key = queueNamespace.GlobalCacheKey(workItemId);
111115
// dequeue items
116+
var key = queueNamespace.GlobalCacheKey(workItemId);
117+
Action<byte[]> dequeueCallback = x =>
118+
{
119+
if (x != null)
120+
dequeueItems.Add((T) client.Deserialize(x));
121+
};
122+
112123
for (var i = 0; i < maxBatchSize; ++i)
113124
{
114-
pipe.QueueCommand(
115-
r => ((RedisNativeClient)r).LPop(key),
116-
x =>
117-
{
118-
if (x != null)
119-
dequeueItems.Add((T)client.Deserialize(x));
120-
});
125+
if (dequeue)
126+
{
127+
pipe.QueueCommand(
128+
r => ((RedisNativeClient) r).LPop(key),
129+
dequeueCallback);
130+
}
131+
else
132+
{
133+
int index = i;
134+
pipe.QueueCommand(
135+
r => ((RedisNativeClient)r).LIndex(key, index),
136+
dequeueCallback);
137+
}
121138
}
122139
pipe.Flush();
123140
}
124141
}
125-
return new SequentialDeueueData<T>
142+
return new SequentialData<T>
126143
{
127-
DequeueItems = dequeueItems,
144+
WorkItems = dequeueItems,
128145
WorkItemId = workItemId,
129-
DequeueLock = dequeueLock
146+
WorkItemIdLock = workItemIdLock
130147
};
131148
}
132149
catch (System.Exception)
133150
{
134151
//release resources
135-
PostDequeue(workItemId);
136-
if (dequeueLock != null)
137-
dequeueLock.Unlock();
152+
if (workItemIdLock != null)
153+
workItemIdLock.Unlock();
138154

139155
throw;
140156
}
@@ -147,13 +163,23 @@ private void HarvestZombies()
147163
// dequeue acquires dequeue lock on these ids
148164
// each dequeue will MGet on list of all dequeued workItemIds: then MGet on all lock keys.
149165
// if expired, then reacquire and PostDequeue
166+
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
167+
{
168+
var client = disposableClient.Client;
169+
var dequeueWorkItemIds = client.SMembers(dequeueIds);
170+
foreach (var workItemId in dequeueWorkItemIds)
171+
{
172+
var lockId = queueNamespace.GlobalKey(client.Deserialize(workItemId), RedisNamespace.NumTagsForLockKey);
173+
174+
}
175+
}
150176
}
151177

152178
/// <summary>
153-
/// Unlock message id, so other servers can process messages for this message id
179+
/// Unlock work item id, so other servers can process items for this id
154180
/// </summary>
155181
/// <param name="workItemId"></param>
156-
private void PostDequeue(string workItemId)
182+
private void Unlock(string workItemId)
157183
{
158184
if (workItemId == null)
159185
return;
@@ -176,8 +202,7 @@ private void PostDequeue(string workItemId)
176202
pipe.QueueCommand(r => ((RedisNativeClient)r).ZAdd(pendingWorkItemIdQueue, len, client.Serialize(workItemId)) );
177203

178204
//untrack dequeue lock
179-
var dequeueLockKey = queueNamespace.GlobalKey(workItemId, numTagsForDequeueLock);
180-
pipe.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueLockIds, client.Serialize(dequeueLockKey)));
205+
pipe.QueueCommand(r => ((RedisNativeClient)r).SRem(dequeueIds, client.Serialize(workItemId)));
181206

182207
pipe.Flush();
183208
}

src/ServiceStack.Redis/Support/Queue/SequentialDequeueData.cs renamed to src/ServiceStack.Redis/Support/Queue/SequentialData.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33

44
namespace ServiceStack.Redis.Support.Queue
55
{
6-
public class SequentialDeueueData<T> where T : class
6+
public class SequentialData<T> where T : class
77
{
88
public string WorkItemId
99
{
1010
get; set;
1111
}
12-
public IList<T> DequeueItems
12+
public IList<T> WorkItems
1313
{
1414
get; set;
1515
}
16-
public IDistributedLock DequeueLock
16+
public IDistributedLock WorkItemIdLock
1717
{
1818
get;set;
1919
}

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,30 @@ public void TestSequentialWorkQueue()
3232
var batch = queue.Dequeue(numMessages/2);
3333
// check that half of patient[0] messages are returned
3434
for (int i = 0; i < numMessages/2; ++i )
35-
Assert.AreEqual(batch.DequeueItems[i], messages0[i]);
36-
batch.DequeueLock.Unlock();
35+
Assert.AreEqual(batch.WorkItems[i], messages0[i]);
36+
batch.WorkItemIdLock.Unlock();
3737

3838
// check that all patient[1] messages are returned
3939
batch = queue.Dequeue(2 * numMessages);
4040
// check that batch size is respected
41-
Assert.AreEqual(batch.DequeueItems.Count, numMessages);
41+
Assert.AreEqual(batch.WorkItems.Count, numMessages);
4242
for (int i = 0; i < numMessages; ++i)
43-
Assert.AreEqual(batch.DequeueItems[i], messages1[i]);
44-
batch.DequeueLock.Unlock();
43+
Assert.AreEqual(batch.WorkItems[i], messages1[i]);
44+
batch.WorkItemIdLock.Unlock();
4545

4646

4747
// check that there are numMessages/2 messages in the queue
4848
batch = queue.Dequeue(numMessages);
4949
Assert.AreEqual(batch.WorkItemId, patients[0]);
50-
Assert.AreEqual(batch.DequeueItems.Count, numMessages / 2);
51-
batch.DequeueLock.Unlock();
50+
Assert.AreEqual(batch.WorkItems.Count, numMessages / 2);
51+
batch.WorkItemIdLock.Unlock();
5252

5353

5454

5555
// check that there are no more messages in the queue
5656
batch = queue.Dequeue(numMessages);
5757
Assert.IsNull(batch.WorkItemId);
58-
Assert.AreEqual(batch.DequeueItems.Count, 0);
58+
Assert.AreEqual(batch.WorkItems.Count, 0);
5959

6060
}
6161
}

0 commit comments

Comments
 (0)