-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Duplicated namespace level consts as class consts and switched the
internal API to use class level consts. Nsp level consts are now present for backwards compat purposes only.
- Loading branch information
1 parent
45dcb79
commit d99b60f
Showing
19 changed files
with
103 additions
and
72 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,2 +1,2 @@ | |||
<?php | <?php | ||
namespace amqphp; use amqphp\protocol; use amqphp\wire; class Socket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host; private $port; private $sock; private $id; private $vhost; private $connected = false; private static $interrupt = false; function __construct ($params, $flags, $vhost) { $this->host = $params['host']; $this->port = $params['port']; $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return sprintf("%s:%s:%s", $this->host, $this->port, md5($this->vhost)); } function connect () { if (! ($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { throw new \Exception("Failed to create inet socket", 7895); } else if (! socket_connect($this->sock, $this->host, $this->port)) { throw new \Exception("Failed to connect inet socket ({$this->host}, {$this->port})", 7564); } else if (! socket_set_nonblock($this->sock)) { throw new \Exception("Failed to switch connection in to non-blocking mode.", 2357); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return false; } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = $ex = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } self::$interrupt = false; $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false && $this->lastError() == SOCKET_EINTR) { self::$interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false && socket_last_error() == SOCKET_EINTR) { self::$interrupt = true; return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return self::$interrupt; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function lastError () { return socket_last_error($this->sock); } function clearErrors () { socket_clear_error($this->sock); } function strError () { return socket_strerror($this->lastError()); } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; while (@socket_recv($this->sock, $tmp, $readLen, MSG_DONTWAIT)) { $buff .= $tmp; } if (DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('Socket select failed for write (socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); while (true) { if (DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = socket_write($this->sock, $buff)) === false) { throw new \Exception(sprintf("Socket write failed: %s", $this->strError()), 7854); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } return $bw; } function close () { $this->connected = false; socket_close($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } } | namespace amqphp; use amqphp\protocol; use amqphp\wire; class Socket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host; private $port; private $sock; private $id; private $vhost; private $connected = false; private static $interrupt = false; function __construct ($params, $flags, $vhost) { $this->host = $params['host']; $this->port = $params['port']; $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return sprintf("%s:%s:%s", $this->host, $this->port, md5($this->vhost)); } function connect () { if (! ($this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { throw new \Exception("Failed to create inet socket", 7895); } else if (! socket_connect($this->sock, $this->host, $this->port)) { throw new \Exception("Failed to connect inet socket ({$this->host}, {$this->port})", 7564); } else if (! socket_set_nonblock($this->sock)) { throw new \Exception("Failed to switch connection in to non-blocking mode.", 2357); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return false; } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = $ex = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } self::$interrupt = false; $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false && $this->lastError() == SOCKET_EINTR) { self::$interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = socket_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false && socket_last_error() == SOCKET_EINTR) { self::$interrupt = true; return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return self::$interrupt; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function lastError () { return socket_last_error($this->sock); } function clearErrors () { socket_clear_error($this->sock); } function strError () { return socket_strerror($this->lastError()); } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; while (@socket_recv($this->sock, $tmp, $readLen, MSG_DONTWAIT)) { $buff .= $tmp; } if (Connection::DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('Socket select failed for write (socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); while (true) { if (Connection::DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = socket_write($this->sock, $buff)) === false) { throw new \Exception(sprintf("Socket write failed: %s", $this->strError()), 7854); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } return $bw; } function close () { $this->connected = false; socket_close($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,2 +1,2 @@ | |||
<?php | <?php | ||
namespace amqphp; use amqphp\protocol; use amqphp\wire; class StreamSocket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host, $port, $vhost; private $id, $connected, $interrupt = false; private $flags; private $stfp; private $errFlag = 0; function __construct ($params, $flags, $vhost) { $this->url = $params['url']; $this->context = isset($params['context']) ? $params['context'] : array(); $this->flags = $flags ? $flags : array(); $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return md5(sprintf("%s:%s:%s", $this->url, $this->getFlags(), $this->vhost)); } private function getFlags () { $flags = STREAM_CLIENT_CONNECT; foreach ($this->flags as $f) { $flags |= constant($f); } return $flags; } function connect () { $context = stream_context_create($this->context); $flags = $this->getFlags(); $this->sock = stream_socket_client($this->url, $errno, $errstr, ini_get("default_socket_timeout"), $flags, $context); $this->stfp = ftell($this->sock); if (! $this->sock) { throw new \Exception("Failed to connect stream socket {$this->url}, ($errno, $errstr): flags $flags", 7568); } else if (($flags & STREAM_CLIENT_PERSISTENT) && $this->stfp > 0) { foreach (self::$All as $sock) { if ($sock !== $this && $sock->getCK() == $this->getCK()) { $this->sock = null; throw new \Exception(sprintf("Stream socket connection created a new wrapper object for " . "an existing persistent connection on URL %s", $this->url), 8164); } } } if (! stream_set_blocking($this->sock, 0)) { throw new \Exception("Failed to place stream connection in non-blocking mode", 2795); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return ($this->stfp > 0); } function getConnectionStartFP () { return $this->stfp; } function tell () { return ftell($this->sock); } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } $this->interrupt = false; $ret = stream_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false) { $this->interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = stream_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false) { return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return $this->interrupt; } function lastError () { return $this->errFlag; } function clearErrors () { $this->errFlag = 0; } function strError () { switch ($this->errFlag) { case 1: return "Read error detected"; case 2: return "Write error detected"; case 3: return "Read and write errors detected"; default: return "No error detected"; } } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; do { $buff .= $chk = fread($this->sock, $readLen); $smd = stream_get_meta_data($this->sock); $readLen = min($smd['unread_bytes'], $readLen); } while ($chk !== false && $smd['unread_bytes'] > 0); if (! $chk) { trigger_error("Stream fread returned false", E_USER_WARNING); $this->errFlag |= 1; } if (DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function getUnreadBytes () { return ($smd = stream_get_meta_data($this->sock)) ? $smd['unread_bytes'] : false; } function eof () { return feof($this->sock); } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('StreamSocket select failed for write (stream socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); if ($contentLength == 0) { return 0; } while (true) { if (DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = fwrite($this->sock, $buff)) === false) { $this->errFlag |= 2; throw new \Exception(sprintf("\nStream write failed (error): %s\n", $this->strError()), 7854); } else if ($tmp === 0) { throw new \Exception(sprintf("\nStream write failed (zero bytes written): %s\n", $this->strError()), 7855); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } fflush($this->sock); return $bw; } function close () { $this->connected = false; fclose($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } } | namespace amqphp; use amqphp\protocol; use amqphp\wire; class StreamSocket { const READ_SELECT = 1; const WRITE_SELECT = 2; const READ_LENGTH = 4096; const BLOCK_TIMEOUT = 5; private static $All = array(); private static $Counter = 0; private $host, $port, $vhost; private $id, $connected, $interrupt = false; private $flags; private $stfp; private $errFlag = 0; function __construct ($params, $flags, $vhost) { $this->url = $params['url']; $this->context = isset($params['context']) ? $params['context'] : array(); $this->flags = $flags ? $flags : array(); $this->id = ++self::$Counter; $this->vhost = $vhost; } function getVHost () { return $this->vhost; } function getCK () { return md5(sprintf("%s:%s:%s", $this->url, $this->getFlags(), $this->vhost)); } private function getFlags () { $flags = STREAM_CLIENT_CONNECT; foreach ($this->flags as $f) { $flags |= constant($f); } return $flags; } function connect () { $context = stream_context_create($this->context); $flags = $this->getFlags(); $this->sock = stream_socket_client($this->url, $errno, $errstr, ini_get("default_socket_timeout"), $flags, $context); $this->stfp = ftell($this->sock); if (! $this->sock) { throw new \Exception("Failed to connect stream socket {$this->url}, ($errno, $errstr): flags $flags", 7568); } else if (($flags & STREAM_CLIENT_PERSISTENT) && $this->stfp > 0) { foreach (self::$All as $sock) { if ($sock !== $this && $sock->getCK() == $this->getCK()) { $this->sock = null; throw new \Exception(sprintf("Stream socket connection created a new wrapper object for " . "an existing persistent connection on URL %s", $this->url), 8164); } } } if (! stream_set_blocking($this->sock, 0)) { throw new \Exception("Failed to place stream connection in non-blocking mode", 2795); } $this->connected = true; self::$All[] = $this; } function isReusedPSock () { return ($this->stfp > 0); } function getConnectionStartFP () { return $this->stfp; } function tell () { return ftell($this->sock); } function select ($tvSec, $tvUsec = 0, $rw = self::READ_SELECT) { $read = $write = $ex = null; if ($rw & self::READ_SELECT) { $read = $ex = array($this->sock); } if ($rw & self::WRITE_SELECT) { $write = array($this->sock); } if (! $read && ! $write) { throw new \Exception("Select must read and/or write", 9864); } $this->interrupt = false; $ret = stream_select($read, $write, $ex, $tvSec, $tvUsec); if ($ret === false) { $this->interrupt = true; } return $ret; } static function Zelekt (array $incSet, $tvSec, $tvUsec) { $write = null; $read = $all = array(); foreach (self::$All as $i => $o) { if (in_array($o->id, $incSet)) { $read[$i] = $all[$i] = $o->sock; } } $ex = $read; $ret = false; if ($read) { $ret = stream_select($read, $write, $ex, $tvSec, $tvUsec); } if ($ret === false) { return false; } $_read = $_ex = array(); foreach ($read as $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_read[] = self::$All[$key]; } } foreach ($ex as $k => $sock) { if (false !== ($key = array_search($sock, $all, true))) { $_ex[] = self::$All[$key]; } } return array($ret, $_read, $_ex); } function selectInterrupted () { return $this->interrupt; } function lastError () { return $this->errFlag; } function clearErrors () { $this->errFlag = 0; } function strError () { switch ($this->errFlag) { case 1: return "Read error detected"; case 2: return "Write error detected"; case 3: return "Read and write errors detected"; default: return "No error detected"; } } function readAll ($readLen = self::READ_LENGTH) { $buff = ''; do { $buff .= $chk = fread($this->sock, $readLen); $smd = stream_get_meta_data($this->sock); $readLen = min($smd['unread_bytes'], $readLen); } while ($chk !== false && $smd['unread_bytes'] > 0); if (! $chk) { trigger_error("Stream fread returned false", E_USER_WARNING); $this->errFlag |= 1; } if (Connection::DEBUG) { echo "\n<read>\n"; echo wire\Hexdump::hexdump($buff); } return $buff; } function read () { $buff = ''; $select = $this->select(self::BLOCK_TIMEOUT); if ($select === false) { return false; } else if ($select > 0) { $buff = $this->readAll(); } return $buff; } function getUnreadBytes () { return ($smd = stream_get_meta_data($this->sock)) ? $smd['unread_bytes'] : false; } function eof () { return feof($this->sock); } function write ($buff) { if (! $this->select(self::BLOCK_TIMEOUT, 0, self::WRITE_SELECT)) { trigger_error('StreamSocket select failed for write (stream socket err: "' . $this->strError() . ')', E_USER_WARNING); return 0; } $bw = 0; $contentLength = strlen($buff); if ($contentLength == 0) { return 0; } while (true) { if (Connection::DEBUG) { echo "\n<write>\n"; echo wire\Hexdump::hexdump($buff); } if (($tmp = fwrite($this->sock, $buff)) === false) { $this->errFlag |= 2; throw new \Exception(sprintf("\nStream write failed (error): %s\n", $this->strError()), 7854); } else if ($tmp === 0) { throw new \Exception(sprintf("\nStream write failed (zero bytes written): %s\n", $this->strError()), 7855); } $bw += $tmp; if ($bw < $contentLength) { $buff = substr($buff, $bw); } else { break; } } fflush($this->sock); return $bw; } function close () { $this->connected = false; fclose($this->sock); $this->detach(); } private function detach () { if (false !== ($k = array_search($this, self::$All))) { unset(self::$All[$k]); } } function getId () { return $this->id; } } |
Oops, something went wrong.