Skip to content

Commit

Permalink
Move mutex options into an object
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Aug 22, 2019
1 parent 8d817dc commit c3bbf54
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
60 changes: 20 additions & 40 deletions lib/Mutex/Mutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,51 +60,30 @@ final class Mutex

/** @var string */
private $uri;
/** @var array */
/** @var MutexOptions */
private $options;
/** @var Client */
private $std;
/** @var array */
private $busyConnectionMap;
private $busyConnectionMap = [];
/** @var Client[] */
private $busyConnections;
private $busyConnections = [];
/** @var Client[] */
private $readyConnections;
/** @var int */
private $maxConnections;
/** @var int */
private $ttl;
/** @var int */
private $timeout;
private $readyConnections = [];
/** @var string */
private $watcher;

/**
* Constructs a new Mutex instance. A single instance can be used to create as many locks as you need.
*
* @param string $uri URI of the Redis server instance, e.g. tcp://localhost:6379
* @param array $options {
* General options for this instance.
*
* @type string|null $password password for the Redis server
* @type int $max_connections maximum of concurrent Redis connections waiting for a lock with blocking
* commands
* @type int timeout timeout for blocking lock wait
* @type int $ttl key ttl for created locks and lock renews
* }
* @param string $uri URI of the Redis server instance, e.g. tcp://localhost:6379
* @param MutexOptions|null $options
*/
public function __construct(string $uri, array $options = [])
public function __construct(string $uri, ?MutexOptions $options = null)
{
$this->uri = $uri;
$this->options = $options;

$this->options = $options ?? new MutexOptions;
$this->std = new Client($uri);
$this->maxConnections = $options["max_connections"] ?? 0;
$this->ttl = $options["ttl"] ?? 1000;
$this->timeout = (int) (($options["timeout"] ?? 1000) / 1000);
$this->readyConnections = [];
$this->busyConnections = [];
$this->busyConnectionMap = [];

$readyConnections = &$this->readyConnections;
$this->watcher = Loop::repeat(5000, static function () use (&$readyConnections) {
Expand Down Expand Up @@ -146,7 +125,7 @@ public function __destruct()
public function lock(string $id, string $token): Promise
{
return call(function () use ($id, $token) {
$result = yield $this->std->eval(self::LOCK, ["lock:{$id}", "queue:{$id}"], [$token, $this->ttl]);
$result = yield $this->std->eval(self::LOCK, ["lock:{$id}", "queue:{$id}"], [$token, $this->getTtl()]);

This comment has been minimized.

Copy link
@staabm

staabm Aug 23, 2019

Member

this->options->getTtl();

This comment has been minimized.

Copy link
@kelunik

kelunik Aug 23, 2019

Author Member

Thanks, but both work 😊


if ($result) {
return true;
Expand All @@ -157,13 +136,13 @@ public function lock(string $id, string $token): Promise
$connection = $this->getReadyConnection();

try {
$result = yield $connection->brPoplPush("queue:{$id}", "lock:{$id}", $this->timeout);
$result = yield $connection->brPoplPush("queue:{$id}", "lock:{$id}", $this->options->getTimeout());

if ($result === null) {
return new Failure(new LockException);
}

return $this->std->eval(self::TOKEN, ["lock:{$id}"], [$token, $this->ttl]);
return $this->std->eval(self::TOKEN, ["lock:{$id}"], [$token, $this->options->getTtl()]);
} finally {
$hash = \spl_object_hash($connection);
$key = $this->busyConnectionMap[$hash] ?? null;
Expand All @@ -181,11 +160,12 @@ public function lock(string $id, string $token): Promise
* @param string $id specific lock ID.
* @param string $token unique token provided during {@link lock()}.
*
* @return Promise promise fails if lock couldn't be acquired, otherwise resolves normally.
* @return Promise Fails if lock couldn't be unlocked, otherwise resolves normally.
*/
public function unlock(string $id, string $token): Promise
{
return $this->std->eval(self::UNLOCK, ["lock:{$id}", "queue:{$id}"], [$token, 2 * $this->timeout]);
return $this->std->eval(self::UNLOCK, ["lock:{$id}", "queue:{$id}"],
[$token, 2 * $this->options->getTimeout()]);
}

/**
Expand All @@ -196,11 +176,11 @@ public function unlock(string $id, string $token): Promise
* @param string $id specific lock ID.
* @param string $token unique token provided during {@link lock()}.
*
* @return Promise promise fails if lock couldn't be renewed, otherwise resolves normally.
* @return Promise Fails if lock couldn't be renewed, otherwise resolves normally.
*/
public function renew(string $id, string $token): Promise
{
return $this->std->eval(self::RENEW, ["lock:{$id}"], [$token, $this->ttl]);
return $this->std->eval(self::RENEW, ["lock:{$id}"], [$token, $this->options->getTtl()]);
}

/**
Expand Down Expand Up @@ -234,7 +214,7 @@ public function shutdown(): Promise
*/
public function getTtl(): int
{
return $this->ttl;
return $this->options->getTtl();
}

/**
Expand All @@ -244,7 +224,7 @@ public function getTtl(): int
*/
public function getTimeout(): int
{
return $this->timeout;
return $this->options->getTimeout();
}

/**
Expand All @@ -265,8 +245,8 @@ protected function getReadyConnection(): Client
$connection = $connection[1] ?? null;

if (!$connection) {
if ($this->maxConnections && \count($this->busyConnections) + 1 === $this->maxConnections) {
throw new ConnectionLimitException;
if (\count($this->busyConnections) + 1 === $this->options->getConnectionLimit()) {
throw new ConnectionLimitException('The number of allowed connections has exceeded the configured limit of ' . $this->options->getConnectionLimit());
}

$connection = new Client(
Expand Down
49 changes: 49 additions & 0 deletions lib/Mutex/MutexOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Amp\Redis\Mutex;

final class MutexOptions
{
private $connectionLimit = 1000;
private $ttl = 1000;
private $timeout = 3;

public function getConnectionLimit(): int
{
return $this->connectionLimit;
}

public function getTimeout(): int
{
return $this->timeout;
}

public function getTtl(): int
{
return $this->ttl;
}

public function withConnectionLimit(int $connectionLimit): self
{
$clone = clone $this;
$clone->connectionLimit = $connectionLimit;

return $clone;
}

public function withTimeout(int $timeout): self
{
$clone = clone $this;
$clone->timeout = $timeout;

return $clone;
}

public function withTtl(int $ttl): self
{
$clone = clone $this;
$clone->ttl = $ttl;

return $clone;
}
}

0 comments on commit c3bbf54

Please sign in to comment.