diff --git a/composer.json b/composer.json index 06db900..65460cb 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,8 @@ "require": { "php": ">=5.4", "ext-redis": "*", - "kdyby/strict-objects": "~1.0" + "kdyby/strict-objects": "~1.0", + "paragonie/random_compat": "^2.0" }, "require-dev": { "nette/tester": "~1.7" diff --git a/src/AdvisoryLock.php b/src/AdvisoryLock.php index 225a2ce..dcd59bc 100644 --- a/src/AdvisoryLock.php +++ b/src/AdvisoryLock.php @@ -16,35 +16,73 @@ /** + * @see http://redis.io/topics/distlock * @author Ondřej Nešpor * @author Filip Procházka */ -class AdvisoryLock implements Lock +class AdvisoryLock { + const DEFAULT_MAX_ATTEMPTS = 10; + const CLOCK_DRIFT_FACTORY = 0.01; + const DEFAULT_TIMEOUT_MILLISECONDS = 15000; + use Scream; - /** - * @var string - */ + /** @var string */ private $key; - /** - * @var \Redis - */ - private $redis; + /** @var string|NULL */ + private $lockToken; - /** - * @var integer|NULL - */ - private $lockTimeout; + /** @var int|NULL */ + private $lockTimeoutMilliseconds; + + /** @var \Redis[] */ + private $servers; + + /** @var int */ + private $maxAttempts; + /** @var int|null */ + private $acquireTimeout; + /** @var int */ + private $quorum; - public function __construct($key, \Redis $redis) + + + /** + * @param string $key + * @param \Redis[] $servers + * @param int|NULL $acquireTimeout in seconds + * @param int $maxAttempts + */ + public function __construct($key, $servers, $acquireTimeout = NULL, $maxAttempts = self::DEFAULT_MAX_ATTEMPTS) { + foreach ($servers as $connection) { + if (!$connection instanceof \Redis) { + throw new InvalidArgumentException('Given connections must be array of connected \\Redis instances'); + } + } + if (count($servers) <= 0) { + throw new InvalidArgumentException('At leasts one server is required'); + } + if (!is_string($key)) { + throw new InvalidArgumentException('Given key is not a string'); + } + if (!is_int($maxAttempts) || $maxAttempts <= 0) { + throw new InvalidArgumentException('Max attempts must be a positive whole number'); + } + if ($acquireTimeout !== NULL && (!is_int($acquireTimeout) || $acquireTimeout <= 0)) { + throw new InvalidArgumentException('Acquire timeout must be a positive whole number of seconds'); + } + $this->key = $key; - $this->redis = $redis; + $this->servers = $servers; + $this->maxAttempts = $maxAttempts; + $this->acquireTimeout = $acquireTimeout; + $this->quorum = min(count($servers), count($servers) / 2 + 1); } @@ -57,144 +95,136 @@ public function __destruct() /** - * {@inheritdoc} + * Tries to acquire a key lock, otherwise waits until it's released and repeats. + * Returns remaining ttl in milliseconds. + * + * @param int $ttlMilliseconds + * @throws \Kdyby\RedisActiveLock\AcquireTimeoutException + * @return int */ - public function lock($duration = self::DEFAULT_TIMEOUT, $acquireTimeout = NULL) + public function lock($ttlMilliseconds = self::DEFAULT_TIMEOUT_MILLISECONDS) { - if ($this->lockTimeout !== NULL) { + if ($this->lockTimeoutMilliseconds !== NULL) { throw LockException::alreadyLocked($this->key); } - if ($duration <= 0) { - throw InvalidArgumentException::invalidDuration($duration); - } - if ($acquireTimeout !== NULL) { - if ($acquireTimeout <= 0) { - throw InvalidArgumentException::invalidAcquireTimeout($acquireTimeout); - } - if ($duration < $acquireTimeout) { - throw InvalidArgumentException::acquireTimeoutTooBig(); - } + if ($ttlMilliseconds <= 0) { + throw new InvalidArgumentException('Time to live must a positive whole number'); } - $start = microtime(TRUE); - - $lockKey = $this->formatLock($this->key); - $maxAttempts = 10; - do { - $sleepTime = 5000; - do { - $timeout = $this->calculateTimeout($duration); - if ($this->redis->set($lockKey, $timeout, ['NX', 'PX' => $timeout])) { - $this->lockTimeout = $timeout; - return TRUE; - } + $this->lockToken = $this->generateRandomToken(); - if ($acquireTimeout !== NULL && (microtime(TRUE) - $start) >= $acquireTimeout) { - throw AcquireTimeoutException::acquireTimeout(); + $lockingStartTime = microtime(TRUE); + + for ($retry = 1 ; $retry <= $this->maxAttempts ; $retry++) { + if ($this->acquireTimeout !== NULL && (microtime(TRUE) - $lockingStartTime) >= $this->acquireTimeout) { + throw AcquireTimeoutException::acquireTimeout(); + } + + $lockedInstances = 0; + $attemptStartTime = microtime(TRUE) * 1000; + foreach ($this->servers as $server) { + if ($this->lockInstance($server, $ttlMilliseconds)) { + $lockedInstances++; } + } + $attemptFinishTime = microtime(TRUE) * 1000; - $lockExpiration = $this->redis->get($lockKey); - $sleepTime += 2500; + // Add 2 milliseconds to the drift to account for Redis expires precision, + // which is 1 millisecond, plus 1 millisecond min drift for small TTLs. + // thx https://github.com/ronnylt/redlock-php + $drift = ($ttlMilliseconds * self::CLOCK_DRIFT_FACTORY) + 2; + $validForMilliSeconds = $ttlMilliseconds - ($attemptFinishTime - $attemptStartTime) - $drift; - } while (empty($lockExpiration) || ($lockExpiration >= time() && !usleep($sleepTime))); + if ($lockedInstances >= $this->quorum && $validForMilliSeconds > 0) { + return $this->lockTimeoutMilliseconds = $attemptFinishTime + $validForMilliSeconds; + } - $oldExpiration = $this->redis->getSet($lockKey, $timeout = $this->calculateTimeout($duration)); - if ($oldExpiration === $lockExpiration) { - $this->lockTimeout = $timeout; - return TRUE; + foreach ($this->servers as $server) { + $this->unlockInstance($server); } - } while (--$maxAttempts > 0); + if ($retry < $this->maxAttempts) { + // Wait a random delay before to retry + $minDelay = 100 * pow(2, $retry); + usleep(1000 * mt_rand($minDelay, $minDelay * 2)); + } + } - throw AcquireTimeoutException::highConcurrency(); + throw AcquireTimeoutException::allAttemptsUsed(); } /** - * {@inheritdoc} + * Releases lock. */ public function release() { - if ($this->lockTimeout === NULL) { + if ($this->lockTimeoutMilliseconds === NULL) { return FALSE; } - if ($this->lockTimeout <= time()) { - $this->lockTimeout = NULL; - return FALSE; + if ($this->lockTimeoutMilliseconds <= microtime(TRUE) * 1000) { + $this->lockTimeoutMilliseconds = NULL; // released by Redis timeout + return TRUE; } - $script = <<redis->eval($script, [$this->key, $this->lockTimeout], 1); - $this->lockTimeout = NULL; + foreach ($this->servers as $server) { + $this->unlockInstance($server); + } + $this->lockTimeoutMilliseconds = NULL; return TRUE; } /** - * {@inheritdoc} + * @param string $key + * @return string */ - public function increaseDuration($duration = self::DEFAULT_TIMEOUT) + protected function formatLock($key) { - if ($duration <= 0) { - throw InvalidArgumentException::invalidDuration($duration); - } - - if ($this->lockTimeout === NULL) { - throw LockException::notLocked($this->key); - } - - if ($this->lockTimeout <= time()) { - throw LockException::durabilityTimedOut(); - } - - $oldTimeout = $this->redis->getSet($this->formatLock($this->key), $timeout = $this->calculateTimeout($duration)); - if ((int) $oldTimeout !== (int) $this->lockTimeout) { - throw LockException::invalidDuration(); - } - $this->lockTimeout = $timeout; - return TRUE; + return $key . ':lock'; } /** - * @return int + * @return string */ - public function calculateRemainingTimeout() + protected function generateRandomToken() { - return $this->lockTimeout !== NULL ? $this->lockTimeout - time() : 0; + return bin2hex(random_bytes(10)); } /** - * @param int $duration - * @return int + * @param \Redis $server + * @param int $ttlMilliseconds + * @return bool */ - protected function calculateTimeout($duration) + private function lockInstance(\Redis $server, $ttlMilliseconds) { - return time() + ((int) $duration) + 1; + return $server->set($this->formatLock($this->key), $this->lockToken, ['NX', 'PX' => $ttlMilliseconds]); } /** - * @param string $key - * @return string + * @param \Redis $server */ - protected function formatLock($key) + private function unlockInstance(\Redis $server) { - return $key . ':lock'; + $script = <<eval($script, [$this->formatLock($this->key), $this->lockToken], 1); } } diff --git a/src/Lock.php b/src/Lock.php deleted file mode 100644 index fe947b1..0000000 --- a/src/Lock.php +++ /dev/null @@ -1,51 +0,0 @@ - - */ -interface Lock -{ - - const DEFAULT_TIMEOUT = 15; - - - - /** - * Tries to acquire a key lock, otherwise waits until it's released and repeats. - * - * @param int $duration in seconds - * @param int $acquireTimeout in seconds - * @throws AcquireTimeoutException - * @return bool - */ - public function lock($duration = self::DEFAULT_TIMEOUT, $acquireTimeout = NULL); - - - - /** - * Releases lock. - */ - public function release(); - - - - /** - * Increases the duration of lock by given amount, or by default value. - * - * @param int $duration - * @throws LockException - * @return bool - */ - public function increaseDuration($duration = self::DEFAULT_TIMEOUT); - -} diff --git a/src/exceptions.php b/src/exceptions.php index ca07832..dc1bdcd 100644 --- a/src/exceptions.php +++ b/src/exceptions.php @@ -26,32 +26,6 @@ interface Exception class InvalidArgumentException extends \InvalidArgumentException implements Exception { - /** - * @param mixed $duration - * @return InvalidArgumentException - */ - public static function invalidDuration($duration) - { - return new static(sprintf('Durability must be positive whole number, but "%s" was given', $duration)); - } - - /** - * @param mixed $timeout - * @return InvalidArgumentException - */ - public static function invalidAcquireTimeout($timeout) - { - return new static(sprintf('Acquire timeout must be positive whole number or NULL, but "%s" was given', $timeout)); - } - - /** - * @return InvalidArgumentException - */ - public static function acquireTimeoutTooBig() - { - return new static('Acquire timeout should be lower than lock duration'); - } - } @@ -62,37 +36,6 @@ public static function acquireTimeoutTooBig() class LockException extends \RuntimeException implements Exception { - /** - * @return LockException - */ - public static function durabilityTimedOut() - { - return new static('Process ran too long. Increase lock duration, or extend lock regularly.'); - } - - - - /** - * @return LockException - */ - public static function invalidDuration() - { - return new static('Some rude client have messed up the lock duration.'); - } - - - - /** - * @param string $key - * @return LockException - */ - public static function notLocked($key) - { - return new static(sprintf('The key "%s" has not yet been locked', $key)); - } - - - /** * @param string $key * @return LockException @@ -112,7 +55,7 @@ public static function alreadyLocked($key) class AcquireTimeoutException extends LockException { - const PROCESS_TIMEOUT = 1; + const ALL_ATTEMPTS_USED = 1; const ACQUIRE_TIMEOUT = 2; @@ -120,12 +63,9 @@ class AcquireTimeoutException extends LockException /** * @return AcquireTimeoutException */ - public static function highConcurrency() + public static function allAttemptsUsed() { - return new static( - 'Lock couldn\'t be acquired. Concurrency is way too high. I died of old age.', - self::PROCESS_TIMEOUT - ); + return new static('Lock couldn\'t be acquired. all attempts were used.', self::ALL_ATTEMPTS_USED); } @@ -135,10 +75,7 @@ public static function highConcurrency() */ public static function acquireTimeout() { - return new static( - 'Lock couldn\'t be acquired in reasonable time. The locking mechanism is giving up. You should kill the request.', - self::ACQUIRE_TIMEOUT - ); + return new static('Lock couldn\'t be acquired in reasonable time.', self::ACQUIRE_TIMEOUT); } } diff --git a/tests/KdybyTests/AdvisoryLock.phpt b/tests/KdybyTests/AdvisoryLock.phpt index ed694d7..2285974 100644 --- a/tests/KdybyTests/AdvisoryLock.phpt +++ b/tests/KdybyTests/AdvisoryLock.phpt @@ -6,6 +6,7 @@ namespace KdybyTests\RedisActiveLock; +use Kdyby\RedisActiveLock\AcquireTimeoutException; use Kdyby\RedisActiveLock\AdvisoryLock; use Redis; use Tester; @@ -23,85 +24,74 @@ require_once __DIR__ . '/bootstrap.php'; class AdvisoryLockTest extends Tester\TestCase { - public function testIncreaseDuration() + public function testHoldsLock() { - $lock = new AdvisoryLock('foo:bar', $this->createClient()); + $client = $this->createClient(); + $first = new AdvisoryLock('foo:bar', [$client]); + $second = new AdvisoryLock('foo:bar', [$client], null); - $lock->lock(2); - $remainingTimeout = $lock->calculateRemainingTimeout(); - Assert::true($remainingTimeout >= 1 && $remainingTimeout <= 3); + $startTime = microtime(TRUE); + $first->lock(3000); + $second->lock(1000); + Assert::true((microtime(TRUE) - $startTime) >= 3); - $lock->increaseDuration(5); - $remainingTimeout = $lock->calculateRemainingTimeout(); - Assert::true($remainingTimeout >= 4 && $remainingTimeout <= 6); - - $lock->release(); - } - - - - public function testIncreaseDurationLockExpiredException() - { - $first = new AdvisoryLock('foo:bar', $this->createClient()); - $first->lock(1); - sleep(3); - - Assert::exception(function () use ($first) { - $first->increaseDuration(); - }, 'Kdyby\RedisActiveLock\LockException', 'Process ran too long. Increase lock duration, or extend lock regularly.'); + Assert::true($first->release()); + Assert::true($second->release()); } - - public function testDeadlockHandling() + public function testAcquireExpiredLock() { - $first = new AdvisoryLock('foo:bar', $this->createClient()); - $second = new AdvisoryLock('foo:bar', $this->createClient()); + $client = $this->createClient(); + $first = new AdvisoryLock('foo:bar', [$client]); + $second = new AdvisoryLock('foo:bar', [$client]); - $first->lock(1); + $first->lock(100); sleep(3); // first died? - $second->lock(1); + $second->lock(100); Assert::true($second->release()); - Assert::false($first->release()); + Assert::true($first->release()); } - - public function testInvalidDurationException() + public function testLockingLockedKeyAcquireTimeoutException() { - $lock = new AdvisoryLock('foo:bar', $this->createClient()); - - Assert::exception(function () use ($lock) { - $lock->lock(-1); - }, 'Kdyby\RedisActiveLock\InvalidArgumentException'); - - Assert::exception(function () use ($lock) { - $lock->increaseDuration(-1); - }, 'Kdyby\RedisActiveLock\InvalidArgumentException'); + $client = $this->createClient(); + $first = new AdvisoryLock('foo:bar', [$client]); + $second = new AdvisoryLock('foo:bar', [$client], 3); + + $first->lock(10000); + Assert::exception(function () use ($second) { + $second->lock(5000); + }, 'Kdyby\RedisActiveLock\AcquireTimeoutException', null, AcquireTimeoutException::ACQUIRE_TIMEOUT); + Assert::false($second->release()); + Assert::true($first->release()); } - - public function testIncreaseNotLockedKeyException() + public function testLockingLockedKeyAllAttemptsUsedException() { - $lock = new AdvisoryLock('foo:bar', $this->createClient()); - - Assert::exception(function () use ($lock) { - $lock->increaseDuration(1); - }, 'Kdyby\RedisActiveLock\LockException', 'The key "foo:bar" has not yet been locked'); + $client = $this->createClient(); + $first = new AdvisoryLock('foo:bar', [$client]); + $second = new AdvisoryLock('foo:bar', [$client], null, 1); + + $first->lock(10000); + Assert::exception(function () use ($second) { + $second->lock(5000); + }, 'Kdyby\RedisActiveLock\AcquireTimeoutException', null, AcquireTimeoutException::ALL_ATTEMPTS_USED); + Assert::false($second->release()); + Assert::true($first->release()); } - public function testReleaseNotLocked() { - $lock = new AdvisoryLock('foo:bar', $this->createClient()); + $lock = new AdvisoryLock('foo:bar', [$this->createClient()]); Assert::false($lock->release()); } - /** * @return Redis */