Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 5c2a3e5

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
fixed a majour bug in distributed lock: when using a pool of Redis clients, user can use
one client to lock, and another to unlock. This was a problem because the client stored the distributed lock, and the lock timeout, which is used to release the lock. Solution is to move lock out of the client.
1 parent 80cf7d1 commit 5c2a3e5

File tree

8 files changed

+79
-104
lines changed

8 files changed

+79
-104
lines changed

src/ServiceStack.Redis/Support/Locking/DisposableDistributedLock.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ public class DisposableDistributedLock : IDisposable
99
{
1010
private readonly IDistributedLock myLock;
1111
private readonly long lockState;
12+
private readonly long lockExpire;
13+
private readonly IRedisClient myClient;
1214

1315
/// <summary>
1416
/// Lock
@@ -19,8 +21,9 @@ public class DisposableDistributedLock : IDisposable
1921
/// <param name="lockTimeout">in seconds</param>
2022
public DisposableDistributedLock(IRedisClient client, string globalLockKey, int acquisitionTimeout, int lockTimeout)
2123
{
22-
myLock = new DistributedLock(client);
23-
lockState = myLock.Lock(globalLockKey, acquisitionTimeout, lockTimeout);
24+
myLock = new DistributedLock();
25+
myClient = client;
26+
lockState = myLock.Lock(globalLockKey, acquisitionTimeout, lockTimeout, out lockExpire, myClient);
2427
}
2528

2629

@@ -29,12 +32,18 @@ public long LockState
2932
get { return lockState; }
3033
}
3134

35+
public long LockExpire
36+
{
37+
get { return lockExpire; }
38+
}
39+
40+
3241
/// <summary>
3342
/// unlock
3443
/// </summary>
3544
public void Dispose()
3645
{
37-
myLock.Unlock();
46+
myLock.Unlock(lockExpire, myClient);
3847
}
3948
}
4049
}

src/ServiceStack.Redis/Support/Locking/DistributedLock.cs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,20 @@ public class DistributedLock : IDistributedLock
1010
public const int LOCK_RECOVERED = 2;
1111

1212
protected string lockKey;
13-
protected long lockExpire;
1413

15-
protected IRedisClient myClient;
16-
17-
public DistributedLock(IRedisClient client)
18-
{
19-
myClient = client;
20-
}
2114

2215
/// <summary>
2316
/// acquire distributed, non-reentrant lock on key
2417
/// </summary>
2518
/// <param name="key">global key for this lock</param>
2619
/// <param name="acquisitionTimeout">timeout for acquiring lock</param>
2720
/// <param name="lockTimeout">timeout for lock, in seconds (stored as value against lock key) </param>
28-
public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
21+
/// <param name="client"></param>
22+
/// <param name="lockExpire"></param>
23+
public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout, out long lockExpire, IRedisClient client)
2924
{
25+
lockExpire = 0;
26+
3027
// cannot lock on a null key
3128
if (key == null)
3229
return LOCK_NOT_ACQUIRED;
@@ -42,7 +39,7 @@ public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
4239
var ts = (DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0));
4340
var newLockExpire = CalculateLockExpire(ts, lockTimeout);
4441

45-
var localClient = (RedisClient)myClient;
42+
var localClient = (RedisClient)client;
4643
int wasSet = localClient.SetNX(key, BitConverter.GetBytes(newLockExpire));
4744
int totalTime = 0;
4845
while (wasSet == LOCK_NOT_ACQUIRED && totalTime < acquisitionTimeout)
@@ -105,12 +102,12 @@ public virtual long Lock(string key, int acquisitionTimeout, int lockTimeout)
105102
/// <summary>
106103
/// unlock key
107104
/// </summary>
108-
public virtual bool Unlock()
105+
public virtual bool Unlock(long lockExpire, IRedisClient client)
109106
{
110107
if (lockExpire <= 0)
111108
return false;
112109
long lockVal = 0;
113-
var localClient = AcquireClient();
110+
var localClient = (RedisClient)client;
114111
using (var pipe = localClient.CreatePipeline())
115112
{
116113

@@ -153,10 +150,5 @@ private static long CalculateLockExpire(TimeSpan ts, int timeout)
153150
return (long)(ts.TotalSeconds + timeout + 1.5);
154151
}
155152

156-
protected virtual RedisClient AcquireClient()
157-
{
158-
return (RedisClient)myClient;
159-
}
160-
161153
}
162154
}

