diff --git a/Net/Gearman/Client.php b/Net/Gearman/Client.php index 4a1585b8..651c7155 100644 --- a/Net/Gearman/Client.php +++ b/Net/Gearman/Client.php @@ -138,6 +138,15 @@ public function __call($func, array $args = array()) protected function submitTask(Net_Gearman_Task $task) { switch ($task->type) { + case Net_Gearman_Task::JOB_LOW: + $type = 'submit_job_low'; + break; + case Net_Gearman_Task::JOB_LOW_BACKGROUND: + $type = 'submit_job_low_bg'; + break; + case Net_Gearman_Task::JOB_HIGH_BACKGROUND: + $type = 'submit_job_high_bg'; + break; case Net_Gearman_Task::JOB_BACKGROUND: $type = 'submit_job_bg'; break; @@ -177,23 +186,56 @@ protected function submitTask(Net_Gearman_Task $task) * Run a set of tasks * * @param object $set A set of tasks to run + * @param int $timeout Time in seconds for the socket timeout. Max is 10 seconds * * @return void * @see Net_Gearman_Set, Net_Gearman_Task */ - public function runSet(Net_Gearman_Set $set) + public function runSet(Net_Gearman_Set $set, $timeout = null) { $totalTasks = $set->tasksCount; $taskKeys = array_keys($set->tasks); $t = 0; + if ($timeout !== null){ + $socket_timeout = min(10, (int)$timeout); + } else { + $socket_timeout = 10; + } + while (!$set->finished()) { + + if ($timeout !== null) { + + if (empty($start)) { + + $start = microtime(true); + + } else { + + $now = microtime(true); + + if ($now - $start >= $timeout) { + break; + } + } + + } + + if ($t < $totalTasks) { $k = $taskKeys[$t]; $this->submitTask($set->tasks[$k]); - if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND) { + if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND || + $set->tasks[$k]->type == Net_Gearman_Task::JOB_HIGH_BACKGROUND || + $set->tasks[$k]->type == Net_Gearman_Task::JOB_LOW_BACKGROUND) { + $set->tasks[$k]->finished = true; $set->tasksCount--; + } else { + if($set->tasks[$k]->timeout !== null) { + $started = microtime(true); + } } $t++; @@ -202,7 +244,7 @@ public function runSet(Net_Gearman_Set $set) $write = null; $except = null; $read = $this->conn; - socket_select($read, $write, $except, 10); + socket_select($read, $write, $except, $socket_timeout); foreach ($read as $socket) { $resp = Net_Gearman_Connection::read($socket); if (count($resp)) { diff --git a/Net/Gearman/Connection.php b/Net/Gearman/Connection.php index 1361c441..396eaf26 100644 --- a/Net/Gearman/Connection.php +++ b/Net/Gearman/Connection.php @@ -57,6 +57,9 @@ class Net_Gearman_Connection 'submit_job' => array(7, array('func', 'uniq', 'arg')), 'submit_job_high' => array(21, array('func', 'uniq', 'arg')), 'submit_job_bg' => array(18, array('func', 'uniq', 'arg')), + 'submit_job_high_bg' => array(32, array('func', 'uniq', 'arg')), + 'submit_job_low' => array(33, array('func', 'uniq', 'arg')), + 'submit_job_low_bg' => array(34, array('func', 'uniq', 'arg')), 'job_created' => array(8, array('handle')), 'grab_job' => array(9, array()), 'no_job' => array(10, array()), @@ -148,16 +151,20 @@ static public function connect($host, $timeout = 2000) $start = microtime(true); do { $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); - @socket_connect($socket, $host, $port); - $errorCode = socket_last_error($socket); - + $socket_connected = @socket_connect($socket, $host, $port); + if($socket_connected){ socket_set_nonblock($socket); socket_set_option($socket, SOL_TCP, 1, 1); $timeLeft = ((microtime(true) - $start) * 1000); - } while (!is_resource($socket) && $timeLeft < $timeout); + } + } while (!$socket_connected && $timeLeft < $timeout); - if ($errorCode == 111) { - throw new Net_Gearman_Exception("Can't connect to server"); + if (!$socket_connected) { + $errno = socket_last_error($socket); + $errstr = socket_strerror($errno); + throw new Net_Gearman_Exception( + "Can't connect to server ($errno: $errstr)" + ); } self::$waiting[(int)$socket] = array(); @@ -225,8 +232,10 @@ static public function send($socket, $command, array $params = array()) } while ($written < $cmdLength); if ($error === true) { + $errno = socket_last_error($socket); + $errstr = socket_strerror($errno); throw new Net_Gearman_Exception( - 'Could not write command to socket' + "Could not write command to socket ($errno: $errstr)" ); } } diff --git a/Net/Gearman/Task.php b/Net/Gearman/Task.php index 95f8d4d1..5f687b26 100644 --- a/Net/Gearman/Task.php +++ b/Net/Gearman/Task.php @@ -59,6 +59,9 @@ class Net_Gearman_Task * @see Net_Gearman_Task::JOB_NORMAL * @see Net_Gearman_Task::JOB_BACKGROUND * @see Net_Gearman_Task::JOB_HIGH + * @see Net_Gearman_Task::JOB_HIGH_BACKGROUND + * @see Net_Gearman_Task::JOB_LOW + * @see Net_Gearman_Task::JOB_LOW_BACKGROUND */ public $type = self::JOB_NORMAL; @@ -144,7 +147,28 @@ class Net_Gearman_Task * * @var integer JOB_HIGH */ - const JOB_HIGH = 2; + const JOB_HIGH = 3; + + /** + * High priority, background job + * + * @var integer JOB_HIGH + */ + const JOB_HIGH_BACKGROUND = 4; + + /** + * LOW priority job + * + * @var integer JOB_LOW + */ + const JOB_LOW = 5; + + /** + * Low priority, background job + * + * @var integer JOB_LOW_BACKGROUND + */ + const JOB_LOW_BACKGROUND = 6; /** * Callback of type complete @@ -184,7 +208,7 @@ class Net_Gearman_Task * Constructor * * @param string $func Name of job to run - * @param array $arg List of arguments for job + * @param mixed $arg List of arguments for job * @param string $uniq The unique id of the job * @param integer $type Type of job to run task as *