Skip to content

Commit

Permalink
Remove deprecations for new major.
Browse files Browse the repository at this point in the history
  • Loading branch information
dereuromark committed Apr 11, 2019
1 parent 6dbc801 commit 1c4f914
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 206 deletions.
3 changes: 0 additions & 3 deletions config/app_queue.php
Expand Up @@ -41,9 +41,6 @@
// seconds of running time after which the PHP process will terminate, null uses workermaxruntime * 100
'workertimeout' => null,

// false for DB, or deprecated string pid file path directory (by default goes to the app/tmp/queue folder)
'pidfilepath' => false, // Deprecated: TMP . 'queue' . DS,

// determine whether logging is enabled
'log' => true,

Expand Down
4 changes: 0 additions & 4 deletions docs/README.md
Expand Up @@ -112,10 +112,6 @@ return [
You can also drop the configuration into an existing config file (recommended) that is already been loaded.
The values above are the default settings which apply, when no configuration is found.

Finally, make sure you allow the configured `pidfilepath` to be creatable and writable.
Especially on deployment some `mkdir` command might be necessary.
Set it to false to use the DB here instead, as well.

#### Backend configuration

- isSearchEnabled: Set to false if you do not want search/filtering capability.
Expand Down
20 changes: 3 additions & 17 deletions src/Controller/Admin/QueueController.php
Expand Up @@ -3,7 +3,6 @@
namespace Queue\Controller\Admin;

use App\Controller\AppController;
use Cake\Core\Configure;
use Cake\Http\Exception\NotFoundException;
use Queue\Queue\TaskFinder;

Expand All @@ -18,15 +17,6 @@ class QueueController extends AppController {
*/
public $modelClass = 'Queue.QueuedJobs';

/**
* @return void
*/
public function initialize() {
parent::initialize();

$this->QueuedJobs->initConfig();
}

/**
* Admin center.
* Manage queues from admin backend (without the need to open ssh console window).
Expand Down Expand Up @@ -134,14 +124,10 @@ public function processes() {
return $this->redirect(['action' => 'processes']);
}

$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
$this->loadModel('Queue.QueueProcesses');
$terminated = $this->QueueProcesses->find()->where(['terminate' => true])->all()->toArray();
$this->set(compact('terminated'));
}
$this->loadModel('Queue.QueueProcesses');
$terminated = $this->QueueProcesses->find()->where(['terminate' => true])->all()->toArray();

$this->set(compact('processes'));
$this->set(compact('terminated', 'processes'));
$this->helpers[] = 'Shim.Configure';
}

Expand Down
2 changes: 0 additions & 2 deletions src/Controller/Admin/QueuedJobsController.php
Expand Up @@ -31,8 +31,6 @@ class QueuedJobsController extends AppController {
public function initialize() {
parent::initialize();

$this->QueuedJobs->initConfig();

$this->loadComponent('RequestHandler');

if (Configure::read('Queue.isSearchEnabled') === false || !Plugin::isLoaded('Search')) {
Expand Down
53 changes: 14 additions & 39 deletions src/Model/Table/QueueProcessesTable.php
Expand Up @@ -177,7 +177,7 @@ public function cleanEndedProcesses() {
}

/**
* If pid loggin is enabled, will return an array with
* If pid logging is enabled, will return an array with
* - time: Timestamp as FrozenTime object
* - workers: int Count of currently running workers
*
Expand All @@ -187,51 +187,26 @@ public function status() {
$timeout = (int)Configure::readOrFail('Queue.defaultworkertimeout');
$thresholdTime = (new FrozenTime())->subSeconds($timeout);

$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
$results = $this->find()
->where(['modified >' => $thresholdTime])
->orderDesc('modified')
->enableHydration(false)
->all()
->toArray();

if (!$results) {
return [];
}

$count = count($results);
$record = array_shift($results);
/** @var \Cake\I18n\FrozenTime $time */
$time = $record['modified'];

return [
'time' => $time,
'workers' => $count,
];
}
$results = $this->find()
->where(['modified >' => $thresholdTime])
->orderDesc('modified')
->enableHydration(false)
->all()
->toArray();

// Deprecated: Will be removed, use DB here
$file = $pidFilePath . 'queue.pid';
if (!file_exists($file)) {
if (!$results) {
return [];
}

$count = 0;
foreach (glob($pidFilePath . 'queue_*.pid') as $filename) {
$time = filemtime($filename);
if ($time >= $thresholdTime->timestamp) {
$count++;
}
}

$time = filemtime($file);
$count = count($results);
$record = array_shift($results);
/** @var \Cake\I18n\FrozenTime $time */
$time = $record['modified'];

$res = [
'time' => $time ? new FrozenTime($time) : null,
return [
'time' => $time,
'workers' => $count,
];
return $res;
}

/**
Expand Down
99 changes: 13 additions & 86 deletions src/Model/Table/QueuedJobsTable.php
Expand Up @@ -13,9 +13,6 @@
use Cake\ORM\TableRegistry;
use InvalidArgumentException;
use Queue\Model\Entity\QueuedJob;
use RecursiveDirectoryIterator;
use RecursiveIteratorIterator;
use RegexIterator;
use RuntimeException;

// PHP 7.1+ has this defined
Expand Down Expand Up @@ -98,8 +95,6 @@ public function initialize(array $config) {
'WorkerProcesses.workerkey = QueuedJobs.workerkey',
],
]);

$this->initConfig();
}

/**
Expand Down Expand Up @@ -143,28 +138,6 @@ public function searchManager() {
return $searchManager;
}

/**
* @deprecated Manually assert config via bootstrapping.
*
* @return void
*/
public function initConfig() {
// Local config without extra config file
$conf = (array)Configure::read('Queue');
if (!empty($conf['configLoaded'])) {
return;
}

// Fallback to Plugin config which can be overwritten via local app config.
Configure::load('Queue.app_queue');
$defaultConf = (array)Configure::read('Queue');

$conf += $defaultConf;
$conf['configLoaded'] = true;

Configure::write('Queue', $conf);
}

/**
* Adds a new job to the queue.
*
Expand Down Expand Up @@ -628,28 +601,6 @@ public function cleanOldJobs() {
$this->deleteAll([
'completed <' => time() - (int)Configure::read('Queue.cleanuptimeout'),
]);
$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
return;
}

// Deprecated: Will be removed, use DB here
// Remove all old pid files left over
$timeout = time() - 2 * (int)Configure::read('Queue.cleanuptimeout');
$Iterator = new RegexIterator(
new RecursiveIteratorIterator(new RecursiveDirectoryIterator($pidFilePath)),
'/^.+\_.+\.(pid)$/i',
RegexIterator::MATCH
);
foreach ($Iterator as $file) {
if ($file->isFile()) {
$file = $file->getPathname();
$lastModified = filemtime($file);
if ($timeout > $lastModified) {
unlink($file);
}
}
}
}

/**
Expand Down Expand Up @@ -792,32 +743,19 @@ public function truncate() {
* @return array
*/
public function getProcesses($forThisServer = false) {
$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
/** @var \Queue\Model\Table\QueueProcessesTable $QueueProcesses */
$QueueProcesses = TableRegistry::getTableLocator()->get('Queue.QueueProcesses');
$query = $QueueProcesses->findActive()
->where(['terminate' => false]);
if ($forThisServer) {
$query = $query->where(['server' => $QueueProcesses->buildServerString()]);
}

$processes = $query
->enableHydration(false)
->find('list', ['keyField' => 'pid', 'valueField' => 'modified'])
->all()
->toArray();

return $processes;
/** @var \Queue\Model\Table\QueueProcessesTable $QueueProcesses */
$QueueProcesses = TableRegistry::getTableLocator()->get('Queue.QueueProcesses');
$query = $QueueProcesses->findActive()
->where(['terminate' => false]);
if ($forThisServer) {
$query = $query->where(['server' => $QueueProcesses->buildServerString()]);
}

// Deprecated: Will be removed, use DB here
$processes = [];
foreach (glob($pidFilePath . 'queue_*.pid') as $filename) {
$time = filemtime($filename);
preg_match('/\bqueue_([0-9a-z]+)\.pid$/', $filename, $matches);
$processes[$matches[1]] = $time;
}
$processes = $query
->enableHydration(false)
->find('list', ['keyField' => 'pid', 'valueField' => 'modified'])
->all()
->toArray();

return $processes;
}
Expand Down Expand Up @@ -853,8 +791,6 @@ public function terminateProcess($pid, $sig = SIGTERM) {
return;
}

$pidFilePath = Configure::read('Queue.pidfilepath');

$killed = false;
if (function_exists('posix_kill')) {
$killed = posix_kill($pid, $sig);
Expand All @@ -864,17 +800,8 @@ public function terminateProcess($pid, $sig = SIGTERM) {
}
sleep(1);

if (!$pidFilePath) {
$QueueProcesses = TableRegistry::get('Queue.QueueProcesses');
$QueueProcesses->deleteAll(['pid' => $pid]);
return;
}

// Deprecated: Will be removed, use DB here
$file = $pidFilePath . 'queue_' . $pid . '.pid';
if (file_exists($file)) {
unlink($file);
}
$QueueProcesses = TableRegistry::get('Queue.QueueProcesses');
$QueueProcesses->deleteAll(['pid' => $pid]);
}

/**
Expand Down
55 changes: 5 additions & 50 deletions src/Shell/QueueShell.php
Expand Up @@ -68,7 +68,6 @@ public function initialize() {

parent::initialize();

$this->QueuedJobs->initConfig();
$this->loadModel('Queue.QueueProcesses');
}

Expand Down Expand Up @@ -272,7 +271,7 @@ protected function runJob(QueuedJob $queuedJob, $pid) {
$this->QueuedJobs->markJobFailed($queuedJob, $failureMessage);
$failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->_getTaskConf());
$this->_log('job ' . $queuedJob->job_type . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
$this->err('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->failed . '.');
$this->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->failed . '.');
return;
}

Expand Down Expand Up @@ -702,31 +701,9 @@ protected function _displayAvailableTasks() {
* @return string
*/
protected function _initPid() {
$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
$pid = $this->_retrievePid();
$key = $this->QueuedJobs->key();
$this->QueueProcesses->add($pid, $key);

$this->_pid = $pid;

return $pid;
}

// Deprecated: Will be removed, use DB here
if (!file_exists($pidFilePath)) {
mkdir($pidFilePath, 0755, true);
}
$pid = $this->_retrievePid();
# global file
$fp = fopen($pidFilePath . 'queue.pid', 'w');
fwrite($fp, $pid);
fclose($fp);
# specific pid file
$pidFileName = 'queue_' . $pid . '.pid';
$fp = fopen($pidFilePath . $pidFileName, 'w');
fwrite($fp, $pid);
fclose($fp);
$key = $this->QueuedJobs->key();
$this->QueueProcesses->add($pid, $key);

$this->_pid = $pid;

Expand All @@ -752,20 +729,7 @@ protected function _retrievePid() {
* @return void
*/
protected function _updatePid($pid) {
$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
$this->QueueProcesses->update($pid);
return;
}

// Deprecated: Will be removed, use DB here
$pidFileName = 'queue_' . $pid . '.pid';
if (!empty($pidFilePath)) {
touch($pidFilePath . 'queue.pid');
}
if (!empty($pidFileName)) {
touch($pidFilePath . $pidFileName);
}
$this->QueueProcesses->update($pid);
}

/**
Expand Down Expand Up @@ -795,16 +759,7 @@ protected function _deletePid($pid) {
return;
}

$pidFilePath = Configure::read('Queue.pidfilepath');
if (!$pidFilePath) {
$this->QueueProcesses->remove($pid);
return;
}

// Deprecated: Will be removed, use DB here
if (file_exists($pidFilePath . 'queue_' . $pid . '.pid')) {
unlink($pidFilePath . 'queue_' . $pid . '.pid');
}
$this->QueueProcesses->remove($pid);
}

/**
Expand Down
5 changes: 0 additions & 5 deletions src/Shell/Task/QueueEmailTask.php
Expand Up @@ -119,11 +119,6 @@ public function run(array $data, $jobId) {
$message = $data['content'];
}
if (!empty($data['vars'])) {
// @deprecated BC only, use $data['content'] instead.
if ($message === null && isset($data['vars']['content'])) {
$message = $data['vars']['content'];
}

$this->Email->setViewVars($data['vars']);
}
if (!empty($data['headers'])) {
Expand Down

0 comments on commit 1c4f914

Please sign in to comment.