src/ServiceStack.Redis/Support/Locking/IDistributedLock.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
/// </summary>
66
public interface IDistributedLock
77
{
8-
9-
long Lock(string key, int acquisitionTimeout, int lockTimeout);
10-
11-
bool Unlock();
8+
long Lock(string key, int acquisitionTimeout, int lockTimeout, out long lockExpire, IRedisClient client);
9+
bool Unlock(long lockExpire, IRedisClient client);
1210
}
1311
}

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.Locks.cs

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,37 +11,39 @@ namespace ServiceStack.Redis.Support.Queue.Implementation
1111
/// </summary>
1212
public partial class RedisSequentialWorkQueue<T>
1313
{
14-
public class DequeueManager : DistributedLock
14+
public class DequeueManager
1515
{
16-
17-
private bool ownsClient;
1816
protected readonly RedisSequentialWorkQueue<T> workQueue;
1917
protected readonly string workItemId;
2018
protected readonly PooledRedisClientManager clientManager;
2119
protected readonly int numberOfDequeuedItems;
2220
protected int numberOfProcessedItems;
23-
public DequeueManager(IRedisClient client, PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId, int numberOfDequeuedItems) : base(client)
21+
private readonly DistributedLock myLock;
22+
private readonly string dequeueLockKey;
23+
private int dequeueLockTimeout = 300;
24+
private long lockExpire;
25+
26+
public DequeueManager(PooledRedisClientManager clientManager, RedisSequentialWorkQueue<T> workQueue, string workItemId, string dequeueLockKey, int numberOfDequeuedItems, int dequeueLockTimeout)
2427
{
2528
this.workQueue = workQueue;
2629
this.workItemId = workItemId;
2730
this.clientManager = clientManager;
28-
ownsClient = false;
2931
this.numberOfDequeuedItems = numberOfDequeuedItems;
30-
}
31-
32-
public override long Lock(string key, int acquisitionTimeout, int lockTimeout)
33-
{
34-
long rc = base.Lock(key, acquisitionTimeout, lockTimeout);
35-
// do not hang on to the client reference. This lock may be held for a long time.
36-
ReleaseClient();
37-
return rc;
32+
myLock = new DistributedLock();
33+
this.dequeueLockKey = dequeueLockKey;
34+
this.dequeueLockTimeout = dequeueLockTimeout;
3835
}
3936

4037
public void DoneProcessedWorkItem()
4138
{
4239
numberOfProcessedItems++;
4340
if (numberOfProcessedItems == numberOfDequeuedItems)
44-
Unlock();
41+
{
42+
using (var disposable = new PooledRedisClientManager.DisposablePooledClient<SerializingRedisClient>(clientManager))
43+
{
44+
Unlock(disposable.Client);
45+
}
46+
}
4547
}
4648
/// <summary>
4749
///
@@ -52,17 +54,23 @@ public void UpdateNextUnprocessed(T newWorkItem)
5254
workQueue.Update(workItemId, numberOfProcessedItems, newWorkItem);
5355
}
5456

55-
public override bool Unlock()
57+
public long Lock(int acquisitionTimeout, IRedisClient client)
5658
{
57-
return PopAndUnlock(numberOfDequeuedItems);
59+
return myLock.Lock(dequeueLockKey, acquisitionTimeout, dequeueLockTimeout, out lockExpire, client);
60+
}
61+
62+
public bool Unlock(IRedisClient client)
63+
{
64+
return PopAndUnlock(numberOfDequeuedItems, client);
5865
}
5966

6067
/// <summary>
6168
///
6269
/// </summary>
6370
/// <param name="numProcessed"></param>
71+
/// <param name="client"></param>
6472
/// <returns></returns>
65-
public bool PopAndUnlock(int numProcessed)
73+
public bool PopAndUnlock(int numProcessed, IRedisClient client)
6674
{
6775
if (numProcessed < 0)
6876
numProcessed = 0;
@@ -74,27 +82,20 @@ public bool PopAndUnlock(int numProcessed)
7482

7583
// unlock work queue id
7684
workQueue.Unlock(workItemId);
77-
bool rc = base.Unlock();
78-
79-
ReleaseClient();
80-
return rc;
81-
}
82-
protected void ReleaseClient()
83-
{
84-
if (ownsClient && myClient != null)
85-
clientManager.DisposeClient((RedisNativeClient)myClient);
86-
myClient = null;
87-
ownsClient = false;
85+
return myLock.Unlock(lockExpire, client);
8886
}
8987

90-
protected override RedisClient AcquireClient()
88+
/// <summary>
89+
///
90+
/// </summary>
91+
/// <param name="numProcessed"></param>
92+
/// <returns></returns>
93+
public bool PopAndUnlock(int numProcessed)
9194
{
92-
if (myClient == null)
95+
using (var disposable = new PooledRedisClientManager.DisposablePooledClient<SerializingRedisClient>(clientManager))
9396
{
94-
myClient = clientManager.GetClient();
95-
ownsClient = true;
97+
return PopAndUnlock(numProcessed, disposable.Client);
9698
}
97-
return (RedisClient) myClient;
9899
}
99100
}
100101
}

