Skip to content
This repository has been archived by the owner on May 27, 2019. It is now read-only.

Commit

Permalink
* The queued_task model can now trigger a queueworker exit.
Browse files Browse the repository at this point in the history
* queued_task model will trigger exit on Datasource error.
closes #6
  • Loading branch information
MSeven committed Apr 21, 2010
1 parent ebc3e97 commit 2049ec6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .gitignore
@@ -1,2 +1,2 @@

.svn
*.bak
6 changes: 6 additions & 0 deletions models/queued_task.php
Expand Up @@ -13,6 +13,8 @@ class QueuedTask extends AppModel {

public $rateHistory = array();

public $exit = false;

/**
* Add a new Job to the Queue
*
Expand All @@ -32,6 +34,10 @@ public function createJob($jobName, $data, $notBefore = null) {
return ($this->save($this->create($data)));
}

public function onError() {
$this->exit = true;
}

/**
* Look for a new job that can be processed with the current abilities.
*
Expand Down
49 changes: 27 additions & 22 deletions vendors/shells/queue.php
Expand Up @@ -116,35 +116,40 @@ public function add() {
public function runworker() {
$exit = false;
$starttime = time();

while (!$exit) {
$this->out('Looking for Job....');
$data = $this->QueuedTask->requestJob($this->getTaskConf());
if ($data != false) {
$this->out('Running Job of type "' . $data['jobtype'] . '"');
$taskname = 'queue_' . strtolower($data['jobtype']);
$return = $this->{$taskname}->run(unserialize($data['data']));
if ($return == true) {
$this->QueuedTask->markJobDone($data['id']);
$this->out('Job Finished.');
if ($this->QueuedTask->exit === true) {
$exit = true;
} else {
if ($data !== false) {
$this->out('Running Job of type "' . $data['jobtype'] . '"');
$taskname = 'queue_' . strtolower($data['jobtype']);
$return = $this->{$taskname}->run(unserialize($data['data']));
if ($return == true) {
$this->QueuedTask->markJobDone($data['id']);
$this->out('Job Finished.');
} else {
$this->QueuedTask->markJobFailed($data['id']);
$this->out('Job did not finish, requeued.');
}
} else {
$this->QueuedTask->markJobFailed($data['id']);
$this->out('Job did not finish, requeued.');
$this->out('nothing to do, sleeping.');
sleep(Configure::read('queue.sleeptime'));
}
} else {
$this->out('nothing to do, sleeping.');
sleep(Configure::read('queue.sleeptime'));
}

// check if we are over the maximum runtime and end processing if so.
if (Configure::read('queue.workermaxruntime') != 0 && (time() - $starttime) >= Configure::read('queue.workermaxruntime')) {
$exit = true;
$this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('queue.workermaxruntime') . '), terminating.');
}
if ($exit || rand(0, 100) > (100 - Configure::read('queue.gcprop'))) {
$this->out('Performing Old job cleanup.');
$this->QueuedTask->cleanOldJobs();
// check if we are over the maximum runtime and end processing if so.
if (Configure::read('queue.workermaxruntime') != 0 && (time() - $starttime) >= Configure::read('queue.workermaxruntime')) {
$exit = true;
$this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('queue.workermaxruntime') . '), terminating.');
}
if ($exit || rand(0, 100) > (100 - Configure::read('queue.gcprop'))) {
$this->out('Performing Old job cleanup.');
$this->QueuedTask->cleanOldJobs();
}
$this->hr();
}
$this->hr();
}
}

Expand Down

0 comments on commit 2049ec6

Please sign in to comment.