Permalink
Browse files

Merge branch 'next'

  • Loading branch information...
2 parents 5ecaada + e92c368 commit 5db6a3a3cdf998a65dc3da3ba135919ffd99eb6c @BraveSirRobin committed Apr 22, 2012
Showing with 103 additions and 50 deletions.
  1. +1 −1 build/cpf/amqphp/Connection.php
  2. +1 −1 build/cpf/amqphp/EventLoop.php
  3. +1 −1 build/cpf/amqphp/Socket.php
  4. +3 −3 build/nspf/amqphp.php
  5. +1 −1 demos/class-loader.php
  6. +4 −4 demos/configs/rabbit1-host-config.xml
  7. +1 −1 demos/consumer.php
  8. +1 −1 demos/get.php
  9. +1 −1 src/amqphp/CallbackExitStrategy.php
  10. +1 −1 src/amqphp/Channel.php
  11. +1 −1 src/amqphp/ChannelEventHandler.php
  12. +1 −1 src/amqphp/ConditionalExitStrategy.php
  13. +5 −1 src/amqphp/Connection.php
  14. +1 −1 src/amqphp/Consumer.php
  15. +51 −2 src/amqphp/EventLoop.php
  16. +1 −1 src/amqphp/ExitStrategy.php
  17. +1 −1 src/amqphp/Factory.php
  18. +1 −1 src/amqphp/MaxloopExitStrategy.php
  19. +1 −1 src/amqphp/SimpleConsumer.php
  20. +2 −2 src/amqphp/Socket.php
  21. +1 −1 src/amqphp/StreamSocket.php
  22. +1 −1 src/amqphp/TimeoutExitStrategy.php
  23. +1 −1 src/amqphp/persistent/APCPersistenceHelper.php
  24. +1 −1 src/amqphp/persistent/FilePersistenceHelper.php
  25. +1 −1 src/amqphp/persistent/PChannel.php
  26. +1 −1 src/amqphp/persistent/PConnection.php
  27. +1 −1 src/amqphp/persistent/PersistenceHelper.php
  28. +1 −1 src/amqphp/protocol/abstrakt/ClassFactory.php
  29. +1 −1 src/amqphp/protocol/abstrakt/DomainFactory.php
  30. +1 −1 src/amqphp/protocol/abstrakt/FieldFactory.php
  31. +1 −1 src/amqphp/protocol/abstrakt/MethodFactory.php
  32. +1 −1 src/amqphp/protocol/abstrakt/XmlSpecClass.php
  33. +1 −1 src/amqphp/protocol/abstrakt/XmlSpecDomain.php
  34. +1 −1 src/amqphp/protocol/abstrakt/XmlSpecField.php
  35. +1 −1 src/amqphp/protocol/abstrakt/XmlSpecMethod.php
  36. +1 −1 src/amqphp/wire/Decimal.php
  37. +1 −1 src/amqphp/wire/Hexdump.php
  38. +1 −1 src/amqphp/wire/Method.php
  39. +1 −1 src/amqphp/wire/Protocol.php
  40. +1 −1 src/amqphp/wire/Reader.php
  41. +1 −1 src/amqphp/wire/Table.php
  42. +1 −1 src/amqphp/wire/TableField.php
  43. +1 −1 src/amqphp/wire/Writer.php
  44. 0 {tests → tests.old}/ForkerConsumer.php
  45. 0 {tests → tests.old}/ForkerProducer.php
  46. 0 {tests → tests.old}/check-forker-results.php
  47. 0 {tests → tests.old}/forker.php
  48. 0 {tests → tests.old}/forker.xml

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -1,2 +1,2 @@
<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class EventLoop { private $cons = array(); private static $In = false; private $forceExit = false; function addConnection (Connection $conn) { $this->cons[$conn->getSocketId()] = $conn; } function removeConnection (Connection $conn) { if (array_key_exists($conn->getSocketId(), $this->cons)) { unset($this->cons[$conn->getSocketId()]); } } function forceLoopExit () { $this->forceExit = true; } function select () { $sockImpl = false; foreach ($this->cons as $c) { if ($c->isBlocking()) { throw new \Exception("Event loop cannot start - connection is already blocking", 3267); } if ($sockImpl === false) { $sockImpl = $c->getSocketImplClass(); } else if ($sockImpl != $c->getSocketImplClass()) { throw new \Exception("Event loop doesn't support mixed socket implementations", 2678); } if (! $c->isConnected()) { throw new \Exception("Connection is not connected", 2174); } } foreach ($this->cons as $c) { $c->setBlocking(true); $c->notifySelectInit(); } $started = false; while (true) { $tv = array(); foreach ($this->cons as $cid => $c) { $c->deliverAll(); $tv[] = array($cid, $c->notifyPreSelect()); } $psr = $this->processPreSelects($tv); if (is_array($psr)) { list($tvSecs, $tvUsecs) = $psr; } else if ($psr === true) { $tvSecs = null; $tvUsecs = 0; } else if (is_null($psr) && empty($this->cons)) { if (! $started) { trigger_error("Select loop not entered - no connections are listening", E_USER_WARNING); } break; } else { throw new \Exception("Unexpected PSR response", 2758); } $this->signal(); if ($this->forceExit) { trigger_error("Select loop forced exit over-rides connection looping state", E_USER_WARNING); $this->forceExit = false; break; } $started = true; if (is_null($tvSecs)) { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), null, 0); } else { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), $tvSecs, $tvUsecs); } if ($ret === false) { $this->signal(); $errNo = $errStr = array('(No specific socket exceptions found)'); if ($ex) { $errNo = $errStr = array(); foreach ($ex as $sock) { $errNo[] = $sock->lastError(); $errStr[] = $sock->strError(); } } $eMsg = sprintf("[2] Read block select produced an error: [%s] (%s)", implode(",", $errNo), implode("),(", $errStr)); throw new \Exception ($eMsg, 9963); } else if ($ret > 0) { foreach ($read as $sock) { $c = $this->cons[$sock->getId()]; try { $c->doSelectRead(); $c->deliverAll(); } catch (\Exception $e) { if ($sock->lastError()) { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket indicates an error, " . "close the connection immediately. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); try { $c->shutdown(); } catch (\Exception $e) { trigger_error("Nested exception swallowed during emergency socket " . "shutdown: '{$e->getMessage()}'", E_USER_WARNING); } $this->removeConnection($c); } else { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket does NOT " . "indicate an error, try again. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); } } } } } foreach ($this->cons as $id => $conn) { $conn->notifyComplete(); $conn->setBlocking(false); $this->removeConnection($conn); } } private function processPreSelects (array $tvs) { $wins = null; foreach ($tvs as $tv) { $sid = $tv[0]; $tv = $tv[1]; if ($tv === false) { $this->cons[$sid]->notifyComplete(); $this->cons[$sid]->setBlocking(false); $this->removeConnection($this->cons[$sid]); } else if (is_null($wins)) { $wins = $tv; } else if ($tv === true && ! is_array($wins)) { $wins = true; } else if (is_array($tv)) { if ($wins === true) { $wins = $tv; } else { switch (bccomp((string) $wins[0], (string) $tv[0])) { case 0: if (1 === bccomp((string) $wins[1], (string) $tv[1])) { $wins = $tv; } break; case 1; $wins = $tv; break; } } } } return $wins; } private function signal () { foreach ($this->cons as $c) { if ($c->getSignalDispatch()) { pcntl_signal_dispatch(); return; } } } }
+ namespace amqphp; use amqphp\protocol; use amqphp\wire; class EventLoop { const HB_TMOBUFF = 50000; private $cons = array(); private static $In = false; private $forceExit = false; private $minHb = -1; function addConnection (Connection $conn) { $this->cons[$conn->getSocketId()] = $conn; $this->setMinHb(); } private function setMinHb () { if ($this->cons) { foreach ($this->cons as $c) { if ((($n = $c->getHeartbeat()) > 0) && $n > $this->minHb) { $this->minHb = $n; } } } else { $this->minHb = -1; } } function removeConnection (Connection $conn) { if (array_key_exists($conn->getSocketId(), $this->cons)) { unset($this->cons[$conn->getSocketId()]); } $this->setMinHb(); } function forceLoopExit () { $this->forceExit = true; } function select () { $sockImpl = false; foreach ($this->cons as $c) { if ($c->isBlocking()) { throw new \Exception("Event loop cannot start - connection is already blocking", 3267); } if ($sockImpl === false) { $sockImpl = $c->getSocketImplClass(); } else if ($sockImpl != $c->getSocketImplClass()) { throw new \Exception("Event loop doesn't support mixed socket implementations", 2678); } if (! $c->isConnected()) { throw new \Exception("Connection is not connected", 2174); } } foreach ($this->cons as $c) { $c->setBlocking(true); $c->notifySelectInit(); } $started = false; $missedHb = 0; while (true) { $tv = array(); foreach ($this->cons as $cid => $c) { $c->deliverAll(); $tv[] = array($cid, $c->notifyPreSelect()); } $psr = $this->processPreSelects($tv); if (is_array($psr)) { list($tvSecs, $tvUsecs) = $psr; } else if ($psr === true) { $tvSecs = null; $tvUsecs = 0; } else if (is_null($psr) && empty($this->cons)) { if (! $started) { trigger_error("Select loop not entered - no connections are listening", E_USER_WARNING); } break; } else { throw new \Exception("Unexpected PSR response", 2758); } $this->signal(); if ($this->forceExit) { trigger_error("Select loop forced exit over-rides connection looping state", E_USER_WARNING); $this->forceExit = false; break; } $started = true; $selectCalledAt = microtime(); if (is_null($tvSecs)) { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), null, 0); } else { list($ret, $read, $ex) = call_user_func(array($sockImpl, 'Zelekt'), array_keys($this->cons), $tvSecs, $tvUsecs); } if ($ret === false) { $this->signal(); $errNo = $errStr = array('(No specific socket exceptions found)'); if ($ex) { $errNo = $errStr = array(); foreach ($ex as $sock) { $errNo[] = $sock->lastError(); $errStr[] = $sock->strError(); } } $eMsg = sprintf("[2] Read block select produced an error: [%s] (%s)", implode(",", $errNo), implode("),(", $errStr)); throw new \Exception ($eMsg, 9963); } else if ($ret > 0) { $missedHb = 0; foreach ($read as $sock) { $c = $this->cons[$sock->getId()]; try { $c->doSelectRead(); $c->deliverAll(); } catch (\Exception $e) { if ($sock->lastError()) { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket indicates an error, " . "close the connection immediately. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); try { $c->shutdown(); } catch (\Exception $e) { trigger_error("Nested exception swallowed during emergency socket " . "shutdown: '{$e->getMessage()}'", E_USER_WARNING); } $this->removeConnection($c); } else { trigger_error("Exception raised on socket {$sock->getId()} during " . "event loop read (nested exception follows). Socket does NOT " . "indicate an error, try again. Nested exception: '{$e->getMessage()}'", E_USER_WARNING); } } } } else { if ($this->minHb > 0) { list($stUsecs, $stSecs) = explode(' ', $selectCalledAt); list($usecs, $secs) = explode(' ', microtime()); if (($secs + $usecs) - ($stSecs + $stUsecs) > $this->minHb) { if (++$missedHb >= 2) { throw new \Exception("Broker missed too many heartbeats", 2957); } else { trigger_error("Broker heartbeat missed from client side, one more triggers loop exit", E_USER_WARNING); } } } } } foreach ($this->cons as $id => $conn) { $conn->notifyComplete(); $conn->setBlocking(false); $this->removeConnection($conn); } } private function processPreSelects (array $tvs) { $wins = null; foreach ($tvs as $tv) { $sid = $tv[0]; $tv = $tv[1]; if ($tv === false) { $this->cons[$sid]->notifyComplete(); $this->cons[$sid]->setBlocking(false); $this->removeConnection($this->cons[$sid]); } else if (is_null($wins)) { $wins = $tv; } else if ($tv === true && ! is_array($wins)) { $wins = true; } else if (is_array($tv)) { if ($wins === true) { $wins = $tv; } else { switch (bccomp((string) $wins[0], (string) $tv[0])) { case 0: if (1 === bccomp((string) $wins[1], (string) $tv[1])) { $wins = $tv; } break; case 1; $wins = $tv; break; } } } } if ($wins && ($this->minHb > 0) && ($wins === true || $wins[0] > $this->minHb || ($wins[0] == $this->minHb && $wins[1] < self::HB_TMOBUFF)) ) { $wins = array($this->minHb, self::HB_TMOBUFF); } return $wins; } private function signal () { foreach ($this->cons as $c) { if ($c->getSignalDispatch()) { pcntl_signal_dispatch(); return; } } } }
@@ -1,2 +1,2 @@
<?php
- namespace amqphp; use amqphp\protocol; use amqphp\wire; class Socket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host; private $port; private $sock; private $id; private $vhost; private $connected = false; private static $interrupt = false; function __construct ($params, $flags, $vhost) { $this->host = $params['host']; $this->port = $params['port']; $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return sprintf("%s:%s:%s", $this->host, $this->port, md5($this->vhost)); } function connect () { if (! ($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { throw new \Exception("Failed to create inet socket", 7895); } else if (! socket_connect($this->sock, $this->host, $this->port)) { throw new \Exception("Failed to connect inet socket ({$this->host}, {$this->port})", 7564); } else if (! socket_set_nonblock($this->sock)) { throw new \Exception("Failed to switch connection in to non-blocking mode.", 2357); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return false; } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = $ex = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } self::$interrupt = false; $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false && $this->lastError() == SOCKET_EINTR) { self::$interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false && socket_last_error() == SOCKET_EINTR) { self::$interrupt = true; return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return self::$interrupt; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function lastError () { return socket_last_error($this->sock); } function clearErrors () { socket_clear_error($this->sock); } function strError () { return socket_strerror($this->lastError()); } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; while (@socket_recv($this->sock, $tmp, $readLen, MSG_DONTWAIT)) { $buff .= $tmp; } if (DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('Socket select failed for write (socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); while (true) { if (DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = socket_write($this->sock, $buff)) === false) { throw new \Exception(sprintf("\nSocket write failed: %s\n", $this->strError()), 7854); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } return $bw; } function close () { $this->connected = false; socket_close($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } }
+ namespace amqphp; use amqphp\protocol; use amqphp\wire; class Socket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host; private $port; private $sock; private $id; private $vhost; private $connected = false; private static $interrupt = false; function __construct ($params, $flags, $vhost) { $this->host = $params['host']; $this->port = $params['port']; $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return sprintf("%s:%s:%s", $this->host, $this->port, md5($this->vhost)); } function connect () { if (! ($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { throw new \Exception("Failed to create inet socket", 7895); } else if (! socket_connect($this->sock, $this->host, $this->port)) { throw new \Exception("Failed to connect inet socket ({$this->host}, {$this->port})", 7564); } else if (! socket_set_nonblock($this->sock)) { throw new \Exception("Failed to switch connection in to non-blocking mode.", 2357); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return false; } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = $ex = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } self::$interrupt = false; $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false && $this->lastError() == SOCKET_EINTR) { self::$interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false && socket_last_error() == SOCKET_EINTR) { self::$interrupt = true; return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return self::$interrupt; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function lastError () { return socket_last_error($this->sock); } function clearErrors () { socket_clear_error($this->sock); } function strError () { return socket_strerror($this->lastError()); } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; while (@socket_recv($this->sock, $tmp, $readLen, MSG_DONTWAIT)) { $buff .= $tmp; } if (DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('Socket select failed for write (socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); while (true) { if (DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = socket_write($this->sock, $buff)) === false) { throw new \Exception(sprintf("Socket write failed: %s", $this->strError()), 7854); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } return $bw; } function close () { $this->connected = false; socket_close($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } }
Oops, something went wrong.

0 comments on commit 5db6a3a

Please sign in to comment.