src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using ServiceStack.Redis.Support.Locking;
45

56

@@ -17,13 +18,12 @@ public partial class RedisSequentialWorkQueue<T> : RedisWorkQueue<T>, ISequentia
1718
private DateTime harvestTime = DateTime.UtcNow;
1819
private int lockAcquisitionTimeout = 2;
1920
private int lockTimeout = 2;
20-
private int dequeueLockTimeout = 300;
2121
protected const double CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0;
2222

2323
// store list of work item ids that have been dequeued
2424
// this list is checked regularly in harvest zombies method
2525
private string dequeueIdSet;
26-
26+
private int dequeueLockTimeout = 300;
2727

2828
private string workItemIdPriorityQueue;
2929

@@ -37,9 +37,9 @@ public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, strin
3737
public RedisSequentialWorkQueue(int maxReadPoolSize, int maxWritePoolSize, string host, int port, string queueName, int dequeueLockTimeout)
3838
: base(maxReadPoolSize, maxWritePoolSize, host, port, queueName)
3939
{
40-
this.dequeueLockTimeout = dequeueLockTimeout;
4140
dequeueIdSet = queueNamespace.GlobalCacheKey("DequeueIdSet");
4241
workItemIdPriorityQueue = queueNamespace.GlobalCacheKey("WorkItemIdPriorityQueue");
42+
this.dequeueLockTimeout = dequeueLockTimeout;
4343
}
4444

4545
/// <summary>
@@ -158,12 +158,10 @@ public ISequentialData<T> Dequeue(int maxBatchSize)
158158
pipe.Flush();
159159
}
160160

161-
workItemDequeueManager = new DequeueManager(client, clientManager, this, workItemId, workItems.Count);
162-
string dequeueLockKey = null;
161+
workItemDequeueManager = new DequeueManager(clientManager, this, workItemId, GlobalDequeueLockKey(workItemId), workItems.Count, dequeueLockTimeout);
163162
// don't lock if there are no work items to be processed (can't lock on null lock key)
164163
if (workItems.Count > 0)
165-
dequeueLockKey = GlobalDequeueLockKey(workItemId);
166-
workItemDequeueManager.Lock(dequeueLockKey, lockAcquisitionTimeout, dequeueLockTimeout);
164+
workItemDequeueManager.Lock(lockAcquisitionTimeout, client);
167165

