Skip to content

Commit

Permalink
Rework StaticConnectionPool pinging logic
Browse files Browse the repository at this point in the history
The StaticConnectionPool now only pings nodes when they are first
used (instead of on startup), and has better logic to handle
the pinging/failure process.

Also implements helper functions in AbstractConnectionPool for
things like ping timeout and ping failures, which are used to
exponentially backoff ping timeouts.

Fixes #20
  • Loading branch information
polyfractal committed Nov 4, 2013
1 parent a0670b9 commit b4e71ce
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 17 deletions.
48 changes: 40 additions & 8 deletions src/Elasticsearch/ConnectionPool/StaticConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

class StaticConnectionPool extends AbstractConnectionPool
{
private $pingTimeout = 60;
private $maxPingTimeout = 3600;

public function __construct($connections, SelectorInterface $selector, ConnectionFactory $factory, $connectionPoolParams)
{
parent::__construct($connections, $selector, $factory, $connectionPoolParams);
Expand All @@ -30,29 +33,58 @@ public function __construct($connections, SelectorInterface $selector, Connectio
*/
public function nextConnection($force = false)
{
$skipped = array();

$total = count($this->connections);
while ($total--) {
/** @var AbstractConnection $connection */
$connection = $this->selector->select($this->connections);
if ($connection->isAlive() === true || $connection->ping() === true) {
if ($connection->isAlive() === true) {
return $connection;
}
}

if ($force === true) {
throw new NoNodesAvailableException("No alive nodes found in your cluster");
if ($this->readyToRevive($connection) === true) {
if ($connection->ping() === true) {
return $connection;
}
} else {
$skipped[] = $connection;
}
}

return $this->nextConnection(true);

// All "alive" nodes failed, force pings on "dead" nodes
foreach ($skipped as $connection) {
if ($connection->ping() === true) {
return $connection;
}
}

throw new NoNodesAvailableException("No alive nodes found in your cluster");
}

public function scheduleCheck()
{
foreach ($this->connections as $connection) {
/** @var AbstractConnection $connection */
$connection->ping();
$connection->markDead();
}
}

/**
* @param AbstractConnection $connection
*
* @return bool
*/
private function readyToRevive(AbstractConnection $connection)
{
$timeout = min(
$this->pingTimeout * pow(2, $connection->getPingFailures()),
$this->maxPingTimeout
);

if ($connection->getLastPing() + $timeout < time()) {
return true;
} else {
return false;
}
}
}
32 changes: 29 additions & 3 deletions src/Elasticsearch/Connections/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ abstract class AbstractConnection implements ConnectionInterface
/** @var float */
private $pingTimeout = 1; //TODO expose this

/** @var int */
private $lastPing = 0;

/** @var int */
private $failedPings = 0;

/**
* @param $method
Expand Down Expand Up @@ -194,21 +199,22 @@ public function logRequestFail(
*/
public function ping()
{
$this->lastPing = time();
$options = array('timeout' => $this->pingTimeout);
try {
$response = $this->performRequest('HEAD', '', null, null, $options);

} catch (TransportException $exception) {
$this->isAlive = false;
$this->markDead();
return false;
}


if ($response['status'] === 200) {
$this->isAlive = true;
$this->markAlive();
return true;
} else {
$this->isAlive = false;
$this->markDead();
return false;
}
}
Expand All @@ -235,12 +241,32 @@ public function isAlive()

public function markAlive()
{
$this->failedPings = 0;
$this->isAlive = true;
}

public function markDead()
{
$this->isAlive = false;
$this->failedPings += 1;
}


/**
* @return int
*/
public function getLastPing()
{
return $this->lastPing;
}


/**
* @return int
*/
public function getPingFailures()
{
return $this->failedPings;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public function testAddOneHostThenGetConnection()
->getMock()
->shouldReceive('isAlive')
->andReturn(true)
->getMock();
->getMock()
->shouldReceive('markDead')->once()->getMock();

$connections = array($mockConnection);

Expand Down Expand Up @@ -63,7 +64,8 @@ public function testAddMultipleHostsThenGetFirst()
->getMock()
->shouldReceive('isAlive')
->andReturn(true)
->getMock();
->getMock()
->shouldReceive('markDead')->once()->getMock();

$connections[] = $mockConnection;
}
Expand Down Expand Up @@ -99,7 +101,10 @@ public function testAllHostsFailPing()
->getMock()
->shouldReceive('isAlive')
->andReturn(false)
->getMock();
->getMock()
->shouldReceive('markDead')->once()->getMock()
->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock()
->shouldReceive('getLastPing')->andReturn(time())->once()->getMock();

$connections[] = $mockConnection;
}
Expand All @@ -119,7 +124,7 @@ public function testAllHostsFailPing()
}


public function testAllExceptLastHostFailPing()
public function testAllExceptLastHostFailPingRevivesInSkip()
{

$connections = array();
Expand All @@ -131,7 +136,10 @@ public function testAllExceptLastHostFailPing()
->getMock()
->shouldReceive('isAlive')
->andReturn(false)
->getMock();
->getMock()
->shouldReceive('markDead')->once()->getMock()
->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock()
->shouldReceive('getLastPing')->andReturn(time())->once()->getMock();

$connections[] = $mockConnection;
}
Expand All @@ -141,8 +149,59 @@ public function testAllExceptLastHostFailPing()
->andReturn(true)
->getMock()
->shouldReceive('isAlive')->once()
->andReturn(false)
->getMock()
->shouldReceive('markDead')->once()->getMock()
->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock()
->shouldReceive('getLastPing')->andReturn(time())->once()->getMock();

$connections[] = $goodConnection;

$selector = m::mock('\Elasticsearch\ConnectionPool\Selectors\RoundRobinSelector')
->shouldReceive('select')
->andReturnValues($connections)
->getMock();

$connectionFactory = m::mock('\Elasticsearch\Connections\ConnectionFactory');

$randomizeHosts = false;
$connectionPool = new \Elasticsearch\ConnectionPool\StaticConnectionPool($connections, $selector, $connectionFactory, $randomizeHosts);

$ret = $connectionPool->nextConnection();
$this->assertEquals($goodConnection, $ret);

}

public function testAllExceptLastHostFailPingRevivesPreSkip()
{

$connections = array();

foreach (range(1,9) as $index) {
$mockConnection = m::mock('\Elasticsearch\Connections\GuzzleConnection')
->shouldReceive('ping')
->andReturn(false)
->getMock()
->shouldReceive('isAlive')
->andReturn(false)
->getMock()
->shouldReceive('markDead')->once()->getMock()
->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock()
->shouldReceive('getLastPing')->andReturn(time())->once()->getMock();

$connections[] = $mockConnection;
}

$goodConnection = m::mock('\Elasticsearch\Connections\GuzzleConnection')
->shouldReceive('ping')->once()
->andReturn(true)
->getMock();
->getMock()
->shouldReceive('isAlive')->once()
->andReturn(false)
->getMock()
->shouldReceive('markDead')->once()->getMock()
->shouldReceive('getPingFailures')->andReturn(0)->once()->getMock()
->shouldReceive('getLastPing')->andReturn(time()-10000)->once()->getMock();

$connections[] = $goodConnection;

Expand All @@ -161,4 +220,5 @@ public function testAllExceptLastHostFailPing()

}


}

0 comments on commit b4e71ce

Please sign in to comment.