-
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathPostgreSqlStore.php
290 lines (241 loc) · 9.4 KB
/
PostgreSqlStore.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Lock\Store;
use Symfony\Component\Lock\BlockingSharedLockStoreInterface;
use Symfony\Component\Lock\BlockingStoreInterface;
use Symfony\Component\Lock\Exception\InvalidArgumentException;
use Symfony\Component\Lock\Exception\LockConflictedException;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\SharedLockStoreInterface;
/**
* PostgreSqlStore is a PersistingStoreInterface implementation using
* PostgreSql advisory locks.
*
* @author Jérémy Derussé <jeremy@derusse.com>
*/
class PostgreSqlStore implements BlockingSharedLockStoreInterface, BlockingStoreInterface
{
private \PDO $conn;
private string $dsn;
private ?string $username = null;
private ?string $password = null;
private array $connectionOptions = [];
private static array $storeRegistry = [];
/**
* You can either pass an existing database connection as PDO instance or
* a DSN string that will be used to lazy-connect to the database when the
* lock is actually used.
*
* List of available options:
* * db_username: The username when lazy-connect [default: '']
* * db_password: The password when lazy-connect [default: '']
* * db_connection_options: An array of driver-specific connection options [default: []]
*
* @param array $options An associative array of options
*
* @throws InvalidArgumentException When first argument is not PDO nor Connection nor string
* @throws InvalidArgumentException When PDO error mode is not PDO::ERRMODE_EXCEPTION
* @throws InvalidArgumentException When namespace contains invalid characters
*/
public function __construct(#[\SensitiveParameter] \PDO|string $connOrDsn, #[\SensitiveParameter] array $options = [])
{
if ($connOrDsn instanceof \PDO) {
if (\PDO::ERRMODE_EXCEPTION !== $connOrDsn->getAttribute(\PDO::ATTR_ERRMODE)) {
throw new InvalidArgumentException(\sprintf('"%s" requires PDO error mode attribute be set to throw Exceptions (i.e. $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION)).', __METHOD__));
}
$this->conn = $connOrDsn;
$this->checkDriver();
} else {
$this->dsn = $connOrDsn;
}
$this->username = $options['db_username'] ?? $this->username;
$this->password = $options['db_password'] ?? $this->password;
$this->connectionOptions = $options['db_connection_options'] ?? $this->connectionOptions;
}
public function save(Key $key): void
{
// prevent concurrency within the same connection
$this->getInternalStore()->save($key);
$lockAcquired = false;
try {
$sql = 'SELECT pg_try_advisory_lock(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
// Check if lock is acquired
if (true === $stmt->fetchColumn()) {
$key->markUnserializable();
// release sharedLock in case of promotion
$this->unlockShared($key);
$lockAcquired = true;
return;
}
} finally {
if (!$lockAcquired) {
$this->getInternalStore()->delete($key);
}
}
throw new LockConflictedException();
}
public function saveRead(Key $key): void
{
// prevent concurrency within the same connection
$this->getInternalStore()->saveRead($key);
$lockAcquired = false;
try {
$sql = 'SELECT pg_try_advisory_lock_shared(:key)';
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
// Check if lock is acquired
if (true === $stmt->fetchColumn()) {
$key->markUnserializable();
// release lock in case of demotion
$this->unlock($key);
$lockAcquired = true;
return;
}
} finally {
if (!$lockAcquired) {
$this->getInternalStore()->delete($key);
}
}
throw new LockConflictedException();
}
public function putOffExpiration(Key $key, float $ttl): void
{
// postgresql locks forever.
// check if lock still exists
if (!$this->exists($key)) {
throw new LockConflictedException();
}
}
public function delete(Key $key): void
{
// Prevent deleting locks own by an other key in the same connection
if (!$this->exists($key)) {
return;
}
$this->unlock($key);
// Prevent deleting Readlocks own by current key AND an other key in the same connection
$store = $this->getInternalStore();
try {
// If lock acquired = there is no other ReadLock
$store->save($key);
$this->unlockShared($key);
} catch (LockConflictedException) {
// an other key exists in this ReadLock
}
$store->delete($key);
}
public function exists(Key $key): bool
{
$sql = "SELECT count(*) FROM pg_locks WHERE locktype='advisory' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$stmt->execute();
if ($stmt->fetchColumn() > 0) {
// connection is locked, check for lock in internal store
return $this->getInternalStore()->exists($key);
}
return false;
}
public function waitAndSave(Key $key): void
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->save($key);
$lockAcquired = false;
$sql = 'SELECT pg_advisory_lock(:key)';
try {
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$stmt->execute();
$lockAcquired = true;
} finally {
if (!$lockAcquired) {
$this->getInternalStore()->delete($key);
}
}
// release lock in case of promotion
$this->unlockShared($key);
}
public function waitAndSaveRead(Key $key): void
{
// prevent concurrency within the same connection
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
$this->getInternalStore()->saveRead($key);
$lockAcquired = false;
$sql = 'SELECT pg_advisory_lock_shared(:key)';
try {
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$stmt->execute();
$lockAcquired = true;
} finally {
if (!$lockAcquired) {
$this->getInternalStore()->delete($key);
}
}
// release lock in case of demotion
$this->unlock($key);
}
/**
* Returns a hashed version of the key.
*/
private function getHashedKey(Key $key): int
{
return crc32((string) $key);
}
private function unlock(Key $key): void
{
while (true) {
$sql = "SELECT pg_advisory_unlock(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ExclusiveLock' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
if (0 === $stmt->rowCount()) {
break;
}
}
}
private function unlockShared(Key $key): void
{
while (true) {
$sql = "SELECT pg_advisory_unlock_shared(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ShareLock' AND objid=:key AND pid=pg_backend_pid()";
$stmt = $this->getConnection()->prepare($sql);
$stmt->bindValue(':key', $this->getHashedKey($key));
$result = $stmt->execute();
if (0 === $stmt->rowCount()) {
break;
}
}
}
private function getConnection(): \PDO
{
if (!isset($this->conn)) {
$this->conn = new \PDO($this->dsn, $this->username, $this->password, $this->connectionOptions);
$this->conn->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
$this->checkDriver();
}
return $this->conn;
}
private function checkDriver(): void
{
if ('pgsql' !== $driver = $this->conn->getAttribute(\PDO::ATTR_DRIVER_NAME)) {
throw new InvalidArgumentException(\sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, $driver));
}
}
private function getInternalStore(): SharedLockStoreInterface
{
$namespace = spl_object_hash($this->getConnection());
return self::$storeRegistry[$namespace] ??= new InMemoryStore();
}
}