168166
}
169167
return new SequentialData<T>(workItemId, workItems, workItemDequeueManager);
@@ -173,7 +171,7 @@ public ISequentialData<T> Dequeue(int maxBatchSize)
173171
{
174172
//release resources
175173
if (workItemDequeueManager != null)
176-
workItemDequeueManager.Unlock();
174+
workItemDequeueManager.Unlock(client);
177175

178176
throw;
179177
}
@@ -228,7 +226,7 @@ public bool HarvestZombies()
228226
bool rc = false;
229227
using (var disposableClient = clientManager.GetDisposableClient<SerializingRedisClient>())
230228
{
231-
var client = disposableClient.Client;
229+
var client = disposableClient.Client;
232230
var dequeueWorkItemIds = client.SMembers(dequeueIdSet);
233231
if (dequeueWorkItemIds.Length == 0) return false;
234232

src/ServiceStack.Redis/Support/Queue/Implementation/SerializingRedisClient.cs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,13 @@ namespace ServiceStack.Redis.Support.Queue.Implementation
66
{
77
public class SerializingRedisClient : RedisClient
88
{
9-
private readonly DistributedLock myLock;
109
private ISerializer serializer = new ObjectSerializer();
1110

1211
public SerializingRedisClient(string host, int port)
1312
: base(host, port)
1413
{
15-
myLock = new DistributedLock(this);
1614
}
1715

18-
/// <summary>
19-
/// acquire distributed, non-reentrant lock on key
20-
/// </summary>
21-
/// <param name="key">global key for this lock</param>
22-
/// <param name="acquisitionTimeout">timeout for acquiring lock</param>
23-
/// <param name="lockTimeout">timeout for lock, in seconds (stored as value against lock key) </param>
24-
public long Lock(string key, int acquisitionTimeout, int lockTimeout)
25-
{
26-
return myLock.Lock(key, acquisitionTimeout, lockTimeout);
27-
28-
}
29-
/// <summary>
30-
/// unlock key
31-
/// </summary>
32-
public bool Unlock()
33-
{
34-
return myLock.Unlock();
35-
}
36-
3716
/// <summary>
3817
/// customize the client serializer
3918
/// </summary>

tests/ServiceStack.Redis.Tests/QueueTests.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public override void OnBeforeEachTest()
2929
[Test]
3030
public void TestSequentialWorkQueueUpdate()
3131
{
32-
using (var queue = new RedisSequentialWorkQueue<string>(10, 10, "127.0.0.1", 6379, 1))
32+
using (var queue = new RedisSequentialWorkQueue<string>(10, 10, "127.0.0.1", 6379,1))
3333
{
3434

3535
for (int i = 0; i < numMessages; ++i)
@@ -70,6 +70,7 @@ public void TestSequentialWorkQueue()
7070
// check that half of patient[0] messages are returned
7171
for (int i = 0; i < numMessages/2; ++i)
7272
Assert.AreEqual(batch.DequeueItems[i], messages0[i]);
73+
Assert.AreEqual(numMessages/2, batch.DequeueItems.Count);
7374
Thread.Sleep(5000);
7475
Assert.IsTrue(queue.HarvestZombies());
7576
for (int i = 0; i < batch.DequeueItems.Count; ++i)
@@ -98,18 +99,14 @@ public void TestSequentialWorkQueue()
9899
int remaining = batch.DequeueItems.Count-1;
99100
batch.PopAndUnlock();
100101

101-
queue.PrepareNextWorkItem();
102-
batch = queue.Dequeue(numMessages);
103-
Assert.AreEqual(batch.DequeueItems.Count, remaining);
104-
105102
//process remaining items
106-
queue.PrepareNextWorkItem();
103+
Assert.IsTrue(queue.PrepareNextWorkItem());
107104
batch = queue.Dequeue(remaining);
108105
Assert.AreEqual(batch.DequeueItems.Count, remaining);
109-
for (int i = 0; i < numMessages; ++i)
106+
for (int i = 0; i < batch.DequeueItems.Count; ++i)
110107
batch.DoneProcessedWorkItem();
111108

112-
queue.PrepareNextWorkItem();
109+
Assert.IsFalse(queue.PrepareNextWorkItem());
113110
batch = queue.Dequeue(remaining);
114111
Assert.AreEqual(batch.DequeueItems.Count, 0);
115112

0 commit comments

Comments
 (0)