Skip to content

Commit

Permalink
rewrite for distributed lock
Browse files Browse the repository at this point in the history
  • Loading branch information
fprochazka committed Sep 5, 2016
1 parent 3a8daa6 commit 3a77744
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 258 deletions.
3 changes: 2 additions & 1 deletion composer.json
Expand Up @@ -19,7 +19,8 @@
"require": {
"php": ">=5.4",
"ext-redis": "*",
"kdyby/strict-objects": "~1.0"
"kdyby/strict-objects": "~1.0",
"paragonie/random_compat": "^2.0"
},
"require-dev": {
"nette/tester": "~1.7"
Expand Down
220 changes: 125 additions & 95 deletions src/AdvisoryLock.php
Expand Up @@ -16,35 +16,73 @@


/**
* @see http://redis.io/topics/distlock
* @author Ondřej Nešpor
* @author Filip Procházka <filip@prochazka.su>
*/
class AdvisoryLock implements Lock
class AdvisoryLock
{

const DEFAULT_MAX_ATTEMPTS = 10;
const CLOCK_DRIFT_FACTORY = 0.01;
const DEFAULT_TIMEOUT_MILLISECONDS = 15000;

use Scream;

/**
* @var string
*/
/** @var string */
private $key;

/**
* @var \Redis
*/
private $redis;
/** @var string|NULL */
private $lockToken;

/**
* @var integer|NULL
*/
private $lockTimeout;
/** @var int|NULL */
private $lockTimeoutMilliseconds;

/** @var \Redis[] */
private $servers;

/** @var int */
private $maxAttempts;

/** @var int|null */
private $acquireTimeout;

/** @var int */
private $quorum;

public function __construct($key, \Redis $redis)


/**
* @param string $key
* @param \Redis[] $servers
* @param int|NULL $acquireTimeout in seconds
* @param int $maxAttempts
*/
public function __construct($key, $servers, $acquireTimeout = NULL, $maxAttempts = self::DEFAULT_MAX_ATTEMPTS)
{
foreach ($servers as $connection) {
if (!$connection instanceof \Redis) {
throw new InvalidArgumentException('Given connections must be array of connected \\Redis instances');
}
}
if (count($servers) <= 0) {
throw new InvalidArgumentException('At leasts one server is required');
}
if (!is_string($key)) {
throw new InvalidArgumentException('Given key is not a string');
}
if (!is_int($maxAttempts) || $maxAttempts <= 0) {
throw new InvalidArgumentException('Max attempts must be a positive whole number');
}
if ($acquireTimeout !== NULL && (!is_int($acquireTimeout) || $acquireTimeout <= 0)) {
throw new InvalidArgumentException('Acquire timeout must be a positive whole number of seconds');
}

$this->key = $key;
$this->redis = $redis;
$this->servers = $servers;
$this->maxAttempts = $maxAttempts;
$this->acquireTimeout = $acquireTimeout;
$this->quorum = min(count($servers), count($servers) / 2 + 1);
}


Expand All @@ -57,144 +95,136 @@ public function __destruct()


/**
* {@inheritdoc}
* Tries to acquire a key lock, otherwise waits until it's released and repeats.
* Returns remaining ttl in milliseconds.
*
* @param int $ttlMilliseconds
* @throws \Kdyby\RedisActiveLock\AcquireTimeoutException
* @return int
*/
public function lock($duration = self::DEFAULT_TIMEOUT, $acquireTimeout = NULL)
public function lock($ttlMilliseconds = self::DEFAULT_TIMEOUT_MILLISECONDS)
{
if ($this->lockTimeout !== NULL) {
if ($this->lockTimeoutMilliseconds !== NULL) {
throw LockException::alreadyLocked($this->key);
}

if ($duration <= 0) {
throw InvalidArgumentException::invalidDuration($duration);
}
if ($acquireTimeout !== NULL) {
if ($acquireTimeout <= 0) {
throw InvalidArgumentException::invalidAcquireTimeout($acquireTimeout);
}
if ($duration < $acquireTimeout) {
throw InvalidArgumentException::acquireTimeoutTooBig();
}
if ($ttlMilliseconds <= 0) {
throw new InvalidArgumentException('Time to live must a positive whole number');
}

$start = microtime(TRUE);

$lockKey = $this->formatLock($this->key);
$maxAttempts = 10;
do {
$sleepTime = 5000;
do {
$timeout = $this->calculateTimeout($duration);
if ($this->redis->set($lockKey, $timeout, ['NX', 'PX' => $timeout])) {
$this->lockTimeout = $timeout;
return TRUE;
}
$this->lockToken = $this->generateRandomToken();

if ($acquireTimeout !== NULL && (microtime(TRUE) - $start) >= $acquireTimeout) {
throw AcquireTimeoutException::acquireTimeout();
$lockingStartTime = microtime(TRUE);

for ($retry = 1 ; $retry <= $this->maxAttempts ; $retry++) {
if ($this->acquireTimeout !== NULL && (microtime(TRUE) - $lockingStartTime) >= $this->acquireTimeout) {
throw AcquireTimeoutException::acquireTimeout();
}

$lockedInstances = 0;
$attemptStartTime = microtime(TRUE) * 1000;
foreach ($this->servers as $server) {
if ($this->lockInstance($server, $ttlMilliseconds)) {
$lockedInstances++;
}
}
$attemptFinishTime = microtime(TRUE) * 1000;

$lockExpiration = $this->redis->get($lockKey);
$sleepTime += 2500;
// Add 2 milliseconds to the drift to account for Redis expires precision,
// which is 1 millisecond, plus 1 millisecond min drift for small TTLs.
// thx https://github.com/ronnylt/redlock-php
$drift = ($ttlMilliseconds * self::CLOCK_DRIFT_FACTORY) + 2;
$validForMilliSeconds = $ttlMilliseconds - ($attemptFinishTime - $attemptStartTime) - $drift;

} while (empty($lockExpiration) || ($lockExpiration >= time() && !usleep($sleepTime)));
if ($lockedInstances >= $this->quorum && $validForMilliSeconds > 0) {
return $this->lockTimeoutMilliseconds = $attemptFinishTime + $validForMilliSeconds;
}

$oldExpiration = $this->redis->getSet($lockKey, $timeout = $this->calculateTimeout($duration));
if ($oldExpiration === $lockExpiration) {
$this->lockTimeout = $timeout;
return TRUE;
foreach ($this->servers as $server) {
$this->unlockInstance($server);
}

} while (--$maxAttempts > 0);
if ($retry < $this->maxAttempts) {
// Wait a random delay before to retry
$minDelay = 100 * pow(2, $retry);
usleep(1000 * mt_rand($minDelay, $minDelay * 2));
}
}

throw AcquireTimeoutException::highConcurrency();
throw AcquireTimeoutException::allAttemptsUsed();
}



/**
* {@inheritdoc}
* Releases lock.
*/
public function release()
{
if ($this->lockTimeout === NULL) {
if ($this->lockTimeoutMilliseconds === NULL) {
return FALSE;
}

if ($this->lockTimeout <= time()) {
$this->lockTimeout = NULL;
return FALSE;
if ($this->lockTimeoutMilliseconds <= microtime(TRUE) * 1000) {
$this->lockTimeoutMilliseconds = NULL; // released by Redis timeout
return TRUE;
}

$script = <<<LUA
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
LUA;
$this->redis->eval($script, [$this->key, $this->lockTimeout], 1);
$this->lockTimeout = NULL;
foreach ($this->servers as $server) {
$this->unlockInstance($server);
}
$this->lockTimeoutMilliseconds = NULL;
return TRUE;
}



/**
* {@inheritdoc}
* @param string $key
* @return string
*/
public function increaseDuration($duration = self::DEFAULT_TIMEOUT)
protected function formatLock($key)
{
if ($duration <= 0) {
throw InvalidArgumentException::invalidDuration($duration);
}

if ($this->lockTimeout === NULL) {
throw LockException::notLocked($this->key);
}

if ($this->lockTimeout <= time()) {
throw LockException::durabilityTimedOut();
}

$oldTimeout = $this->redis->getSet($this->formatLock($this->key), $timeout = $this->calculateTimeout($duration));
if ((int) $oldTimeout !== (int) $this->lockTimeout) {
throw LockException::invalidDuration();
}
$this->lockTimeout = $timeout;
return TRUE;
return $key . ':lock';
}



/**
* @return int
* @return string
*/
public function calculateRemainingTimeout()
protected function generateRandomToken()
{
return $this->lockTimeout !== NULL ? $this->lockTimeout - time() : 0;
return bin2hex(random_bytes(10));
}



/**
* @param int $duration
* @return int
* @param \Redis $server
* @param int $ttlMilliseconds
* @return bool
*/
protected function calculateTimeout($duration)
private function lockInstance(\Redis $server, $ttlMilliseconds)
{
return time() + ((int) $duration) + 1;
return $server->set($this->formatLock($this->key), $this->lockToken, ['NX', 'PX' => $ttlMilliseconds]);
}



/**
* @param string $key
* @return string
* @param \Redis $server
*/
protected function formatLock($key)
private function unlockInstance(\Redis $server)
{
return $key . ':lock';
$script = <<<LUA
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
LUA;
$server->eval($script, [$this->formatLock($this->key), $this->lockToken], 1);
}

}
51 changes: 0 additions & 51 deletions src/Lock.php

This file was deleted.

0 comments on commit 3a77744

Please sign in to comment.