From 898992c11700adae1247d04607cc1a1a71fb7f55 Mon Sep 17 00:00:00 2001 From: Neil Crookes Date: Wed, 14 Apr 2010 11:08:18 +0100 Subject: [PATCH] Adding task grouping support with optional -group parameter in the call to the shell. This will restrict workers to tasks with group field matching the supplied . Group param added as 4th parameter to QueuedTask::createJob(). Also adding reference field to sql and passed into the 5th parameter to QueuedTask::createJob() - intended to uniquely identify a task without having to fetch and unserialize the contents of the data field --- config/sql/queue.php | 12 ++++++++++++ models/queued_task.php | 19 +++++++++++++++---- vendors/shells/queue.php | 6 +++++- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/config/sql/queue.php b/config/sql/queue.php index 07ac14c..4046e68 100644 --- a/config/sql/queue.php +++ b/config/sql/queue.php @@ -35,6 +35,18 @@ function after($event = array()) { 'null' => true, 'default' => NULL ), + 'group' => array( + 'type' => 'string', + 'length' => 255, + 'null' => true, + 'default' => NULL + ), + 'reference' => array( + 'type' => 'string', + 'length' => 255, + 'null' => true, + 'default' => NULL + ), 'created' => array( 'type' => 'datetime', 'null' => false diff --git a/models/queued_task.php b/models/queued_task.php index 0ae20ae..23363c2 100644 --- a/models/queued_task.php +++ b/models/queued_task.php @@ -18,13 +18,17 @@ class QueuedTask extends AppModel { * * @param string $jobName QueueTask name * @param array $data any array + * @param string $group Used to group similar QueuedTasks + * @param string $reference any array * @return bool success */ - public function createJob($jobName, $data, $notBefore = null) { + public function createJob($jobName, $data, $notBefore = null, $group = null, $reference = null) { $data = array( 'jobtype' => $jobName, - 'data' => serialize($data) + 'data' => serialize($data), + 'group' => $group, + 'reference' => $reference, ); if ($notBefore != null) { $data['notbefore'] = date('Y-m-d H:i:s', strtotime($notBefore)); @@ -33,12 +37,14 @@ public function createJob($jobName, $data, $notBefore = null) { } /** - * Look for a new job that can be processed with the current abilities. + * Look for a new job that can be processed with the current abilities and + * from the specified group (or any if null). * * @param array $capabilities Available QueueWorkerTasks. + * @param string $group Request a job from this group, (from any group if null) * @return Array Taskdata. */ - public function requestJob($capabilities) { + public function requestJob($capabilities, $group = null) { $idlist = array(); $wasFetched = array(); @@ -58,6 +64,11 @@ public function requestJob($capabilities) { ), 'limit' => 3 ); + + if (!is_null($group)) { + $findConf['conditions']['group'] = $group; + } + // generate the task specific conditions. foreach ($capabilities as $task) { $tmp = array( diff --git a/vendors/shells/queue.php b/vendors/shells/queue.php index 7cc446d..3f0d0b9 100644 --- a/vendors/shells/queue.php +++ b/vendors/shells/queue.php @@ -116,9 +116,13 @@ public function add() { public function runworker() { $exit = false; $starttime = time(); + $group = null; + if (isset($this->params['group']) && !empty($this->params['group'])) { + $group = $this->params['group']; + } while (!$exit) { $this->out('Looking for Job....'); - $data = $this->QueuedTask->requestJob($this->getTaskConf()); + $data = $this->QueuedTask->requestJob($this->getTaskConf(), $group); if ($data != false) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'queue_' . strtolower($data['jobtype']);