From b34efbd13f7fb2cca1197c2d4314482e8a2ff991 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Fri, 31 Jan 2014 13:02:58 -0800 Subject: [PATCH] Support multiplexing multiple requests over persistent socket and making asynchronous requests. --- src/Adoy/FastCGI/Client.php | 265 ++++++++++++++++++++++++++++++++++-- 1 file changed, 250 insertions(+), 15 deletions(-) diff --git a/src/Adoy/FastCGI/Client.php b/src/Adoy/FastCGI/Client.php index d307efb..0a7d8a5 100644 --- a/src/Adoy/FastCGI/Client.php +++ b/src/Adoy/FastCGI/Client.php @@ -9,6 +9,8 @@ */ namespace Adoy\FastCGI; +class TimedOutException extends \Exception {} + /** * Handles communication with a FastCGI application * @@ -47,6 +49,11 @@ class Client const HEADER_LEN = 8; + const REQ_STATE_WRITTEN = 1; + const REQ_STATE_OK = 2; + const REQ_STATE_ERR = 3; + const REQ_STATE_TIMED_OUT = 4; + /** * Socket * @var Resource @@ -71,6 +78,38 @@ class Client */ private $_keepAlive = false; + /** + * Outstanding request statuses keyed by request id + * + * Each request is an array with following form: + * + * array( + * 'state' => REQ_STATE_* + * 'response' => null | string + * ) + * + * @var array + */ + private $_requests = array(); + + /** + * Use persistent sockets to connect to backend + * @var Boolean + */ + private $_persistentSocket = false; + + /** + * Connect timeout in milliseconds + * @var Integer + */ + private $_connectTimeout = 5000; + + /** + * Read/Write timeout in milliseconds + * @var Integer + */ + private $_readWriteTimeout = 5000; + /** * Constructor * @@ -107,15 +146,105 @@ public function getKeepAlive() return $this->_keepAlive; } + /** + * Define whether or not PHP should attempt to re-use sockets opened by previous + * request for efficiency + * + * @param Boolean $b true if persistent socket should be used, false otherwise + */ + public function setPersistentSocket($b) + { + $was_persistent = ($this->_sock && $this->_persistentSocket); + $this->_persistentSocket = (boolean)$b; + if (!$this->_persistentSocket && $was_persistent) { + fclose($this->_sock); + } + } + + /** + * Get the pesistent socket status + * + * @return Boolean true if the socket should be persistent, false otherwise + */ + public function getPersistentSocket() + { + return $this->_persistentSocket; + } + + + /** + * Set the connect timeout + * + * @param Integer number of milliseconds before connect will timeout + */ + public function setConnectTimeout($timeoutMs) + { + $this->_connectTimeout = $timeoutMs; + } + + /** + * Get the connect timeout + * + * @return Integer number of milliseconds before connect will timeout + */ + public function getConnectTimeout() + { + return $this->_connectTimeout; + } + + /** + * Set the read/write timeout + * + * @param Integer number of milliseconds before read or write call will timeout + */ + public function setReadWriteTimeout($timeoutMs) + { + $this->_readWriteTimeout = $timeoutMs; + $this->set_ms_timeout($this->_readWriteTimeout); + } + + /** + * Get the read timeout + * + * @return Integer number of milliseconds before read will timeout + */ + public function getReadWriteTimeout() + { + return $this->_readWriteTimeout; + } + + /** + * Helper to avoid duplicating milliseconds to secs/usecs in a few places + * + * @param Integer millisecond timeout + * @return Boolean + */ + private function set_ms_timeout($timeoutMs) { + if (!$this->_sock) { + return false; + } + return stream_set_timeout($this->_sock, floor($timeoutMs / 1000), ($timeoutMs % 1000) * 1000); + } + + /** * Create a connection to the FastCGI application */ private function connect() { if (!$this->_sock) { - $this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, 5); + if ($this->_persistentSocket) { + $this->_sock = pfsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000); + } else { + $this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000); + } + if (!$this->_sock) { - throw new \Exception('Unable to connect to FastCGI application'); + throw new \Exception('Unable to connect to FastCGI application: ' . $errstr); + } + + if (!$this->set_ms_timeout($this->_readWriteTimeout)) { + throw new \Exception('Unable to set timeout on socket'); } } } @@ -245,7 +374,7 @@ private function readPacket() } } if ($resp['paddingLength']) { - $buf=fread($this->_sock, $resp['paddingLength']); + $buf = fread($this->_sock, $resp['paddingLength']); } return $resp; } else { @@ -286,38 +415,144 @@ public function getValues(array $requestedInfo) */ public function request(array $params, $stdin) { - $response = ''; + $id = $this->async_request($params, $stdin); + return $this->wait_for_response($id); + } + + /** + * Execute a request to the FastCGI application asyncronously + * + * This sends request to application and returns the assigned ID for that request. + * + * You should keep this id for later use with wait_for_response(). Ids are chosen randomly + * rather than seqentially to guard against false-positives when using persistent sockets. + * In that case it is possible that a delayed response to a request made by a previous script + * invocation comes back on this socket and is mistaken for response to request made with same ID + * during this request. + * + * @param array $params Array of parameters + * @param String $stdin Content + * @return Integer + */ + public function async_request(array $params, $stdin) + { $this->connect(); - $request = $this->buildPacket(self::BEGIN_REQUEST, chr(0) . chr(self::RESPONDER) . chr((int) $this->_keepAlive) . str_repeat(chr(0), 5)); + // Pick random number between 1 and max 16 bit unsigned int 65535 + $id = mt_rand(1, (1 << 16) - 1); + + // Using persistent sockets implies you want them keept alive by server! + $keepAlive = intval($this->_keepAlive || $this->_persistentSocket); + + $request = $this->buildPacket(self::BEGIN_REQUEST + ,chr(0) . chr(self::RESPONDER) . chr($keepAlive) . str_repeat(chr(0), 5) + ,$id + ); $paramsRequest = ''; foreach ($params as $key => $value) { - $paramsRequest .= $this->buildNvpair($key, $value); + $paramsRequest .= $this->buildNvpair($key, $value, $id); } if ($paramsRequest) { - $request .= $this->buildPacket(self::PARAMS, $paramsRequest); + $request .= $this->buildPacket(self::PARAMS, $paramsRequest, $id); } - $request .= $this->buildPacket(self::PARAMS, ''); + $request .= $this->buildPacket(self::PARAMS, '', $id); if ($stdin) { - $request .= $this->buildPacket(self::STDIN, $stdin); + $request .= $this->buildPacket(self::STDIN, $stdin, $id); + } + $request .= $this->buildPacket(self::STDIN, '', $id); + + if (fwrite($this->_sock, $request) === false || fflush($this->_sock) === false) { + + $info = stream_get_meta_data($this->_sock); + + if ($info['timed_out']) { + throw new TimedOutException('Write timed out'); + } + + // Broken pipe, tear down so future requests might succeed + fclose($this->_sock); + throw new \Exception('Failed to write request to socket'); + } + + $this->_requests[$id] = array( + 'state' => self::REQ_STATE_WRITTEN, + 'response' => null + ); + + return $id; + } + + /** + * Blocking call that waits for response to specific request + * + * @param Integer $requestId + * @param Integer $timeoutMs [optional] the number of milliseconds to wait. Defaults to the ReadWriteTimeout value set. + * @return string response body + */ + public function wait_for_response($requestId, $timeoutMs = 0) { + + if (!isset($this->_requests[$requestId])) { + throw new \Exception('Invalid request id given'); + } + + // If we already read the response during an earlier call for different id, just return it + if ($this->_requests[$requestId]['state'] == self::REQ_STATE_OK + || $this->_requests[$requestId]['state'] == self::REQ_STATE_ERR + ) { + return $this->_requests[$requestId]['response']; + } + + if ($timeoutMs > 0) { + // Reset timeout on socket for now + $this->set_ms_timeout($timeoutMs); + } else { + $timeoutMs = $this->_readWriteTimeout; } - $request .= $this->buildPacket(self::STDIN, ''); - fwrite($this->_sock, $request); + // Need to manually check since we might do several reads none of which timeout themselves + // but still not get the response requested + $startTime = microtime(true); do { $resp = $this->readPacket(); + if ($resp['type'] == self::STDOUT || $resp['type'] == self::STDERR) { - $response .= $resp['content']; + if ($resp['type'] == self::STDERR) { + $this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_ERR; + } + $this->_requests[$resp['requestId']]['response'] .= $resp['content']; + } + if ($resp['type'] == self::END_REQUEST) { + $this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_OK; + if ($resp['requestId'] == $requestId) { + break; + } + } + if (microtime(true) - $startTime >= ($timeoutMs * 1000)) { + // Reset + $this->set_ms_timeout($this->_readWriteTimeout); + throw new \Exception('Timed out'); } - } while ($resp && $resp['type'] != self::END_REQUEST); + } while ($resp); if (!is_array($resp)) { - throw new \Exception('Bad request'); + $info = stream_get_meta_data($this->_sock); + + // We must reset timeout but it must be AFTER we get info + $this->set_ms_timeout($this->_readWriteTimeout); + + if ($info['timed_out']) { + throw new TimedOutException('Read timed out'); + } + + throw new \Exception('Read failed'); } + // Reset timeout + $this->set_ms_timeout($this->_readWriteTimeout); + switch (ord($resp['content']{4})) { case self::CANT_MPX_CONN: throw new \Exception('This app can\'t multiplex [CANT_MPX_CONN]'); @@ -329,7 +564,7 @@ public function request(array $params, $stdin) throw new \Exception('Role value not known [UNKNOWN_ROLE]'); break; case self::REQUEST_COMPLETE: - return $response; + return $this->_requests[$requestId]['response']; } } }