Skip to content

Commit

Permalink
Added high priority background, low priority and low priority backgro…
Browse files Browse the repository at this point in the history
…und tasks support

Adjusted connection handling and exception messages
  • Loading branch information
Brian Moon committed Dec 9, 2009
1 parent 756b08a commit 990519a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 12 deletions.
48 changes: 45 additions & 3 deletions Net/Gearman/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ public function __call($func, array $args = array())
protected function submitTask(Net_Gearman_Task $task)
{
switch ($task->type) {
case Net_Gearman_Task::JOB_LOW:
$type = 'submit_job_low';
break;
case Net_Gearman_Task::JOB_LOW_BACKGROUND:
$type = 'submit_job_low_bg';
break;
case Net_Gearman_Task::JOB_HIGH_BACKGROUND:
$type = 'submit_job_high_bg';
break;
case Net_Gearman_Task::JOB_BACKGROUND:
$type = 'submit_job_bg';
break;
Expand Down Expand Up @@ -177,23 +186,56 @@ protected function submitTask(Net_Gearman_Task $task)
* Run a set of tasks
*
* @param object $set A set of tasks to run
* @param int $timeout Time in seconds for the socket timeout. Max is 10 seconds
*
* @return void
* @see Net_Gearman_Set, Net_Gearman_Task
*/
public function runSet(Net_Gearman_Set $set)
public function runSet(Net_Gearman_Set $set, $timeout = null)
{
$totalTasks = $set->tasksCount;
$taskKeys = array_keys($set->tasks);
$t = 0;

if ($timeout !== null){
$socket_timeout = min(10, (int)$timeout);
} else {
$socket_timeout = 10;
}

while (!$set->finished()) {

if ($timeout !== null) {

if (empty($start)) {

$start = microtime(true);

} else {

$now = microtime(true);

if ($now - $start >= $timeout) {
break;
}
}

}


if ($t < $totalTasks) {
$k = $taskKeys[$t];
$this->submitTask($set->tasks[$k]);
if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND) {
if ($set->tasks[$k]->type == Net_Gearman_Task::JOB_BACKGROUND ||
$set->tasks[$k]->type == Net_Gearman_Task::JOB_HIGH_BACKGROUND ||
$set->tasks[$k]->type == Net_Gearman_Task::JOB_LOW_BACKGROUND) {

$set->tasks[$k]->finished = true;
$set->tasksCount--;
} else {
if($set->tasks[$k]->timeout !== null) {
$started = microtime(true);
}
}

$t++;
Expand All @@ -202,7 +244,7 @@ public function runSet(Net_Gearman_Set $set)
$write = null;
$except = null;
$read = $this->conn;
socket_select($read, $write, $except, 10);
socket_select($read, $write, $except, $socket_timeout);
foreach ($read as $socket) {
$resp = Net_Gearman_Connection::read($socket);
if (count($resp)) {
Expand Down
23 changes: 16 additions & 7 deletions Net/Gearman/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Net_Gearman_Connection
'submit_job' => array(7, array('func', 'uniq', 'arg')),
'submit_job_high' => array(21, array('func', 'uniq', 'arg')),
'submit_job_bg' => array(18, array('func', 'uniq', 'arg')),
'submit_job_high_bg' => array(32, array('func', 'uniq', 'arg')),
'submit_job_low' => array(33, array('func', 'uniq', 'arg')),
'submit_job_low_bg' => array(34, array('func', 'uniq', 'arg')),
'job_created' => array(8, array('handle')),
'grab_job' => array(9, array()),
'no_job' => array(10, array()),
Expand Down Expand Up @@ -148,16 +151,20 @@ static public function connect($host, $timeout = 2000)
$start = microtime(true);
do {
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
@socket_connect($socket, $host, $port);
$errorCode = socket_last_error($socket);

$socket_connected = @socket_connect($socket, $host, $port);
if($socket_connected){
socket_set_nonblock($socket);
socket_set_option($socket, SOL_TCP, 1, 1);
$timeLeft = ((microtime(true) - $start) * 1000);
} while (!is_resource($socket) && $timeLeft < $timeout);
}
} while (!$socket_connected && $timeLeft < $timeout);

if ($errorCode == 111) {
throw new Net_Gearman_Exception("Can't connect to server");
if (!$socket_connected) {
$errno = socket_last_error($socket);
$errstr = socket_strerror($errno);
throw new Net_Gearman_Exception(
"Can't connect to server ($errno: $errstr)"
);
}

self::$waiting[(int)$socket] = array();
Expand Down Expand Up @@ -225,8 +232,10 @@ static public function send($socket, $command, array $params = array())
} while ($written < $cmdLength);

if ($error === true) {
$errno = socket_last_error($socket);
$errstr = socket_strerror($errno);
throw new Net_Gearman_Exception(
'Could not write command to socket'
"Could not write command to socket ($errno: $errstr)"
);
}
}
Expand Down
28 changes: 26 additions & 2 deletions Net/Gearman/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class Net_Gearman_Task
* @see Net_Gearman_Task::JOB_NORMAL
* @see Net_Gearman_Task::JOB_BACKGROUND
* @see Net_Gearman_Task::JOB_HIGH
* @see Net_Gearman_Task::JOB_HIGH_BACKGROUND
* @see Net_Gearman_Task::JOB_LOW
* @see Net_Gearman_Task::JOB_LOW_BACKGROUND
*/
public $type = self::JOB_NORMAL;

Expand Down Expand Up @@ -144,7 +147,28 @@ class Net_Gearman_Task
*
* @var integer JOB_HIGH
*/
const JOB_HIGH = 2;
const JOB_HIGH = 3;

/**
* High priority, background job
*
* @var integer JOB_HIGH
*/
const JOB_HIGH_BACKGROUND = 4;

/**
* LOW priority job
*
* @var integer JOB_LOW
*/
const JOB_LOW = 5;

/**
* Low priority, background job
*
* @var integer JOB_LOW_BACKGROUND
*/
const JOB_LOW_BACKGROUND = 6;

/**
* Callback of type complete
Expand Down Expand Up @@ -184,7 +208,7 @@ class Net_Gearman_Task
* Constructor
*
* @param string $func Name of job to run
* @param array $arg List of arguments for job
* @param mixed $arg List of arguments for job
* @param string $uniq The unique id of the job
* @param integer $type Type of job to run task as
*
Expand Down

0 comments on commit 990519a

Please sign in to comment.