Skip to content

Commit

Permalink
Refactor PooledStatement
Browse files Browse the repository at this point in the history
PooledStatement now holds a collection of statements created from the pool, releasing them at the same frequency as connections are released from the pool.

Added a lastUsedAt() method to Statement.
  • Loading branch information
trowski committed Mar 30, 2018
1 parent 2340e03 commit 97022a4
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 17 deletions.
9 changes: 9 additions & 0 deletions src/PgSqlStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ final class PgSqlStatement implements Statement, Operation {
/** @var string[] */
private $params;

/** @var int */
private $lastUsedAt;

/**
* @param \Amp\Postgres\PgSqlHandle $handle
* @param string $name
Expand All @@ -32,6 +35,7 @@ public function __construct(PgSqlHandle $handle, string $name, string $sql, arra
$this->sql = $sql;
$this->params = $params;
$this->queue = new Internal\ReferenceQueue;
$this->lastUsedAt = \time();
}

public function __destruct() {
Expand All @@ -49,6 +53,11 @@ public function getQuery(): string {
return $this->sql;
}

/** {@inheritdoc} */
public function lastUsedAt(): int {
return $this->lastUsedAt;
}

/** {@inheritdoc} */
public function execute(array $params = []): Promise {
return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params));
Expand Down
14 changes: 6 additions & 8 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,16 @@ public function resetConnections(bool $reset = true) {
$this->resetConnections = $reset;
}

public function getIdleTimeout(): int {
return $this->idleTimeout;
}

public function setIdleTimeout(int $timeout) {
if ($timeout < 0) {
throw new \Error("Timeout must be greater than or equal to 0");
if ($timeout < 1) {
throw new \Error("Timeout must be greater than or equal to 1");
}

$this->idleTimeout = $timeout;

if ($this->idleTimeout > 0) {
Loop::enable($this->timeoutWatcher);
} else {
Loop::disable($this->timeoutWatcher);
}
}

/**
Expand Down
80 changes: 71 additions & 9 deletions src/PooledStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,59 @@

namespace Amp\Postgres;

use Amp\Loop;
use Amp\Promise;
use function Amp\call;

final class PooledStatement implements Statement {
/** @var \Amp\Postgres\Pool */
private $pool;

/** @var \Amp\Postgres\Statement */
private $statement;
/** @var \SplQueue */
private $statements;

/** @var string */
private $sql;

/** @var int */
private $lastUsedAt;

/** @var string */
private $timeoutWatcher;

/**
* @param \Amp\Postgres\Pool $pool Pool used to re-create the statement if the original closes.
* @param \Amp\Postgres\Statement $statement
*/
public function __construct(Pool $pool, Statement $statement) {
$this->statement = $statement;
$this->lastUsedAt = \time();
$this->statements = $statements = new \SplQueue;
$this->pool = $pool;
$this->sql = $statement->getQuery();

$this->statements->push($statement);

$this->timeoutWatcher = Loop::repeat(1000, static function () use ($pool, $statements) {
$now = \time();
$idleTimeout = $pool->getIdleTimeout();

while (!$statements->isEmpty()) {
/** @var \Amp\Postgres\Statement $statement */
$statement = $statements->bottom();

if ($statement->lastUsedAt() + $idleTimeout > $now) {
return;
}

$statements->shift();
}
});

Loop::unreference($this->timeoutWatcher);
}

public function __destruct() {
Loop::cancel($this->timeoutWatcher);
}

/**
Expand All @@ -27,13 +63,34 @@ public function __construct(Pool $pool, Statement $statement) {
* Unlike regular statements, as long as the pool is open this statement will not die.
*/
public function execute(array $params = []): Promise {
if ($this->statement->isAlive()) {
return $this->statement->execute($params);
}
$this->lastUsedAt = \time();

return call(function () use ($params) {
$this->statement = yield $this->pool->prepare($this->statement->getQuery());
return yield $this->statement->execute($params);
if (!$this->statements->isEmpty()) {
do {
/** @var \Amp\Postgres\Statement $statement */
$statement = $this->statements->shift();
} while (!$statement->isAlive() && !$this->statements->isEmpty());
} else {
$statement = yield $this->pool->prepare($this->sql);
}

try {
$result = yield $statement->execute($params);
} catch (\Throwable $exception) {
$this->statements->push($statement);
throw $exception;
}

if ($result instanceof Operation) {
$result->onDestruct(function () use ($statement) {
$this->statements->push($statement);
});
} else {
$this->statements->push($statement);
}

return $result;
});
}

Expand All @@ -44,6 +101,11 @@ public function isAlive(): bool {

/** {@inheritdoc} */
public function getQuery(): string {
return $this->statement->getQuery();
return $this->sql;
}

/** {@inheritdoc} */
public function lastUsedAt(): int {
return $this->lastUsedAt;
}
}
10 changes: 10 additions & 0 deletions src/PqStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ final class PqStatement implements Statement, Operation {
/** @var array */
private $params;

/** @var int */
private $lastUsedAt;

/**
* @param \Amp\Postgres\PqHandle $handle
* @param string $name Statement name.
Expand All @@ -32,6 +35,7 @@ public function __construct(PqHandle $handle, string $name, string $sql, array $
$this->params = $params;
$this->sql = $sql;
$this->queue = new Internal\ReferenceQueue;
$this->lastUsedAt = \time();
}

public function __destruct() {
Expand All @@ -49,8 +53,14 @@ public function getQuery(): string {
return $this->sql;
}

/** {@inheritdoc} */
public function lastUsedAt(): int {
return $this->lastUsedAt;
}

/** {@inheritdoc} */
public function execute(array $params = []): Promise {
$this->lastUsedAt = \time();
return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params));
}

Expand Down
5 changes: 5 additions & 0 deletions src/Statement.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public function isAlive(): bool;
* @return string The SQL string used to prepare the statement.
*/
public function getQuery(): string;

/**
* @return int Timestamp of when the statement was last used.
*/
public function lastUsedAt(): int;
}
67 changes: 67 additions & 0 deletions test/PooledStatementTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

namespace Amp\Postgres\Test;

use Amp\Delayed;
use Amp\Loop;
use Amp\Postgres\Pool;
use Amp\Postgres\PooledStatement;
use Amp\Postgres\ResultSet;
use Amp\Postgres\Statement;
use PHPUnit\Framework\TestCase;

class PooledStatementTest extends TestCase {
public function testActiveStatementsRemainAfterTimeout() {
Loop::run(function () {
$pool = new Pool('host=localhost user=postgres');

$statement = $this->createMock(Statement::class);
$statement->method('getQuery')
->willReturn('SELECT 1');
$statement->method('lastUsedAt')
->willReturn(\time());
$statement->expects($this->once())
->method('execute');

$pooledStatement = new PooledStatement($pool, $statement);

$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());

yield new Delayed(1500); // Give timeout watcher enough time to execute.

$pooledStatement->execute();

$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
});
}

public function testIdleStatementsRemovedAfterTimeout() {
Loop::run(function () {
$pool = new Pool('host=localhost user=postgres');

$statement = $this->createMock(Statement::class);
$statement->method('getQuery')
->willReturn('SELECT 1');
$statement->method('lastUsedAt')
->willReturn(0);
$statement->expects($this->never())
->method('execute');

$pooledStatement = new PooledStatement($pool, $statement);

$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());

yield new Delayed(1500); // Give timeout watcher enough time to execute and remove mock statement object.

$result = yield $pooledStatement->execute();

$this->assertInstanceOf(ResultSet::class, $result);

$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
});
}
}

0 comments on commit 97022a4

Please sign in to comment.