Skip to content

Commit

Permalink
Ensure connections closed by the server are not used
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 11, 2018
1 parent 43d3111 commit f077c40
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 20 deletions.
46 changes: 29 additions & 17 deletions lib/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ public function __construct(Internal\ConnectionConfig $config, int $maxConnectio

public function close() {
$this->closed = true;
foreach ($this->connections as $connection) {
while (!$this->idle->isEmpty()) {
$connection = $this->idle->shift();
$connection->close();
$this->connections->detach($connection);
}
$this->idle = new \SplQueue;
$this->connections = new \SplObjectStorage;
}

/**
Expand Down Expand Up @@ -174,17 +174,17 @@ private function pop(): \Generator {
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
}

while ($this->idle->isEmpty()) { // While loop to ensure an idle connection is available after promises below are resolved.
if ($this->connections->count() + $this->pending >= $this->getMaxConnections()) {
// All possible connections busy, so wait until one becomes available.
try {
$this->deferred = new Deferred;
yield $this->promise = $this->deferred->promise(); // May be resolved with defunct connection.
} finally {
$this->deferred = null;
$this->promise = null;
}
} else {
while (!$this->idle->isEmpty()) {
$connection = $this->idle->shift();
if ($connection->isAlive()) {
return $connection;
}

$this->connections->detach($connection);
}

do { // While loop to ensure an idle connection is available after promises below are resolved.
if ($this->connections->count() + $this->pending < $this->getMaxConnections()) {
// Max connection count has not been reached, so open another connection.
++$this->pending;
try {
Expand All @@ -203,7 +203,17 @@ private function pop(): \Generator {
$this->connections->attach($connection);
return $connection;
}
}

// All possible connections busy, so wait until one becomes available.
try {
$this->deferred = new Deferred;
// May be resolved with defunct connection, but that connection will not be added to $this->idle.
yield $this->promise = $this->deferred->promise();
} finally {
$this->deferred = null;
$this->promise = null;
}
} while ($this->idle->isEmpty());

// Shift a connection off the idle queue.
return $this->idle->shift();
Expand All @@ -215,12 +225,14 @@ private function pop(): \Generator {
* @throws \Error If the connection is not part of this pool.
*/
private function push(Connection $connection) {
\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');

if ($this->closed) {
$connection->close();
$this->connections->detach($connection);
return;
}

\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');

if ($connection->isAlive()) {
$this->idle->push($connection);
} else {
Expand Down
48 changes: 46 additions & 2 deletions test/AbstractPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ protected function getLink(string $connectionString): Promise {
*/
protected function createConnection(): Connection {
$mock = $this->createMock(Connection::class);
$mock->method('isAlive')->willReturn(true);
return $mock;
}

Expand All @@ -52,7 +51,8 @@ private function makeConnectionSet(int $count) {
$connections = [];

for ($i = 0; $i < $count; ++$i) {
$connections[] = $this->createConnection();
$connections[] = $connection = $this->createConnection();
$connection->method('isAlive')->willReturn(true);
}

return $connections;
Expand Down Expand Up @@ -246,4 +246,48 @@ public function testExtractConnection(int $count) {
}
});
}

/**
* @dataProvider getConnectionCounts
*
* @param int $count
*/
public function testConnectionClosedInPool(int $count) {
$connections = $this->makeConnectionSet($count);
$query = "SELECT * FROM test";

foreach ($connections as $connection) {
$connection->expects($this->exactly(2))
->method('query')
->with($query)
->willReturn(new Delayed(10));
}

$connection = $this->createConnection();
$connection->method('isAlive')
->willReturnOnConsecutiveCalls(true, false);
$connection->expects($this->once())
->method('query')
->with($query)
->willReturn(new Delayed(10));

\array_unshift($connections, $connection);

$pool = $this->createPool($connections);
$this->assertSame($count + 1, $pool->getMaxConnections());

Loop::run(function () use ($pool, $query, $count) {
$promises = [];
for ($i = 0; $i < $count + 1; ++$i) {
$promises[] = $pool->query($query);
}
yield $promises;

$promises = [];
for ($i = 0; $i < $count; ++$i) {
$promises[] = $pool->query($query);
}
yield $promises;
});
}
}
2 changes: 1 addition & 1 deletion test/ConnectionPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public function testIdleConnectionsRemovedAfterTimeout() {
while (yield $result->advance()) ;
} while (yield $result->nextResultSet());

yield new Delayed(1000);
yield new Delayed(1500);

$this->assertSame(1, $pool->getConnectionCount());
});
Expand Down
14 changes: 14 additions & 0 deletions test/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Mysql\Test;

use Amp\Loop;
use Amp\Mysql\Connection;
use Amp\Mysql\Internal\ConnectionConfig;
use Amp\Promise;
Expand Down Expand Up @@ -34,4 +35,17 @@ public function testConnect() {
public function testInvalidConnectionString() {
$promise = connect("username=".DB_USER);
}

public function testDoubleClose() {
Loop::run(function () {
/** @var \Amp\Mysql\Connection $db */
$db = yield $this->getLink("host=".DB_HOST.";user=".DB_USER.";pass=".DB_PASS.";db=test");

$db->close();

$this->assertFalse($db->isAlive());

$db->close(); // Should not throw an exception.
});
}
}

0 comments on commit f077c40

Please sign in to comment.