Skip to content
This repository has been archived by the owner on May 27, 2019. It is now read-only.

Commit

Permalink
Adding task grouping support with optional -group <group string> para…
Browse files Browse the repository at this point in the history
…meter in the call to the shell. This will restrict workers to tasks with group field matching the supplied <group string>. Group param added as 4th parameter to QueuedTask::createJob(). Also adding reference field to sql and passed into the 5th parameter to QueuedTask::createJob() - intended to uniquely identify a task without having to fetch and unserialize the contents of the data field
  • Loading branch information
Neil Crookes committed Apr 14, 2010
1 parent ebc3e97 commit 898992c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
12 changes: 12 additions & 0 deletions config/sql/queue.php
Expand Up @@ -35,6 +35,18 @@ function after($event = array()) {
'null' => true,
'default' => NULL
),
'group' => array(
'type' => 'string',
'length' => 255,
'null' => true,
'default' => NULL
),
'reference' => array(
'type' => 'string',
'length' => 255,
'null' => true,
'default' => NULL
),
'created' => array(
'type' => 'datetime',
'null' => false
Expand Down
19 changes: 15 additions & 4 deletions models/queued_task.php
Expand Up @@ -18,13 +18,17 @@ class QueuedTask extends AppModel {
*
* @param string $jobName QueueTask name
* @param array $data any array
* @param string $group Used to group similar QueuedTasks
* @param string $reference any array
* @return bool success
*/
public function createJob($jobName, $data, $notBefore = null) {
public function createJob($jobName, $data, $notBefore = null, $group = null, $reference = null) {

$data = array(
'jobtype' => $jobName,
'data' => serialize($data)
'data' => serialize($data),
'group' => $group,
'reference' => $reference,
);
if ($notBefore != null) {
$data['notbefore'] = date('Y-m-d H:i:s', strtotime($notBefore));
Expand All @@ -33,12 +37,14 @@ public function createJob($jobName, $data, $notBefore = null) {
}

/**
* Look for a new job that can be processed with the current abilities.
* Look for a new job that can be processed with the current abilities and
* from the specified group (or any if null).
*
* @param array $capabilities Available QueueWorkerTasks.
* @param string $group Request a job from this group, (from any group if null)
* @return Array Taskdata.
*/
public function requestJob($capabilities) {
public function requestJob($capabilities, $group = null) {
$idlist = array();
$wasFetched = array();

Expand All @@ -58,6 +64,11 @@ public function requestJob($capabilities) {
),
'limit' => 3
);

if (!is_null($group)) {
$findConf['conditions']['group'] = $group;
}

// generate the task specific conditions.
foreach ($capabilities as $task) {
$tmp = array(
Expand Down
6 changes: 5 additions & 1 deletion vendors/shells/queue.php
Expand Up @@ -116,9 +116,13 @@ public function add() {
public function runworker() {
$exit = false;
$starttime = time();
$group = null;
if (isset($this->params['group']) && !empty($this->params['group'])) {
$group = $this->params['group'];
}
while (!$exit) {
$this->out('Looking for Job....');
$data = $this->QueuedTask->requestJob($this->getTaskConf());
$data = $this->QueuedTask->requestJob($this->getTaskConf(), $group);
if ($data != false) {
$this->out('Running Job of type "' . $data['jobtype'] . '"');
$taskname = 'queue_' . strtolower($data['jobtype']);
Expand Down

0 comments on commit 898992c

Please sign in to comment.