From caf829a48c0a4ce1adb5eb8215eb2b63929093a6 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 14 Sep 2016 09:27:39 -0500 Subject: [PATCH] Initial commit --- .gitattributes | 6 + .gitignore | 4 + .travis.yml | 39 ++++ LICENSE | 21 +++ README.md | 61 ++++++ composer.json | 43 +++++ example/test.php | 22 +++ lib/AbstractConnection.php | 113 +++++++++++ lib/AbstractPool.php | 217 +++++++++++++++++++++ lib/AggregatePool.php | 34 ++++ lib/CommandResult.php | 12 ++ lib/Connection.php | 16 ++ lib/ConnectionPool.php | 54 ++++++ lib/Executor.php | 35 ++++ lib/FailureException.php | 5 + lib/Internal/Operation.php | 36 ++++ lib/Operation.php | 10 + lib/PgSqlCommandResult.php | 43 +++++ lib/PgSqlConnection.php | 96 ++++++++++ lib/PgSqlExecutor.php | 190 +++++++++++++++++++ lib/PgSqlStatement.php | 40 ++++ lib/PgSqlTupleResult.php | 129 +++++++++++++ lib/Pool.php | 20 ++ lib/PoolError.php | 5 + lib/PqBufferedResult.php | 36 ++++ lib/PqCommandResult.php | 30 +++ lib/PqConnection.php | 91 +++++++++ lib/PqExecutor.php | 216 +++++++++++++++++++++ lib/PqStatement.php | 46 +++++ lib/PqUnbufferedResult.php | 39 ++++ lib/QueryError.php | 5 + lib/Result.php | 5 + lib/Statement.php | 14 ++ lib/Transaction.php | 185 ++++++++++++++++++ lib/TransactionError.php | 5 + lib/TupleResult.php | 14 ++ lib/functions.php | 41 ++++ phpdoc.dist.xml | 23 +++ phpunit.xml.dist | 29 +++ test/AbstractConnectionTest.php | 323 ++++++++++++++++++++++++++++++++ test/AbstractPoolTest.php | 182 ++++++++++++++++++ test/AggregatePoolTest.php | 31 +++ test/ConnectionPoolTest.php | 30 +++ test/FunctionsTest.php | 42 +++++ test/PgSqlConnectionTest.php | 40 ++++ test/PqConnectionTest.php | 39 ++++ 46 files changed, 2717 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 example/test.php create mode 100644 lib/AbstractConnection.php create mode 100644 lib/AbstractPool.php create mode 100644 lib/AggregatePool.php create mode 100644 lib/CommandResult.php create mode 100644 lib/Connection.php create mode 100644 lib/ConnectionPool.php create mode 100644 lib/Executor.php create mode 100644 lib/FailureException.php create mode 100644 lib/Internal/Operation.php create mode 100644 lib/Operation.php create mode 100644 lib/PgSqlCommandResult.php create mode 100644 lib/PgSqlConnection.php create mode 100644 lib/PgSqlExecutor.php create mode 100644 lib/PgSqlStatement.php create mode 100644 lib/PgSqlTupleResult.php create mode 100644 lib/Pool.php create mode 100644 lib/PoolError.php create mode 100644 lib/PqBufferedResult.php create mode 100644 lib/PqCommandResult.php create mode 100644 lib/PqConnection.php create mode 100644 lib/PqExecutor.php create mode 100644 lib/PqStatement.php create mode 100644 lib/PqUnbufferedResult.php create mode 100644 lib/QueryError.php create mode 100644 lib/Result.php create mode 100644 lib/Statement.php create mode 100644 lib/Transaction.php create mode 100644 lib/TransactionError.php create mode 100644 lib/TupleResult.php create mode 100644 lib/functions.php create mode 100644 phpdoc.dist.xml create mode 100644 phpunit.xml.dist create mode 100644 test/AbstractConnectionTest.php create mode 100644 test/AbstractPoolTest.php create mode 100644 test/AggregatePoolTest.php create mode 100644 test/ConnectionPoolTest.php create mode 100644 test/FunctionsTest.php create mode 100644 test/PgSqlConnectionTest.php create mode 100644 test/PqConnectionTest.php diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..29a850a --- /dev/null +++ b/.gitattributes @@ -0,0 +1,6 @@ +example export-ignore +test export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.travis.yml export-ignore +phpunit.xml.dist export-ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d92a4ec --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build +composer.lock +phpunit.xml +vendor diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e8ced04 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,39 @@ +sudo: false + +language: php + +php: + - 7.0 + - 7.1 + - nightly + +matrix: + allow_failures: + - php: 7.1 + - php: nightly + fast_finish: true + +services: + - postgresql + +install: + - git clone https://github.com/m6w6/ext-pq; + pushd ext-pq; + phpize; + ./configure; + make; + make install; + popd; + echo "extension=pq.so" >> "$(php -r 'echo php_ini_loaded_file();')"; + - composer self-update + - composer install --no-interaction --prefer-source + +before_script: + - psql -c 'CREATE DATABASE test;' -U postgres + +script: + - vendor/bin/phpunit --coverage-text --coverage-clover build/logs/clover.xml + +after_script: + - composer require satooshi/php-coveralls dev-master + - vendor/bin/coveralls -v --exclude-no-stmt diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ad98d62 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 amphp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..18bb300 --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +# PostgreSQL Client for Amp + +This library is a component for [Amp](https://github.com/amphp/amp) that provides an asynchronous client for PostgreSQL. + +[![Build Status](https://img.shields.io/travis/amphp/postgres/master.svg?style=flat-square)](https://travis-ci.org/amphp/postgres) +[![Coverage Status](https://img.shields.io/coveralls/amphp/postgres/master.svg?style=flat-square)](https://coveralls.io/r/amphp/postgres) +[![Semantic Version](https://img.shields.io/github/release/amphp/postgres.svg?style=flat-square)](http://semver.org) +[![MIT License](https://img.shields.io/packagist/l/amphp/postgres.svg?style=flat-square)](LICENSE) +[![@amphp on Twitter](https://img.shields.io/badge/twitter-%40asyncphp-5189c7.svg?style=flat-square)](https://twitter.com/asyncphp) + +##### Requirements + +- PHP 7 + +##### Installation + +The recommended way to install is with the [Composer](http://getcomposer.org/) package manager. (See the [Composer installation guide](https://getcomposer.org/doc/00-intro.md) for information on installing and using Composer.) + +Run the following command to use this library in your project: + +```bash +composer require amphp/postgres +``` + +You can also manually edit `composer.json` to add this library as a project requirement. + +```js +// composer.json +{ + "require": { + "amphp/postgres": "^0.1" + } +} +``` + +#### Example + +```php +#!/usr/bin/env php +prepare('SELECT * FROM test WHERE id=$1'); + + /** @var \Amp\Postgres\TupleResult $result */ + $result = yield $statement->execute(1337); + + while (yield $result->next()) { + $row = $result->getCurrent(); + // $row is an array (map) of column values. e.g.: $row['column_name'] + } +}); +``` \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..0aafd9f --- /dev/null +++ b/composer.json @@ -0,0 +1,43 @@ +{ + "name": "amphp/postgres", + "description": "Asynchronous PostgreSQL client for Amp.", + "keywords": [ + "database", + "db", + "postgresql", + "postgre", + "pgsql", + "asynchronous", + "async" + ], + "homepage": "http://amphp.org", + "license": "MIT", + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + } + ], + "require": { + "amphp/amp": "dev-master as 2.0", + "async-interop/event-loop-implementation": "^0.3" + }, + "require-dev": { + "amphp/loop": "dev-master", + "phpunit/phpunit": "^5.0" + }, + "minimum-stability": "dev", + "autoload": { + "psr-4": { + "Amp\\Postgres\\": "lib" + }, + "files": [ + "lib/functions.php" + ] + }, + "autoload-dev": { + "psr-4": { + "Amp\\Postgres\\Test\\": "test" + } + } +} diff --git a/example/test.php b/example/test.php new file mode 100644 index 0000000..ff0264b --- /dev/null +++ b/example/test.php @@ -0,0 +1,22 @@ +#!/usr/bin/env php +prepare('SHOW ALL'); + + /** @var \Amp\Postgres\TupleResult $result */ + $result = yield $statement->execute(); + + while (yield $result->next()) { + $row = $result->getCurrent(); + \printf("%-35s = %s (%s)\n", $row['name'], $row['setting'], $row['description']); + } +}); diff --git a/lib/AbstractConnection.php b/lib/AbstractConnection.php new file mode 100644 index 0000000..42501a0 --- /dev/null +++ b/lib/AbstractConnection.php @@ -0,0 +1,113 @@ + + */ + abstract public static function connect(string $connectionString, int $timeout = null): Awaitable; + + /** + * @param $executor; + */ + public function __construct(Executor $executor) { + $this->executor = $executor; + $this->release = $this->callableFromInstanceMethod("release"); + } + + /** + * @param callable $method Method to execute. + * @param mixed ...$args Arguments to pass to function. + * + * @return \Generator + * + * @resolve resource + * + * @throws \Amp\Postgres\FailureException + */ + private function send(callable $method, ...$args): \Generator { + while ($this->busy !== null) { + yield $this->busy->getAwaitable(); + } + + return $method(...$args); + } + + private function release() { + $busy = $this->busy; + $this->busy = null; + $busy->resolve(); + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Awaitable { + return new Coroutine($this->send([$this->executor, "query"], $sql)); + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, ...$params): Awaitable { + return new Coroutine($this->send([$this->executor, "execute"], $sql, ...$params)); + } + + /** + * {@inheritdoc} + */ + public function prepare(string $sql): Awaitable { + return new Coroutine($this->send([$this->executor, "prepare"], $sql, $sql)); + } + + /** + * {@inheritdoc} + */ + public function transaction(int $isolation = Transaction::COMMITTED): Awaitable { + switch ($isolation) { + case Transaction::UNCOMMITTED: + $awaitable = $this->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); + break; + + case Transaction::COMMITTED: + $awaitable = $this->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); + break; + + case Transaction::REPEATABLE: + $awaitable = $this->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); + break; + + case Transaction::SERIALIZABLE: + $awaitable = $this->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + break; + + default: + throw new \Error("Invalid transaction type"); + } + + return pipe($awaitable, function (CommandResult $result) use ($isolation) { + $this->busy = new Deferred; + $transaction = new Transaction($this->executor, $isolation); + $transaction->onComplete($this->release); + return $transaction; + }); + } +} diff --git a/lib/AbstractPool.php b/lib/AbstractPool.php new file mode 100644 index 0000000..59dc4ec --- /dev/null +++ b/lib/AbstractPool.php @@ -0,0 +1,217 @@ + + * + * @throws \Amp\Postgres\FailureException + */ + abstract protected function createConnection(): Awaitable; + + public function __construct() { + $this->connections = new \SplObjectStorage(); + $this->idle = new \SplQueue(); + $this->busy = new \SplQueue(); + } + + /** + * {@inheritdoc} + */ + public function getConnectionCount(): int { + return $this->connections->count(); + } + + /** + * {@inheritdoc} + */ + public function getIdleConnectionCount(): int { + return $this->idle->count(); + } + + /** + * @param \Amp\Postgres\Connection $connection + */ + protected function addConnection(Connection $connection) { + if (isset($this->connections[$connection])) { + return; + } + + $this->connections->attach($connection); + $this->idle->push($connection); + } + + /** + * @coroutine + * + * @return \Generator + * + * @resolve \Amp\Postgres\Connection + */ + private function pop(): \Generator { + while (null !== $this->awaitable) { + try { + yield $this->awaitable; // Prevent simultaneous connection creation. + } catch (\Throwable $exception) { + // Ignore failure or cancellation of other operations. + } + } + + if ($this->idle->isEmpty()) { + try { + if ($this->connections->count() >= $this->getMaxConnections()) { + // All possible connections busy, so wait until one becomes available. + $this->awaitable = new Deferred; + yield $this->awaitable; + } else { + // Max connection count has not been reached, so open another connection. + $this->awaitable = $this->createConnection(); + $this->addConnection(yield $this->awaitable); + } + } finally { + $this->awaitable = null; + } + } + + // Shift a connection off the idle queue. + return $this->idle->shift(); + } + + /** + * @param \Amp\Postgres\Connection $connection + * + * @throws \Error If the connection is not part of this pool. + */ + private function push(Connection $connection) { + if (!isset($this->connections[$connection])) { + throw new \Error('Connection is not part of this pool'); + } + + $this->idle->push($connection); + + if ($this->awaitable instanceof Deferred) { + $this->awaitable->resolve($connection); + } + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Awaitable { + return new Coroutine($this->doQuery($sql)); + } + + private function doQuery(string $sql): \Generator { + /** @var \Amp\Postgres\Connection $connection */ + $connection = yield from $this->pop(); + + try { + $result = yield $connection->query($sql); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + if ($result instanceof Operation) { + $result->onComplete(function () use ($connection) { + $this->push($connection); + }); + } else { + $this->push($connection); + } + + return $result; + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, ...$params): Awaitable { + return new Coroutine($this->doExecute($sql, $params)); + } + + private function doExecute(string $sql, array $params): \Generator { + /** @var \Amp\Postgres\Connection $connection */ + $connection = yield from $this->pop(); + + try { + $result = yield $connection->execute($sql, ...$params); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + if ($result instanceof Operation) { + $result->onComplete(function () use ($connection) { + $this->push($connection); + }); + } else { + $this->push($connection); + } + + return $result; + } + + /** + * {@inheritdoc} + */ + public function prepare(string $sql): Awaitable { + return new Coroutine($this->doPrepare($sql)); + } + + private function doPrepare(string $sql): \Generator { + /** @var \Amp\Postgres\Connection $connection */ + $connection = yield from $this->pop(); + + try { + /** @var \Amp\Postgres\Statement $statement */ + $statement = yield $connection->prepare($sql); + } finally { + $this->push($connection); + } + + return $statement; + } + + /** + * {@inheritdoc} + */ + public function transaction(int $isolation = Transaction::COMMITTED): Awaitable { + return new Coroutine($this->doTransaction($isolation)); + } + + private function doTransaction(int $isolation = Transaction::COMMITTED): \Generator { + /** @var \Amp\Postgres\Connection $connection */ + $connection = yield from $this->pop(); + + try { + /** @var \Amp\Postgres\Transaction $transaction */ + $transaction = yield $connection->transaction($isolation); + } catch (\Throwable $exception) { + $this->push($connection); + throw $exception; + } + + $transaction->onComplete(function () use ($connection) { + $this->push($connection); + }); + + return $transaction; + } +} diff --git a/lib/AggregatePool.php b/lib/AggregatePool.php new file mode 100644 index 0000000..1359631 --- /dev/null +++ b/lib/AggregatePool.php @@ -0,0 +1,34 @@ +getConnectionCount(); + + if (!$count) { + throw new PoolError("No connections in aggregate pool"); + } + + return $count; + } +} diff --git a/lib/CommandResult.php b/lib/CommandResult.php new file mode 100644 index 0000000..b90e4cb --- /dev/null +++ b/lib/CommandResult.php @@ -0,0 +1,12 @@ + + * + * @throws \Amp\Postgres\FailureException + */ + public function transaction(int $isolation = Transaction::COMMITTED): Awaitable; +} diff --git a/lib/ConnectionPool.php b/lib/ConnectionPool.php new file mode 100644 index 0000000..9a89c23 --- /dev/null +++ b/lib/ConnectionPool.php @@ -0,0 +1,54 @@ +connectionString = $connectionString; + $this->connectTimeout = $connectTimeout; + + $this->maxConnections = $maxConnections; + if (1 > $this->maxConnections) { + $this->maxConnections = 1; + } + } + + /** + * {@inheritdoc} + */ + protected function createConnection(): Awaitable { + return connect($this->connectionString, $this->connectTimeout); + } + + /** + * {@inheritdoc} + */ + public function getMaxConnections(): int { + return $this->maxConnections; + } +} diff --git a/lib/Executor.php b/lib/Executor.php new file mode 100644 index 0000000..0346207 --- /dev/null +++ b/lib/Executor.php @@ -0,0 +1,35 @@ + + * + * @throws \Amp\Postgres\FailureException + */ + public function query(string $sql): Awaitable; + + /** + * @param string $sql + * @param mixed ...$params + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\Result> + * + * @throws \Amp\Postgres\FailureException + */ + public function execute(string $sql, ...$params): Awaitable; + + /** + * @param string $sql + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\Statement> + * + * @throws \Amp\Postgres\FailureException + */ + public function prepare(string $sql): Awaitable; +} diff --git a/lib/FailureException.php b/lib/FailureException.php new file mode 100644 index 0000000..00cda39 --- /dev/null +++ b/lib/FailureException.php @@ -0,0 +1,5 @@ +complete(); + } + + public function onComplete(callable $onComplete) { + if ($this->complete) { + $onComplete(); + return; + } + + $this->onComplete[] = $onComplete; + } + + private function complete() { + if ($this->complete) { + return; + } + + $this->complete = true; + foreach ($this->onComplete as $callback) { + $callback(); + } + $this->onComplete = null; + } +} diff --git a/lib/Operation.php b/lib/Operation.php new file mode 100644 index 0000000..31323db --- /dev/null +++ b/lib/Operation.php @@ -0,0 +1,10 @@ +handle = $handle; + } + + /** + * Frees the result resource. + */ + public function __destruct() { + \pg_free_result($this->handle); + } + + /** + * @return int Number of rows affected by the INSERT, UPDATE, or DELETE query. + */ + public function affectedRows(): int { + return \pg_affected_rows($this->handle); + } + + /** + * @return string + */ + public function lastOid(): string { + return (string) \pg_last_oid($this->handle); + } + + /** + * @return int + */ + public function count(): int { + return $this->affectedRows(); + } +} \ No newline at end of file diff --git a/lib/PgSqlConnection.php b/lib/PgSqlConnection.php new file mode 100644 index 0000000..2dadcbc --- /dev/null +++ b/lib/PgSqlConnection.php @@ -0,0 +1,96 @@ + + * + * @throws \Amp\Postgres\FailureException + */ + public static function connect(string $connectionString, int $timeout = null): Awaitable { + if (!$connection = @\pg_connect($connectionString, \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) { + throw new FailureException("Failed to create connection resource"); + } + + if (\pg_connection_status($connection) === \PGSQL_CONNECTION_BAD) { + throw new FailureException(\pg_last_error($connection)); + } + + if (!$socket = \pg_socket($connection)) { + throw new FailureException("Failed to access connection socket"); + } + + $deferred = new Deferred; + + $callback = function ($watcher, $resource) use (&$poll, &$await, $connection, $deferred) { + try { + switch (\pg_connect_poll($connection)) { + case \PGSQL_POLLING_READING: + return; // Connection not ready, poll again. + + case \PGSQL_POLLING_WRITING: + return; // Still writing... + + case \PGSQL_POLLING_FAILED: + throw new FailureException("Could not connect to PostgreSQL server"); + + case \PGSQL_POLLING_OK: + Loop::cancel($poll); + Loop::cancel($await); + $deferred->resolve(new self($connection, $resource)); + return; + } + } catch (\Throwable $exception) { + Loop::cancel($poll); + Loop::cancel($await); + \pg_close($connection); + $deferred->fail($exception); + } + }; + + $poll = Loop::onReadable($socket, $callback); + $await = Loop::onWritable($socket, $callback); + + if ($timeout !== null) { + return \Amp\capture( + $deferred->getAwaitable(), + TimeoutException::class, + function (\Throwable $exception) use ($connection, $poll, $await) { + Loop::cancel($poll); + Loop::cancel($await); + \pg_close($connection); + throw $exception; + } + ); + } + + return $deferred->getAwaitable(); + } + + /** + * Connection constructor. + * + * @param resource $handle PostgreSQL connection handle. + * @param resource $socket PostgreSQL connection stream socket. + */ + public function __construct($handle, $socket) { + parent::__construct(new PgSqlExecutor($handle, $socket)); + } +} diff --git a/lib/PgSqlExecutor.php b/lib/PgSqlExecutor.php new file mode 100644 index 0000000..635b013 --- /dev/null +++ b/lib/PgSqlExecutor.php @@ -0,0 +1,190 @@ +handle = $handle; + + $deferred = &$this->delayed; + + $this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, $handle) { + if (!\pg_consume_input($handle)) { + Loop::disable($watcher); + $deferred->fail(new FailureException(\pg_last_error($handle))); + return; + } + + if (!\pg_connection_busy($handle)) { + Loop::disable($watcher); + $deferred->resolve(\pg_get_result($handle)); + return; + } + + // Reading not done, listen again. + }); + + $this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) { + $flush = \pg_flush($handle); + if (0 === $flush) { + return; // Not finished sending data, listen again. + } + + Loop::disable($watcher); + + if ($flush === false) { + $deferred->fail(new FailureException(\pg_last_error($handle))); + } + }); + + Loop::disable($this->poll); + Loop::disable($this->await); + + $this->createResult = $this->callableFromInstanceMethod("createResult"); + $this->executeCallback = $this->callableFromInstanceMethod("sendExecute"); + } + + /** + * Frees Io watchers from loop. + */ + public function __destruct() { + if (\is_resource($this->handle)) { + \pg_close($this->handle); + } + + Loop::cancel($this->poll); + Loop::cancel($this->await); + } + + /** + * @coroutine + * + * @param callable $function Function name to execute. + * @param mixed ...$args Arguments to pass to function. + * + * @return \Generator + * + * @resolve resource + * + * @throws \Amp\Postgres\FailureException + */ + private function send(callable $function, ...$args): \Generator { + while ($this->delayed !== null) { + try { + yield $this->delayed->getAwaitable(); + } catch (\Throwable $exception) { + // Ignore failure from another operation. + } + } + + $result = $function($this->handle, ...$args); + + if ($result === false) { + throw new FailureException(\pg_last_error($this->handle)); + } + + $this->delayed = new Deferred; + + Loop::enable($this->poll); + if (0 === $result) { + Loop::enable($this->await); + } + + try { + $result = yield $this->delayed->getAwaitable(); + } finally { + $this->delayed = null; + Loop::disable($this->poll); + Loop::disable($this->await); + } + + return $result; + } + + /** + * @param resource $result PostgreSQL result resource. + * + * @return \Amp\Postgres\Result + * + * @throws \Amp\Postgres\FailureException + * @throws \Amp\Postgres\QueryError + */ + private function createResult($result): Result { + switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) { + case \PGSQL_EMPTY_QUERY: + throw new QueryError("Empty query string"); + + case \PGSQL_COMMAND_OK: + return new PgSqlCommandResult($result); + + case \PGSQL_TUPLES_OK: + return new PgSqlTupleResult($result); + + case \PGSQL_NONFATAL_ERROR: + case \PGSQL_FATAL_ERROR: + throw new QueryError(\pg_result_error($result)); + + case \PGSQL_BAD_RESPONSE: + throw new FailureException(\pg_result_error($result)); + + default: + throw new FailureException("Unknown result status"); + } + } + + private function sendExecute(string $name, array $params): Awaitable { + return pipe(new Coroutine($this->send("pg_send_execute", $name, $params)), $this->createResult); + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Awaitable { + return pipe(new Coroutine($this->send("pg_send_query", $sql)), $this->createResult); + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, ...$params): Awaitable { + return pipe(new Coroutine($this->send("pg_send_query_params", $sql, $params)), $this->createResult); + } + + /** + * {@inheritdoc} + */ + public function prepare(string $sql): Awaitable { + return pipe(new Coroutine($this->send("pg_send_prepare", $sql, $sql)), function () use ($sql) { + return new PgSqlStatement($sql, $this->executeCallback); + }); + } +} diff --git a/lib/PgSqlStatement.php b/lib/PgSqlStatement.php new file mode 100644 index 0000000..0455ca8 --- /dev/null +++ b/lib/PgSqlStatement.php @@ -0,0 +1,40 @@ +sql = $sql; + $this->execute = $execute; + } + + /** + * @return string + */ + public function getQuery(): string { + return $this->sql; + } + + /** + * @param mixed ...$params + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\Result> + * + * @throws \Amp\Postgres\FailureException If executing the statement fails. + */ + public function execute(...$params): Awaitable { + return ($this->execute)($this->sql, $params); + } +} \ No newline at end of file diff --git a/lib/PgSqlTupleResult.php b/lib/PgSqlTupleResult.php new file mode 100644 index 0000000..0fd933d --- /dev/null +++ b/lib/PgSqlTupleResult.php @@ -0,0 +1,129 @@ +handle = $handle; + parent::__construct(new Emitter(static function (callable $emit) use ($handle) { + $count = \pg_num_rows($handle); + for ($i = 0; $i < $count; ++$i) { + $result = \pg_fetch_assoc($handle); + if ($result === false) { + throw new FailureException(\pg_result_error($handle)); + } + yield $emit($result); + } + return $count; + })); + } + + /** + * Frees the result resource. + */ + public function __destruct() { + \pg_free_result($this->handle); + } + + /** + * @return int Number of rows in the result set. + */ + public function numRows(): int { + return \pg_num_rows($this->handle); + } + + /** + * @return int Number of fields in each row. + */ + public function numFields(): int { + return \pg_num_fields($this->handle); + } + + /** + * @param int $fieldNum + * + * @return string Column name at index $fieldNum + * + * @throws \Error If the field number does not exist in the result. + */ + public function fieldName(int $fieldNum): string { + return \pg_field_name($this->handle, $this->filterNameOrNum($fieldNum)); + } + + /** + * @param string $fieldName + * + * @return int Index of field with given name. + * + * @throws \Error If the field name does not exist in the result. + */ + public function fieldNum(string $fieldName): int { + $result = \pg_field_num($this->handle, $fieldName); + + if (-1 === $result) { + throw new \Error(\sprintf('No field with name "%s" in result', $fieldName)); + } + + return $result; + } + + /** + * @param int|string $fieldNameOrNum Field name or index. + * + * @return string Name of the field type. + * + * @throws \Error If the field number does not exist in the result. + */ + public function fieldType($fieldNameOrNum): string { + return \pg_field_type($this->handle, $this->filterNameOrNum($fieldNameOrNum)); + } + + /** + * @param int|string $fieldNameOrNum Field name or index. + * + * @return int Storage required for field. -1 indicates a variable length field. + * + * @throws \Error If the field number does not exist in the result. + */ + public function fieldSize($fieldNameOrNum): int { + return \pg_field_size($this->handle, $this->filterNameOrNum($fieldNameOrNum)); + } + + /** + * @return int Number of rows in the result set. + */ + public function count(): int { + return $this->numRows(); + } + + /** + * @param int|string $fieldNameOrNum Field name or index. + * + * @return int Field index. + * + * @throws \Error + */ + private function filterNameOrNum($fieldNameOrNum): int { + if (\is_string($fieldNameOrNum)) { + return $this->fieldNum($fieldNameOrNum); + } + + if (!\is_int($fieldNameOrNum)) { + throw new \Error('Must provide a string name or integer field number'); + } + + if (0 > $fieldNameOrNum || $this->numFields() <= $fieldNameOrNum) { + throw new \Error(\sprintf('No field with index %d in result', $fieldNameOrNum)); + } + + return $fieldNameOrNum; + } +} diff --git a/lib/Pool.php b/lib/Pool.php new file mode 100644 index 0000000..f06a701 --- /dev/null +++ b/lib/Pool.php @@ -0,0 +1,20 @@ +result = $result; + parent::__construct(new Emitter(static function (callable $emit) use ($result) { + for ($count = 0; $row = $result->fetchRow(pq\Result::FETCH_ASSOC); ++$count) { + yield $emit($row); + } + return $count; + })); + } + + public function numRows(): int { + return $this->result->numRows; + } + + public function numFields(): int { + return $this->result->numCols; + } + + public function count(): int { + return $this->numRows(); + } +} diff --git a/lib/PqCommandResult.php b/lib/PqCommandResult.php new file mode 100644 index 0000000..609c6e4 --- /dev/null +++ b/lib/PqCommandResult.php @@ -0,0 +1,30 @@ +result = $result; + } + + /** + * @return int Number of rows affected by the INSERT, UPDATE, or DELETE query. + */ + public function affectedRows(): int { + return $this->result->affectedRows; + } + + /** + * @return int + */ + public function count() { + return $this->affectedRows(); + } +} \ No newline at end of file diff --git a/lib/PqConnection.php b/lib/PqConnection.php new file mode 100644 index 0000000..f5d4b2c --- /dev/null +++ b/lib/PqConnection.php @@ -0,0 +1,91 @@ + + * + * @throws \Amp\Postgres\FailureException + */ + public static function connect(string $connectionString, int $timeout = null): Awaitable { + try { + $connection = new pq\Connection($connectionString, pq\Connection::ASYNC); + } catch (pq\Exception $exception) { + throw new FailureException("Could not connect to PostgresSQL server", 0, $exception); + } + $connection->resetAsync(); + $connection->nonblocking = true; + $connection->unbuffered = true; + + $deferred = new Deferred; + + $callback = function ($watcher, $resource) use (&$poll, &$await, $connection, $deferred) { + try { + switch ($connection->poll()) { + case pq\Connection::POLLING_READING: + return; // Connection not ready, poll again. + + case pq\Connection::POLLING_WRITING: + return; // Still writing... + + case pq\Connection::POLLING_FAILED: + throw new FailureException("Could not connect to PostgreSQL server"); + + case pq\Connection::POLLING_OK: + case \PGSQL_POLLING_OK: + Loop::cancel($poll); + Loop::cancel($await); + $deferred->resolve(new self($connection)); + return; + } + } catch (\Throwable $exception) { + Loop::cancel($poll); + Loop::cancel($await); + $deferred->fail($exception); + } + }; + + $poll = Loop::onReadable($connection->socket, $callback); + $await = Loop::onWritable($connection->socket, $callback); + + if ($timeout !== null) { + return \Amp\capture( + $deferred->getAwaitable(), + TimeoutException::class, + function (\Throwable $exception) use ($connection, $poll, $await) { + Loop::cancel($poll); + Loop::cancel($await); + throw $exception; + } + ); + } + + return $deferred->getAwaitable(); + } + + /** + * Connection constructor. + * + * @param \pq\Connection $handle + */ + public function __construct(pq\Connection $handle) { + parent::__construct(new PqExecutor($handle)); + } +} diff --git a/lib/PqExecutor.php b/lib/PqExecutor.php new file mode 100644 index 0000000..23b3f89 --- /dev/null +++ b/lib/PqExecutor.php @@ -0,0 +1,216 @@ +handle = $handle; + + $deferred = &$this->delayed; + + $this->poll = Loop::onReadable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) { + if ($handle->poll() === pq\Connection::POLLING_FAILED) { + $deferred->fail(new FailureException($handle->errorMessage)); + return; + } + + if (!$handle->busy) { + $deferred->resolve($handle->getResult()); + return; + } + + // Reading not done, listen again. + }); + + $this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) { + if (!$handle->flush()) { + return; // Not finished sending data, listen again. + } + + Loop::disable($watcher); + }); + + Loop::disable($this->poll); + Loop::disable($this->await); + + $this->send = $this->callableFromInstanceMethod("send"); + $this->fetch = $this->callableFromInstanceMethod("fetch"); + $this->release = $this->callableFromInstanceMethod("release"); + } + + /** + * Frees Io watchers from loop. + */ + public function __destruct() { + Loop::cancel($this->poll); + Loop::cancel($this->await); + } + + /** + * @param callable $method Method to execute. + * @param mixed ...$args Arguments to pass to function. + * + * @return \Generator + * + * @resolve resource + * + * @throws \Amp\Postgres\FailureException + */ + private function send(callable $method, ...$args): \Generator { + while ($this->busy !== null) { + yield $this->busy->getAwaitable(); + } + + $this->busy = new Deferred; + + try { + try { + $handle = $method(...$args); + } catch (pg\Exception $exception) { + throw new FailureException($this->handle->errorMessage, 0, $exception); + } + + $this->delayed = new Deferred; + + Loop::enable($this->poll); + if (!$this->handle->flush()) { + Loop::enable($this->await); + } + + try { + $result = yield $this->delayed->getAwaitable(); + } finally { + $this->delayed = null; + Loop::disable($this->poll); + Loop::disable($this->await); + } + + if ($handle instanceof pq\Statement) { + return new PqStatement($handle, $this->send); + } + + if (!$result instanceof pq\Result) { + throw new FailureException("Unknown query result"); + } + } finally { + $this->release(); + } + + switch ($result->status) { + case pq\Result::EMPTY_QUERY: + throw new QueryError("Empty query string"); + + case pq\Result::COMMAND_OK: + return new PqCommandResult($result); + + case pq\Result::TUPLES_OK: + return new PqBufferedResult($result); + + CASE pq\Result::SINGLE_TUPLE: + $result = new PqUnbufferedResult($this->fetch, $result); + $result->onComplete($this->release); + $this->busy = new Deferred; + return $result; + + case pq\Result::NONFATAL_ERROR: + case pq\Result::FATAL_ERROR: + throw new QueryError($result->errorMessage); + + case pq\Result::BAD_RESPONSE: + throw new FailureException($result->errorMessage); + + default: + throw new FailureException("Unknown result status"); + } + } + + private function fetch(): \Generator { + if (!$this->handle->busy) { // Results buffered. + $result = $this->handle->getResult(); + } else { + $this->delayed = new Deferred; + + Loop::enable($this->poll); + + try { + $result = yield $this->delayed->getAwaitable(); + } finally { + $this->delayed = null; + Loop::disable($this->poll); + } + } + + switch ($result->status) { + case pq\Result::TUPLES_OK: // No more rows in result set. + return null; + + case pq\Result::SINGLE_TUPLE: + return $result; + + default: + throw new FailureException($result->errorMessage); + } + } + + private function release() { + $busy = $this->busy; + $this->busy = null; + $busy->resolve(); + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Awaitable { + return new Coroutine($this->send([$this->handle, "execAsync"], $sql)); + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, ...$params): Awaitable { + return new Coroutine($this->send([$this->handle, "execParamsAsync"], $sql, $params)); + } + + /** + * {@inheritdoc} + */ + public function prepare(string $sql): Awaitable { + return new Coroutine($this->send([$this->handle, "prepareAsync"], $sql, $sql)); + } +} diff --git a/lib/PqStatement.php b/lib/PqStatement.php new file mode 100644 index 0000000..eb74bae --- /dev/null +++ b/lib/PqStatement.php @@ -0,0 +1,46 @@ +statement = $statement; + $this->execute = $execute; + } + + public function __destruct() { + rethrow(new Coroutine(($this->execute)([$this->statement, "deallocateAsync"]))); + } + + /** + * @return string + */ + public function getQuery(): string { + return $this->statement->query; + } + + /** + * @param mixed ...$params + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\Result> + * + * @throws \Amp\Postgres\FailureException If executing the statement fails. + */ + public function execute(...$params): Awaitable { + return new Coroutine(($this->execute)([$this->statement, "execAsync"], $params)); + } +} \ No newline at end of file diff --git a/lib/PqUnbufferedResult.php b/lib/PqUnbufferedResult.php new file mode 100644 index 0000000..5b57af1 --- /dev/null +++ b/lib/PqUnbufferedResult.php @@ -0,0 +1,39 @@ +numCols = $result->numCols; + parent::__construct(new Emitter(function (callable $emit) use ($result, $fetch) { + $count = 0; + try { + do { + $next = new Coroutine($fetch()); // Request next result before current is consumed. + ++$count; + yield $emit($result->fetchRow(pq\Result::FETCH_ASSOC)); + $result = yield $next; + } while ($result instanceof pq\Result); + } finally { + $this->complete(); + } + return $count; + })); + } + + public function numFields(): int { + return $this->numCols; + } +} \ No newline at end of file diff --git a/lib/QueryError.php b/lib/QueryError.php new file mode 100644 index 0000000..fff6366 --- /dev/null +++ b/lib/QueryError.php @@ -0,0 +1,5 @@ + + */ + public function execute(...$params): Awaitable; +} diff --git a/lib/Transaction.php b/lib/Transaction.php new file mode 100644 index 0000000..d9284ce --- /dev/null +++ b/lib/Transaction.php @@ -0,0 +1,185 @@ +isolation = $isolation; + break; + + default: + throw new \Error("Isolation must be a valid transaction isolation level"); + } + + $this->executor = $executor; + } + + /** + * @return bool + */ + public function isActive(): bool { + return $this->executor !== null; + } + + /** + * @return int + */ + public function getIsolationLevel(): int { + return $this->isolation; + } + + /** + * {@inheritdoc} + */ + public function query(string $sql): Awaitable { + if ($this->executor === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->executor->query($sql); + } + + /** + * {@inheritdoc} + */ + public function prepare(string $sql): Awaitable { + if ($this->executor === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->executor->prepare($sql); + } + + /** + * {@inheritdoc} + */ + public function execute(string $sql, ...$params): Awaitable { + if ($this->executor === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->executor->execute($sql, ...$params); + } + + /** + * Commits the transaction and makes it inactive. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + * + * @throws \Amp\Postgres\TransactionError + */ + public function commit(): Awaitable { + return new Coroutine($this->doCommit()); + } + + private function doCommit(): \Generator { + if ($this->executor === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $executor = $this->executor; + $this->executor = null; + + try { + $result = yield $executor->query("COMMIT"); + } finally { + $this->complete(); + } + + return $result; + } + + /** + * Rolls back the transaction and makes it inactive. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + * + * @throws \Amp\Postgres\TransactionError + */ + public function rollback(): Awaitable { + return new Coroutine($this->doRollback()); + } + + public function doRollback(): \Generator { + if ($this->executor === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $executor = $this->executor; + $this->executor = null; + + try { + $result = yield $executor->query("ROLLBACK"); + } finally { + $this->complete(); + } + + return $result; + } + + /** + * Creates a savepoint with the given identifier. WARNING: Identifier is not sanitized, do not pass untrusted data. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + * + * @throws \Amp\Postgres\TransactionError + */ + public function savepoint(string $identifier): Awaitable { + return $this->query("SAVEPOINT " . $identifier); + } + + /** + * @coroutine + * + * Rolls back to the savepoint with the given identifier. WARNING: Identifier is not sanitized, do not pass + * untrusted data. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + * + * @throws \Amp\Postgres\TransactionError + */ + public function rollbackTo(string $identifier): Awaitable { + return $this->query("ROLLBACK TO " . $identifier); + } + + /** + * @coroutine + * + * Releases the savepoint with the given identifier. WARNING: Identifier is not sanitized, do not pass untrusted + * data. + * + * @return \Interop\Async\Awaitable<\Amp\Postgres\CommandResult> + * + * @throws \Amp\Postgres\TransactionError + */ + public function release(string $identifier): Awaitable { + return $this->query("RELEASE SAVEPOINT " . $identifier); + } +} diff --git a/lib/TransactionError.php b/lib/TransactionError.php new file mode 100644 index 0000000..2580800 --- /dev/null +++ b/lib/TransactionError.php @@ -0,0 +1,5 @@ + + * + * @throws \Amp\Postgres\FailureException If connecting fails. + * @throws \Error If neither ext-pgsql or pecl-pq is loaded. + */ +function connect(string $connectionString, int $timeout = null): Awaitable { + if (\extension_loaded("pq")) { + return PqConnection::connect($connectionString, $timeout); + } + + if (\extension_loaded("pgsql")) { + return PgSqlConnection::connect($connectionString, $timeout); + } + + throw new \Error("This lib requires either pecl-pq or ext-pgsql"); +} + +/** + * @param string $connectionString + * @param int $maxConnections + * @param int $connectTimeout + * + * @return \Amp\Postgres\Pool + */ +function pool( + string $connectionString, + int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS, + int $connectTimeout = ConnectionPool::DEFAULT_CONNECT_TIMEOUT +): Pool { + return new ConnectionPool($connectionString, $maxConnections, $connectTimeout); +} diff --git a/phpdoc.dist.xml b/phpdoc.dist.xml new file mode 100644 index 0000000..b774432 --- /dev/null +++ b/phpdoc.dist.xml @@ -0,0 +1,23 @@ + + + Amp Postgres + + build/docs + utf8 + + + build/docs + + + warn + + build/log/docs/{DATE}.log + + + +