Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 250 additions & 15 deletions src/Adoy/FastCGI/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
namespace Adoy\FastCGI;

class TimedOutException extends \Exception {}

/**
* Handles communication with a FastCGI application
*
Expand Down Expand Up @@ -47,6 +49,11 @@ class Client

const HEADER_LEN = 8;

const REQ_STATE_WRITTEN = 1;
const REQ_STATE_OK = 2;
const REQ_STATE_ERR = 3;
const REQ_STATE_TIMED_OUT = 4;

/**
* Socket
* @var Resource
Expand All @@ -71,6 +78,38 @@ class Client
*/
private $_keepAlive = false;

/**
* Outstanding request statuses keyed by request id
*
* Each request is an array with following form:
*
* array(
* 'state' => REQ_STATE_*
* 'response' => null | string
* )
*
* @var array
*/
private $_requests = array();

/**
* Use persistent sockets to connect to backend
* @var Boolean
*/
private $_persistentSocket = false;

/**
* Connect timeout in milliseconds
* @var Integer
*/
private $_connectTimeout = 5000;

/**
* Read/Write timeout in milliseconds
* @var Integer
*/
private $_readWriteTimeout = 5000;

/**
* Constructor
*
Expand Down Expand Up @@ -107,15 +146,105 @@ public function getKeepAlive()
return $this->_keepAlive;
}

/**
* Define whether or not PHP should attempt to re-use sockets opened by previous
* request for efficiency
*
* @param Boolean $b true if persistent socket should be used, false otherwise
*/
public function setPersistentSocket($b)
{
$was_persistent = ($this->_sock && $this->_persistentSocket);
$this->_persistentSocket = (boolean)$b;
if (!$this->_persistentSocket && $was_persistent) {
fclose($this->_sock);
}
}

/**
* Get the pesistent socket status
*
* @return Boolean true if the socket should be persistent, false otherwise
*/
public function getPersistentSocket()
{
return $this->_persistentSocket;
}


/**
* Set the connect timeout
*
* @param Integer number of milliseconds before connect will timeout
*/
public function setConnectTimeout($timeoutMs)
{
$this->_connectTimeout = $timeoutMs;
}

/**
* Get the connect timeout
*
* @return Integer number of milliseconds before connect will timeout
*/
public function getConnectTimeout()
{
return $this->_connectTimeout;
}

/**
* Set the read/write timeout
*
* @param Integer number of milliseconds before read or write call will timeout
*/
public function setReadWriteTimeout($timeoutMs)
{
$this->_readWriteTimeout = $timeoutMs;
$this->set_ms_timeout($this->_readWriteTimeout);
}

/**
* Get the read timeout
*
* @return Integer number of milliseconds before read will timeout
*/
public function getReadWriteTimeout()
{
return $this->_readWriteTimeout;
}

/**
* Helper to avoid duplicating milliseconds to secs/usecs in a few places
*
* @param Integer millisecond timeout
* @return Boolean
*/
private function set_ms_timeout($timeoutMs) {
if (!$this->_sock) {
return false;
}
return stream_set_timeout($this->_sock, floor($timeoutMs / 1000), ($timeoutMs % 1000) * 1000);
}


/**
* Create a connection to the FastCGI application
*/
private function connect()
{
if (!$this->_sock) {
$this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, 5);
if ($this->_persistentSocket) {
$this->_sock = pfsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000);
} else {
$this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000);
}

if (!$this->_sock) {
throw new \Exception('Unable to connect to FastCGI application');
throw new \Exception('Unable to connect to FastCGI application: ' . $errstr);
}

