Permalink
Browse files

Altered the event loop so as to detect missed heartbeats from the

broker.  This means  that if there is unexpected  network loss between
the  client and  the  broker, the  client  can detect  this using  the
standard Amqp heartbeat.
  • Loading branch information...
1 parent 6623010 commit e92c3685c6efde44efcd95b8c97eee67778d4c83 @BraveSirRobin committed Apr 22, 2012

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -1,2 +1,2 @@
<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class EventLoop { private $cons = array(); private static $In = false; private $forceExit = false; function addConnection (Connection $conn) { $this->cons[$conn->getSocketId()] = $conn; } function removeConnection (Connection $conn) { if (array_key_exists($conn->getSocketId(), $this->cons)) { unset($this->cons[$conn->getSocketId()]); } } function forceLoopExit () { $this->forceExit = true; } function select () { $sockImpl = false; foreach ($this->cons as $c) { if ($c->isBlocking()) { throw new \Exception("Event loop cannot start - connection is already blocking", 3267); } if ($sockImpl === false) { $sockImpl = $c->getSocketImplClass(); } else if ($sockImpl != $c->getSocketImplClass()) { throw new \Exception("Event loop doesn't support mixed socket implementations", 2678); } if (! $c->isConnected()) { throw new \Exception("Connection is not connected", 2174); } } foreach ($this->cons as $c) { $c->setBlocking(true); $c->notifySelectInit(); } $started = false; while (true) { $tv = array(); foreach ($this->cons as $cid => $c) { $c->deliverAll(); $tv[] = array($cid, $c->notifyPreSelect()); } $psr = $this->processPreSelects($tv); if (is_array($psr)) { list($tvSecs, $tvUsecs) = $psr; } else if ($psr === true) { $tvSecs = null; $tvUsecs = 0; } else if (is_null($psr) && empty($this->cons)) { if (! $started) { trigger_error("Select loop not entered - no connections are listening", E_USER_WARNING); } break; } else { throw new \Exception("Unexpected PSR response", 2758); } $this->signal(); if ($this->forceExit) { trigger_error("Select loop forced exit over-rides connection looping state", E_USER_WARNING); $this->forceExit = false; break; } $started = true; if (is_null($tvSecs)) { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), null, 0); } else { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), $tvSecs, $tvUsecs); } if ($ret === false) { $this->signal(); $errNo = $errStr = array('(No specific socket exceptions found)'); if ($ex) { $errNo = $errStr = array(); foreach ($ex as $sock) { $errNo[] = $sock->lastError(); $errStr[] = $sock->strError(); } } $eMsg = sprintf("[2] Read block select produced an error: [%s] (%s)", implode(",", $errNo), implode("),(", $errStr)); throw new \Exception ($eMsg, 9963); } else if ($ret > 0) { foreach ($read as $sock) { $c = $this->cons[$sock->getId()]; try { $c->doSelectRead(); $c->deliverAll(); } catch (\Exception $e) { if ($sock->lastError()) { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket indicates an error, " . "close the connection immediately. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); try { $c->shutdown(); } catch (\Exception $e) { trigger_error("Nested exception swallowed during emergency socket " . "shutdown: '{$e->getMessage()}'", E_USER_WARNING); } $this->removeConnection($c); } else { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket does NOT " . "indicate an error, try again. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); } } } } } foreach ($this->cons as $id => $conn) { $conn->notifyComplete(); $conn->setBlocking(false); $this->removeConnection($conn); } } private function processPreSelects (array $tvs) { $wins = null; foreach ($tvs as $tv) { $sid = $tv[0]; $tv = $tv[1]; if ($tv === false) { $this->cons[$sid]->notifyComplete(); $this->cons[$sid]->setBlocking(false); $this->removeConnection($this->cons[$sid]); } else if (is_null($wins)) { $wins = $tv; } else if ($tv === true && ! is_array($wins)) { $wins = true; } else if (is_array($tv)) { if ($wins === true) { $wins = $tv; } else { switch (bccomp((string) $wins[0], (string) $tv[0])) { case 0: if (1 === bccomp((string) $wins[1], (string) $tv[1])) { $wins = $tv; } break; case 1; $wins = $tv; break; } } } } return $wins; } private function signal () { foreach ($this->cons as $c) { if ($c->getSignalDispatch()) { pcntl_signal_dispatch(); return; } } } }
+ namespace amqphp; use amqphp\protocol; use amqphp\wire; class EventLoop { const HB_TMOBUFF = 50000; private $cons = array(); private static $In = false; private $forceExit = false; private $minHb = -1; function addConnection (Connection $conn) { $this->cons[$conn->getSocketId()] = $conn; $this->setMinHb(); } private function setMinHb () { if ($this->cons) { foreach ($this->cons as $c) { if ((($n = $c->getHeartbeat()) > 0) && $n > $this->minHb) { $this->minHb = $n; } } } else { $this->minHb = -1; } } function removeConnection (Connection $conn) { if (array_key_exists($conn->getSocketId(), $this->cons)) { unset($this->cons[$conn->getSocketId()]); } $this->setMinHb(); } function forceLoopExit () { $this->forceExit = true; } function select () { $sockImpl = false; foreach ($this->cons as $c) { if ($c->isBlocking()) { throw new \Exception("Event loop cannot start - connection is already blocking", 3267); } if ($sockImpl === false) { $sockImpl = $c->getSocketImplClass(); } else if ($sockImpl != $c->getSocketImplClass()) { throw new \Exception("Event loop doesn't support mixed socket implementations", 2678); } if (! $c->isConnected()) { throw new \Exception("Connection is not connected", 2174); } } foreach ($this->cons as $c) { $c->setBlocking(true); $c->notifySelectInit(); } $started = false; $missedHb = 0; while (true) { $tv = array(); foreach ($this->cons as $cid => $c) { $c->deliverAll(); $tv[] = array($cid, $c->notifyPreSelect()); } $psr = $this->processPreSelects($tv); if (is_array($psr)) { list($tvSecs, $tvUsecs) = $psr; } else if ($psr === true) { $tvSecs = null; $tvUsecs = 0; } else if (is_null($psr) && empty($this->cons)) { if (! $started) { trigger_error("Select loop not entered - no connections are listening", E_USER_WARNING); } break; } else { throw new \Exception("Unexpected PSR response", 2758); } $this->signal(); if ($this->forceExit) { trigger_error("Select loop forced exit over-rides connection looping state", E_USER_WARNING); $this->forceExit = false; break; } $started = true; $selectCalledAt = microtime(); if (is_null($tvSecs)) { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), null, 0); } else { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), $tvSecs, $tvUsecs); } if ($ret === false) { $this->signal(); $errNo = $errStr = array('(No specific socket exceptions found)'); if ($ex) { $errNo = $errStr = array(); foreach ($ex as $sock) { $errNo[] = $sock->lastError(); $errStr[] = $sock->strError(); } } $eMsg = sprintf("[2] Read block select produced an error: [%s] (%s)", implode(",", $errNo), implode("),(", $errStr)); throw new \Exception ($eMsg, 9963); } else if ($ret > 0) { $missedHb = 0; foreach ($read as $sock) { $c = $this->cons[$sock->getId()]; try { $c->doSelectRead(); $c->deliverAll(); } catch (\Exception $e) { if ($sock->lastError()) { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket indicates an error, " . "close the connection immediately. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); try { $c->shutdown(); } catch (\Exception $e) { trigger_error("Nested exception swallowed during emergency socket " . "shutdown: '{$e->getMessage()}'", E_USER_WARNING); } $this->removeConnection($c); } else { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket does NOT " . "indicate an error, try again. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); } } } } else { if ($this->minHb > 0) { list($stUsecs, $stSecs) = explode(' ', $selectCalledAt); list($usecs, $secs) = explode(' ', microtime()); if (($secs + $usecs) - ($stSecs + $stUsecs) > $this->minHb) { if (++$missedHb >= 2) { throw new \Exception("Broker missed too many heartbeats", 2957); } else { trigger_error("Broker heartbeat missed from client side, one more triggers loop exit", E_USER_WARNING); } } } } } foreach ($this->cons as $id => $conn) { $conn->notifyComplete(); $conn->setBlocking(false); $this->removeConnection($conn); } } private function processPreSelects (array $tvs) { $wins = null; foreach ($tvs as $tv) { $sid = $tv[0]; $tv = $tv[1]; if ($tv === false) { $this->cons[$sid]->notifyComplete(); $this->cons[$sid]->setBlocking(false); $this->removeConnection($this->cons[$sid]); } else if (is_null($wins)) { $wins = $tv; } else if ($tv === true && ! is_array($wins)) { $wins = true; } else if (is_array($tv)) { if ($wins === true) { $wins = $tv; } else { switch (bccomp((string) $wins[0], (string) $tv[0])) { case 0: if (1 === bccomp((string) $wins[1], (string) $tv[1])) { $wins = $tv; } break; case 1; $wins = $tv; break; } } } } if ($wins && ($this->minHb > 0) && ($wins === true || $wins[0] > $this->minHb || ($wins[0] == $this->minHb && $wins[1] < self::HB_TMOBUFF)) ) { $wins = array($this->minHb, self::HB_TMOBUFF); } return $wins; } private function signal () { foreach ($this->cons as $c) { if ($c->getSignalDispatch()) { pcntl_signal_dispatch(); return; } } } }
View

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -5,21 +5,21 @@
<vhost k="string">/</vhost>
<username k="string">testing</username>
<userpass k="string">letmein</userpass>
- <heartbeat k="int">5</heartbeat>
+ <heartbeat k="int">2</heartbeat>
<socketImpl k="string">\amqphp\Socket</socketImpl>
<socketParams>
<host k="string">rabbit1</host>
<port k="integer">5672</port>
</socketParams>
-</conn_params>
--->
+</conn_params>-->
+
<!-- Stream based config -->
<conn_params>
<vhost k="string">/</vhost>
<username k="string">testing</username>
<userpass k="string">letmein</userpass>
- <heartbeat k="int">5</heartbeat>
+ <heartbeat k="int">2</heartbeat>
<socketImpl k="string">\amqphp\StreamSocket</socketImpl>
<socketParams>
<url k="string">tcp://rabbit1:5672</url>
@@ -320,6 +320,10 @@ function getSignalDispatch () {
return $this->signalDispatch;
}
+ function getHeartbeat () {
+ return $this->heartbeat;
+ }
+
function removeChannel (Channel $chan) {
if (false !== ($k = array_search($chan, $this->chans))) {
unset($this->chans[$k]);
View
@@ -30,19 +30,42 @@
*/
class EventLoop
{
+ /**
+ * When the forced heatbeat loop exit monitoring is active, this
+ * number is used as a milliseconds buffer such that the loop
+ * timeout is set to <hearbeat> + HB_TMOBUFF. If the loop breaks
+ * on this timeout, we consider the heartbeat to be missed
+ */
+ const HB_TMOBUFF = 50000;
+
private $cons = array();
private static $In = false;
private $forceExit = false;
+ private $minHb = -1;
function addConnection (Connection $conn) {
$this->cons[$conn->getSocketId()] = $conn;
+ $this->setMinHb();
+ }
+
+ private function setMinHb () {
+ if ($this->cons) {
+ foreach ($this->cons as $c) {
+ if ((($n = $c->getHeartbeat()) > 0) && $n > $this->minHb) {
+ $this->minHb = $n;
+ }
+ }
+ } else {
+ $this->minHb = -1;
+ }
}
function removeConnection (Connection $conn) {
if (array_key_exists($conn->getSocketId(), $this->cons)) {
unset($this->cons[$conn->getSocketId()]);
}
+ $this->setMinHb();
}
/** Flips an internal flag that forces the loop to exit
@@ -80,6 +103,7 @@ function select () {
// The loop
$started = false;
+ $missedHb = 0;
while (true) {
// Deliver all buffered messages and collect pre-select signals.
$tv = array();
@@ -115,6 +139,7 @@ function select () {
}
$started = true;
+ $selectCalledAt = microtime();
if (is_null($tvSecs)) {
list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'),
array_keys($this->cons), null, 0);
@@ -138,6 +163,7 @@ function select () {
throw new \Exception ($eMsg, 9963);
} else if ($ret > 0) {
+ $missedHb = 0;
foreach ($read as $sock) {
$c = $this->cons[$sock->getId()];
try {
@@ -164,6 +190,19 @@ function select () {
}
}
}
+ } else {
+ if ($this->minHb > 0) {
+ // Check to see if the empty read is due to a missed heartbeat.
+ list($stUsecs, $stSecs) = explode(' ', $selectCalledAt);
+ list($usecs, $secs) = explode(' ', microtime());
+ if (($secs + $usecs) - ($stSecs + $stUsecs) > $this->minHb) {
+ if (++$missedHb >= 2) {
+ throw new \Exception("Broker missed too many heartbeats", 2957);
+ } else {
+ trigger_error("Broker heartbeat missed from client side, one more triggers loop exit", E_USER_WARNING);
+ }
+ }
+ }
}
} // End - the loop
@@ -180,7 +219,9 @@ function select () {
* complete and filter out the "soonest" timeout. Call the
* 'complete' callback for connections that get removed
*
- * @return mixed True=Loop without timeout, False=exit loop, array(int, int)=specific timeout
+ * @return mixed True=Loop without timeout,
+ * False=exit loop,
+ * array(int, int)=specific timeout
*/
private function processPreSelects (array $tvs) {
$wins = null;
@@ -215,6 +256,14 @@ private function processPreSelects (array $tvs) {
}
}
}
+ // Check to see if we need to alter the timeout to match a heartbeat
+ if ($wins &&
+ ($this->minHb > 0) &&
+ ($wins === true || $wins[0] > $this->minHb ||
+ ($wins[0] == $this->minHb && $wins[1] < self::HB_TMOBUFF))
+ ) {
+ $wins = array($this->minHb, self::HB_TMOBUFF);
+ }
return $wins;
}

0 comments on commit e92c368

Please sign in to comment.