Skip to content

Commit

Permalink
Add PooledStatement
Browse files Browse the repository at this point in the history
Removed Operation from Statement.
  • Loading branch information
trowski committed Jan 26, 2018
1 parent b8e9d37 commit 33c056b
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 109 deletions.
2 changes: 2 additions & 0 deletions lib/Connection.php
Expand Up @@ -106,6 +106,8 @@ public function execute(string $sql, array $params = []): Promise {

/**
* {@inheritdoc}
*
* Statement instances returned by this method must also implement Operation.
*/
public function prepare(string $sql): Promise {
return $this->send("prepare", $sql);
Expand Down
45 changes: 27 additions & 18 deletions lib/PgSqlHandle.php
Expand Up @@ -40,12 +40,6 @@ class PgSqlHandle implements Handle {
/** @var string */
private $await;

/** @var callable */
private $executeCallback;

/** @var callable */
private $deallocateCallback;

/** @var \Amp\Emitter[] */
private $listeners = [];

Expand Down Expand Up @@ -148,8 +142,6 @@ public function __construct($handle, $socket) {
Loop::disable($this->poll);
Loop::disable($this->await);

$this->executeCallback = $this->callableFromInstanceMethod("sendExecute");
$this->deallocateCallback = $this->callableFromInstanceMethod("sendDeallocate");
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
}

Expand Down Expand Up @@ -280,24 +272,41 @@ private function createResult($result) {
}
}

private function sendExecute(string $name, array $params): Promise {
/**
* @param string $name
* @param array $params
*
* @return \Amp\Promise
*/
public function statementExecute(string $name, array $params): Promise {
return call(function () use ($name, $params) {
return $this->createResult(yield from $this->send("pg_send_execute", $name, $params));
});
}

private function sendDeallocate(string $name) {
/**
* @param string $name
*
* @return \Amp\Promise
*
* @throws \Error
*/
public function statementDeallocate(string $name): Promise {
if (!\is_resource($this->handle)) {
return new Success; // Connection closed, no need to deallocate.
}

\assert(isset($this->statements[$name]), "Named statement not found when deallocating");

$storage = $this->statements[$name];

if (--$storage->count) {
return;
return new Success;
}

unset($this->statements[$name]);

Promise\rethrow($this->query(\sprintf("DEALLOCATE %s", $name)));
return $this->query(\sprintf("DEALLOCATE %s", $name));
}

/**
Expand Down Expand Up @@ -337,9 +346,9 @@ public function prepare(string $sql): Promise {
throw new \Error("The connection to the database has been closed");
}

$sql = Internal\parseNamedParams($sql, $names);
$modifiedSql = Internal\parseNamedParams($sql, $names);

$name = self::STATEMENT_NAME_PREFIX . \sha1($sql);
$name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);

if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
Expand All @@ -349,18 +358,18 @@ public function prepare(string $sql): Promise {
return $storage->promise;
}

return new Success(new PgSqlStatement($name, $sql, $names, $this->executeCallback, $this->deallocateCallback));
return new Success(new PgSqlStatement($this, $name, $sql, $names));
}

$this->statements[$name] = $storage = new Internal\StatementStorage;

$promise = $storage->promise = call(function () use ($name, $names, $sql) {
$promise = $storage->promise = call(function () use ($name, $names, $sql, $modifiedSql) {
/** @var resource $result PostgreSQL result resource. */
$result = yield from $this->send("pg_send_prepare", $name, $sql);
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);

switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
case \PGSQL_COMMAND_OK:
return new PgSqlStatement($name, $sql, $names, $this->executeCallback, $this->deallocateCallback);
return new PgSqlStatement($this, $name, $sql, $names);

case \PGSQL_NONFATAL_ERROR:
case \PGSQL_FATAL_ERROR:
Expand Down
51 changes: 20 additions & 31 deletions lib/PgSqlStatement.php
Expand Up @@ -4,68 +4,57 @@

use Amp\Promise;

class PgSqlStatement implements Statement {
class PgSqlStatement implements Statement, Operation {
/** @var \Amp\Postgres\PgSqlHandle */
private $handle;

/** @var string */
private $name;

/** @var string */
private $sql;

/** @var callable */
private $execute;

/** @var callable */
private $deallocate;

/** @var \Amp\Postgres\Internal\ReferenceQueue */
private $queue;

/** @var string[] */
private $names;
private $params;

/**
* @internal
*
* @param \Amp\Postgres\PgSqlHandle $handle
* @param string $name
* @param string $sql
* @param callable $execute
* @param callable $deallocate
* @param string[] $params
*/
public function __construct(string $name, string $sql, array $names, callable $execute, callable $deallocate) {
public function __construct(PgSqlHandle $handle, string $name, string $sql, array $params) {
$this->handle = $handle;
$this->name = $name;
$this->sql = $sql;
$this->names = $names;
$this->execute = $execute;
$this->deallocate = $deallocate;
$this->params = $params;
$this->queue = new Internal\ReferenceQueue;
}

public function __destruct() {
($this->deallocate)($this->name);
$this->handle->statementDeallocate($this->name);
$this->queue->unreference();
}

/**
* @return string
*/
/** {@inheritdoc} */
public function isAlive(): bool {
return $this->handle->isAlive();
}

/** {@inheritdoc} */
public function getQuery(): string {
return $this->sql;
}

/**
* @param mixed ...$params
*
* @return \Amp\Promise<\Amp\Postgres\Result>
*
* @throws \Amp\Postgres\FailureException If executing the statement fails.
*/
/** {@inheritdoc} */
public function execute(array $params = []): Promise {
return ($this->execute)($this->name, Internal\replaceNamedParams($params, $this->names));
return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params));
}

/**
* @param callable $onDestruct
*/
/** {@inheritdoc} */
public function onDestruct(callable $onDestruct) {
$this->queue->onDestruct($onDestruct);
}
Expand Down
17 changes: 16 additions & 1 deletion lib/Pool.php
Expand Up @@ -113,6 +113,16 @@ public function setIdleTimeout(int $timeout) {
}
}

/**
* @return bool
*/
public function isAlive(): bool {
return !$this->closed;
}

/**
* Close all connections in the pool. No further queries may be made after a pool is closed.
*/
public function close() {
$this->closed = true;
foreach ($this->connections as $connection) {
Expand Down Expand Up @@ -325,11 +335,16 @@ public function prepare(string $sql): Promise {
throw $exception;
}

\assert(
$statement instanceof Operation,
Statement::class . " instances returned from connections must implement " . Operation::class
);

$statement->onDestruct(function () use ($connection) {
$this->push($connection);
});

return $statement;
return new PooledStatement($this, $statement);
});
}

Expand Down
49 changes: 49 additions & 0 deletions lib/PooledStatement.php
@@ -0,0 +1,49 @@
<?php

namespace Amp\Postgres;

use Amp\Promise;
use function Amp\call;

class PooledStatement implements Statement {
/** @var \Amp\Postgres\Pool */
private $pool;

/** @var \Amp\Postgres\Statement */
private $statement;

/**
* @param \Amp\Postgres\Pool $pool Pool used to re-create the statement if the original closes.
* @param \Amp\Postgres\Statement $statement
*/
public function __construct(Pool $pool, Statement $statement) {
$this->statement = $statement;
$this->pool = $pool;
}

/**
* {@inheritdoc}
*
* Unlike regular statements, as long as the pool is open this statement will not die.
*/
public function execute(array $params = []): Promise {
if ($this->statement->isAlive()) {
return $this->statement->execute($params);
}

return call(function () use ($params) {
$this->statement = yield $this->pool->prepare($this->statement->getQuery());
return yield $this->statement->execute($params);
});
}

/** {@inheritdoc} */
public function isAlive(): bool {
return $this->pool->isAlive();
}

/** {@inheritdoc} */
public function getQuery(): string {
return $this->statement->getQuery();
}
}

0 comments on commit 33c056b

Please sign in to comment.