diff --git a/.php_cs b/.php_cs new file mode 100644 index 0000000..233c4e9 --- /dev/null +++ b/.php_cs @@ -0,0 +1,10 @@ +getFinder()->in(__DIR__); + +$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__; + +$config->setCacheFile($cacheDir . '/.php_cs.cache'); + +return $config; diff --git a/.php_cs.dist b/.php_cs.dist deleted file mode 100644 index 9c4a372..0000000 --- a/.php_cs.dist +++ /dev/null @@ -1,40 +0,0 @@ -setRiskyAllowed(true) - ->setRules([ - "@PSR1" => true, - "@PSR2" => true, - "braces" => [ - "allow_single_line_closure" => true, - "position_after_functions_and_oop_constructs" => "same", - ], - "array_syntax" => ["syntax" => "short"], - "cast_spaces" => true, - "combine_consecutive_unsets" => true, - "function_to_constant" => true, - "no_multiline_whitespace_before_semicolons" => true, - "no_unused_imports" => true, - "no_useless_else" => true, - "no_useless_return" => true, - "no_whitespace_before_comma_in_array" => true, - "no_whitespace_in_blank_line" => true, - "non_printable_character" => true, - "normalize_index_brace" => true, - "ordered_imports" => true, - "php_unit_construct" => true, - "php_unit_dedicate_assert" => true, - "php_unit_fqcn_annotation" => true, - "phpdoc_summary" => true, - "phpdoc_types" => true, - "psr4" => true, - "return_type_declaration" => ["space_before" => "none"], - "short_scalar_cast" => true, - "single_blank_line_before_namespace" => true, - ]) - ->setFinder( - PhpCsFixer\Finder::create() - ->in(__DIR__ . "/examples") - ->in(__DIR__ . "/src") - ->in(__DIR__ . "/test") - ); diff --git a/.travis.yml b/.travis.yml index 8ab7c33..6661027 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,12 @@ matrix: - php: nightly fast_finish: true +cache: + directories: + - $HOME/.composer/cache + - $HOME/.php-cs-fixer + - $HOME/.local + env: - AMP_DEBUG=true diff --git a/composer.json b/composer.json index 130dba2..f8d3f27 100644 --- a/composer.json +++ b/composer.json @@ -19,12 +19,13 @@ } ], "require": { - "amphp/amp": "^2" + "amphp/amp": "^2", + "amphp/sql": "dev-master" }, "require-dev": { "amphp/phpunit-util": "^1", "phpunit/phpunit": "^6", - "friendsofphp/php-cs-fixer": "^2.3", + "amphp/php-cs-fixer-config": "dev-master", "phpstan/phpstan": "^0.9" }, "autoload": { @@ -47,7 +48,12 @@ } }, "scripts": { - "test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit", - "code-style": "@php ./vendor/bin/php-cs-fixer fix" + "check": [ + "@cs", + "@test" + ], + "cs": "php-cs-fixer fix -v --diff --dry-run", + "cs-fix": "php-cs-fixer fix -v --diff", + "test": "@php -dzend.assertions=1 -dassert.exception=1 ./vendor/bin/phpunit" } } diff --git a/examples/basic.php b/examples/basic.php index 53feea0..d342e74 100644 --- a/examples/basic.php +++ b/examples/basic.php @@ -1,13 +1,13 @@ #!/usr/bin/env php query('SHOW ALL'); diff --git a/examples/listen.php b/examples/listen.php index 73cc5d8..56ef6f6 100644 --- a/examples/listen.php +++ b/examples/listen.php @@ -1,23 +1,23 @@ #!/usr/bin/env php listen($channel); - printf("Listening on channel '%s'\n", $listener->getChannel()); + \printf("Listening on channel '%s'\n", $listener->getChannel()); Loop::delay(3000, function () use ($listener) { // Unlisten in 3 seconds. - printf("Unlistening from channel '%s'\n", $listener->getChannel()); + \printf("Unlistening from channel '%s'\n", $listener->getChannel()); return $listener->unlisten(); }); @@ -31,7 +31,7 @@ while (yield $listener->advance()) { $notification = $listener->getCurrent(); - printf( + \printf( "Received notification from PID %d on channel '%s' with payload: %s\n", $notification->pid, $notification->channel, diff --git a/examples/multi-listen.php b/examples/multi-listen.php index 956a957..5851987 100644 --- a/examples/multi-listen.php +++ b/examples/multi-listen.php @@ -1,14 +1,14 @@ #!/usr/bin/env php listen($channel1); - printf("Listening on channel '%s'\n", $listener1->getChannel()); + \printf("Listening on channel '%s'\n", $listener1->getChannel()); /** @var \Amp\Postgres\Listener $listener2 */ $listener2 = yield $pool->listen($channel2); - printf("Listening on channel '%s'\n", $listener2->getChannel()); + \printf("Listening on channel '%s'\n", $listener2->getChannel()); Loop::delay(6000, function () use ($listener1) { // Unlisten in 6 seconds. - printf("Unlistening from channel '%s'\n", $listener1->getChannel()); + \printf("Unlistening from channel '%s'\n", $listener1->getChannel()); return $listener1->unlisten(); }); Loop::delay(4000, function () use ($listener2) { // Unlisten in 4 seconds. - printf("Unlistening from channel '%s'\n", $listener2->getChannel()); + \printf("Unlistening from channel '%s'\n", $listener2->getChannel()); return $listener2->unlisten(); }); @@ -53,7 +53,7 @@ while (yield $iterator->advance()) { $notification = $iterator->getCurrent(); - printf( + \printf( "Received notification from PID %d on channel '%s' with payload: %s\n", $notification->pid, $notification->channel, diff --git a/examples/transaction.php b/examples/transaction.php index 13c7da7..129bd47 100644 --- a/examples/transaction.php +++ b/examples/transaction.php @@ -1,12 +1,12 @@ #!/usr/bin/env php query('DROP TABLE IF EXISTS test'); @@ -15,7 +15,7 @@ yield $transaction->query('CREATE TABLE test (domain VARCHAR(63), tld VARCHAR(63), PRIMARY KEY (domain, tld))'); - /** @var \Amp\Postgres\Statement $statement */ + /** @var \Amp\Sql\Statement $statement */ $statement = yield $transaction->prepare('INSERT INTO test VALUES (?, ?)'); yield $statement->execute(['amphp', 'org']); @@ -26,10 +26,10 @@ $result = yield $transaction->execute('SELECT * FROM test WHERE tld = :tld', ['tld' => 'com']); $format = "%-20s | %-10s\n"; - printf($format, 'TLD', 'Domain'); + \printf($format, 'TLD', 'Domain'); while (yield $result->advance()) { $row = $result->getCurrent(); - printf($format, $row['domain'], $row['tld']); + \printf($format, $row['domain'], $row['tld']); } yield $transaction->rollback(); diff --git a/src/CommandResult.php b/src/CommandResult.php deleted file mode 100644 index 93b124f..0000000 --- a/src/CommandResult.php +++ /dev/null @@ -1,12 +0,0 @@ - + * @return Promise */ - abstract public static function connect(string $connectionString, CancellationToken $token = null): Promise; + abstract public static function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise; /** - * @param \Amp\Postgres\Handle $handle + * @param Handle $handle */ - public function __construct(Handle $handle) { + public function __construct(Handle $handle) + { $this->handle = $handle; $this->release = $this->callableFromInstanceMethod("release"); } @@ -39,33 +44,49 @@ public function __construct(Handle $handle) { /** * {@inheritdoc} */ - final public function isAlive(): bool { + final public function isAlive(): bool + { return $this->handle->isAlive(); } /** * {@inheritdoc} + * + * @throws FailureException */ - final public function lastUsedAt(): int { + final public function lastUsedAt(): int + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->handle->lastUsedAt(); } /** * {@inheritdoc} */ - final public function close() { - $this->handle->close(); + final public function close() + { + if ($this->handle) { + $this->handle->close(); + } } /** * @param string $methodName Method to execute. * @param mixed ...$args Arguments to pass to function. * - * @return \Amp\Promise + * @return Promise * - * @throws \Amp\Postgres\FailureException + * @throws FailureException */ - private function send(string $methodName, ...$args): Promise { + private function send(string $methodName, ...$args): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + if ($this->busy) { return call(function () use ($methodName, $args) { while ($this->busy) { @@ -82,7 +103,8 @@ private function send(string $methodName, ...$args): Promise { /** * Releases the transaction lock. */ - private function release() { + private function release() + { \assert($this->busy !== null); $deferred = $this->busy; @@ -93,14 +115,24 @@ private function release() { /** * {@inheritdoc} */ - final public function query(string $sql): Promise { + final public function query(string $sql): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->send("query", $sql); } /** * {@inheritdoc} */ - final public function execute(string $sql, array $params = []): Promise { + final public function execute(string $sql, array $params = []): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->send("execute", $sql, $params); } @@ -109,7 +141,12 @@ final public function execute(string $sql, array $params = []): Promise { * * Statement instances returned by this method must also implement Operation. */ - final public function prepare(string $sql): Promise { + final public function prepare(string $sql): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->send("prepare", $sql); } @@ -117,36 +154,55 @@ final public function prepare(string $sql): Promise { /** * {@inheritdoc} */ - final public function notify(string $channel, string $payload = ""): Promise { + final public function notify(string $channel, string $payload = ""): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->send("notify", $channel, $payload); } /** * {@inheritdoc} + * + * @throws FailureException */ - final public function listen(string $channel): Promise { + final public function listen(string $channel): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->send("listen", $channel); } /** * {@inheritdoc} + * + * @throws FailureException */ - final public function transaction(int $isolation = Transaction::COMMITTED): Promise { + final public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return call(function () use ($isolation) { switch ($isolation) { - case Transaction::UNCOMMITTED: + case Transaction::ISOLATION_UNCOMMITTED: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); break; - case Transaction::COMMITTED: + case Transaction::ISOLATION_COMMITTED: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); break; - case Transaction::REPEATABLE: + case Transaction::ISOLATION_REPEATABLE: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); break; - case Transaction::SERIALIZABLE: + case Transaction::ISOLATION_SERIALIZABLE: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"); break; @@ -164,15 +220,29 @@ final public function transaction(int $isolation = Transaction::COMMITTED): Prom /** * {@inheritdoc} + * + * @throws FailureException */ - final public function quoteString(string $data): string { + final public function quoteString(string $data): string + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->handle->quoteString($data); } /** * {@inheritdoc} + * + * @throws FailureException */ - final public function quoteName(string $name): string { + final public function quoteName(string $name): string + { + if (! $this->handle) { + throw new FailureException('Not connected'); + } + return $this->handle->quoteName($name); } } diff --git a/src/ConnectionConfig.php b/src/ConnectionConfig.php new file mode 100644 index 0000000..47da661 --- /dev/null +++ b/src/ConnectionConfig.php @@ -0,0 +1,21 @@ +connectionString = $connectionString; + } + + public function connectionString(): string + { + return $this->connectionString; + } +} diff --git a/src/ConnectionException.php b/src/ConnectionException.php deleted file mode 100644 index bcc1b5d..0000000 --- a/src/ConnectionException.php +++ /dev/null @@ -1,6 +0,0 @@ - - */ - public function connect(string $connectionString): Promise; -} diff --git a/src/DefaultPool.php b/src/DefaultPool.php deleted file mode 100644 index 2f4df69..0000000 --- a/src/DefaultPool.php +++ /dev/null @@ -1,435 +0,0 @@ -connector = $connector ?? connector(); - - $this->connectionString = $connectionString; - - $this->maxConnections = $maxConnections; - if ($this->maxConnections < 1) { - throw new \Error("Pool must contain at least one connection"); - } - - $this->connections = $connections = new \SplObjectStorage; - $this->idle = $idle = new \SplQueue; - $this->prepare = coroutine($this->callableFromInstanceMethod("doPrepare")); - - $idleTimeout = &$this->idleTimeout; - - $this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) { - $now = \time(); - while (!$idle->isEmpty()) { - /** @var \Amp\Postgres\Connection $connection */ - $connection = $idle->bottom(); - - if ($connection->lastUsedAt() + $idleTimeout > $now) { - return; - } - - // Close connection and remove it from the pool. - $idle->shift(); - $connections->detach($connection); - $connection->close(); - } - }); - - Loop::unreference($this->timeoutWatcher); - } - - public function __destruct() { - Loop::cancel($this->timeoutWatcher); - } - - public function resetConnections(bool $reset) { - $this->resetConnections = $reset; - } - - public function getIdleTimeout(): int { - return $this->idleTimeout; - } - - public function setIdleTimeout(int $timeout) { - if ($timeout < 1) { - throw new \Error("Timeout must be greater than or equal to 1"); - } - - $this->idleTimeout = $timeout; - } - - /** - * @return bool - */ - public function isAlive(): bool { - return !$this->closed; - } - - /** - * Close all connections in the pool. No further queries may be made after a pool is closed. - */ - public function close() { - $this->closed = true; - foreach ($this->connections as $connection) { - $connection->close(); - } - $this->idle = new \SplQueue; - $this->connections = new \SplObjectStorage; - $this->listeningConnection = null; - $this->prepare = null; - } - - /** - * {@inheritdoc} - */ - public function extractConnection(): Promise { - return call(function () { - $connection = yield from $this->pop(); - $this->connections->detach($connection); - return $connection; - }); - } - - /** - * {@inheritdoc} - */ - public function getConnectionCount(): int { - return $this->connections->count(); - } - - /** - * {@inheritdoc} - */ - public function getIdleConnectionCount(): int { - return $this->idle->count(); - } - - /** - * {@inheritdoc} - */ - public function getMaxConnections(): int { - return $this->maxConnections; - } - - /** - * @return \Generator - * - * @resolve \Amp\Postgres\Connection - * - * @throws \Amp\Postgres\FailureException If creating a new connection fails. - * @throws \Error If the pool has been closed. - */ - private function pop(): \Generator { - if ($this->closed) { - throw new \Error("The pool has been closed"); - } - - while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) { - yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1. - } - - do { - // While loop to ensure an idle connection is available after promises below are resolved. - while ($this->idle->isEmpty()) { - if ($this->connections->count() + $this->pending < $this->getMaxConnections()) { - // Max connection count has not been reached, so open another connection. - ++$this->pending; - try { - $connection = yield $this->connector->connect($this->connectionString); - if (!$connection instanceof Connection) { - throw new \Error(\sprintf( - "%s::createConnection() must resolve to an instance of %s", - static::class, - Connection::class - )); - } - } finally { - --$this->pending; - } - - $this->connections->attach($connection); - return $connection; - } - - // All possible connections busy, so wait until one becomes available. - try { - $this->deferred = new Deferred; - // May be resolved with defunct connection, but that connection will not be added to $this->idle. - yield $this->promise = $this->deferred->promise(); - } finally { - $this->deferred = null; - $this->promise = null; - } - } - - /** @var \Amp\Postgres\Connection $connection */ - $connection = $this->idle->shift(); - - if ($connection->isAlive()) { - try { - if ($this->resetConnections) { - yield $connection->query("RESET ALL"); - } - - return $connection; - } catch (FailureException $exception) { - // Fall-through to remove connection below. - } - } - - $this->connections->detach($connection); - } while (!$this->closed); - - throw new FailureException("Pool closed before an active connection could be obtained"); - } - - /** - * @param \Amp\Postgres\Connection $connection - * - * @throws \Error If the connection is not part of this pool. - */ - private function push(Connection $connection) { - \assert(isset($this->connections[$connection]), 'Connection is not part of this pool'); - - if ($connection->isAlive()) { - $this->idle->push($connection); - } else { - $this->connections->detach($connection); - } - - if ($this->deferred instanceof Deferred) { - $this->deferred->resolve($connection); - } - } - - /** - * {@inheritdoc} - */ - public function query(string $sql): Promise { - return call(function () use ($sql) { - /** @var \Amp\Postgres\Connection $connection */ - $connection = yield from $this->pop(); - - try { - $result = yield $connection->query($sql); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof Operation) { - $result->onDestruct(function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - return $result; - }); - } - - /** - * {@inheritdoc} - */ - public function execute(string $sql, array $params = []): Promise { - return call(function () use ($sql, $params) { - /** @var \Amp\Postgres\Connection $connection */ - $connection = yield from $this->pop(); - - try { - $result = yield $connection->execute($sql, $params); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - if ($result instanceof Operation) { - $result->onDestruct(function () use ($connection) { - $this->push($connection); - }); - } else { - $this->push($connection); - } - - return $result; - }); - } - - /** - * {@inheritdoc} - * - * Prepared statements returned by this method will stay alive as long as the pool remains open. - */ - public function prepare(string $sql): Promise { - return call(function () use ($sql) { - $statement = yield from $this->doPrepare($sql); - return new Internal\PooledStatement($this, $statement, $this->prepare); - }); - } - - private function doPrepare(string $sql): \Generator { - /** @var \Amp\Postgres\Connection $connection */ - $connection = yield from $this->pop(); - - try { - /** @var \Amp\Postgres\Statement $statement */ - $statement = yield $connection->prepare($sql); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - \assert( - $statement instanceof Operation, - Statement::class . " instances returned from connections must implement " . Operation::class - ); - - $statement->onDestruct(function () use ($connection) { - $this->push($connection); - }); - - return $statement; - } - - /** - * {@inheritdoc} - */ - public function notify(string $channel, string $payload = ""): Promise { - return call(function () use ($channel, $payload) { - /** @var \Amp\Postgres\Connection $connection */ - $connection = yield from $this->pop(); - - try { - $result = yield $connection->notify($channel, $payload); - } finally { - $this->push($connection); - } - - return $result; - }); - } - - /** - * {@inheritdoc} - */ - public function listen(string $channel): Promise { - return call(function () use ($channel) { - ++$this->listenerCount; - - if ($this->listeningConnection === null) { - $this->listeningConnection = new Coroutine($this->pop()); - } - - if ($this->listeningConnection instanceof Promise) { - $this->listeningConnection = yield $this->listeningConnection; - } - - try { - /** @var \Amp\Postgres\Listener $listener */ - $listener = yield $this->listeningConnection->listen($channel); - } catch (\Throwable $exception) { - if (--$this->listenerCount === 0) { - $connection = $this->listeningConnection; - $this->listeningConnection = null; - $this->push($connection); - } - throw $exception; - } - - $listener->onDestruct(function () { - if (--$this->listenerCount === 0) { - $connection = $this->listeningConnection; - $this->listeningConnection = null; - $this->push($connection); - } - }); - - return $listener; - }); - } - - /** - * {@inheritdoc} - */ - public function transaction(int $isolation = Transaction::COMMITTED): Promise { - return call(function () use ($isolation) { - /** @var \Amp\Postgres\Connection $connection */ - $connection = yield from $this->pop(); - - try { - /** @var \Amp\Postgres\Transaction $transaction */ - $transaction = yield $connection->transaction($isolation); - } catch (\Throwable $exception) { - $this->push($connection); - throw $exception; - } - - $transaction->onDestruct(function () use ($connection) { - $this->push($connection); - }); - - return $transaction; - }); - } -} diff --git a/src/Executor.php b/src/Executor.php index 4585dc5..b11bd76 100644 --- a/src/Executor.php +++ b/src/Executor.php @@ -3,64 +3,20 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\Executor as SqlExecutor; -interface Executor { +interface Executor extends SqlExecutor +{ const STATEMENT_NAME_PREFIX = "amp_"; - /** - * @param string $sql - * - * @return \Amp\Promise<\Amp\Postgres\CommandResult|\Amp\Postgres\TupleResult> - * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. - * @throws \Amp\Postgres\QueryError If the operation fails due to an error in the query (such as a syntax error). - */ - public function query(string $sql): Promise; - - /** - * @param string $sql - * @param mixed[] $params - * - * @return \Amp\Promise<\Amp\Postgres\CommandResult|\Amp\Postgres\TupleResult> - * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. - * @throws \Amp\Postgres\QueryError If the operation fails due to an error in the query (such as a syntax error). - */ - public function execute(string $sql, array $params = []): Promise; - - /** - * @param string $sql - * - * @return \Amp\Promise<\Amp\Postgres\Statement> - * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. - * @throws \Amp\Postgres\QueryError If the operation fails due to an error in the query (such as a syntax error). - */ - public function prepare(string $sql): Promise; - /** * @param string $channel Channel name. * @param string $payload Notification payload. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. + * @throws \Amp\Sql\FailureException If the operation fails due to unexpected condition. + * @throws \Amp\Sql\ConnectionException If the connection to the database is lost. */ public function notify(string $channel, string $payload = ""): Promise; - - /** - * Indicates if the connection to the database is still alive. - * - * @return bool - */ - public function isAlive(): bool; - - /** - * Closes the executor. No further queries may be performed. - */ - public function close(); } diff --git a/src/FailureException.php b/src/FailureException.php deleted file mode 100644 index 8250530..0000000 --- a/src/FailureException.php +++ /dev/null @@ -1,6 +0,0 @@ -lastUsedAt = \time(); - $this->statements = $statements = new \SplQueue; - $this->pool = $pool; - $this->prepare = $prepare; - $this->sql = $statement->getQuery(); - - $this->statements->push($statement); - - $this->timeoutWatcher = Loop::repeat(1000, static function () use ($pool, $statements) { - $now = \time(); - $idleTimeout = ((int) ($pool->getIdleTimeout() / 10)) ?: 1; - - while (!$statements->isEmpty()) { - /** @var \Amp\Postgres\Statement $statement */ - $statement = $statements->bottom(); - - if ($statement->lastUsedAt() + $idleTimeout > $now) { - return; - } - - $statements->shift(); - } - }); - - Loop::unreference($this->timeoutWatcher); - } - - public function __destruct() { - Loop::cancel($this->timeoutWatcher); - } - - /** - * {@inheritdoc} - * - * Unlike regular statements, as long as the pool is open this statement will not die. - */ - public function execute(array $params = []): Promise { - $this->lastUsedAt = \time(); - - return call(function () use ($params) { - if (!$this->statements->isEmpty()) { - do { - /** @var \Amp\Postgres\Statement $statement */ - $statement = $this->statements->shift(); - } while (!$statement->isAlive() && !$this->statements->isEmpty()); - } else { - $statement = yield ($this->prepare)($this->sql); - } - - try { - $result = yield $statement->execute($params); - } catch (\Throwable $exception) { - $this->push($statement); - throw $exception; - } - - if ($result instanceof Operation) { - $result->onDestruct(function () use ($statement) { - $this->push($statement); - }); - } else { - $this->push($statement); - } - - return $result; - }); - } - - /** - * Only retains statements if less than 10% of the pool is consumed by this statement and the pool has - * available connections. - * - * @param Statement $statement - */ - private function push(Statement $statement) { - $maxConnections = $this->pool->getMaxConnections(); - - if ($this->statements->count() > ($maxConnections / 10)) { - return; - } - - if ($maxConnections === $this->pool->getConnectionCount() && $this->pool->getIdleConnectionCount() === 0) { - return; - } - - $this->statements->push($statement); - } - - - /** {@inheritdoc} */ - public function isAlive(): bool { - return $this->pool->isAlive(); - } - - /** {@inheritdoc} */ - public function getQuery(): string { - return $this->sql; - } - - /** {@inheritdoc} */ - public function lastUsedAt(): int { - return $this->lastUsedAt; - } -} diff --git a/src/Internal/PqStatementStorage.php b/src/Internal/PqStatementStorage.php index b2deffc..d0ea108 100644 --- a/src/Internal/PqStatementStorage.php +++ b/src/Internal/PqStatementStorage.php @@ -2,7 +2,8 @@ namespace Amp\Postgres\Internal; -class PqStatementStorage extends StatementStorage { +class PqStatementStorage extends StatementStorage +{ /** @var \pq\Statement */ public $statement; } diff --git a/src/Internal/ReferenceQueue.php b/src/Internal/ReferenceQueue.php index 2323447..bfde708 100644 --- a/src/Internal/ReferenceQueue.php +++ b/src/Internal/ReferenceQueue.php @@ -4,14 +4,16 @@ use Amp\Loop; -final class ReferenceQueue { +final class ReferenceQueue +{ /** @var callable[]|null */ private $onDestruct = []; /** @var int */ private $refCount = 1; - public function onDestruct(callable $onDestruct) { + public function onDestruct(callable $onDestruct) + { if (!$this->refCount) { try { $onDestruct(); @@ -26,12 +28,14 @@ public function onDestruct(callable $onDestruct) { $this->onDestruct[] = $onDestruct; } - public function reference() { + public function reference() + { \assert($this->refCount, "The reference queue has already been fully unreferenced and destroyed"); ++$this->refCount; } - public function unreference() { + public function unreference() + { \assert($this->refCount, "The reference queue has already been fully unreferenced and destroyed"); if (--$this->refCount) { @@ -50,7 +54,8 @@ public function unreference() { $this->onDestruct = null; } - public function isReferenced(): bool { + public function isReferenced(): bool + { return (bool) $this->refCount; } } diff --git a/src/Internal/StatementStorage.php b/src/Internal/StatementStorage.php index b98a259..7d81ed2 100644 --- a/src/Internal/StatementStorage.php +++ b/src/Internal/StatementStorage.php @@ -4,10 +4,11 @@ use Amp\Struct; -class StatementStorage { +class StatementStorage +{ use Struct; - /** @var \Amp\Promise|null */ + /** @var |null */ public $promise; /** @var int */ diff --git a/src/Internal/functions.php b/src/Internal/functions.php index d53ae64..86a9b38 100644 --- a/src/Internal/functions.php +++ b/src/Internal/functions.php @@ -14,7 +14,8 @@ * * @return string SQL statement with Postgres-style placeholders */ -function parseNamedParams(string $sql, array &$names = null): string { +function parseNamedParams(string $sql, array &$names = null): string +{ $names = []; return \preg_replace_callback(STATEMENT_PARAM_REGEX, function (array $matches) use (&$names) { static $index = 0, $unnamed = 0, $numbered = 1; @@ -44,7 +45,8 @@ function parseNamedParams(string $sql, array &$names = null): string { * * @throws \Error If the $param array does not contain a key corresponding to a named parameter. */ -function replaceNamedParams(array $params, array $names): array { +function replaceNamedParams(array $params, array $names): array +{ $values = []; foreach ($names as $index => $name) { if (!\array_key_exists($name, $params)) { diff --git a/src/Link.php b/src/Link.php index 665d46c..98d4df2 100644 --- a/src/Link.php +++ b/src/Link.php @@ -3,27 +3,18 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\Link as SqlLink; -interface Link extends Executor { - /** - * @param int $isolation - * - * @return \Amp\Promise<\Amp\Postgres\Transaction> - * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. - * @throws \Amp\Postgres\QueryError If the operation fails due to an error in the query (such as a syntax error). - */ - public function transaction(int $isolation = Transaction::COMMITTED): Promise; - +interface Link extends Executor, SqlLink +{ /** * @param string $channel Channel name. * - * @return \Amp\Promise<\Amp\Postgres\Listener> + * @return Promise * - * @throws \Amp\Postgres\FailureException If the operation fails due to unexpected condition. - * @throws \Amp\Postgres\ConnectionException If the connection to the database is lost. - * @throws \Amp\Postgres\QueryError If the operation fails due to an error in the query (such as a syntax error). + * @throws \Amp\Sql\FailureException If the operation fails due to unexpected condition. + * @throws \Amp\Sql\ConnectionException If the connection to the database is lost. + * @throws \Amp\Sql\QueryError If the operation fails due to an error in the query (such as a syntax error). */ public function listen(string $channel): Promise; } diff --git a/src/Listener.php b/src/Listener.php index 71ee9eb..d78edec 100644 --- a/src/Listener.php +++ b/src/Listener.php @@ -4,8 +4,10 @@ use Amp\Iterator; use Amp\Promise; +use Amp\Sql\Operation; -final class Listener implements Iterator, Operation { +final class Listener implements Iterator, Operation +{ /** @var \Amp\Iterator */ private $iterator; @@ -15,22 +17,24 @@ final class Listener implements Iterator, Operation { /** @var callable|null */ private $unlisten; - /** @var \Amp\Postgres\Internal\ReferenceQueue */ + /** @var Internal\ReferenceQueue */ private $queue; /** * @param \Amp\Iterator $iterator Iterator emitting notificatons on the channel. * @param string $channel Channel name. - * @param callable(string $channel): \Amp\Promise $unlisten Function invoked to unlisten from the channel. + * @param callable(string $channel): $unlisten Function invoked to unlisten from the channel. */ - public function __construct(Iterator $iterator, string $channel, callable $unlisten) { + public function __construct(Iterator $iterator, string $channel, callable $unlisten) + { $this->iterator = $iterator; $this->channel = $channel; $this->unlisten = $unlisten; $this->queue = new Internal\ReferenceQueue; } - public function __destruct() { + public function __destruct() + { if ($this->unlisten) { $this->unlisten(); // Invokes $this->queue->complete(). } @@ -39,46 +43,51 @@ public function __destruct() { /** * {@inheritdoc} */ - public function onDestruct(callable $onComplete) { + public function onDestruct(callable $onComplete) + { $this->queue->onDestruct($onComplete); } /** * {@inheritdoc} */ - public function advance(): Promise { + public function advance(): Promise + { return $this->iterator->advance(); } /** * {@inheritdoc} * - * @return \Amp\Postgres\Notification + * @return Notification */ - public function getCurrent(): Notification { + public function getCurrent(): Notification + { return $this->iterator->getCurrent(); } /** * @return string Channel name. */ - public function getChannel(): string { + public function getChannel(): string + { return $this->channel; } /** * Unlistens from the channel. No more values will be emitted from this listener. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * * @throws \Error If this method was previously invoked. */ - public function unlisten(): Promise { + public function unlisten(): Promise + { if (!$this->unlisten) { throw new \Error("Already unlistened on this channel"); } - /** @var \Amp\Promise $promise */ + /** @var $promise */ $promise = ($this->unlisten)($this->channel); $this->unlisten = null; $promise->onResolve([$this->queue, "unreference"]); diff --git a/src/Notification.php b/src/Notification.php index 7d714c5..39d31e0 100644 --- a/src/Notification.php +++ b/src/Notification.php @@ -4,7 +4,8 @@ use Amp\Struct; -final class Notification { +final class Notification +{ use Struct; /** @var string Channel name. */ diff --git a/src/Operation.php b/src/Operation.php deleted file mode 100644 index 73e1ba3..0000000 --- a/src/Operation.php +++ /dev/null @@ -1,10 +0,0 @@ -handle = $handle; } /** * Frees the result resource. */ - public function __destruct() { + public function __destruct() + { \pg_free_result($this->handle); } /** * @return int Number of rows affected by the INSERT, UPDATE, or DELETE query. */ - public function affectedRows(): int { + public function affectedRows(): int + { return \pg_affected_rows($this->handle); } /** * @return string */ - public function lastOid(): string { + public function lastOid(): string + { return (string) \pg_last_oid($this->handle); } } diff --git a/src/PgSqlConnection.php b/src/PgSqlConnection.php index 458cd76..b45cfae 100644 --- a/src/PgSqlConnection.php +++ b/src/PgSqlConnection.php @@ -2,29 +2,36 @@ namespace Amp\Postgres; +use Amp\CallableMaker; use Amp\CancellationToken; use Amp\Deferred; use Amp\Failure; use Amp\Loop; use Amp\NullCancellationToken; use Amp\Promise; +use Amp\Sql\ConnectionConfig; +use Amp\Sql\ConnectionException; + +final class PgSqlConnection extends Connection implements Link +{ + use CallableMaker; -final class PgSqlConnection extends Connection { /** * @param string $connectionString - * @param \Amp\CancellationToken $token + * @param CancellationToken $token * - * @return \Amp\Promise<\Amp\Postgres\PgSqlConnection> + * @return Promise * * @throws \Error If pecl-ev is used as a loop extension. */ - public static function connect(string $connectionString, CancellationToken $token = null): Promise { + public static function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise + { // @codeCoverageIgnoreStart if (Loop::get()->getHandle() instanceof \EvLoop) { throw new \Error('ext-pgsql is not compatible with pecl-ev; use pecl-pq or a different loop extension'); } // @codeCoverageIgnoreEnd - $connectionString = \str_replace(";", " ", $connectionString); + $connectionString = \str_replace(";", " ", $connectionConfig->connectionString()); if (!$connection = @\pg_connect($connectionString, \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) { return new Failure(new ConnectionException("Failed to create connection resource")); @@ -63,7 +70,7 @@ public static function connect(string $connectionString, CancellationToken $toke $promise = $deferred->promise(); - $token = $token ?? new NullCancellationToken; + $token = $token ?? new NullCancellationToken(); $id = $token->subscribe([$deferred, "fail"]); $promise->onResolve(function ($exception) use ($connection, $poll, $await, $id, $token) { @@ -83,7 +90,8 @@ public static function connect(string $connectionString, CancellationToken $toke * @param resource $handle PostgreSQL connection handle. * @param resource $socket PostgreSQL connection stream socket. */ - public function __construct($handle, $socket) { + public function __construct($handle, $socket) + { parent::__construct(new PgSqlHandle($handle, $socket)); } } diff --git a/src/PgSqlHandle.php b/src/PgSqlHandle.php index 6a4845b..061c4b5 100644 --- a/src/PgSqlHandle.php +++ b/src/PgSqlHandle.php @@ -7,10 +7,14 @@ use Amp\Emitter; use Amp\Loop; use Amp\Promise; +use Amp\Sql\ConnectionException; +use Amp\Sql\FailureException; +use Amp\Sql\QueryError; use Amp\Success; use function Amp\call; -final class PgSqlHandle implements Handle { +final class PgSqlHandle implements Handle +{ use CallableMaker; const DIAGNOSTIC_CODES = [ @@ -46,7 +50,7 @@ final class PgSqlHandle implements Handle { /** @var callable */ private $unlisten; - /** @var \Amp\Postgres\Internal\StatementStorage[] */ + /** @var Internal\StatementStorage[] */ private $statements = []; /** @var int */ @@ -58,7 +62,8 @@ final class PgSqlHandle implements Handle { * @param resource $handle PostgreSQL connection handle. * @param resource $socket PostgreSQL connection stream socket. */ - public function __construct($handle, $socket) { + public function __construct($handle, $socket) + { $this->handle = $handle; $this->lastUsedAt = \time(); @@ -148,14 +153,16 @@ public function __construct($handle, $socket) { /** * Frees Io watchers from loop. */ - public function __destruct() { + public function __destruct() + { $this->free(); } /** * {@inheritdoc} */ - public function close() { + public function close() + { if ($this->deferred) { $deferred = $this->deferred; $this->deferred = null; @@ -167,7 +174,8 @@ public function close() { $this->handle = null; } - private function free() { + private function free() + { if (\is_resource($this->handle)) { \pg_close($this->handle); } @@ -179,14 +187,16 @@ private function free() { /** * {@inheritdoc} */ - public function isAlive(): bool { + public function isAlive(): bool + { return $this->handle !== null; } /** * {@inheritdoc} */ - public function lastUsedAt(): int { + public function lastUsedAt(): int + { return $this->lastUsedAt; } @@ -198,9 +208,10 @@ public function lastUsedAt(): int { * * @resolve resource * - * @throws \Amp\Postgres\FailureException + * @throws FailureException */ - private function send(callable $function, ...$args): \Generator { + private function send(callable $function, ...$args): \Generator + { while ($this->deferred) { try { yield $this->deferred->promise(); @@ -238,12 +249,13 @@ private function send(callable $function, ...$args): \Generator { /** * @param resource $result PostgreSQL result resource. * - * @return \Amp\Postgres\CommandResult|\Amp\Postgres\ResultSet + * @return \Amp\Sql\CommandResult|ResultSet * - * @throws \Amp\Postgres\FailureException - * @throws \Amp\Postgres\QueryError + * @throws FailureException + * @throws QueryError */ - private function createResult($result) { + private function createResult($result) + { switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) { case \PGSQL_EMPTY_QUERY: throw new QueryError("Empty query string"); @@ -276,9 +288,10 @@ private function createResult($result) { * @param string $name * @param array $params * - * @return \Amp\Promise + * @return Promise */ - public function statementExecute(string $name, array $params): Promise { + public function statementExecute(string $name, array $params): Promise + { return call(function () use ($name, $params) { return $this->createResult(yield from $this->send("pg_send_execute", $name, $params)); }); @@ -287,11 +300,12 @@ public function statementExecute(string $name, array $params): Promise { /** * @param string $name * - * @return \Amp\Promise + * @return Promise * * @throws \Error */ - public function statementDeallocate(string $name): Promise { + public function statementDeallocate(string $name): Promise + { if (!\is_resource($this->handle)) { return new Success; // Connection closed, no need to deallocate. } @@ -312,7 +326,8 @@ public function statementDeallocate(string $name): Promise { /** * {@inheritdoc} */ - public function query(string $sql): Promise { + public function query(string $sql): Promise + { if (!\is_resource($this->handle)) { throw new \Error("The connection to the database has been closed"); } @@ -325,7 +340,8 @@ public function query(string $sql): Promise { /** * {@inheritdoc} */ - public function execute(string $sql, array $params = []): Promise { + public function execute(string $sql, array $params = []): Promise + { if (!\is_resource($this->handle)) { throw new \Error("The connection to the database has been closed"); } @@ -341,7 +357,8 @@ public function execute(string $sql, array $params = []): Promise { /** * {@inheritdoc} */ - public function prepare(string $sql): Promise { + public function prepare(string $sql): Promise + { if (!\is_resource($this->handle)) { throw new \Error("The connection to the database has been closed"); } @@ -402,7 +419,8 @@ public function prepare(string $sql): Promise { /** * {@inheritdoc} */ - public function notify(string $channel, string $payload = ""): Promise { + public function notify(string $channel, string $payload = ""): Promise + { if ($payload === "") { return $this->query(\sprintf("NOTIFY %s", $this->quoteName($channel))); } @@ -413,7 +431,8 @@ public function notify(string $channel, string $payload = ""): Promise { /** * {@inheritdoc} */ - public function listen(string $channel): Promise { + public function listen(string $channel): Promise + { return call(function () use ($channel) { if (isset($this->listeners[$channel])) { throw new QueryError(\sprintf("Already listening on channel '%s'", $channel)); @@ -436,11 +455,12 @@ public function listen(string $channel): Promise { /** * @param string $channel * - * @return \Amp\Promise + * @return Promise * * @throws \Error */ - private function unlisten(string $channel): Promise { + private function unlisten(string $channel): Promise + { \assert(isset($this->listeners[$channel]), "Not listening on that channel"); $emitter = $this->listeners[$channel]; @@ -459,7 +479,8 @@ private function unlisten(string $channel): Promise { /** * {@inheritdoc} */ - public function quoteString(string $data): string { + public function quoteString(string $data): string + { if (!\is_resource($this->handle)) { throw new \Error("The connection to the database has been closed"); } @@ -470,7 +491,8 @@ public function quoteString(string $data): string { /** * {@inheritdoc} */ - public function quoteName(string $name): string { + public function quoteName(string $name): string + { if (!\is_resource($this->handle)) { throw new \Error("The connection to the database has been closed"); } diff --git a/src/PgSqlResultSet.php b/src/PgSqlResultSet.php index e2476f9..594a398 100644 --- a/src/PgSqlResultSet.php +++ b/src/PgSqlResultSet.php @@ -3,9 +3,11 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\FailureException; use Amp\Success; -final class PgSqlResultSet implements ResultSet { +final class PgSqlResultSet implements ResultSet +{ /** @var resource PostgreSQL result resource. */ private $handle; @@ -24,13 +26,14 @@ final class PgSqlResultSet implements ResultSet { /** @var string[] */ private $fieldNames = []; - /** @var \Amp\Postgres\Internal\ArrayParser */ + /** @var Internal\ArrayParser */ private $parser; /** * @param resource $handle PostgreSQL result resource. */ - public function __construct($handle) { + public function __construct($handle) + { $this->handle = $handle; $numFields = \pg_num_fields($this->handle); @@ -45,14 +48,16 @@ public function __construct($handle) { /** * Frees the result resource. */ - public function __destruct() { + public function __destruct() + { \pg_free_result($this->handle); } /** * {@inheritdoc} */ - public function advance(int $type = self::FETCH_ASSOC): Promise { + public function advance(int $type = self::FETCH_ASSOC): Promise + { $this->currentRow = null; $this->type = $type; @@ -66,7 +71,8 @@ public function advance(int $type = self::FETCH_ASSOC): Promise { /** * {@inheritdoc} */ - public function getCurrent() { + public function getCurrent() + { if ($this->currentRow !== null) { return $this->currentRow; } @@ -120,9 +126,10 @@ public function getCurrent() { * * @return array|bool|float|int Cast value. * - * @throws \Amp\Postgres\ParseException + * @throws ParseException */ - private function cast(int $column, string $value) { + private function cast(int $column, string $value) + { switch ($this->fieldTypes[$column]) { case 16: // bool return $value === 't'; @@ -228,14 +235,16 @@ private function cast(int $column, string $value) { /** * @return int Number of rows in the result set. */ - public function numRows(): int { + public function numRows(): int + { return \pg_num_rows($this->handle); } /** * @return int Number of fields in each row. */ - public function numFields(): int { + public function numFields(): int + { return \pg_num_fields($this->handle); } @@ -246,7 +255,8 @@ public function numFields(): int { * * @throws \Error If the field number does not exist in the result. */ - public function fieldName(int $fieldNum): string { + public function fieldName(int $fieldNum): string + { if (0 > $fieldNum || $this->numFields() <= $fieldNum) { throw new \Error(\sprintf('No field with index %d in result', $fieldNum)); } @@ -261,7 +271,8 @@ public function fieldName(int $fieldNum): string { * * @throws \Error If the field name does not exist in the result. */ - public function fieldNum(string $fieldName): int { + public function fieldNum(string $fieldName): int + { $result = \pg_field_num($this->handle, $fieldName); if (-1 === $result) { diff --git a/src/PgSqlStatement.php b/src/PgSqlStatement.php index 5b31281..aa9fdf3 100644 --- a/src/PgSqlStatement.php +++ b/src/PgSqlStatement.php @@ -3,9 +3,12 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\Operation; +use Amp\Sql\Statement; -final class PgSqlStatement implements Statement, Operation { - /** @var \Amp\Postgres\PgSqlHandle */ +final class PgSqlStatement implements Statement, Operation +{ + /** @var PgSqlHandle */ private $handle; /** @var string */ @@ -14,7 +17,7 @@ final class PgSqlStatement implements Statement, Operation { /** @var string */ private $sql; - /** @var \Amp\Postgres\Internal\ReferenceQueue */ + /** @var Internal\ReferenceQueue */ private $queue; /** @var string[] */ @@ -24,12 +27,13 @@ final class PgSqlStatement implements Statement, Operation { private $lastUsedAt; /** - * @param \Amp\Postgres\PgSqlHandle $handle + * @param PgSqlHandle $handle * @param string $name * @param string $sql * @param string[] $params */ - public function __construct(PgSqlHandle $handle, string $name, string $sql, array $params) { + public function __construct(PgSqlHandle $handle, string $name, string $sql, array $params) + { $this->handle = $handle; $this->name = $name; $this->sql = $sql; @@ -38,33 +42,39 @@ public function __construct(PgSqlHandle $handle, string $name, string $sql, arra $this->lastUsedAt = \time(); } - public function __destruct() { + public function __destruct() + { $this->handle->statementDeallocate($this->name); $this->queue->unreference(); } /** {@inheritdoc} */ - public function isAlive(): bool { + public function isAlive(): bool + { return $this->handle->isAlive(); } /** {@inheritdoc} */ - public function getQuery(): string { + public function getQuery(): string + { return $this->sql; } /** {@inheritdoc} */ - public function lastUsedAt(): int { + public function lastUsedAt(): int + { return $this->lastUsedAt; } /** {@inheritdoc} */ - public function execute(array $params = []): Promise { + public function execute(array $params = []): Promise + { return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params)); } /** {@inheritdoc} */ - public function onDestruct(callable $onDestruct) { + public function onDestruct(callable $onDestruct) + { $this->queue->onDestruct($onDestruct); } } diff --git a/src/Pool.php b/src/Pool.php index a902689..ae721a0 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -2,44 +2,80 @@ namespace Amp\Postgres; +use Amp\Coroutine; use Amp\Promise; +use Amp\Sql\AbstractPool; +use Amp\Sql\Connector; +use function Amp\call; -interface Pool extends Link { - const DEFAULT_MAX_CONNECTIONS = 100; - const DEFAULT_IDLE_TIMEOUT = 60; +final class Pool extends AbstractPool implements Link +{ + /** @var Connection|Promise|null Connection used for notification listening. */ + private $listeningConnection; + /** @var int Number of listeners on listening connection. */ + private $listenerCount = 0; - /** - * @return Promise - */ - public function extractConnection(): Promise; + protected function createDefaultConnector(): Connector + { + return connector(); + } /** - * @return int Total number of active connections in the pool. + * {@inheritdoc} */ - public function getConnectionCount(): int; + public function notify(string $channel, string $payload = ""): Promise + { + return call(function () use ($channel, $payload) { + /** @var Connection $connection */ + $connection = yield from $this->pop(); - /** - * @return int Total number of idle connections in the pool. - */ - public function getIdleConnectionCount(): int; + try { + $result = yield $connection->notify($channel, $payload); + } finally { + $this->push($connection); + } - /** - * @return int Maximum number of connections this pool will create. - */ - public function getMaxConnections(): int; + return $result; + }); + } /** - * @param bool $reset True to automatically reset a connection in the pool before using it for an operation. + * {@inheritdoc} */ - public function resetConnections(bool $reset); + public function listen(string $channel): Promise + { + return call(function () use ($channel) { + ++$this->listenerCount; - /** - * @return int Number of seconds a connection may remain idle before it is automatically closed. - */ - public function getIdleTimeout(): int; + if ($this->listeningConnection === null) { + $this->listeningConnection = new Coroutine($this->pop()); + } - /** - * @param int $timeout Number of seconds a connection may remain idle before it is automatically closed. - */ - public function setIdleTimeout(int $timeout); + if ($this->listeningConnection instanceof Promise) { + $this->listeningConnection = yield $this->listeningConnection; + } + + try { + /** @var Listener $listener */ + $listener = yield $this->listeningConnection->listen($channel); + } catch (\Throwable $exception) { + if (--$this->listenerCount === 0) { + $connection = $this->listeningConnection; + $this->listeningConnection = null; + $this->push($connection); + } + throw $exception; + } + + $listener->onDestruct(function () { + if (--$this->listenerCount === 0) { + $connection = $this->listeningConnection; + $this->listeningConnection = null; + $this->push($connection); + } + }); + + return $listener; + }); + } } diff --git a/src/PoolError.php b/src/PoolError.php index 9119703..926ab2e 100644 --- a/src/PoolError.php +++ b/src/PoolError.php @@ -2,5 +2,6 @@ namespace Amp\Postgres; -final class PoolError extends \Error { +final class PoolError extends \Error +{ } diff --git a/src/PqBufferedResultSet.php b/src/PqBufferedResultSet.php index cf54c74..968b879 100644 --- a/src/PqBufferedResultSet.php +++ b/src/PqBufferedResultSet.php @@ -6,7 +6,8 @@ use Amp\Success; use pq; -final class PqBufferedResultSet implements ResultSet { +final class PqBufferedResultSet implements ResultSet +{ /** @var \pq\Result */ private $result; @@ -22,7 +23,8 @@ final class PqBufferedResultSet implements ResultSet { /** * @param pq\Result $result PostgreSQL result object. */ - public function __construct(pq\Result $result) { + public function __construct(pq\Result $result) + { $this->result = $result; $this->result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; } @@ -30,7 +32,8 @@ public function __construct(pq\Result $result) { /** * {@inheritdoc} */ - public function advance(int $type = self::FETCH_ASSOC): Promise { + public function advance(int $type = self::FETCH_ASSOC): Promise + { $this->currentRow = null; $this->type = $type; @@ -44,7 +47,8 @@ public function advance(int $type = self::FETCH_ASSOC): Promise { /** * {@inheritdoc} */ - public function getCurrent() { + public function getCurrent() + { if ($this->currentRow !== null) { return $this->currentRow; } @@ -65,11 +69,13 @@ public function getCurrent() { } } - public function numRows(): int { + public function numRows(): int + { return $this->result->numRows; } - public function numFields(): int { + public function numFields(): int + { return $this->result->numCols; } } diff --git a/src/PqCommandResult.php b/src/PqCommandResult.php index 5fac65e..43af02d 100644 --- a/src/PqCommandResult.php +++ b/src/PqCommandResult.php @@ -2,23 +2,27 @@ namespace Amp\Postgres; +use Amp\Sql\CommandResult; use pq; -final class PqCommandResult implements CommandResult { +final class PqCommandResult implements CommandResult +{ /** @var \pq\Result PostgreSQL result object. */ private $result; /** * @param \pq\Result $result PostgreSQL result object. */ - public function __construct(pq\Result $result) { + public function __construct(pq\Result $result) + { $this->result = $result; } /** * @return int Number of rows affected by the INSERT, UPDATE, or DELETE query. */ - public function affectedRows(): int { + public function affectedRows(): int + { return $this->result->affectedRows; } } diff --git a/src/PqConnection.php b/src/PqConnection.php index af58224..1ebae9f 100644 --- a/src/PqConnection.php +++ b/src/PqConnection.php @@ -8,17 +8,21 @@ use Amp\Loop; use Amp\NullCancellationToken; use Amp\Promise; +use Amp\Sql\ConnectionConfig; +use Amp\Sql\ConnectionException; use pq; -final class PqConnection extends Connection { +final class PqConnection extends Connection implements Link +{ /** - * @param string $connectionString - * @param \Amp\CancellationToken $token + * @param ConnectionConfig $connectionConfig + * @param CancellationToken $token * - * @return \Amp\Promise<\Amp\Postgres\PgSqlConnection> + * @return Promise */ - public static function connect(string $connectionString, CancellationToken $token = null): Promise { - $connectionString = \str_replace(";", " ", $connectionString); + public static function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise + { + $connectionString = \str_replace(";", " ", $connectionConfig->connectionString()); try { $connection = new pq\Connection($connectionString, pq\Connection::ASYNC); @@ -54,7 +58,7 @@ public static function connect(string $connectionString, CancellationToken $toke $promise = $deferred->promise(); - $token = $token ?? new NullCancellationToken; + $token = $token ?? new NullCancellationToken(); $id = $token->subscribe([$deferred, "fail"]); $promise->onResolve(function () use ($poll, $await, $id, $token) { @@ -69,7 +73,8 @@ public static function connect(string $connectionString, CancellationToken $toke /** * @param \pq\Connection $handle */ - public function __construct(pq\Connection $handle) { + public function __construct(pq\Connection $handle) + { parent::__construct(new PqHandle($handle)); } } diff --git a/src/PqHandle.php b/src/PqHandle.php index f919f9f..74cc27b 100644 --- a/src/PqHandle.php +++ b/src/PqHandle.php @@ -8,12 +8,16 @@ use Amp\Emitter; use Amp\Loop; use Amp\Promise; +use Amp\Sql\ConnectionException; +use Amp\Sql\FailureException; +use Amp\Sql\QueryError; use Amp\Success; use pq; use function Amp\call; use function Amp\coroutine; -final class PqHandle implements Handle { +final class PqHandle implements Handle +{ use CallableMaker; /** @var \pq\Connection PostgreSQL connection object. */ @@ -34,7 +38,7 @@ final class PqHandle implements Handle { /** @var \Amp\Emitter[] */ private $listeners; - /** @var \Amp\Postgres\Internal\PqStatementStorage[] */ + /** @var @return PromiseInternal\PqStatementStorage[] */ private $statements = []; /** @var callable */ @@ -54,7 +58,8 @@ final class PqHandle implements Handle { * * @param \pq\Connection $handle */ - public function __construct(pq\Connection $handle) { + public function __construct(pq\Connection $handle) + { $this->handle = $handle; $this->lastUsedAt = \time(); @@ -129,28 +134,32 @@ public function __construct(pq\Connection $handle) { /** * Frees Io watchers from loop. */ - public function __destruct() { + public function __destruct() + { $this->free(); } /** * {@inheritdoc} */ - public function isAlive(): bool { + public function isAlive(): bool + { return $this->handle !== null; } /** * {@inheritdoc} */ - public function lastUsedAt(): int { + public function lastUsedAt(): int + { return $this->lastUsedAt; } /** * {@inheritdoc} */ - public function close() { + public function close() + { if ($this->deferred) { $deferred = $this->deferred; $this->deferred = null; @@ -162,7 +171,8 @@ public function close() { $this->free(); } - private function free() { + private function free() + { Loop::cancel($this->poll); Loop::cancel($this->await); } @@ -173,11 +183,12 @@ private function free() { * * @return \Generator * - * @resolve \Amp\Postgres\CommandResult|\Amp\Postgres\TupleResult|\pq\Statement + * @resolve \Amp\Sql\CommandResult|\pq\Statement * - * @throws \Amp\Postgres\FailureException + * @throws FailureException */ - private function send(callable $method, ...$args): \Generator { + private function send(callable $method, ...$args): \Generator + { while ($this->busy) { try { yield $this->busy->promise(); @@ -243,7 +254,8 @@ private function send(callable $method, ...$args): \Generator { } } - private function fetch(): \Generator { + private function fetch(): \Generator + { if (!$this->handle->busy) { // Results buffered. $result = $this->handle->getResult(); } else { @@ -277,7 +289,8 @@ private function fetch(): \Generator { } } - private function release() { + private function release() + { \assert( $this->busy instanceof Deferred && $this->busy !== $this->deferred, "Connection in invalid state when releasing" @@ -294,10 +307,11 @@ private function release() { * @param string $name * @param array $params * - * @return \Amp\Promise - * @throws \Amp\Postgres\FailureException + * @return Promise + * @throws FailureException */ - public function statementExecute(string $name, array $params): Promise { + public function statementExecute(string $name, array $params): Promise + { \assert(isset($this->statements[$name]), "Named statement not found when executing"); $statement = $this->statements[$name]->statement; @@ -308,11 +322,12 @@ public function statementExecute(string $name, array $params): Promise { /** * @param string $name * - * @return \Amp\Promise + * @return Promise * - * @throws \Amp\Postgres\FailureException + * @throws FailureException */ - public function statementDeallocate(string $name): Promise { + public function statementDeallocate(string $name): Promise + { if (!$this->handle) { return new Success; // Connection dead. } @@ -333,7 +348,8 @@ public function statementDeallocate(string $name): Promise { /** * {@inheritdoc} */ - public function query(string $sql): Promise { + public function query(string $sql): Promise + { if (!$this->handle) { throw new \Error("The connection to the database has been closed"); } @@ -344,7 +360,8 @@ public function query(string $sql): Promise { /** * {@inheritdoc} */ - public function execute(string $sql, array $params = []): Promise { + public function execute(string $sql, array $params = []): Promise + { if (!$this->handle) { throw new \Error("The connection to the database has been closed"); } @@ -358,7 +375,8 @@ public function execute(string $sql, array $params = []): Promise { /** * {@inheritdoc} */ - public function prepare(string $sql): Promise { + public function prepare(string $sql): Promise + { if (!$this->handle) { throw new \Error("The connection to the database has been closed"); } @@ -399,14 +417,16 @@ public function prepare(string $sql): Promise { /** * {@inheritdoc} */ - public function notify(string $channel, string $payload = ""): Promise { + public function notify(string $channel, string $payload = ""): Promise + { return new Coroutine($this->send([$this->handle, "notifyAsync"], $channel, $payload)); } /** * {@inheritdoc} */ - public function listen(string $channel): Promise { + public function listen(string $channel): Promise + { return call(function () use ($channel) { if (isset($this->listeners[$channel])) { throw new QueryError(\sprintf("Already listening on channel '%s'", $channel)); @@ -439,11 +459,12 @@ static function (string $channel, string $message, int $pid) use ($emitter) { /** * @param string $channel * - * @return \Amp\Promise + * @return Promise * * @throws \Error */ - private function unlisten(string $channel): Promise { + private function unlisten(string $channel): Promise + { \assert(isset($this->listeners[$channel]), "Not listening on that channel"); $emitter = $this->listeners[$channel]; @@ -462,7 +483,8 @@ private function unlisten(string $channel): Promise { /** * {@inheritdoc} */ - public function quoteString(string $data): string { + public function quoteString(string $data): string + { if (!$this->handle) { throw new \Error("The connection to the database has been closed"); } @@ -473,7 +495,8 @@ public function quoteString(string $data): string { /** * {@inheritdoc} */ - public function quoteName(string $name): string { + public function quoteName(string $name): string + { if (!$this->handle) { throw new \Error("The connection to the database has been closed"); } diff --git a/src/PqStatement.php b/src/PqStatement.php index a8e4c89..1788e44 100644 --- a/src/PqStatement.php +++ b/src/PqStatement.php @@ -3,9 +3,12 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\Operation; +use Amp\Sql\Statement; -final class PqStatement implements Statement, Operation { - /** @var \Amp\Postgres\PqHandle */ +final class PqStatement implements Statement, Operation +{ + /** @var @return PromisePqHandle */ private $handle; /** @var string */ @@ -14,7 +17,7 @@ final class PqStatement implements Statement, Operation { /** @var string */ private $sql; - /** @var \Amp\Postgres\Internal\ReferenceQueue */ + /** @var @return PromiseInternal\ReferenceQueue */ private $queue; /** @var array */ @@ -24,12 +27,13 @@ final class PqStatement implements Statement, Operation { private $lastUsedAt; /** - * @param \Amp\Postgres\PqHandle $handle + * @param @return PromisePqHandle $handle * @param string $name Statement name. * @param string $sql Original prepared SQL query. * @param string[] $params Parameter indices to parameter names. */ - public function __construct(PqHandle $handle, string $name, string $sql, array $params) { + public function __construct(PqHandle $handle, string $name, string $sql, array $params) + { $this->handle = $handle; $this->name = $name; $this->params = $params; @@ -38,34 +42,40 @@ public function __construct(PqHandle $handle, string $name, string $sql, array $ $this->lastUsedAt = \time(); } - public function __destruct() { + public function __destruct() + { $this->handle->statementDeallocate($this->name); $this->queue->unreference(); } /** {@inheritdoc} */ - public function isAlive(): bool { + public function isAlive(): bool + { return $this->handle->isAlive(); } /** {@inheritdoc} */ - public function getQuery(): string { + public function getQuery(): string + { return $this->sql; } /** {@inheritdoc} */ - public function lastUsedAt(): int { + public function lastUsedAt(): int + { return $this->lastUsedAt; } /** {@inheritdoc} */ - public function execute(array $params = []): Promise { + public function execute(array $params = []): Promise + { $this->lastUsedAt = \time(); return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params)); } /** {@inheritdoc} */ - public function onDestruct(callable $onDestruct) { + public function onDestruct(callable $onDestruct) + { $this->queue->onDestruct($onDestruct); } } diff --git a/src/PqUnbufferedResultSet.php b/src/PqUnbufferedResultSet.php index 9ffc4ce..0794ab0 100644 --- a/src/PqUnbufferedResultSet.php +++ b/src/PqUnbufferedResultSet.php @@ -6,7 +6,8 @@ use Amp\Promise; use pq; -final class PqUnbufferedResultSet implements ResultSet, Operation { +final class PqUnbufferedResultSet implements ResultSet +{ /** @var int */ private $numCols; @@ -19,14 +20,15 @@ final class PqUnbufferedResultSet implements ResultSet, Operation { /** @var int Next row fetch type. */ private $type = self::FETCH_ASSOC; - /** @var \Amp\Postgres\Internal\ReferenceQueue */ + /** @var Internal\ReferenceQueue */ private $queue; /** - * @param callable(): \Amp\Promise $fetch Function to fetch next result row. + * @param callable(): $fetch Function to fetch next result row. * @param \pq\Result $result PostgreSQL result object. */ - public function __construct(callable $fetch, pq\Result $result) { + public function __construct(callable $fetch, pq\Result $result) + { $this->numCols = $result->numCols; $this->queue = $queue = new Internal\ReferenceQueue; @@ -46,7 +48,8 @@ public function __construct(callable $fetch, pq\Result $result) { /** * {@inheritdoc} */ - public function advance(int $type = self::FETCH_ASSOC): Promise { + public function advance(int $type = self::FETCH_ASSOC): Promise + { $this->currentRow = null; $this->type = $type; @@ -56,7 +59,8 @@ public function advance(int $type = self::FETCH_ASSOC): Promise { /** * {@inheritdoc} */ - public function getCurrent() { + public function getCurrent() + { if ($this->currentRow !== null) { return $this->currentRow; } @@ -79,14 +83,16 @@ public function getCurrent() { /** * @return int Number of fields (columns) in each result set. */ - public function numFields(): int { + public function numFields(): int + { return $this->numCols; } /** * {@inheritdoc} */ - public function onDestruct(callable $onComplete) { + public function onDestruct(callable $onComplete) + { $this->queue->onDestruct($onComplete); } } diff --git a/src/QueryError.php b/src/QueryError.php deleted file mode 100644 index 5fea331..0000000 --- a/src/QueryError.php +++ /dev/null @@ -1,6 +0,0 @@ -diagnostics = $diagnostics; } - public function getDiagnostics(): array { + public function getDiagnostics(): array + { return $this->diagnostics; } } diff --git a/src/ResultSet.php b/src/ResultSet.php index ce548a4..a7ac24c 100644 --- a/src/ResultSet.php +++ b/src/ResultSet.php @@ -2,21 +2,10 @@ namespace Amp\Postgres; -use Amp\Iterator; -use Amp\Promise; - -interface ResultSet extends Iterator { - const FETCH_ARRAY = 0; - const FETCH_ASSOC = 1; - const FETCH_OBJECT = 2; - - /** - * {@inheritdoc} - * - * @param int $type Next row fetch type. Use the FETCH_* constants provided by this interface. - */ - public function advance(int $type = self::FETCH_ASSOC): Promise; +use Amp\Sql\ResultSet as SqlResultSet; +interface ResultSet extends SqlResultSet +{ /** * Returns the number of fields (columns) in each row. * diff --git a/src/Statement.php b/src/Statement.php deleted file mode 100644 index 691155a..0000000 --- a/src/Statement.php +++ /dev/null @@ -1,29 +0,0 @@ - - */ - public function execute(array $params = []): Promise; - - /** - * @return bool True if the statement can still be executed, false if the connection has died. - */ - public function isAlive(): bool; - - /** - * @return string The SQL string used to prepare the statement. - */ - public function getQuery(): string; - - /** - * @return int Timestamp of when the statement was last used. - */ - public function lastUsedAt(): int; -} diff --git a/src/TimeoutConnector.php b/src/TimeoutConnector.php index 14c6c2c..8aa98e1 100644 --- a/src/TimeoutConnector.php +++ b/src/TimeoutConnector.php @@ -3,9 +3,13 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\ConnectionConfig; +use Amp\Sql\Connector; +use Amp\Sql\FailureException; use Amp\TimeoutCancellationToken; -final class TimeoutConnector implements Connector { +final class TimeoutConnector implements Connector +{ const DEFAULT_TIMEOUT = 5000; /** @var int */ @@ -14,26 +18,28 @@ final class TimeoutConnector implements Connector { /** * @param int $timeout Milliseconds until connections attempts are cancelled. */ - public function __construct(int $timeout = self::DEFAULT_TIMEOUT) { + public function __construct(int $timeout = self::DEFAULT_TIMEOUT) + { $this->timeout = $timeout; } /** * {@inheritdoc} * - * @throws \Amp\Postgres\FailureException If connecting fails. + * @throws FailureException If connecting fails. * * @throws \Error If neither ext-pgsql or pecl-pq is loaded. */ - public function connect(string $connectionString): Promise { + public function connect(ConnectionConfig $connectionConfig): Promise + { $token = new TimeoutCancellationToken($this->timeout); if (\extension_loaded("pq")) { - return PqConnection::connect($connectionString, $token); + return PqConnection::connect($connectionConfig, $token); } if (\extension_loaded("pgsql")) { - return PgSqlConnection::connect($connectionString, $token); + return PgSqlConnection::connect($connectionConfig, $token); } throw new \Error("amphp/postgres requires either pecl-pq or ext-pgsql"); diff --git a/src/Transaction.php b/src/Transaction.php index 4656028..1e8e713 100644 --- a/src/Transaction.php +++ b/src/Transaction.php @@ -3,34 +3,33 @@ namespace Amp\Postgres; use Amp\Promise; +use Amp\Sql\Operation; +use Amp\Sql\Transaction as SqlTransaction; -final class Transaction implements Handle, Operation { - const UNCOMMITTED = 0; - const COMMITTED = 1; - const REPEATABLE = 2; - const SERIALIZABLE = 4; - - /** @var \Amp\Postgres\Handle|null */ +final class Transaction implements Handle, SqlTransaction +{ + /** @var Handle|null */ private $handle; /** @var int */ private $isolation; - /** @var \Amp\Postgres\Internal\ReferenceQueue */ + /** @var Internal\ReferenceQueue */ private $queue; /** - * @param \Amp\Postgres\Handle $handle + * @param Handle $handle * @param int $isolation * * @throws \Error If the isolation level is invalid. */ - public function __construct(Handle $handle, int $isolation = self::COMMITTED) { + public function __construct(Handle $handle, int $isolation = SqlTransaction::ISOLATION_COMMITTED) + { switch ($isolation) { - case self::UNCOMMITTED: - case self::COMMITTED: - case self::REPEATABLE: - case self::SERIALIZABLE: + case SqlTransaction::ISOLATION_UNCOMMITTED: + case SqlTransaction::ISOLATION_COMMITTED: + case SqlTransaction::ISOLATION_REPEATABLE: + case SqlTransaction::ISOLATION_SERIALIZABLE: $this->isolation = $isolation; break; @@ -42,7 +41,8 @@ public function __construct(Handle $handle, int $isolation = self::COMMITTED) { $this->queue = new Internal\ReferenceQueue; } - public function __destruct() { + public function __destruct() + { if ($this->handle) { $this->rollback(); // Invokes $this->queue->complete(). } @@ -51,7 +51,8 @@ public function __destruct() { /** * {@inheritdoc} */ - public function lastUsedAt(): int { + public function lastUsedAt(): int + { return $this->handle->lastUsedAt(); } @@ -60,7 +61,8 @@ public function lastUsedAt(): int { * * Closes and commits all changes in the transaction. */ - public function close() { + public function close() + { if ($this->handle) { $this->commit(); // Invokes $this->queue->unreference(). } @@ -69,37 +71,42 @@ public function close() { /** * {@inheritdoc} */ - public function onDestruct(callable $onComplete) { + public function onDestruct(callable $onComplete) + { $this->queue->onDestruct($onComplete); } /** * {@inheritdoc} */ - public function isAlive(): bool { - return $this->handle !== null && $this->handle->isAlive(); + public function isAlive(): bool + { + return $this->handle && $this->handle->isAlive(); } /** * @return bool True if the transaction is active, false if it has been committed or rolled back. */ - public function isActive(): bool { + public function isActive(): bool + { return $this->handle !== null; } /** * @return int */ - public function getIsolationLevel(): int { + public function getIsolationLevel(): int + { return $this->isolation; } /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function query(string $sql): Promise { + public function query(string $sql): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -123,9 +130,10 @@ public function query(string $sql): Promise { /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function prepare(string $sql): Promise { + public function prepare(string $sql): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -149,9 +157,10 @@ public function prepare(string $sql): Promise { /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function execute(string $sql, array $params = []): Promise { + public function execute(string $sql, array $params = []): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -176,9 +185,10 @@ public function execute(string $sql, array $params = []): Promise { /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function notify(string $channel, string $payload = ""): Promise { + public function notify(string $channel, string $payload = ""): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -189,11 +199,12 @@ public function notify(string $channel, string $payload = ""): Promise { /** * Commits the transaction and makes it inactive. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function commit(): Promise { + public function commit(): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -208,11 +219,12 @@ public function commit(): Promise { /** * Rolls back the transaction and makes it inactive. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function rollback(): Promise { + public function rollback(): Promise + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -229,11 +241,12 @@ public function rollback(): Promise { * * @param string $identifier Savepoint identifier. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function savepoint(string $identifier): Promise { + public function createSavepoint(string $identifier): Promise + { return $this->query("SAVEPOINT " . $this->quoteName($identifier)); } @@ -242,11 +255,12 @@ public function savepoint(string $identifier): Promise { * * @param string $identifier Savepoint identifier. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function rollbackTo(string $identifier): Promise { + public function rollbackTo(string $identifier): Promise + { return $this->query("ROLLBACK TO " . $this->quoteName($identifier)); } @@ -255,20 +269,22 @@ public function rollbackTo(string $identifier): Promise { * * @param string $identifier Savepoint identifier. * - * @return \Amp\Promise<\Amp\Postgres\CommandResult> + * @return Promise<\Amp\Sql\CommandResult> * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function release(string $identifier): Promise { + public function releaseSavepoint(string $identifier): Promise + { return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier)); } /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function quoteString(string $data): string { + public function quoteString(string $data): string + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } @@ -279,9 +295,10 @@ public function quoteString(string $data): string { /** * {@inheritdoc} * - * @throws \Amp\Postgres\TransactionError If the transaction has been committed or rolled back. + * @throws TransactionError If the transaction has been committed or rolled back. */ - public function quoteName(string $name): string { + public function quoteName(string $name): string + { if ($this->handle === null) { throw new TransactionError("The transaction has been committed or rolled back"); } diff --git a/src/TransactionError.php b/src/TransactionError.php index 7f9c8ec..b497f7a 100644 --- a/src/TransactionError.php +++ b/src/TransactionError.php @@ -2,5 +2,6 @@ namespace Amp\Postgres; -class TransactionError extends \Error { +class TransactionError extends \Error +{ } diff --git a/src/functions.php b/src/functions.php index cf052a5..41c8c31 100644 --- a/src/functions.php +++ b/src/functions.php @@ -4,10 +4,14 @@ use Amp\Loop; use Amp\Promise; +use Amp\Sql\ConnectionConfig as SqlConnectionConfig; +use Amp\Sql\Connector; +use Amp\Sql\Pool as SqlPool; const LOOP_CONNECTOR_IDENTIFIER = Connector::class; -function connector(Connector $connector = null): Connector { +function connector(Connector $connector = null): Connector +{ if ($connector === null) { $connector = Loop::getState(LOOP_CONNECTOR_IDENTIFIER); if ($connector) { @@ -24,30 +28,32 @@ function connector(Connector $connector = null): Connector { /** * Create a connection using the global Connector instance. * - * @param string $connectionString + * @param SqlConnectionConfig $config * - * @return \Amp\Promise<\Amp\Postgres\Connection> + * @return Promise * - * @throws \Amp\Postgres\FailureException If connecting fails. + * @throws \Amp\Sql\FailureException If connecting fails. * * @throws \Error If neither ext-pgsql or pecl-pq is loaded. * * @codeCoverageIgnore */ -function connect(string $connectionString): Promise { - return connector()->connect($connectionString); +function connect(SqlConnectionConfig $config): Promise +{ + return connector()->connect($config); } /** * Create a pool using the global Connector instance. * - * @param string $connectionString + * @param SqlConnectionConfig $config * @param int $maxConnections * - * @return \Amp\Postgres\Pool + * @return Pool */ -function pool(string $connectionString, int $maxConnections = Pool::DEFAULT_MAX_CONNECTIONS): Pool { - return new DefaultPool($connectionString, $maxConnections, connector()); +function pool(SqlConnectionConfig $config, int $maxConnections = SqlPool::DEFAULT_MAX_CONNECTIONS): Pool +{ + return new Pool($config, $maxConnections, connector()); } /** @@ -59,7 +65,8 @@ function pool(string $connectionString, int $maxConnections = Pool::DEFAULT_MAX_ * * @throws \Error If $value is an object without a __toString() method, a resource, or an unknown type. */ -function cast($value) { +function cast($value) +{ switch ($type = \gettype($value)) { case "NULL": case "integer": @@ -94,7 +101,8 @@ function cast($value) { * * @throws \Error If $array contains an object without a __toString() method, a resource, or an unknown type. */ -function encode(array $array): string { +function encode(array $array): string +{ $array = \array_map(function ($value) { switch (\gettype($value)) { case "NULL": diff --git a/test/AbstractConnectTest.php b/test/AbstractConnectTest.php index a80acd1..4a0a612 100644 --- a/test/AbstractConnectTest.php +++ b/test/AbstractConnectTest.php @@ -6,22 +6,29 @@ use Amp\CancellationTokenSource; use Amp\Loop; use Amp\Postgres\Connection; +use Amp\Postgres\ConnectionConfig as PostgresConnectionConfig; use Amp\Promise; +use Amp\Sql\ConnectionConfig; use Amp\TimeoutCancellationToken; use PHPUnit\Framework\TestCase; -abstract class AbstractConnectTest extends TestCase { +abstract class AbstractConnectTest extends TestCase +{ /** - * @param string $connectionString - * @param \Amp\CancellationToken|null $token + * @param ConnectionConfig $connectionConfig + * @param CancellationToken|null $token * - * @return \Amp\Promise + * @return Promise */ - abstract public function connect(string $connectionString, CancellationToken $token = null): Promise; + abstract public function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise; - public function testConnect() { + public function testConnect() + { Loop::run(function () { - $connection = yield $this->connect('host=localhost user=postgres', new TimeoutCancellationToken(100)); + $connection = yield $this->connect( + new PostgresConnectionConfig('host=localhost user=postgres'), + new TimeoutCancellationToken(100) + ); $this->assertInstanceOf(Connection::class, $connection); }); } @@ -30,23 +37,25 @@ public function testConnect() { * @depends testConnect * @expectedException \Amp\CancelledException */ - public function testConnectCancellationBeforeConnect() { + public function testConnectCancellationBeforeConnect() + { Loop::run(function () { $source = new CancellationTokenSource; $token = $source->getToken(); $source->cancel(); - $connection = yield $this->connect('host=localhost user=postgres', $token); + $connection = yield $this->connect(new PostgresConnectionConfig('host=localhost user=postgres'), $token); }); } /** * @depends testConnectCancellationBeforeConnect */ - public function testConnectCancellationAfterConnect() { + public function testConnectCancellationAfterConnect() + { Loop::run(function () { $source = new CancellationTokenSource; $token = $source->getToken(); - $connection = yield $this->connect('host=localhost user=postgres', $token); + $connection = yield $this->connect(new PostgresConnectionConfig('host=localhost user=postgres'), $token); $this->assertInstanceOf(Connection::class, $connection); $source->cancel(); }); @@ -54,31 +63,34 @@ public function testConnectCancellationAfterConnect() { /** * @depends testConnectCancellationBeforeConnect - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidUser() { + public function testConnectInvalidUser() + { Loop::run(function () { - $connection = yield $this->connect('host=localhost user=invalid', new TimeoutCancellationToken(100)); + $connection = yield $this->connect(new PostgresConnectionConfig('host=localhost user=invalid'), new TimeoutCancellationToken(100)); }); } /** * @depends testConnectCancellationBeforeConnect - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidConnectionString() { + public function testConnectInvalidConnectionString() + { Loop::run(function () { - $connection = yield $this->connect('invalid connection string', new TimeoutCancellationToken(100)); + $connection = yield $this->connect(new PostgresConnectionConfig('invalid connection string'), new TimeoutCancellationToken(100)); }); } /** * @depends testConnectCancellationBeforeConnect - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidHost() { + public function testConnectInvalidHost() + { Loop::run(function () { - $connection = yield $this->connect('hostaddr=invalid.host user=postgres', new TimeoutCancellationToken(100)); + $connection = yield $this->connect(new PostgresConnectionConfig('hostaddr=invalid.host user=postgres'), new TimeoutCancellationToken(100)); }); } } diff --git a/test/AbstractConnectionTest.php b/test/AbstractConnectionTest.php index 6b9d9d7..53032f5 100644 --- a/test/AbstractConnectionTest.php +++ b/test/AbstractConnectionTest.php @@ -2,8 +2,10 @@ namespace Amp\Postgres\Test; -abstract class AbstractConnectionTest extends AbstractLinkTest { - public function testIsAlive() { +abstract class AbstractConnectionTest extends AbstractLinkTest +{ + public function testIsAlive() + { $this->assertTrue($this->connection->isAlive()); } } diff --git a/test/AbstractLinkTest.php b/test/AbstractLinkTest.php index f05c7f6..712cd69 100644 --- a/test/AbstractLinkTest.php +++ b/test/AbstractLinkTest.php @@ -5,25 +5,29 @@ use Amp\Coroutine; use Amp\Delayed; use Amp\Loop; -use Amp\Postgres\CommandResult; use Amp\Postgres\Link; use Amp\Postgres\Listener; -use Amp\Postgres\QueryError; use Amp\Postgres\QueryExecutionError; use Amp\Postgres\ResultSet; -use Amp\Postgres\Statement; use Amp\Postgres\Transaction; use Amp\Postgres\TransactionError; +use Amp\Sql\CommandResult; +use Amp\Sql\QueryError; +use Amp\Sql\ResultSet as SqlResultSet; +use Amp\Sql\Statement; +use Amp\Sql\Transaction as SqlTransaction; use PHPUnit\Framework\TestCase; -abstract class AbstractLinkTest extends TestCase { +abstract class AbstractLinkTest extends TestCase +{ /** @var \Amp\Postgres\Connection */ protected $connection; /** * @return array Start test data for database. */ - public function getData() { + public function getData() + { return [ ['amphp', 'org'], ['github', 'com'], @@ -39,11 +43,13 @@ public function getData() { */ abstract public function createLink(string $connectionString): Link; - public function setUp() { + public function setUp() + { $this->connection = $this->createLink('host=localhost user=postgres'); } - public function testQueryWithTupleResult() { + public function testQueryWithTupleResult() + { Loop::run(function () { /** @var \Amp\Postgres\ResultSet $result */ $result = yield $this->connection->query("SELECT * FROM test"); @@ -62,7 +68,8 @@ public function testQueryWithTupleResult() { }); } - public function testQueryWithUnconsumedTupleResult() { + public function testQueryWithUnconsumedTupleResult() + { Loop::run(function () { /** @var \Amp\Postgres\ResultSet $result */ $result = yield $this->connection->query("SELECT * FROM test"); @@ -76,7 +83,7 @@ public function testQueryWithUnconsumedTupleResult() { $data = $this->getData(); - for ($i = 0; yield $result->advance(ResultSet::FETCH_OBJECT); ++$i) { + for ($i = 0; yield $result->advance(SqlResultSet::FETCH_OBJECT); ++$i) { $row = $result->getCurrent(); $this->assertSame($data[$i][0], $row->domain); $this->assertSame($data[$i][1], $row->tld); @@ -84,9 +91,10 @@ public function testQueryWithUnconsumedTupleResult() { }); } - public function testQueryWithCommandResult() { + public function testQueryWithCommandResult() + { Loop::run(function () { - /** @var \Amp\Postgres\CommandResult $result */ + /** @var CommandResult $result */ $result = yield $this->connection->query("INSERT INTO test VALUES ('canon', 'jp')"); $this->assertInstanceOf(CommandResult::class, $result); @@ -95,18 +103,20 @@ public function testQueryWithCommandResult() { } /** - * @expectedException \Amp\Postgres\QueryError + * @expectedException \Amp\Sql\QueryError */ - public function testQueryWithEmptyQuery() { + public function testQueryWithEmptyQuery() + { Loop::run(function () { - /** @var \Amp\Postgres\CommandResult $result */ + /** @var \Amp\Sql\CommandResult $result */ $result = yield $this->connection->query(''); }); } - public function testQueryWithSyntaxError() { + public function testQueryWithSyntaxError() + { Loop::run(function () { - /** @var \Amp\Postgres\CommandResult $result */ + /** @var \Amp\Sql\CommandResult $result */ try { $result = yield $this->connection->query("SELECT & FROM test"); $this->fail(\sprintf("An instance of %s was expected to be thrown", QueryExecutionError::class)); @@ -117,11 +127,12 @@ public function testQueryWithSyntaxError() { }); } - public function testPrepare() { + public function testPrepare() + { Loop::run(function () { $query = "SELECT * FROM test WHERE domain=\$1"; - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare($query); $this->assertSame($query, $statement->getQuery()); @@ -135,7 +146,7 @@ public function testPrepare() { $this->assertSame(2, $result->numFields()); - while (yield $result->advance(ResultSet::FETCH_ARRAY)) { + while (yield $result->advance(SqlResultSet::FETCH_ARRAY)) { $row = $result->getCurrent(); $this->assertSame($data[0], $row[0]); $this->assertSame($data[1], $row[1]); @@ -146,11 +157,12 @@ public function testPrepare() { /** * @depends testPrepare */ - public function testPrepareWithNamedParams() { + public function testPrepareWithNamedParams() + { Loop::run(function () { $query = "SELECT * FROM test WHERE domain=:domain AND tld=:tld"; - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare($query); $data = $this->getData()[0]; @@ -164,7 +176,7 @@ public function testPrepareWithNamedParams() { $this->assertSame(2, $result->numFields()); - while (yield $result->advance(ResultSet::FETCH_ARRAY)) { + while (yield $result->advance(SqlResultSet::FETCH_ARRAY)) { $row = $result->getCurrent(); $this->assertSame($data[0], $row[0]); $this->assertSame($data[1], $row[1]); @@ -175,11 +187,12 @@ public function testPrepareWithNamedParams() { /** * @depends testPrepare */ - public function testPrepareWithUnnamedParams() { + public function testPrepareWithUnnamedParams() + { Loop::run(function () { $query = "SELECT * FROM test WHERE domain=? AND tld=?"; - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare($query); $data = $this->getData()[0]; @@ -193,7 +206,7 @@ public function testPrepareWithUnnamedParams() { $this->assertSame(2, $result->numFields()); - while (yield $result->advance(ResultSet::FETCH_ARRAY)) { + while (yield $result->advance(SqlResultSet::FETCH_ARRAY)) { $row = $result->getCurrent(); $this->assertSame($data[0], $row[0]); $this->assertSame($data[1], $row[1]); @@ -204,11 +217,12 @@ public function testPrepareWithUnnamedParams() { /** * @depends testPrepare */ - public function testPrepareWithNamedParamsWithDataAppearingAsNamedParam() { + public function testPrepareWithNamedParamsWithDataAppearingAsNamedParam() + { Loop::run(function () { $query = "SELECT * FROM test WHERE domain=:domain OR domain=':domain'"; - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare($query); $data = $this->getData()[0]; @@ -222,7 +236,7 @@ public function testPrepareWithNamedParamsWithDataAppearingAsNamedParam() { $this->assertSame(2, $result->numFields()); - while (yield $result->advance(ResultSet::FETCH_ARRAY)) { + while (yield $result->advance(SqlResultSet::FETCH_ARRAY)) { $row = $result->getCurrent(); $this->assertSame($data[0], $row[0]); $this->assertSame($data[1], $row[1]); @@ -235,11 +249,12 @@ public function testPrepareWithNamedParamsWithDataAppearingAsNamedParam() { * @expectedException \Amp\Postgres\QueryExecutionError * @expectedExceptionMessage column "invalid" does not exist */ - public function testPrepareInvalidQuery() { + public function testPrepareInvalidQuery() + { Loop::run(function () { $query = "SELECT * FROM test WHERE invalid=\$1"; - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare($query); }); } @@ -247,14 +262,15 @@ public function testPrepareInvalidQuery() { /** * @depends testPrepare */ - public function testPrepareSameQuery() { + public function testPrepareSameQuery() + { Loop::run(function () { $sql = "SELECT * FROM test WHERE domain=\$1"; - /** @var \Amp\Postgres\Statement $statement1 */ + /** @var Statement $statement1 */ $statement1 = yield $this->connection->prepare($sql); - /** @var \Amp\Postgres\Statement $statement2 */ + /** @var Statement $statement2 */ $statement2 = yield $this->connection->prepare($sql); $this->assertInstanceOf(Statement::class, $statement1); @@ -282,7 +298,8 @@ public function testPrepareSameQuery() { /** * @depends testPrepareSameQuery */ - public function testSimultaneousPrepareSameQuery() { + public function testSimultaneousPrepareSameQuery() + { Loop::run(function () { $sql = "SELECT * FROM test WHERE domain=\$1"; @@ -290,8 +307,8 @@ public function testSimultaneousPrepareSameQuery() { $statement2 = $this->connection->prepare($sql); /** - * @var \Amp\Postgres\Statement $statement1 - * @var \Amp\Postgres\Statement $statement2 + * @var Statement $statement1 + * @var Statement $statement2 */ list($statement1, $statement2) = yield [$statement1, $statement2]; @@ -330,9 +347,10 @@ public function testSimultaneousPrepareSameQuery() { }); } - public function testPrepareThenExecuteWithUnconsumedTupleResult() { + public function testPrepareThenExecuteWithUnconsumedTupleResult() + { Loop::run(function () { - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare("SELECT * FROM test"); /** @var \Amp\Postgres\ResultSet $result */ @@ -347,7 +365,7 @@ public function testPrepareThenExecuteWithUnconsumedTupleResult() { $data = $this->getData(); - for ($i = 0; yield $result->advance(ResultSet::FETCH_OBJECT); ++$i) { + for ($i = 0; yield $result->advance(SqlResultSet::FETCH_OBJECT); ++$i) { $row = $result->getCurrent(); $this->assertSame($data[$i][0], $row->domain); $this->assertSame($data[$i][1], $row->tld); @@ -355,7 +373,8 @@ public function testPrepareThenExecuteWithUnconsumedTupleResult() { }); } - public function testExecute() { + public function testExecute() + { Loop::run(function () { $data = $this->getData()[0]; @@ -377,7 +396,8 @@ public function testExecute() { /** * @depends testExecute */ - public function testExecuteWithNamedParams() { + public function testExecuteWithNamedParams() + { Loop::run(function () { $data = $this->getData()[0]; @@ -403,7 +423,8 @@ public function testExecuteWithNamedParams() { * @expectedException \Error * @expectedExceptionMessage Value for unnamed parameter at position 0 missing */ - public function testExecuteWithInvalidParams() { + public function testExecuteWithInvalidParams() + { Loop::run(function () { $result = yield $this->connection->execute("SELECT * FROM test WHERE domain=\$1"); }); @@ -414,7 +435,8 @@ public function testExecuteWithInvalidParams() { * @expectedException \Error * @expectedExceptionMessage Value for named parameter 'domain' missing */ - public function testExecuteWithInvalidNamedParams() { + public function testExecuteWithInvalidNamedParams() + { Loop::run(function () { $result = yield $this->connection->execute("SELECT * FROM test WHERE domain=:domain", ['tld' => 'com']); }); @@ -423,7 +445,8 @@ public function testExecuteWithInvalidNamedParams() { /** * @depends testQueryWithTupleResult */ - public function testSimultaneousQuery() { + public function testSimultaneousQuery() + { $callback = \Amp\coroutine(function ($value) { /** @var \Amp\Postgres\ResultSet $result */ $result = yield $this->connection->query("SELECT {$value} as value"); @@ -446,7 +469,8 @@ public function testSimultaneousQuery() { /** * @depends testSimultaneousQuery */ - public function testSimultaneousQueryWithOneFailing() { + public function testSimultaneousQueryWithOneFailing() + { $callback = \Amp\coroutine(function ($query) { /** @var \Amp\Postgres\ResultSet $result */ $result = yield $this->connection->query($query); @@ -478,7 +502,8 @@ public function testSimultaneousQueryWithOneFailing() { $this->fail(\sprintf("Test did not throw an instance of %s", QueryError::class)); } - public function testSimultaneousQueryAndPrepare() { + public function testSimultaneousQueryAndPrepare() + { $promises = []; $promises[] = new Coroutine((function () { /** @var \Amp\Postgres\ResultSet $result */ @@ -494,7 +519,7 @@ public function testSimultaneousQueryAndPrepare() { })()); $promises[] = new Coroutine((function () { - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = (yield $this->connection->prepare("SELECT * FROM test")); /** @var \Amp\Postgres\ResultSet $result */ @@ -514,9 +539,10 @@ public function testSimultaneousQueryAndPrepare() { }); } - public function testSimultaneousPrepareAndExecute() { + public function testSimultaneousPrepareAndExecute() + { $promises[] = new Coroutine((function () { - /** @var \Amp\Postgres\Statement $statement */ + /** @var Statement $statement */ $statement = yield $this->connection->prepare("SELECT * FROM test"); /** @var \Amp\Postgres\ResultSet $result */ @@ -549,9 +575,10 @@ public function testSimultaneousPrepareAndExecute() { }); } - public function testTransaction() { + public function testTransaction() + { Loop::run(function () { - $isolation = Transaction::COMMITTED; + $isolation = SqlTransaction::ISOLATION_COMMITTED; /** @var \Amp\Postgres\Transaction $transaction */ $transaction = yield $this->connection->transaction($isolation); @@ -564,7 +591,7 @@ public function testTransaction() { $this->assertTrue($transaction->isActive()); $this->assertSame($isolation, $transaction->getIsolationLevel()); - yield $transaction->savepoint('test'); + yield $transaction->createSavepoint('test'); $result = yield $transaction->execute("SELECT * FROM test WHERE domain=\$1 FOR UPDATE", [$data[0]]); @@ -584,7 +611,8 @@ public function testTransaction() { }); } - public function testListen() { + public function testListen() + { Loop::run(function () { $channel = "test"; /** @var \Amp\Postgres\Listener $listener */ @@ -614,7 +642,8 @@ public function testListen() { /** * @depends testListen */ - public function testNotify() { + public function testNotify() + { Loop::run(function () { $channel = "test"; /** @var \Amp\Postgres\Listener $listener */ @@ -640,10 +669,11 @@ public function testNotify() { /** * @depends testListen - * @expectedException \Amp\Postgres\QueryError + * @expectedException \Amp\Sql\QueryError * @expectedExceptionMessage Already listening on channel */ - public function testListenOnSameChannel() { + public function testListenOnSameChannel() + { Loop::run(function () { $channel = "test"; $listener = yield $this->connection->listen($channel); diff --git a/test/ArrayParserTest.php b/test/ArrayParserTest.php index 722e471..3850b38 100644 --- a/test/ArrayParserTest.php +++ b/test/ArrayParserTest.php @@ -5,64 +5,74 @@ use Amp\PHPUnit\TestCase; use Amp\Postgres\Internal\ArrayParser; -class ArrayParserTest extends TestCase { +class ArrayParserTest extends TestCase +{ /** @var \Amp\Postgres\Internal\ArrayParser */ private $parser; - public function setUp() { + public function setUp() + { $this->parser = new ArrayParser; } - public function testSingleDimensionalArray() { + public function testSingleDimensionalArray() + { $array = ["one", "two", "three"]; $string = '{' . \implode(',', $array) . '}'; $this->assertSame($array, $this->parser->parse($string)); } - public function testMultiDimensionalArray() { + public function testMultiDimensionalArray() + { $array = ["one", "two", ["three", "four"], "five"]; $string = '{one, two, {three, four}, five}'; $this->assertSame($array, $this->parser->parse($string)); } - public function testQuotedStrings() { + public function testQuotedStrings() + { $array = ["one", "two", ["three", "four"], "five"]; $string = '{"one", "two", {"three", "four"}, "five"}'; $this->assertSame($array, $this->parser->parse($string)); } - public function testAlternateDelimiter() { + public function testAlternateDelimiter() + { $array = ["1,2,3", "3,4,5"]; $string = '{1,2,3;3,4,5}'; $this->assertSame($array, $this->parser->parse($string, null, ';')); } - public function testEscapedQuoteDelimiter() { + public function testEscapedQuoteDelimiter() + { $array = ['va"lue1', 'value"2']; $string = '{"va\\"lue1", "value\\"2"}'; $this->assertSame($array, $this->parser->parse($string, null, ',')); } - public function testNullValue() { + public function testNullValue() + { $array = ["one", null, "three"]; $string = '{one, NULL, three}'; $this->assertSame($array, $this->parser->parse($string)); } - public function testQuotedNullValue() { + public function testQuotedNullValue() + { $array = ["one", "NULL", "three"]; $string = '{one, "NULL", three}'; $this->assertSame($array, $this->parser->parse($string)); } - public function testCast() { + public function testCast() + { $array = [1, 2, 3]; $string = '{' . \implode(',', $array) . '}'; @@ -73,7 +83,8 @@ public function testCast() { $this->assertSame($array, $this->parser->parse($string, $cast)); } - public function testCastWithNull() { + public function testCastWithNull() + { $array = [1, 2, null, 3]; $string = '{1,2,NULL,3}'; @@ -84,7 +95,8 @@ public function testCastWithNull() { $this->assertSame($array, $this->parser->parse($string, $cast)); } - public function testCastWithMultidimensionalArray() { + public function testCastWithMultidimensionalArray() + { $array = [1, 2, [3, 4], [5], 6, 7, [[8, 9], 10]]; $string = '{1,2,{3,4},{5},6,7,{{8,9},10}}'; @@ -95,7 +107,8 @@ public function testCastWithMultidimensionalArray() { $this->assertSame($array, $this->parser->parse($string, $cast)); } - public function testRandomWhitespace() { + public function testRandomWhitespace() + { $array = [1, 2, [3, 4], [5], 6, 7, [[8, 9], 10]]; $string = " {1, 2, { 3 ,\r 4 },{ 5} \n\t ,6 , 7, { {8,\t 9}, 10} } \n"; @@ -106,7 +119,8 @@ public function testRandomWhitespace() { $this->assertSame($array, $this->parser->parse($string, $cast)); } - public function testEscapedBackslashesInQuotedValue() { + public function testEscapedBackslashesInQuotedValue() + { $array = ["test\\ing", "esca\\ped\\"]; $string = '{"test\\\\ing", "esca\\\\ped\\\\"}'; @@ -117,7 +131,8 @@ public function testEscapedBackslashesInQuotedValue() { * @expectedException \Amp\Postgres\ParseException * @expectedExceptionMessage Missing opening or closing brackets */ - public function testNoClosingBracket() { + public function testNoClosingBracket() + { $string = '{"one", "two"'; $this->parser->parse($string); } @@ -126,7 +141,8 @@ public function testNoClosingBracket() { * @expectedException \Amp\Postgres\ParseException * @expectedExceptionMessage Data left in buffer after parsing */ - public function testTrailingData() { + public function testTrailingData() + { $string = '{"one", "two"} data}'; $this->parser->parse($string); } @@ -135,7 +151,8 @@ public function testTrailingData() { * @expectedException \Amp\Postgres\ParseException * @expectedExceptionMessage Could not find matching quote in quoted value */ - public function testMissingQuote() { + public function testMissingQuote() + { $string = '{"one", "two}'; $this->parser->parse($string); } @@ -144,7 +161,8 @@ public function testMissingQuote() { * @expectedException \Amp\Postgres\ParseException * @expectedExceptionMessage Invalid delimiter */ - public function testInvalidDelimiter() { + public function testInvalidDelimiter() + { $string = '{"one"; "two"}'; $this->parser->parse($string); } diff --git a/test/PoolTest.php b/test/DefaultPoolTest.php similarity index 75% rename from test/PoolTest.php rename to test/DefaultPoolTest.php index 3c8cd74..f295d96 100644 --- a/test/PoolTest.php +++ b/test/DefaultPoolTest.php @@ -4,21 +4,25 @@ use Amp\Delayed; use Amp\Loop; -use Amp\Postgres\DefaultPool; +use Amp\Postgres\ConnectionConfig; +use Amp\Postgres\Pool; use PHPUnit\Framework\TestCase; -class PoolTest extends TestCase { +class DefaultPoolTest extends TestCase +{ /** * @expectedException \Error * @expectedExceptionMessage Pool must contain at least one connection */ - public function testInvalidMaxConnections() { - $pool = new DefaultPool('connection string', 0); + public function testInvalidMaxConnections() + { + new Pool(new ConnectionConfig('connection string'), 0); } - public function testIdleConnectionsRemovedAfterTimeout() { + public function testIdleConnectionsRemovedAfterTimeout() + { Loop::run(function () { - $pool = new DefaultPool('host=localhost user=postgres'); + $pool = new Pool(new ConnectionConfig('host=localhost user=postgres')); $pool->setIdleTimeout(2); $count = 3; diff --git a/test/EncodeTest.php b/test/EncodeTest.php index 9303e2b..1c303a6 100644 --- a/test/EncodeTest.php +++ b/test/EncodeTest.php @@ -5,64 +5,74 @@ use Amp\PHPUnit\TestCase; use function Amp\Postgres\encode; -class EncodeTest extends TestCase { - public function testSingleDimensionalStringArray() { +class EncodeTest extends TestCase +{ + public function testSingleDimensionalStringArray() + { $array = ["one", "two", "three"]; $string = '{"one","two","three"}'; $this->assertSame($string, encode($array)); } - public function testMultiDimensionalStringArray() { + public function testMultiDimensionalStringArray() + { $array = ["one", "two", ["three", "four"], "five"]; $string = '{"one","two",{"three","four"},"five"}'; $this->assertSame($string, encode($array)); } - public function testQuotedStrings() { + public function testQuotedStrings() + { $array = ["one", "two", ["three", "four"], "five"]; $string = '{"one","two",{"three","four"},"five"}'; $this->assertSame($string, encode($array)); } - public function testEscapedQuoteDelimiter() { + public function testEscapedQuoteDelimiter() + { $array = ['va"lue1', 'value"2']; $string = '{"va\\"lue1","value\\"2"}'; $this->assertSame($string, encode($array)); } - public function testNullValue() { + public function testNullValue() + { $array = ["one", null, "three"]; $string = '{"one",NULL,"three"}'; $this->assertSame($string, encode($array)); } - public function testSingleDimensionalIntegerArray() { + public function testSingleDimensionalIntegerArray() + { $array = [1, 2, 3]; $string = '{' . \implode(',', $array) . '}'; $this->assertSame($string, encode($array)); } - public function testIntegerArrayWithNull() { + public function testIntegerArrayWithNull() + { $array = [1, 2, null, 3]; $string = '{1,2,NULL,3}'; $this->assertSame($string, encode($array)); } - public function testMultidimensionalIntegerArray() { + public function testMultidimensionalIntegerArray() + { $array = [1, 2, [3, 4], [5], 6, 7, [[8, 9], 10]]; $string = '{1,2,{3,4},{5},6,7,{{8,9},10}}'; $this->assertSame($string, encode($array)); } - public function testEscapedBackslashesInQuotedValue() { + public function testEscapedBackslashesInQuotedValue() + { $array = ["test\\ing", "esca\\ped\\"]; $string = '{"test\\\\ing","esca\\\\ped\\\\"}'; @@ -73,7 +83,8 @@ public function testEscapedBackslashesInQuotedValue() { * @expectedException \Error * @expectedExceptionMessage Object without a __toString() method in array */ - public function testObjectWithoutToStringMethod() { + public function testObjectWithoutToStringMethod() + { encode([new \stdClass]); } } diff --git a/test/FunctionsTest.php b/test/FunctionsTest.php index 494e776..7a91dfa 100644 --- a/test/FunctionsTest.php +++ b/test/FunctionsTest.php @@ -4,47 +4,54 @@ use Amp\Loop; use Amp\Postgres\Connection; +use Amp\Postgres\ConnectionConfig; use PHPUnit\Framework\TestCase; use function Amp\Postgres\connect; -class FunctionsTest extends TestCase { - public function setUp() { +class FunctionsTest extends TestCase +{ + public function setUp() + { if (!\extension_loaded('pgsql') && !\extension_loaded('pq')) { $this->markTestSkipped('This test requires either ext/pgsql or pecl/pq'); } } - public function testConnect() { + public function testConnect() + { Loop::run(function () { - $connection = yield connect('host=localhost user=postgres'); + $connection = yield connect(new ConnectionConfig('host=localhost user=postgres')); $this->assertInstanceOf(Connection::class, $connection); }); } /** - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidUser() { + public function testConnectInvalidUser() + { Loop::run(function () { - $connection = yield connect('host=localhost user=invalid'); + $connection = yield connect(new ConnectionConfig('host=localhost user=invalid')); }); } /** - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidConnectionString() { + public function testConnectInvalidConnectionString() + { Loop::run(function () { - $connection = yield connect('invalid connection string'); + $connection = yield connect(new ConnectionConfig('invalid connection string')); }); } /** - * @expectedException \Amp\Postgres\FailureException + * @expectedException \Amp\Sql\FailureException */ - public function testConnectInvalidHost() { + public function testConnectInvalidHost() + { Loop::run(function () { - $connection = yield connect('hostaddr=invalid.host user=postgres'); + $connection = yield connect(new ConnectionConfig('hostaddr=invalid.host user=postgres')); }); } } diff --git a/test/PgSqlConnectTest.php b/test/PgSqlConnectTest.php index d21e215..96a09f0 100644 --- a/test/PgSqlConnectTest.php +++ b/test/PgSqlConnectTest.php @@ -5,12 +5,15 @@ use Amp\CancellationToken; use Amp\Postgres\PgSqlConnection; use Amp\Promise; +use Amp\Sql\ConnectionConfig; /** * @requires extension pgsql */ -class PgSqlConnectTest extends AbstractConnectTest { - public function connect(string $connectionString, CancellationToken $token = null): Promise { - return PgSqlConnection::connect($connectionString, $token); +class PgSqlConnectTest extends AbstractConnectTest +{ + public function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise + { + return PgSqlConnection::connect($connectionConfig, $token); } } diff --git a/test/PgSqlConnectionTest.php b/test/PgSqlConnectionTest.php index 70b4b95..eaf90a4 100644 --- a/test/PgSqlConnectionTest.php +++ b/test/PgSqlConnectionTest.php @@ -8,11 +8,13 @@ /** * @requires extension pgsql */ -class PgSqlConnectionTest extends AbstractConnectionTest { +class PgSqlConnectionTest extends AbstractConnectionTest +{ /** @var resource PostgreSQL connection resource. */ protected $handle; - public function createLink(string $connectionString): Link { + public function createLink(string $connectionString): Link + { $this->handle = \pg_connect($connectionString); $socket = \pg_socket($this->handle); @@ -35,7 +37,8 @@ public function createLink(string $connectionString): Link { return new PgSqlConnection($this->handle, $socket); } - public function tearDown() { + public function tearDown() + { \pg_get_result($this->handle); // Consume any leftover results from test. \pg_query($this->handle, "ROLLBACK"); \pg_query($this->handle, "DROP TABLE test"); diff --git a/test/PgSqlPoolTest.php b/test/PgSqlPoolTest.php index ff32cef..ada0903 100644 --- a/test/PgSqlPoolTest.php +++ b/test/PgSqlPoolTest.php @@ -2,23 +2,26 @@ namespace Amp\Postgres\Test; -use Amp\Postgres\Connector; +use Amp\Postgres\ConnectionConfig; use Amp\Postgres\Link; use Amp\Postgres\PgSqlConnection; -use Amp\Postgres\DefaultPool; +use Amp\Postgres\Pool; use Amp\Promise; +use Amp\Sql\Connector; use Amp\Success; /** * @requires extension pgsql */ -class PgSqlPoolTest extends AbstractLinkTest { +class PgSqlPoolTest extends AbstractLinkTest +{ const POOL_SIZE = 3; /** @var resource[] PostgreSQL connection resources. */ protected $handles = []; - public function createLink(string $connectionString): Link { + public function createLink(string $connectionString): Link + { for ($i = 0; $i < self::POOL_SIZE; ++$i) { $this->handles[] = \pg_connect($connectionString, \PGSQL_CONNECT_FORCE_NEW); } @@ -35,7 +38,7 @@ public function createLink(string $connectionString): Link { return new Success(new PgSqlConnection($handle, \pg_socket($handle))); })); - $pool = new DefaultPool('connection string', \count($this->handles), $connector); + $pool = new Pool(new ConnectionConfig('connection string'), \count($this->handles), $connector); $handle = \reset($this->handles); @@ -58,7 +61,8 @@ public function createLink(string $connectionString): Link { return $pool; } - public function tearDown() { + public function tearDown() + { foreach ($this->handles as $handle) { \pg_get_result($handle); // Consume any leftover results from test. } diff --git a/test/PooledStatementTest.php b/test/PooledStatementTest.php index e2a52b2..0a81ceb 100644 --- a/test/PooledStatementTest.php +++ b/test/PooledStatementTest.php @@ -5,16 +5,19 @@ use Amp\Delayed; use Amp\Loop; use Amp\PHPUnit\TestCase; -use Amp\Postgres\Internal\PooledStatement; -use Amp\Postgres\DefaultPool; +use Amp\Postgres\ConnectionConfig; +use Amp\Postgres\Pool; use Amp\Postgres\ResultSet; -use Amp\Postgres\Statement; +use Amp\Sql\PooledStatement; +use Amp\Sql\Statement; use Amp\Success; -class PooledStatementTest extends TestCase { - public function testActiveStatementsRemainAfterTimeout() { +class PooledStatementTest extends TestCase +{ + public function testActiveStatementsRemainAfterTimeout() + { Loop::run(function () { - $pool = new DefaultPool('host=localhost user=postgres'); + $pool = new Pool(new ConnectionConfig('host=localhost user=postgres')); $statement = $this->createMock(Statement::class); $statement->method('getQuery') @@ -38,9 +41,10 @@ public function testActiveStatementsRemainAfterTimeout() { }); } - public function testIdleStatementsRemovedAfterTimeout() { + public function testIdleStatementsRemovedAfterTimeout() + { Loop::run(function () { - $pool = new DefaultPool('host=localhost user=postgres'); + $pool = new Pool(new ConnectionConfig('host=localhost user=postgres')); $statement = $this->createMock(Statement::class); $statement->method('getQuery') diff --git a/test/PqConnectTest.php b/test/PqConnectTest.php index 03760df..5349284 100644 --- a/test/PqConnectTest.php +++ b/test/PqConnectTest.php @@ -5,12 +5,15 @@ use Amp\CancellationToken; use Amp\Postgres\PqConnection; use Amp\Promise; +use Amp\Sql\ConnectionConfig; /** * @requires extension pq */ -class PqConnectTest extends AbstractConnectTest { - public function connect(string $connectionString, CancellationToken $token = null): Promise { - return PqConnection::connect($connectionString, $token); +class PqConnectTest extends AbstractConnectTest +{ + public function connect(ConnectionConfig $connectionConfig, CancellationToken $token = null): Promise + { + return PqConnection::connect($connectionConfig, $token); } } diff --git a/test/PqConnectionTest.php b/test/PqConnectionTest.php index b86a616..8c5273c 100644 --- a/test/PqConnectionTest.php +++ b/test/PqConnectionTest.php @@ -8,11 +8,13 @@ /** * @requires extension pq */ -class PqConnectionTest extends AbstractConnectionTest { +class PqConnectionTest extends AbstractConnectionTest +{ /** @var resource PostgreSQL connection resource. */ protected $handle; - public function createLink(string $connectionString): Link { + public function createLink(string $connectionString): Link + { $this->handle = new \pq\Connection($connectionString); $this->handle->nonblocking = true; $this->handle->unbuffered = true; @@ -36,7 +38,8 @@ public function createLink(string $connectionString): Link { return new PqConnection($this->handle); } - public function tearDown() { + public function tearDown() + { $this->handle->exec("ROLLBACK"); $this->handle->exec("DROP TABLE test"); } diff --git a/test/PqPoolTest.php b/test/PqPoolTest.php index 4c600c8..ffd853b 100644 --- a/test/PqPoolTest.php +++ b/test/PqPoolTest.php @@ -2,23 +2,26 @@ namespace Amp\Postgres\Test; -use Amp\Postgres\Connector; +use Amp\Postgres\ConnectionConfig; use Amp\Postgres\Link; -use Amp\Postgres\DefaultPool; +use Amp\Postgres\Pool; use Amp\Postgres\PqConnection; use Amp\Promise; +use Amp\Sql\Connector; use Amp\Success; /** * @requires extension pq */ -class PqPoolTest extends AbstractLinkTest { +class PqPoolTest extends AbstractLinkTest +{ const POOL_SIZE = 3; /** @var \pq\Connection[] */ protected $handles = []; - public function createLink(string $connectionString): Link { + public function createLink(string $connectionString): Link + { for ($i = 0; $i < self::POOL_SIZE; ++$i) { $this->handles[] = $handle = new \pq\Connection($connectionString); $handle->nonblocking = true; @@ -37,7 +40,7 @@ public function createLink(string $connectionString): Link { return new Success(new PqConnection($handle)); })); - $pool = new DefaultPool('connection string', \count($this->handles), $connector); + $pool = new Pool(new ConnectionConfig('connection string'), \count($this->handles), $connector); $handle = \reset($this->handles); @@ -60,7 +63,8 @@ public function createLink(string $connectionString): Link { return $pool; } - public function tearDown() { + public function tearDown() + { $this->handles[0]->exec("ROLLBACK"); $this->handles[0]->exec("DROP TABLE test"); } diff --git a/travis/install-pq.sh b/travis/install-pq.sh index 152d319..54e0a49 100755 --- a/travis/install-pq.sh +++ b/travis/install-pq.sh @@ -8,3 +8,4 @@ make; make install; popd; echo "extension=pq.so" >> "$(php -r 'echo php_ini_loaded_file();')"; +rm -rf ext-pq diff --git a/travis/install-raphf.sh b/travis/install-raphf.sh index 8347f04..222f34c 100755 --- a/travis/install-raphf.sh +++ b/travis/install-raphf.sh @@ -8,3 +8,4 @@ make; make install; popd; echo "extension=raphf.so" >> "$(php -r 'echo php_ini_loaded_file();')"; +rm -rf ext-raphf