Skip to content

Commit

Permalink
Update pool connection creation
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 10, 2017
1 parent 2d9e97a commit ddb3b56
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 16 deletions.
4 changes: 2 additions & 2 deletions lib/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ private function send(string $methodName, ...$args): Promise {
yield $this->busy->promise();
}

return yield $this->handle->{$methodName}(...$args);
return yield ([$this->handle, $methodName])(...$args);
});
}

return $this->handle->{$methodName}(...$args);
return ([$this->handle, $methodName])(...$args);
}

/**
Expand Down
21 changes: 13 additions & 8 deletions lib/AbstractPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ abstract class AbstractPool implements Pool {
/** @var callable */
private $push;

/** @var int */
private $pending = 0;

/**
* @return \Amp\Promise<\Amp\Postgres\Connection>
*
Expand Down Expand Up @@ -72,10 +75,16 @@ public function getIdleConnectionCount(): int {

/**
* @param \Amp\Postgres\Connection $connection
*
* @throws \Error if the connection is already part of this pool or if the connection is dead.
*/
protected function addConnection(Connection $connection) {
if (isset($this->connections[$connection])) {
return;
throw new \Error("Connection is already a part of this pool");
}

if (!$connection->isAlive()) {
throw new \Error("The connection is dead");
}

$this->connections->attach($connection);
Expand All @@ -92,17 +101,13 @@ protected function addConnection(Connection $connection) {
* @resolve \Amp\Postgres\Connection
*/
private function pop(): \Generator {
while ($this->promise !== null) {
try {
yield $this->promise; // Prevent simultaneous connection creation.
} catch (\Throwable $exception) {
// Ignore failure or cancellation of other operations.
}
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) {
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
}

while ($this->idle->isEmpty()) { // While loop to ensure an idle connection is available after promises below are resolved.
try {
if ($this->connections->count() >= $this->getMaxConnections()) {
if ($this->connections->count() + $this->pending >= $this->getMaxConnections()) {
// All possible connections busy, so wait until one becomes available.
$this->deferred = new Deferred;
yield $this->promise = $this->deferred->promise(); // May be resolved with defunct connection.
Expand Down
7 changes: 1 addition & 6 deletions test/AbstractPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ abstract protected function createPool(array $connections): Pool;
*/
protected function createConnection(): Connection {
$mock = $this->createMock(Connection::class);
$mock->method('isAlive')
->willReturnCallback(static function () {
static $count = 0;
return $count++ < 3; // Force defunct connection after 3 operations.
});

$mock->method('isAlive')->willReturn(true);
return $mock;
}

Expand Down
23 changes: 23 additions & 0 deletions test/AggregatePoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Postgres\Test;

use Amp\Postgres\AggregatePool;
use Amp\Postgres\Connection;
use Amp\Postgres\Pool;

class AggregatePoolTest extends AbstractPoolTest {
Expand Down Expand Up @@ -47,4 +48,26 @@ public function testGetIdleConnectionCount() {
$promise = $pool->query("SELECT 1");
$this->assertSame(1, $pool->getIdleConnectionCount());
}

/**
* @expectedException \Error
* @expectedExceptionMessage Connection is already a part of this pool
*/
public function testDoubleAddConnection() {
$pool = $this->createPool([]);
$connection = $this->createConnection();
$pool->addConnection($connection);
$pool->addConnection($connection);
}

/**
* @expectedException \Error
* @expectedExceptionMessage The connection is dead
*/
public function testAddDeadConnection() {
$pool = $this->createPool([]);
$connection = $this->createMock(Connection::class);
$connection->method('isAlive')->willReturn(false);
$pool->addConnection($connection);
}
}

0 comments on commit ddb3b56

Please sign in to comment.