Skip to content

Commit

Permalink
Fix circular reference in RemoteExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Feb 18, 2023
1 parent 298d5e2 commit ed5555e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
52 changes: 35 additions & 17 deletions src/RemoteExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ final class RemoteExecutor implements QueryExecutor
/** @var Promise|null */
private $connect;

/** @var RespSocket|null */
private $socket;

/** @var Socket\Connector */
private $connector;

Expand All @@ -34,6 +37,13 @@ public function __construct(Config $config, ?Socket\Connector $connector = null)
$this->connector = $connector ?? Socket\connector();
}

public function __destruct()
{
if ($this->socket) {
$this->socket->close();
}
}

/**
* @param string[] $args
* @param callable $transform
Expand Down Expand Up @@ -87,24 +97,29 @@ private function connect(): Promise
return $this->connect;
}

return $this->connect = call(function () {
$config = $this->config->withDatabase($this->database);
$connect = &$this->connect;
$socket = &$this->socket;
$connector = $this->connector;
$queue = &$this->queue;
return $this->connect = call(static function () use (&$connect, &$socket, &$queue, $config, $connector) {
try {
/** @var RespSocket $resp */
$resp = yield connect($this->config->withDatabase($this->database), $this->connector);
/** @var RespSocket $socket */
$socket = yield connect($config, $connector);
} catch (\Throwable $connectException) {
yield delay(0); // ensure $this->connect is already assigned above in case of immediate failure
yield delay(0); // ensure $connect is already assigned above in case of immediate failure

$this->connect = null;
$connect = null;

throw $connectException;
}

asyncCall(function () use ($resp) {
asyncCall(static function () use ($socket, &$queue, &$connect, $config) {
try {
while ([$response] = yield $resp->read()) {
$deferred = \array_shift($this->queue);
if (!$this->queue) {
$resp->unreference();
while ([$response] = yield $socket->read()) {
$deferred = \array_shift($queue);
if (!$queue) {
$socket->unreference();
}

if ($response instanceof \Throwable) {
Expand All @@ -114,20 +129,23 @@ private function connect(): Promise
}
}

throw new SocketException('Socket to redis instance (' . $this->config->getConnectUri() . ') closed unexpectedly');
throw new SocketException('Socket to redis instance (' . $config->getConnectUri() . ') closed unexpectedly');
} catch (\Throwable $error) {
$queue = $this->queue;
$this->queue = [];
$this->connect = null;

while ($queue) {
// Ignore, the connection will be reset in the finally block.
} finally {
$temp = $queue;
$queue = [];
$connect = null;
$socket->close();

while ($temp) {
$deferred = \array_shift($queue);
$deferred->fail($error);
}
}
});

return $resp;
return $socket;
});
}
}
13 changes: 13 additions & 0 deletions test/AuthTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Amp\Redis;

use Amp\Deferred;
use Amp\Delayed;
use Amp\PHPUnit\AsyncTestCase;

class AuthTest extends AsyncTestCase
Expand All @@ -23,6 +25,17 @@ public static function tearDownAfterClass(): void
}
}

public function testGarbageCollection(): \Generator
{
// This will hit stream select limits if garbage isn't collected as it should (e.g. due to circular references)
for ($i = 0; $i < 10000; $i++) {
$redis = new Redis(new RemoteExecutor(Config::fromUri('tcp://127.0.0.1:25325?password=secret')));
$this->assertSame('PONG', yield $redis->echo('PONG'));
}

yield $redis->quit();
}

public function testSuccess(): \Generator
{
$redis = new Redis(new RemoteExecutor(Config::fromUri('tcp://127.0.0.1:25325?password=secret')));
Expand Down

0 comments on commit ed5555e

Please sign in to comment.