-
Notifications
You must be signed in to change notification settings - Fork 81
/
redis_semaphore.py
120 lines (93 loc) · 3.32 KB
/
redis_semaphore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Redis Semaphore lock."""
import os
import time
from typing import Optional, Tuple
from redis import Redis
SYSTEM_LOCK_ID = "SYSTEM_LOCK"
class Semaphore:
"""Semaphore lock using Redis ZSET."""
def __init__(
self,
redis: Redis,
name: str,
lock_id: str,
timeout: float,
max_locks: int = 1,
) -> None:
"""
Semaphore lock.
Semaphore logic is implemented in the lua/semaphore.lua script.
Individual locks within the semaphore are managed inside a ZSET
using scores to track when they expire.
Arguments:
redis: Redis client
name: Name of lock. Used as ZSET key.
lock_id: Lock ID
timeout: Timeout in seconds
max_locks: Maximum number of locks allowed for this semaphore
"""
self.redis = redis
self.name = name
self.lock_id = lock_id
self.max_locks = max_locks
self.timeout = timeout
with open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"lua/semaphore.lua",
)
) as f:
self._semaphore = self.redis.register_script(f.read())
@classmethod
def get_system_lock(cls, redis: Redis, name: str) -> Optional[float]:
"""
Get system lock timeout for the semaphore.
Arguments:
redis: Redis client
name: Name of lock. Used as ZSET key.
Returns: Time system lock expires or None if lock does not exist
"""
return redis.zscore(name, SYSTEM_LOCK_ID)
@classmethod
def set_system_lock(cls, redis: Redis, name: str, timeout: int) -> None:
"""
Set system lock for the semaphore.
Sets a system lock that will expire in timeout seconds. This
overrides all other locks. Existing locks cannot be renewed
and no new locks will be permitted until the system lock
expires.
Arguments:
redis: Redis client
name: Name of lock. Used as ZSET key.
timeout: Timeout in seconds for system lock
"""
pipeline = redis.pipeline()
pipeline.zadd(name, {SYSTEM_LOCK_ID: time.time() + timeout})
pipeline.expire(
name, timeout + 10
) # timeout plus buffer for troubleshooting
pipeline.execute()
def release(self) -> None:
"""Release semaphore."""
self.redis.zrem(self.name, self.lock_id)
def acquire(self) -> Tuple[bool, int]:
"""
Obtain a semaphore lock.
Returns: Tuple that contains True/False if the lock was acquired and number of
locks in semaphore.
"""
acquired, locks = self._semaphore(
keys=[self.name],
args=[self.lock_id, self.max_locks, self.timeout, time.time()],
)
# Convert Lua boolean returns to Python booleans
acquired = True if acquired == 1 else False
return acquired, locks
def renew(self) -> Tuple[bool, int]:
"""
Attempt to renew semaphore.
Technically this doesn't know the difference between losing the lock
but then successfully getting a new lock versus renewing your lock
before the timeout. Both will return True.
"""
return self.acquire()