From c3bbf5404b6675f7cd48e36db6b508de089d99da Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Thu, 22 Aug 2019 20:14:37 +0200 Subject: [PATCH] Move mutex options into an object --- lib/Mutex/Mutex.php | 60 +++++++++++++------------------------- lib/Mutex/MutexOptions.php | 49 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 40 deletions(-) create mode 100644 lib/Mutex/MutexOptions.php diff --git a/lib/Mutex/Mutex.php b/lib/Mutex/Mutex.php index 3594172..abd3b89 100644 --- a/lib/Mutex/Mutex.php +++ b/lib/Mutex/Mutex.php @@ -60,51 +60,30 @@ final class Mutex /** @var string */ private $uri; - /** @var array */ + /** @var MutexOptions */ private $options; /** @var Client */ private $std; /** @var array */ - private $busyConnectionMap; + private $busyConnectionMap = []; /** @var Client[] */ - private $busyConnections; + private $busyConnections = []; /** @var Client[] */ - private $readyConnections; - /** @var int */ - private $maxConnections; - /** @var int */ - private $ttl; - /** @var int */ - private $timeout; + private $readyConnections = []; /** @var string */ private $watcher; /** * Constructs a new Mutex instance. A single instance can be used to create as many locks as you need. * - * @param string $uri URI of the Redis server instance, e.g. tcp://localhost:6379 - * @param array $options { - * General options for this instance. - * - * @type string|null $password password for the Redis server - * @type int $max_connections maximum of concurrent Redis connections waiting for a lock with blocking - * commands - * @type int timeout timeout for blocking lock wait - * @type int $ttl key ttl for created locks and lock renews - * } + * @param string $uri URI of the Redis server instance, e.g. tcp://localhost:6379 + * @param MutexOptions|null $options */ - public function __construct(string $uri, array $options = []) + public function __construct(string $uri, ?MutexOptions $options = null) { $this->uri = $uri; - $this->options = $options; - + $this->options = $options ?? new MutexOptions; $this->std = new Client($uri); - $this->maxConnections = $options["max_connections"] ?? 0; - $this->ttl = $options["ttl"] ?? 1000; - $this->timeout = (int) (($options["timeout"] ?? 1000) / 1000); - $this->readyConnections = []; - $this->busyConnections = []; - $this->busyConnectionMap = []; $readyConnections = &$this->readyConnections; $this->watcher = Loop::repeat(5000, static function () use (&$readyConnections) { @@ -146,7 +125,7 @@ public function __destruct() public function lock(string $id, string $token): Promise { return call(function () use ($id, $token) { - $result = yield $this->std->eval(self::LOCK, ["lock:{$id}", "queue:{$id}"], [$token, $this->ttl]); + $result = yield $this->std->eval(self::LOCK, ["lock:{$id}", "queue:{$id}"], [$token, $this->getTtl()]); if ($result) { return true; @@ -157,13 +136,13 @@ public function lock(string $id, string $token): Promise $connection = $this->getReadyConnection(); try { - $result = yield $connection->brPoplPush("queue:{$id}", "lock:{$id}", $this->timeout); + $result = yield $connection->brPoplPush("queue:{$id}", "lock:{$id}", $this->options->getTimeout()); if ($result === null) { return new Failure(new LockException); } - return $this->std->eval(self::TOKEN, ["lock:{$id}"], [$token, $this->ttl]); + return $this->std->eval(self::TOKEN, ["lock:{$id}"], [$token, $this->options->getTtl()]); } finally { $hash = \spl_object_hash($connection); $key = $this->busyConnectionMap[$hash] ?? null; @@ -181,11 +160,12 @@ public function lock(string $id, string $token): Promise * @param string $id specific lock ID. * @param string $token unique token provided during {@link lock()}. * - * @return Promise promise fails if lock couldn't be acquired, otherwise resolves normally. + * @return Promise Fails if lock couldn't be unlocked, otherwise resolves normally. */ public function unlock(string $id, string $token): Promise { - return $this->std->eval(self::UNLOCK, ["lock:{$id}", "queue:{$id}"], [$token, 2 * $this->timeout]); + return $this->std->eval(self::UNLOCK, ["lock:{$id}", "queue:{$id}"], + [$token, 2 * $this->options->getTimeout()]); } /** @@ -196,11 +176,11 @@ public function unlock(string $id, string $token): Promise * @param string $id specific lock ID. * @param string $token unique token provided during {@link lock()}. * - * @return Promise promise fails if lock couldn't be renewed, otherwise resolves normally. + * @return Promise Fails if lock couldn't be renewed, otherwise resolves normally. */ public function renew(string $id, string $token): Promise { - return $this->std->eval(self::RENEW, ["lock:{$id}"], [$token, $this->ttl]); + return $this->std->eval(self::RENEW, ["lock:{$id}"], [$token, $this->options->getTtl()]); } /** @@ -234,7 +214,7 @@ public function shutdown(): Promise */ public function getTtl(): int { - return $this->ttl; + return $this->options->getTtl(); } /** @@ -244,7 +224,7 @@ public function getTtl(): int */ public function getTimeout(): int { - return $this->timeout; + return $this->options->getTimeout(); } /** @@ -265,8 +245,8 @@ protected function getReadyConnection(): Client $connection = $connection[1] ?? null; if (!$connection) { - if ($this->maxConnections && \count($this->busyConnections) + 1 === $this->maxConnections) { - throw new ConnectionLimitException; + if (\count($this->busyConnections) + 1 === $this->options->getConnectionLimit()) { + throw new ConnectionLimitException('The number of allowed connections has exceeded the configured limit of ' . $this->options->getConnectionLimit()); } $connection = new Client( diff --git a/lib/Mutex/MutexOptions.php b/lib/Mutex/MutexOptions.php new file mode 100644 index 0000000..528537c --- /dev/null +++ b/lib/Mutex/MutexOptions.php @@ -0,0 +1,49 @@ +connectionLimit; + } + + public function getTimeout(): int + { + return $this->timeout; + } + + public function getTtl(): int + { + return $this->ttl; + } + + public function withConnectionLimit(int $connectionLimit): self + { + $clone = clone $this; + $clone->connectionLimit = $connectionLimit; + + return $clone; + } + + public function withTimeout(int $timeout): self + { + $clone = clone $this; + $clone->timeout = $timeout; + + return $clone; + } + + public function withTtl(int $ttl): self + { + $clone = clone $this; + $clone->ttl = $ttl; + + return $clone; + } +}