Skip to content

Commit

Permalink
Fix simultaneous requests to prepare the same query
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 30, 2018
1 parent 01f9b5c commit b7fa576
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use Amp\Sql\ResultSet as SqlResultSet;
use Amp\Sql\Statement as SqlStatement;
use Amp\Sql\Transaction as SqlTransaction;
use Amp\Success;
use cash\LRUCache;
use function Amp\call;

Expand Down Expand Up @@ -64,14 +63,18 @@ public function getIterator(): \Iterator
$this->statementWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $statements) {
$now = \time();

foreach ($statements as $hash => $statement) {
foreach ($statements as $sql => $statement) {
if ($statement instanceof Promise) {
continue;
}

\assert($statement instanceof StatementPool);

if ($statement->getLastUsedAt() + $idleTimeout > $now) {
return;
}

$statements->remove($hash);
$statements->remove($sql);
}
});

Expand Down Expand Up @@ -143,18 +146,34 @@ public function prepare(string $sql): Promise
throw new \Error("The pool has been closed");
}

if ($this->statements->containsKey($sql)) {
$statement = $this->statements->get($sql);
\assert($statement instanceof SqlStatement);
if ($statement->isAlive()) {
return new Success($statement);
return call(function () use ($sql) {
if ($this->statements->containsKey($sql)) {
$statement = $this->statements->get($sql);

if ($statement instanceof Promise) {
$statement = yield $statement; // Wait for prior request to resolve.
}

\assert($statement instanceof StatementPool);

if ($statement->isAlive()) {
return $statement;
}
}
}

return call(function () use ($sql) {
$statement = yield parent::prepare($sql);
\assert($statement instanceof SqlStatement);
$this->statements->put($sql, $statement);
$promise = parent::prepare($sql);
$this->statements->put($sql, $promise); // Insert promise into queue so subsequent requests get promise.

try {
$statement = yield $promise;
\assert($statement instanceof StatementPool);
} catch (\Throwable $exception) {
$this->statements->remove($sql);
throw $exception;
}

$this->statements->put($sql, $statement); // Replace promise in queue with statement object.

return $statement;
});
}
Expand Down

0 comments on commit b7fa576

Please sign in to comment.