From 4e5a2aa039f9a3edb33e18411750b74a39b9c454 Mon Sep 17 00:00:00 2001 From: harry Date: Tue, 15 Dec 2015 18:10:50 +0800 Subject: [PATCH 1/6] update TSocket.php add conntimeout support accurate send timeout and write timeout --- lib/php/lib/Thrift/Transport/TSocket.php | 206 ++++++++++++++++------- 1 file changed, 146 insertions(+), 60 deletions(-) diff --git a/lib/php/lib/Thrift/Transport/TSocket.php b/lib/php/lib/Thrift/Transport/TSocket.php index 3ad3bf70fdf..3961c878b78 100644 --- a/lib/php/lib/Thrift/Transport/TSocket.php +++ b/lib/php/lib/Thrift/Transport/TSocket.php @@ -22,7 +22,6 @@ namespace Thrift\Transport; -use Thrift\Transport\TTransport; use Thrift\Exception\TException; use Thrift\Exception\TTransportException; use Thrift\Factory\TStringFuncFactory; @@ -32,8 +31,8 @@ * * @package thrift.transport */ -class TSocket extends TTransport { - +class TSocket extends TTransport +{ /** * Handle to PHP socket * @@ -55,6 +54,24 @@ class TSocket extends TTransport { */ protected $port_ = '9090'; + /** + * Connect timeout in seconds. + * + * Combined with connTimeoutUsec this is used for connect timeouts. + * + * @var int + */ + private $connTimeoutSec_ = 0; + + /** + * Connect timeout in useconds. + * + * Combined with connTimeoutUsec this is used for connect timeouts. + * + * @var int + */ + private $connTimeoutUsec_ = 100000; + /** * Send timeout in seconds. * @@ -96,14 +113,14 @@ class TSocket extends TTransport { * * @var bool */ - protected $persist_ = FALSE; + protected $persist_ = false; /** * Debugging on? * * @var bool */ - protected $debug_ = FALSE; + protected $debug_ = false; /** * Debug handler @@ -122,19 +139,20 @@ class TSocket extends TTransport { */ public function __construct($host='localhost', $port=9090, - $persist=FALSE, + $persist=false, $debugHandler=null) { - $this->host_ = $host; - $this->port_ = $port; - $this->persist_ = $persist; - $this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log'; - } + $this->host_ = $host; + $this->port_ = $port; + $this->persist_ = $persist; + $this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log'; + } /** * @param resource $handle * @return void */ - public function setHandle($handle) { + public function setHandle($handle) + { $this->handle_ = $handle; } @@ -143,10 +161,23 @@ public function setHandle($handle) { * * @param int $timeout Timeout in milliseconds. */ - public function setSendTimeout($timeout) { + public function setConnTimeout($timeout) + { + $this->connTimeoutSec_ = floor($timeout / 1000); + $this->connTimeoutUsec_ = + ($timeout - ($this->connTimeoutSec_ * 1000)) * 1000; + } + + /** + * Sets the send timeout. + * + * @param int $timeout Timeout in milliseconds. + */ + public function setSendTimeout($timeout) + { $this->sendTimeoutSec_ = floor($timeout / 1000); $this->sendTimeoutUsec_ = - ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; + ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; } /** @@ -154,10 +185,11 @@ public function setSendTimeout($timeout) { * * @param int $timeout Timeout in milliseconds. */ - public function setRecvTimeout($timeout) { + public function setRecvTimeout($timeout) + { $this->recvTimeoutSec_ = floor($timeout / 1000); $this->recvTimeoutUsec_ = - ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; + ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; } /** @@ -165,7 +197,8 @@ public function setRecvTimeout($timeout) { * * @param bool $debug */ - public function setDebug($debug) { + public function setDebug($debug) + { $this->debug_ = $debug; } @@ -174,7 +207,8 @@ public function setDebug($debug) { * * @return string host */ - public function getHost() { + public function getHost() + { return $this->host_; } @@ -183,7 +217,8 @@ public function getHost() { * * @return int port */ - public function getPort() { + public function getPort() + { return $this->port_; } @@ -192,14 +227,16 @@ public function getPort() { * * @return bool true if the socket is open */ - public function isOpen() { + public function isOpen() + { return is_resource($this->handle_); } /** * Connects the socket. */ - public function open() { + public function open() + { if ($this->isOpen()) { throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN); } @@ -213,17 +250,17 @@ public function open() { } if ($this->persist_) { - $this->handle_ = @pfsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)); + $this->handle_ = pfsockopen($this->host_, + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } else { - $this->handle_ = @fsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)); + $this->handle_ = fsockopen($this->host_, + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } // Connect failed? @@ -239,9 +276,10 @@ public function open() { /** * Closes the socket. */ - public function close() { + public function close() + { if (!$this->persist_) { - @fclose($this->handle_); + fclose($this->handle_); $this->handle_ = null; } } @@ -255,61 +293,80 @@ public function close() { * @param int $len Maximum number of bytes to read. * @return string Binary data */ - public function read($len) { + public function read($len) + { + if($len <= 0) { + throw new TTransportException("TSocket: read len $len invalid"); + } $null = null; $read = array($this->handle_); - $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_); - if ($readable > 0) { - $data = @stream_socket_recvfrom($this->handle_, $len); - if ($data === false) { + $recvTimeoutSec_ = $this->recvTimeoutSec_; + $recvTimeoutUsec_ = $this->recvTimeoutUsec_; + $dataTotal = ""; + do { + $readable = self::stream_select_ex($read, $null, $null, $recvTimeoutSec_, $recvTimeoutUsec_); + + if ($readable > 0) { + $data = stream_socket_recvfrom($this->handle_, $len); + if ($data === false) { throw new TTransportException('TSocket: Could not read '.$len.' bytes from '. - $this->host_.':'.$this->port_); - } elseif($data == '' && feof($this->handle_)) { - throw new TTransportException('TSocket read 0 bytes'); + $this->host_.':'.$this->port_); + } elseif ($data == '' && feof($this->handle_)) { + throw new TTransportException("TSocket read 0 bytes,want $len"); } - - return $data; - } else if ($readable === 0) { + $dataTotal .= $data; + $len -= strlen($data); + } elseif ($readable === 0) { throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. - $this->host_.':'.$this->port_); + $this->host_.':'.$this->port_); } else { throw new TTransportException('TSocket: Could not read '.$len.' bytes from '. - $this->host_.':'.$this->port_); + $this->host_.':'.$this->port_." ".self::sockErr_()); } + } while( $len > 0 && ($recvTimeoutSec_ != 0 || $recvTimeoutUsec_ !=0) ); + + if($len > 0) { + throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. + $this->host_.':'.$this->port_.' '.self::sockErr()); } + return $dataTotal; + } /** * Write to the socket. * * @param string $buf The data to write */ - public function write($buf) { + public function write($buf) + { $null = null; $write = array($this->handle_); + $sendTimeoutSec = $this->sendTimeoutSec_; + $sendTimeoutUsec = $this->sendTimeoutUsec_; // keep writing until all the data has been written while (TStringFuncFactory::create()->strlen($buf) > 0) { // wait for stream to become available for writing - $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_); + $writable = self::stream_select_ex($null,$write,$null,$sendTimeoutSec,$sendTimeoutUsec); if ($writable > 0) { // write buffer to stream - $written = @stream_socket_sendto($this->handle_, $buf); + $written = stream_socket_sendto($this->handle_, $buf); if ($written === -1 || $written === false) { - throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf).' bytes '. - $this->host_.':'.$this->port_); + throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf). + ' bytes '.$this->host_.':'.$this->port_." ".self::sockErr_()); } // determine how much of the buffer is left to write $buf = TStringFuncFactory::create()->substr($buf, $written); - } else if ($writable === 0) { - throw new TTransportException('TSocket: timed out writing '.TStringFuncFactory::create()->strlen($buf).' bytes from '. - $this->host_.':'.$this->port_); - } else { - throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf).' bytes '. - $this->host_.':'.$this->port_); - } + } elseif ($writable === 0) { + throw new TTransportException('TSocket: timed out writing '.TStringFuncFactory::create()->strlen($buf). + ' bytes from '.$this->host_.':'.$this->port_); + } else { + throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf). + ' bytes '.$this->host_.':'.$this->port_); } } + } /** * Flush output to the socket. @@ -320,7 +377,36 @@ public function write($buf) { * If you wish to have flushable buffering behaviour, wrap this TSocket * in a TBufferedTransport. */ - public function flush() { + public function flush() + { // no-op - } } + + /** + * stream_select_ex + * + * Improvement of stream_select. The tv_sec and tv_usec will be updated after the function + * + */ + public static function stream_select_ex(&$read,&$write,&$except,&$tv_sec,&$tv_usec) + { + $timeout = $tv_sec * 1000000 + $tv_usec; + $begin = gettimeofday(); + $ret = stream_select($read,$write,$except,$tv_sec,$tv_usec); + $end = gettimeofday(); + + $dt = $end['usec'] + $end['sec'] * 1000000 - $begin['usec'] - $begin['sec'] * 1000000; + $rest_time = max($timeout - $dt,0); + $tv_sec = floor($rest_time / 1000000); + $tv_usec = $rest_time % 1000000; + + return $ret; + } + + private static function sockErr_() + { + $errno = socket_last_error(); + return "errno:".$errno." msg:".socket_strerror($errno)." "; + } +} + From 93affda4d6aefcb88929cd53e2f29cd6287b4d3e Mon Sep 17 00:00:00 2001 From: harry Date: Tue, 15 Dec 2015 18:28:23 +0800 Subject: [PATCH 2/6] modify code style --- lib/php/lib/Thrift/Transport/TSocket.php | 48 ++++++++---------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/lib/php/lib/Thrift/Transport/TSocket.php b/lib/php/lib/Thrift/Transport/TSocket.php index 3961c878b78..914c9cbcd25 100644 --- a/lib/php/lib/Thrift/Transport/TSocket.php +++ b/lib/php/lib/Thrift/Transport/TSocket.php @@ -31,8 +31,7 @@ * * @package thrift.transport */ -class TSocket extends TTransport -{ +class TSocket extends TTransport { /** * Handle to PHP socket * @@ -151,8 +150,7 @@ public function __construct($host='localhost', * @param resource $handle * @return void */ - public function setHandle($handle) - { + public function setHandle($handle) { $this->handle_ = $handle; } @@ -161,8 +159,7 @@ public function setHandle($handle) * * @param int $timeout Timeout in milliseconds. */ - public function setConnTimeout($timeout) - { + public function setConnTimeout($timeout) { $this->connTimeoutSec_ = floor($timeout / 1000); $this->connTimeoutUsec_ = ($timeout - ($this->connTimeoutSec_ * 1000)) * 1000; @@ -173,8 +170,7 @@ public function setConnTimeout($timeout) * * @param int $timeout Timeout in milliseconds. */ - public function setSendTimeout($timeout) - { + public function setSendTimeout($timeout) { $this->sendTimeoutSec_ = floor($timeout / 1000); $this->sendTimeoutUsec_ = ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; @@ -185,8 +181,7 @@ public function setSendTimeout($timeout) * * @param int $timeout Timeout in milliseconds. */ - public function setRecvTimeout($timeout) - { + public function setRecvTimeout($timeout) { $this->recvTimeoutSec_ = floor($timeout / 1000); $this->recvTimeoutUsec_ = ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; @@ -197,8 +192,7 @@ public function setRecvTimeout($timeout) * * @param bool $debug */ - public function setDebug($debug) - { + public function setDebug($debug) { $this->debug_ = $debug; } @@ -207,8 +201,7 @@ public function setDebug($debug) * * @return string host */ - public function getHost() - { + public function getHost() { return $this->host_; } @@ -217,8 +210,7 @@ public function getHost() * * @return int port */ - public function getPort() - { + public function getPort() { return $this->port_; } @@ -227,16 +219,14 @@ public function getPort() * * @return bool true if the socket is open */ - public function isOpen() - { + public function isOpen() { return is_resource($this->handle_); } /** * Connects the socket. */ - public function open() - { + public function open() { if ($this->isOpen()) { throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN); } @@ -276,8 +266,7 @@ public function open() /** * Closes the socket. */ - public function close() - { + public function close() { if (!$this->persist_) { fclose($this->handle_); $this->handle_ = null; @@ -293,8 +282,7 @@ public function close() * @param int $len Maximum number of bytes to read. * @return string Binary data */ - public function read($len) - { + public function read($len) { if($len <= 0) { throw new TTransportException("TSocket: read len $len invalid"); } @@ -338,8 +326,7 @@ public function read($len) * * @param string $buf The data to write */ - public function write($buf) - { + public function write($buf) { $null = null; $write = array($this->handle_); @@ -377,8 +364,7 @@ public function write($buf) * If you wish to have flushable buffering behaviour, wrap this TSocket * in a TBufferedTransport. */ - public function flush() - { + public function flush() { // no-op } @@ -388,8 +374,7 @@ public function flush() * Improvement of stream_select. The tv_sec and tv_usec will be updated after the function * */ - public static function stream_select_ex(&$read,&$write,&$except,&$tv_sec,&$tv_usec) - { + public static function stream_select_ex(&$read,&$write,&$except,&$tv_sec,&$tv_usec) { $timeout = $tv_sec * 1000000 + $tv_usec; $begin = gettimeofday(); $ret = stream_select($read,$write,$except,$tv_sec,$tv_usec); @@ -403,8 +388,7 @@ public static function stream_select_ex(&$read,&$write,&$except,&$tv_sec,&$tv_us return $ret; } - private static function sockErr_() - { + private static function sockErr_() { $errno = socket_last_error(); return "errno:".$errno." msg:".socket_strerror($errno)." "; } From 588200508ac14951cc472ea6de34651ae7a202aa Mon Sep 17 00:00:00 2001 From: harry Date: Tue, 15 Dec 2015 19:04:11 +0800 Subject: [PATCH 3/6] modify code style --- lib/php/lib/Thrift/Transport/TSocket.php | 26 ++++++++++++------------ 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/php/lib/Thrift/Transport/TSocket.php b/lib/php/lib/Thrift/Transport/TSocket.php index 914c9cbcd25..1c31c2435c1 100644 --- a/lib/php/lib/Thrift/Transport/TSocket.php +++ b/lib/php/lib/Thrift/Transport/TSocket.php @@ -140,11 +140,11 @@ public function __construct($host='localhost', $port=9090, $persist=false, $debugHandler=null) { - $this->host_ = $host; - $this->port_ = $port; - $this->persist_ = $persist; - $this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log'; - } + $this->host_ = $host; + $this->port_ = $port; + $this->persist_ = $persist; + $this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log'; + } /** * @param resource $handle @@ -241,16 +241,16 @@ public function open() { if ($this->persist_) { $this->handle_ = pfsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } else { $this->handle_ = fsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } // Connect failed? From 644d7314204baad808220ffffc98ec054032ac48 Mon Sep 17 00:00:00 2001 From: harry Date: Tue, 15 Dec 2015 19:06:37 +0800 Subject: [PATCH 4/6] modify code style --- lib/php/lib/Thrift/Transport/TSocket.php | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/php/lib/Thrift/Transport/TSocket.php b/lib/php/lib/Thrift/Transport/TSocket.php index 1c31c2435c1..1269ba91c76 100644 --- a/lib/php/lib/Thrift/Transport/TSocket.php +++ b/lib/php/lib/Thrift/Transport/TSocket.php @@ -241,16 +241,16 @@ public function open() { if ($this->persist_) { $this->handle_ = pfsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } else { $this->handle_ = fsockopen($this->host_, - $this->port_, - $errno, - $errstr, - $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); + $this->port_, + $errno, + $errstr, + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } // Connect failed? From 565e2982635c03bb33139330e815598927bbc983 Mon Sep 17 00:00:00 2001 From: netbit Date: Thu, 22 Mar 2018 01:58:34 +0800 Subject: [PATCH 5/6] add conntimeout support accurate send timeout and write timeout Please enter the commit message for your changes. Lines starting --- lib/php/lib/Transport/TSocket.php | 387 +++++++++++++++++------------- 1 file changed, 217 insertions(+), 170 deletions(-) diff --git a/lib/php/lib/Transport/TSocket.php b/lib/php/lib/Transport/TSocket.php index 5147efa6301..fe58c0919d4 100644 --- a/lib/php/lib/Transport/TSocket.php +++ b/lib/php/lib/Transport/TSocket.php @@ -33,97 +33,115 @@ */ class TSocket extends TTransport { - /** - * Handle to PHP socket - * - * @var resource - */ - protected $handle_ = null; - - /** - * Remote hostname - * - * @var string - */ - protected $host_ = 'localhost'; - - /** - * Remote port - * - * @var int - */ - protected $port_ = '9090'; - - /** - * Send timeout in seconds. - * - * Combined with sendTimeoutUsec this is used for send timeouts. - * - * @var int - */ - protected $sendTimeoutSec_ = 0; - - /** - * Send timeout in microseconds. - * - * Combined with sendTimeoutSec this is used for send timeouts. - * - * @var int - */ - protected $sendTimeoutUsec_ = 100000; - - /** - * Recv timeout in seconds - * - * Combined with recvTimeoutUsec this is used for recv timeouts. - * - * @var int - */ - protected $recvTimeoutSec_ = 0; - - /** - * Recv timeout in microseconds - * - * Combined with recvTimeoutSec this is used for recv timeouts. - * - * @var int - */ - protected $recvTimeoutUsec_ = 750000; - - /** - * Persistent socket or plain? - * - * @var bool - */ - protected $persist_ = false; - - /** - * Debugging on? - * - * @var bool - */ - protected $debug_ = false; - - /** - * Debug handler - * - * @var mixed - */ - protected $debugHandler_ = null; - - /** - * Socket constructor - * - * @param string $host Remote hostname - * @param int $port Remote port - * @param bool $persist Whether to use a persistent socket - * @param string $debugHandler Function to call for error logging - */ + /** + * Handle to PHP socket + * + * @var resource + */ + private $handle_ = null; + + /** + * Remote hostname + * + * @var string + */ + protected $host_ = 'localhost'; + + /** + * Remote port + * + * @var int + */ + protected $port_ = '9090'; + + /** + * Connect timeout in seconds. + * + * Combined with connTimeoutUsec this is used for connect timeouts. + * + * @var int + */ + private $connTimeoutSec_ = 0; + + /** + * Connect timeout in microseconds. + * + * Combined with connTimeoutUsec this is used for connect timeouts. + * + * @var int + */ + private $connTimeoutUsec_ = 100000; + + /** + * Send timeout in seconds. + * + * Combined with sendTimeoutUsec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutSec_ = 0; + + /** + * Send timeout in microseconds. + * + * Combined with sendTimeoutSec this is used for send timeouts. + * + * @var int + */ + private $sendTimeoutUsec_ = 100000; + + /** + * Recv timeout in seconds + * + * Combined with recvTimeoutUsec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutSec_ = 0; + + /** + * Recv timeout in microseconds + * + * Combined with recvTimeoutSec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutUsec_ = 750000; + + /** + * Persistent socket or plain? + * + * @var bool + */ + protected $persist_ = false; + + /** + * Debugging on? + * + * @var bool + */ + protected $debug_ = false; + + /** + * Debug handler + * + * @var mixed + */ + protected $debugHandler_ = null; + + /** + * Socket constructor + * + * @param string $host Remote hostname + * @param int $port Remote port + * @param bool $persist Whether to use a persistent socket + * @param string $debugHandler Function to call for error logging + */ public function __construct( - $host = 'localhost', - $port = 9090, - $persist = false, - $debugHandler = null + $host='localhost', + $port=9090, + $persist=false, + $debugHandler=null ) { $this->host_ = $host; $this->port_ = $port; @@ -134,35 +152,45 @@ public function __construct( /** * @param resource $handle * @return void - */ + */ public function setHandle($handle) { $this->handle_ = $handle; - stream_set_blocking($this->handle_, false); + } + + /** + * Sets the conn timeout. + * + * @param int $timeout Timeout in milliseconds. + */ + public function setConnTimeout($timeout) + { + $this->connTimeoutSec_ = floor($timeout / 1000); + $this->connTimeoutUsec_ = + ($timeout - ($this->connTimeoutSec_ * 1000)) * 1000; } /** * Sets the send timeout. * - * @param int $timeout Timeout in milliseconds. + * @param int $timeout Timeout in milliseconds. */ public function setSendTimeout($timeout) { - $this->sendTimeoutSec_ = floor($timeout / 1000); - $this->sendTimeoutUsec_ = - ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; + $this->sendTimeoutSec_ = floor($timeout / 1000); + $this->sendTimeoutUsec_ = + ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; } /** * Sets the receive timeout. * - * @param int $timeout Timeout in milliseconds. + * @param int $timeout Timeout in milliseconds. */ - public function setRecvTimeout($timeout) - { - $this->recvTimeoutSec_ = floor($timeout / 1000); - $this->recvTimeoutUsec_ = - ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; + public function setRecvTimeout($timeout) { + $this->recvTimeoutSec_ = floor($timeout / 1000); + $this->recvTimeoutUsec_ = + ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; } /** @@ -170,7 +198,7 @@ public function setRecvTimeout($timeout) * * @param bool $debug */ - public function setDebug($debug) + public function setDebug($debug) { $this->debug_ = $debug; } @@ -202,58 +230,51 @@ public function getPort() */ public function isOpen() { - return is_resource($this->handle_); + return is_resource($this->handle_); } /** * Connects the socket. */ - public function open() + public function open() { if ($this->isOpen()) { - throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN); + throw new TTransportException('Socket already connected', + TTransportException::ALREADY_OPEN); } if (empty($this->host_)) { - throw new TTransportException('Cannot open null host', TTransportException::NOT_OPEN); + throw new TTransportException('Cannot open null host', + TTransportException::NOT_OPEN); } if ($this->port_ <= 0) { - throw new TTransportException('Cannot open without port', TTransportException::NOT_OPEN); + throw new TTransportException('Cannot open without port', + TTransportException::NOT_OPEN); } if ($this->persist_) { - $this->handle_ = @pfsockopen( - $this->host_, + $this->handle_ = pfsockopen($this->host_, $this->port_, $errno, $errstr, - $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000) - ); + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } else { - $this->handle_ = @fsockopen( - $this->host_, + $this->handle_ = fsockopen($this->host_, $this->port_, $errno, $errstr, - $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000) - ); + $this->connTimeoutSec_ + ($this->connTimeoutUsec_ / 1000000)); } // Connect failed? - if ($this->handle_ === false) { - $error = 'TSocket: Could not connect to ' . - $this->host_ . ':' . $this->port_ . ' (' . $errstr . ' [' . $errno . '])'; + if ($this->handle_ === FALSE) { + $error = 'TSocket: Could not connect to '.$this->host_.':'.$this->port_.' ('.$errstr.' ['.$errno.'])'; if ($this->debug_) { call_user_func($this->debugHandler_, $error); } throw new TException($error); } - - if (function_exists('socket_import_stream') && function_exists('socket_set_option')) { - $socket = socket_import_stream($this->handle_); - socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1); - } } /** @@ -261,8 +282,10 @@ public function open() */ public function close() { - @fclose($this->handle_); - $this->handle_ = null; + if (!$this->persist_) { + fclose($this->handle_); + $this->handle_ = null; + } } /** @@ -276,33 +299,42 @@ public function close() */ public function read($len) { + if($len <= 0) { + throw new TTransportException("TSocket: read len $len invalid"); + } $null = null; $read = array($this->handle_); - $readable = @stream_select( - $read, - $null, - $null, - $this->recvTimeoutSec_, - $this->recvTimeoutUsec_ - ); - - if ($readable > 0) { - $data = fread($this->handle_, $len); - if ($data === false) { - throw new TTransportException('TSocket: Could not read ' . $len . ' bytes from ' . - $this->host_ . ':' . $this->port_); - } elseif ($data == '' && feof($this->handle_)) { - throw new TTransportException('TSocket read 0 bytes'); + + $recvTimeoutSec_ = $this->recvTimeoutSec_; + $recvTimeoutUsec_ = $this->recvTimeoutUsec_; + $dataTotal = ""; + do { + $readable = self::stream_select_ex($read, $null, $null, $recvTimeoutSec_, $recvTimeoutUsec_); + + if ($readable > 0) { + $data = stream_socket_recvfrom($this->handle_, $len); + if ($data === false) { + throw new TTransportException('TSocket: Could not read '.$len.' bytes from '. + $this->host_.':'.$this->port_); + } elseif ($data == '' && feof($this->handle_)) { + throw new TTransportException("TSocket read 0 bytes,want $len"); + } + $dataTotal .= $data; + $len -= strlen($data); + } elseif ($readable === 0) { + throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. + $this->host_.':'.$this->port_); + } else { + throw new TTransportException('TSocket: Could not read '.$len.' bytes from '. + $this->host_.':'.$this->port_." ".self::sockErr_()); } + } while( $len > 0 && ($recvTimeoutSec_ != 0 || $recvTimeoutUsec_ !=0) ); - return $data; - } elseif ($readable === 0) { - throw new TTransportException('TSocket: timed out reading ' . $len . ' bytes from ' . - $this->host_ . ':' . $this->port_); - } else { - throw new TTransportException('TSocket: Could not read ' . $len . ' bytes from ' . - $this->host_ . ':' . $this->port_); + if($len > 0) { + throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. + $this->host_.':'.$this->port_.' '.self::sockErr()); } + return $dataTotal; } /** @@ -310,42 +342,30 @@ public function read($len) * * @param string $buf The data to write */ - public function write($buf) + public function write($buf) { $null = null; $write = array($this->handle_); + $sendTimeoutSec = $this->sendTimeoutSec_; + $sendTimeoutUsec = $this->sendTimeoutUsec_; // keep writing until all the data has been written while (TStringFuncFactory::create()->strlen($buf) > 0) { // wait for stream to become available for writing - $writable = @stream_select( - $null, - $write, - $null, - $this->sendTimeoutSec_, - $this->sendTimeoutUsec_ - ); + $writable = self::stream_select_ex($null,$write,$null,$sendTimeoutSec,$sendTimeoutUsec); if ($writable > 0) { // write buffer to stream - $written = fwrite($this->handle_, $buf); + $written = stream_socket_sendto($this->handle_, $buf); if ($written === -1 || $written === false) { - throw new TTransportException( - 'TSocket: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes ' . - $this->host_ . ':' . $this->port_ - ); + throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf). + ' bytes '.$this->host_.':'.$this->port_." ".self::sockErr_()); } // determine how much of the buffer is left to write $buf = TStringFuncFactory::create()->substr($buf, $written); } elseif ($writable === 0) { - throw new TTransportException( - 'TSocket: timed out writing ' . TStringFuncFactory::create()->strlen($buf) . ' bytes from ' . - $this->host_ . ':' . $this->port_ - ); + throw new TTransportException('TSocket: timed out writing '.TStringFuncFactory::create()->strlen($buf).' bytes from '.$this->host_.':'.$this->port_); } else { - throw new TTransportException( - 'TSocket: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes ' . - $this->host_ . ':' . $this->port_ - ); + throw new TTransportException('TSocket: Could not write '.TStringFuncFactory::create()->strlen($buf).' bytes '.$this->host_.':'.$this->port_); } } } @@ -359,8 +379,35 @@ public function write($buf) * If you wish to have flushable buffering behaviour, wrap this TSocket * in a TBufferedTransport. */ - public function flush() + public function flush() { // no-op } + + /** + * stream_select_ex + * + * Improvement of stream_select. The tv_sec and tv_usec will be updated after the function + * + */ + public static function stream_select_ex(&$read,&$write,&$except,&$tv_sec,&$tv_usec) + { + $timeout = $tv_sec * 1000000 + $tv_usec; + $begin = gettimeofday(); + $ret = stream_select($read,$write,$except,$tv_sec,$tv_usec); + $end = gettimeofday(); + + $dt = $end['usec'] + $end['sec'] * 1000000 - $begin['usec'] - $begin['sec'] * 1000000; + $rest_time = max($timeout - $dt,0); + $tv_sec = floor($rest_time / 1000000); + $tv_usec = $rest_time % 1000000; + + return $ret; + } + + private static function sockErr_() + { + $errno = socket_last_error(); + return "errno:".$errno." msg:".socket_strerror($errno)." "; + } } From 82faf6e23caf5477e7cdd8b419f9437598189121 Mon Sep 17 00:00:00 2001 From: netbit Date: Thu, 22 Mar 2018 02:10:46 +0800 Subject: [PATCH 6/6] modify codestyle --- lib/php/lib/Transport/TSocket.php | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/php/lib/Transport/TSocket.php b/lib/php/lib/Transport/TSocket.php index fe58c0919d4..be495544e2d 100644 --- a/lib/php/lib/Transport/TSocket.php +++ b/lib/php/lib/Transport/TSocket.php @@ -139,9 +139,9 @@ class TSocket extends TTransport */ public function __construct( $host='localhost', - $port=9090, - $persist=false, - $debugHandler=null + $port = 9090, + $persist = false, + $debugHandler = null ) { $this->host_ = $host; $this->port_ = $port; @@ -177,9 +177,9 @@ public function setConnTimeout($timeout) */ public function setSendTimeout($timeout) { - $this->sendTimeoutSec_ = floor($timeout / 1000); - $this->sendTimeoutUsec_ = - ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; + $this->sendTimeoutSec_ = floor($timeout / 1000); + $this->sendTimeoutUsec_ = + ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; } /** @@ -188,9 +188,9 @@ public function setSendTimeout($timeout) * @param int $timeout Timeout in milliseconds. */ public function setRecvTimeout($timeout) { - $this->recvTimeoutSec_ = floor($timeout / 1000); - $this->recvTimeoutUsec_ = - ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; + $this->recvTimeoutSec_ = floor($timeout / 1000); + $this->recvTimeoutUsec_ = + ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; } /** @@ -230,7 +230,7 @@ public function getPort() */ public function isOpen() { - return is_resource($this->handle_); + return is_resource($this->handle_); } /**