if (!$this->set_ms_timeout($this->_readWriteTimeout)) {
throw new \Exception('Unable to set timeout on socket');
}
}
}
Expand Down Expand Up @@ -245,7 +374,7 @@ private function readPacket()
}
}
if ($resp['paddingLength']) {
$buf=fread($this->_sock, $resp['paddingLength']);
$buf = fread($this->_sock, $resp['paddingLength']);
}
return $resp;
} else {
Expand Down Expand Up @@ -286,38 +415,144 @@ public function getValues(array $requestedInfo)
*/
public function request(array $params, $stdin)
{
$response = '';
$id = $this->async_request($params, $stdin);
return $this->wait_for_response($id);
}

/**
* Execute a request to the FastCGI application asyncronously
*
* This sends request to application and returns the assigned ID for that request.
*
* You should keep this id for later use with wait_for_response(). Ids are chosen randomly
* rather than seqentially to guard against false-positives when using persistent sockets.
* In that case it is possible that a delayed response to a request made by a previous script
* invocation comes back on this socket and is mistaken for response to request made with same ID
* during this request.
*
* @param array $params Array of parameters
* @param String $stdin Content
* @return Integer
*/
public function async_request(array $params, $stdin)
{
$this->connect();

$request = $this->buildPacket(self::BEGIN_REQUEST, chr(0) . chr(self::RESPONDER) . chr((int) $this->_keepAlive) . str_repeat(chr(0), 5));
// Pick random number between 1 and max 16 bit unsigned int 65535
$id = mt_rand(1, (1 << 16) - 1);

// Using persistent sockets implies you want them keept alive by server!
$keepAlive = intval($this->_keepAlive || $this->_persistentSocket);

$request = $this->buildPacket(self::BEGIN_REQUEST
,chr(0) . chr(self::RESPONDER) . chr($keepAlive) . str_repeat(chr(0), 5)
,$id
);

$paramsRequest = '';
foreach ($params as $key => $value) {
$paramsRequest .= $this->buildNvpair($key, $value);
$paramsRequest .= $this->buildNvpair($key, $value, $id);
}
if ($paramsRequest) {
$request .= $this->buildPacket(self::PARAMS, $paramsRequest);
$request .= $this->buildPacket(self::PARAMS, $paramsRequest, $id);
}
$request .= $this->buildPacket(self::PARAMS, '');
$request .= $this->buildPacket(self::PARAMS, '', $id);

if ($stdin) {
$request .= $this->buildPacket(self::STDIN, $stdin);
$request .= $this->buildPacket(self::STDIN, $stdin, $id);
}
$request .= $this->buildPacket(self::STDIN, '', $id);

if (fwrite($this->_sock, $request) === false || fflush($this->_sock) === false) {

$info = stream_get_meta_data($this->_sock);

if ($info['timed_out']) {
throw new TimedOutException('Write timed out');
}

// Broken pipe, tear down so future requests might succeed
fclose($this->_sock);
throw new \Exception('Failed to write request to socket');
}

$this->_requests[$id] = array(
'state' => self::REQ_STATE_WRITTEN,
'response' => null
);

return $id;
}

/**
* Blocking call that waits for response to specific request
*
* @param Integer $requestId
* @param Integer $timeoutMs [optional] the number of milliseconds to wait. Defaults to the ReadWriteTimeout value set.
* @return string response body
*/
public function wait_for_response($requestId, $timeoutMs = 0) {

if (!isset($this->_requests[$requestId])) {
throw new \Exception('Invalid request id given');
}

// If we already read the response during an earlier call for different id, just return it
if ($this->_requests[$requestId]['state'] == self::REQ_STATE_OK
|| $this->_requests[$requestId]['state'] == self::REQ_STATE_ERR
) {
return $this->_requests[$requestId]['response'];
}

if ($timeoutMs > 0) {
// Reset timeout on socket for now
$this->set_ms_timeout($timeoutMs);
} else {
$timeoutMs = $this->_readWriteTimeout;
}
$request .= $this->buildPacket(self::STDIN, '');

fwrite($this->_sock, $request);
// Need to manually check since we might do several reads none of which timeout themselves
// but still not get the response requested
$startTime = microtime(true);

do {
$resp = $this->readPacket();

if ($resp['type'] == self::STDOUT || $resp['type'] == self::STDERR) {
$response .= $resp['content'];
if ($resp['type'] == self::STDERR) {
$this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_ERR;
}
$this->_requests[$resp['requestId']]['response'] .= $resp['content'];
}
if ($resp['type'] == self::END_REQUEST) {
$this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_OK;
if ($resp['requestId'] == $requestId) {
break;
}
}
if (microtime(true) - $startTime >= ($timeoutMs * 1000)) {
// Reset
$this->set_ms_timeout($this->_readWriteTimeout);
throw new \Exception('Timed out');
}
} while ($resp && $resp['type'] != self::END_REQUEST);
} while ($resp);

if (!is_array($resp)) {
throw new \Exception('Bad request');
$info = stream_get_meta_data($this->_sock);

// We must reset timeout but it must be AFTER we get info
$this->set_ms_timeout($this->_readWriteTimeout);

if ($info['timed_out']) {
throw new TimedOutException('Read timed out');
}

throw new \Exception('Read failed');
}

// Reset timeout
$this->set_ms_timeout($this->_readWriteTimeout);

switch (ord($resp['content']{4})) {
case self::CANT_MPX_CONN:
throw new \Exception('This app can\'t multiplex [CANT_MPX_CONN]');
Expand All @@ -329,7 +564,7 @@ public function request(array $params, $stdin)
throw new \Exception('Role value not known [UNKNOWN_ROLE]');
break;
case self::REQUEST_COMPLETE:
return $response;
return $this->_requests[$requestId]['response'];
}
}
}