Skip to content

Commit

Permalink
Fix CS
Browse files Browse the repository at this point in the history
  • Loading branch information
dereuromark committed May 26, 2020
1 parent 1a84434 commit 59a667d
Show file tree
Hide file tree
Showing 18 changed files with 71 additions and 36 deletions.
1 change: 1 addition & 0 deletions config/bootstrap.php
@@ -1,4 +1,5 @@
<?php

use Cake\Core\Configure;
use Queue\Generator\Task\QueuedJobTask;

Expand Down
8 changes: 2 additions & 6 deletions src/Controller/Admin/QueueController.php
Expand Up @@ -58,10 +58,8 @@ public function index() {

/**
* @param string|null $job
*
* @return \Cake\Http\Response
*
* @throws \Cake\Http\Exception\NotFoundException
* @return \Cake\Http\Response
*/
public function addJob($job = null) {
$this->request->allowMethod('post');
Expand All @@ -87,10 +85,8 @@ public function addJob($job = null) {

/**
* @param int|null $id
*
* @return \Cake\Http\Response
*
* @throws \Cake\Http\Exception\NotFoundException
* @return \Cake\Http\Response
*/
public function resetJob($id = null) {
$this->request->allowMethod('post');
Expand Down
3 changes: 3 additions & 0 deletions src/Controller/Admin/QueueProcessesController.php
Expand Up @@ -71,6 +71,7 @@ public function edit($id = null) {
$queueProcess = $this->QueueProcesses->patchEntity($queueProcess, $this->request->getData());
if ($this->QueueProcesses->save($queueProcess)) {
$this->Flash->success(__d('queue', 'The queue process has been saved.'));

return $this->redirect(['action' => 'index']);
}

Expand All @@ -95,6 +96,7 @@ public function terminate($id = null) {
} catch (Exception $exception) {
$this->Flash->error(__d('queue', 'The queue process could not be deleted. Please, try again.'));
}

return $this->redirect(['action' => 'index']);
}

Expand All @@ -116,6 +118,7 @@ public function delete($id = null) {
} else {
$this->Flash->error(__d('queue', 'The queue process could not be deleted. Please, try again.'));
}

return $this->redirect(['action' => 'index']);
}

Expand Down
10 changes: 8 additions & 2 deletions src/Controller/Admin/QueuedJobsController.php
Expand Up @@ -83,8 +83,8 @@ public function index() {
* Index method
*
* @param string|null $jobType
* @return void
* @throws \Cake\Http\Exception\NotFoundException
* @return void
*/
public function stats($jobType = null) {
if (!Configure::read('Queue.isStatisticEnabled')) {
Expand Down Expand Up @@ -159,12 +159,14 @@ public function import() {
$queuedJob = $this->QueuedJobs->newEntity($data);
if ($queuedJob->getErrors()) {
$this->Flash->error('Validation failed: ' . print_r($queuedJob->getErrors(), true));

return $this->redirect($this->referer(['action' => 'index']));
}

$this->QueuedJobs->saveOrFail($queuedJob);

$this->Flash->success('Imported');

return $this->redirect(['action' => 'view', $queuedJob->id]);
}

Expand All @@ -184,13 +186,15 @@ public function edit($id = null) {
]);
if ($queuedJob->completed) {
$this->Flash->error(__d('queue', 'The queued job is already completed.'));

return $this->redirect(['action' => 'view', $id]);
}

if ($this->request->is(['patch', 'post', 'put'])) {
$queuedJob = $this->QueuedJobs->patchEntity($queuedJob, $this->request->getData());
if ($this->QueuedJobs->save($queuedJob)) {
$this->Flash->success(__d('queue', 'The queued job has been saved.'));

return $this->redirect(['action' => 'view', $id]);
}

Expand Down Expand Up @@ -222,12 +226,13 @@ public function delete($id = null) {
} else {
$this->Flash->error(__d('queue', 'The queued job could not be deleted. Please try again.'));
}

return $this->redirect(['action' => 'index']);
}

/**
* @return \Cake\Http\Response|null|void
* @throws \Cake\Http\Exception\NotFoundException
* @return \Cake\Http\Response|null|void
*/
public function execute() {
if (!Configure::read('debug')) {
Expand All @@ -238,6 +243,7 @@ public function execute() {
$data = (array)$this->request->getData();
if (empty($data['command'])) {
$this->Flash->error('Command is required');

return null;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Model/Table/QueueProcessesTable.php
Expand Up @@ -39,7 +39,7 @@ public static function defaultConnectionName(): string {
$connection = Configure::read('Queue.connection');
if (!empty($connection)) {
return $connection;
};
}

return parent::defaultConnectionName();
}
Expand Down Expand Up @@ -140,8 +140,8 @@ public function add($pid, $key) {

/**
* @param string $pid
* @return void
* @throws \Queue\Model\ProcessEndingException
* @return void
*/
public function update($pid) {
$conditions = [
Expand Down
32 changes: 23 additions & 9 deletions src/Model/Table/QueuedJobsTable.php
Expand Up @@ -44,11 +44,11 @@
*/
class QueuedJobsTable extends Table {

const DRIVER_MYSQL = 'Mysql';
const DRIVER_POSTGRES = 'Postgres';
const DRIVER_SQLSERVER = 'Sqlserver';
public const DRIVER_MYSQL = 'Mysql';
public const DRIVER_POSTGRES = 'Postgres';
public const DRIVER_SQLSERVER = 'Sqlserver';

const STATS_LIMIT = 100000;
public const STATS_LIMIT = 100000;

/**
* @var array
Expand All @@ -69,7 +69,7 @@ public static function defaultConnectionName(): string {
$connection = Configure::read('Queue.connection');
if (!empty($connection)) {
return $connection;
};
}

return parent::defaultConnectionName();
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public function validationDefault(Validator $validator): Validator {
* @param array $config Config to save along with the job
* @return \Queue\Model\Entity\QueuedJob Saved job entity
*/
public function createJob($jobType, array $data = null, array $config = []) {
public function createJob($jobType, ?array $data = null, array $config = []) {
$queuedJob = [
'job_type' => $jobType,
'data' => is_array($data) ? serialize($data) : null,
Expand All @@ -187,9 +187,8 @@ public function createJob($jobType, array $data = null, array $config = []) {
* @param string $reference
* @param string|null $jobType
*
* @return bool
*
* @throws \InvalidArgumentException
* @return bool
*/
public function isQueued($reference, $jobType = null) {
if (!$reference) {
Expand Down Expand Up @@ -243,6 +242,7 @@ public function getTypes() {
'keyField' => 'job_type',
'valueField' => 'job_type',
];

return $this->find('list', $findCond);
}

Expand All @@ -264,11 +264,13 @@ public function getStats() {
$alltime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', created)");
$runtime = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");
$fetchdelay = $query->func()->avg("DATEDIFF(s, '1970-01-01 00:00:00', fetched) - (CASE WHEN notbefore IS NULL THEN DATEDIFF(s, '1970-01-01 00:00:00', created) ELSE DATEDIFF(s, '1970-01-01 00:00:00', notbefore) END)");

break;
case static::DRIVER_POSTGRES:
$alltime = $query->func()->avg('EXTRACT(EPOCH FROM completed) - EXTRACT(EPOCH FROM created)');
$runtime = $query->func()->avg('EXTRACT(EPOCH FROM completed) - EXTRACT(EPOCH FROM fetched)');
$fetchdelay = $query->func()->avg('EXTRACT(EPOCH FROM fetched) - CASE WHEN notbefore IS NULL then EXTRACT(EPOCH FROM created) ELSE EXTRACT(EPOCH FROM notbefore) END');

break;
}

Expand All @@ -287,6 +289,7 @@ public function getStats() {
'job_type',
],
];

return $this->find('all', $options);
}

Expand All @@ -308,9 +311,11 @@ public function getFullStats($jobType = null) {
switch ($driverName) {
case static::DRIVER_SQLSERVER:
$runtime = $query->newExpr("DATEDIFF(s, '1970-01-01 00:00:00', completed) - DATEDIFF(s, '1970-01-01 00:00:00', fetched)");

break;
case static::DRIVER_POSTGRES:
$runtime = $query->newExpr('EXTRACT(EPOCH FROM completed) - EXTRACT(EPOCH FROM fetched)');

break;
}

Expand Down Expand Up @@ -389,10 +394,12 @@ public function requestJob(array $capabilities, array $groups = [], array $types
switch ($driverName) {
case static::DRIVER_SQLSERVER:
$age = $query->newExpr()->add('ISNULL(DATEDIFF(SECOND, GETDATE(), notbefore), 0)');

break;
case static::DRIVER_POSTGRES:
$age = $query->newExpr()
->add('COALESCE(EXTRACT(EPOCH FROM notbefore) - (EXTRACT(EPOCH FROM now())), 0)');

break;
}
$options = [
Expand Down Expand Up @@ -444,6 +451,7 @@ public function requestJob(array $capabilities, array $groups = [], array $types
foreach ($runningJobs as $runningJob) {
if (isset($uniqueConstraints[$runningJob->job_type])) {
$types[] = '-' . $runningJob->job_type;

continue;
}

Expand Down Expand Up @@ -472,7 +480,7 @@ public function requestJob(array $capabilities, array $groups = [], array $types

// Generate the task specific conditions.
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
[$plugin, $name] = pluginSplit($task['name']);
$timeoutAt = $now->copy();
$tmp = [
'job_type' => $name,
Expand All @@ -496,12 +504,15 @@ public function requestJob(array $capabilities, array $groups = [], array $types
switch ($driverName) {
case static::DRIVER_POSTGRES:
$tmp['EXTRACT(EPOCH FROM NOW()) >='] = $this->rateHistory[$tmp['job_type']] + $task['rate'];

break;
case static::DRIVER_MYSQL:
$tmp['UNIX_TIMESTAMP() >='] = $this->rateHistory[$tmp['job_type']] + $task['rate'];

break;
case static::DRIVER_SQLSERVER:
$tmp["DATEDIFF(s, '1970-01-01 00:00:00', GETDATE()) >="] = $this->rateHistory[$tmp['job_type']] + $task['rate'];

break;
}
}
Expand Down Expand Up @@ -673,6 +684,7 @@ public function getPendingStats() {
'completed IS' => null,
],
];

return $this->find('all', $findCond);
}

Expand Down Expand Up @@ -755,6 +767,7 @@ protected function _findProgress($state, $query = [], $results = []) {
$query['conditions'][]['job_group'] = $query['conditions']['job_group'];
unset($query['conditions']['job_group']);
}

return $query;
}
// state === after
Expand All @@ -770,6 +783,7 @@ protected function _findProgress($state, $query = [], $results = []) {
$results[$k]['failure_message'] = $result['failure_message'];
}
}

return $results;
}

Expand Down
1 change: 1 addition & 0 deletions src/Queue/TaskFinder.php
Expand Up @@ -71,6 +71,7 @@ protected function getPluginPaths(Folder $Folder, $plugin) {
$name = basename($r, 'Task.php');
if (in_array($name, $this->tasks)) {
unset($res[$key]);

continue;
}
$res[$key] = $plugin . '.' . $name;
Expand Down
5 changes: 5 additions & 0 deletions src/Shell/QueueShell.php
Expand Up @@ -104,6 +104,7 @@ public function _getDescription() {
Available Tasks:
$tasks
TEXT;

return $text;
}

Expand Down Expand Up @@ -147,6 +148,7 @@ protected function _taskName($task) {
if (strpos($task, 'Queue') === 0) {
return substr($task, 5);
}

return $task;
}

Expand Down Expand Up @@ -196,10 +198,12 @@ public function runworker() {
} catch (RecordNotFoundException $exception) {
// Manually killed
$this->_exit = true;

continue;
} catch (ProcessEndingException $exception) {
// Soft killed, e.g. during deploy update
$this->_exit = true;

continue;
}

Expand Down Expand Up @@ -281,6 +285,7 @@ protected function runJob(QueuedJob $queuedJob, $pid) {
$failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->_getTaskConf());
$this->_log('job ' . $queuedJob->job_type . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
$this->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->failed . '.');

return;
}

Expand Down
10 changes: 6 additions & 4 deletions src/Shell/Task/QueueEmailTask.php
Expand Up @@ -6,6 +6,7 @@
use Cake\Core\Configure;
use Cake\Log\Log;
use Cake\Mailer\Mailer;
use Cake\ORM\Locator\LocatorInterface;
use Queue\Model\QueueException;
use Throwable;

Expand Down Expand Up @@ -39,9 +40,10 @@ class QueueEmailTask extends QueueTask implements AddInterface {

/**
* @param \Cake\Console\ConsoleIo|null $io IO
* @param \Cake\ORM\Locator\LocatorInterface|null $locator
*/
public function __construct(ConsoleIo $io = null) {
parent::__construct($io);
public function __construct(?ConsoleIo $io = null, ?LocatorInterface $locator = null) {
parent::__construct($io, $locator);

$adminEmail = Configure::read('Config.adminEmail');
if ($adminEmail) {
Expand Down Expand Up @@ -176,8 +178,8 @@ public function run(array $data, int $jobId): void {
/**
* Check if Mail class exists and create instance
*
* @return \Cake\Mailer\Mailer
* @throws \Queue\Model\QueueException
* @return \Cake\Mailer\Mailer
*/
protected function _getMailer() {
$class = Configure::read('Queue.mailerClass');
Expand All @@ -188,7 +190,7 @@ protected function _getMailer() {
}
}
if (!class_exists($class)) {
throw new QueueException(sprintf('Configured mailer class `%s` in `%s` not found.', $class, get_class($this)));
throw new QueueException(sprintf('Configured mailer class `%s` in `%s` not found.', $class, static::class));
}

return new $class();
Expand Down
2 changes: 1 addition & 1 deletion src/Shell/Task/QueueExceptionExampleTask.php
Expand Up @@ -50,8 +50,8 @@ public function add() {
*
* @param array $data The array passed to QueuedJobsTable::createJob()
* @param int $jobId The id of the QueuedJob entity
* @return void
* @throws \Queue\Model\QueueException
* @return void
*/
public function run(array $data, int $jobId): void {
$this->hr();
Expand Down
2 changes: 1 addition & 1 deletion src/Shell/Task/QueueExecuteTask.php
Expand Up @@ -51,8 +51,8 @@ public function add() {
*
* @param array $data The array passed to QueuedJobsTable::createJob()
* @param int $jobId The id of the QueuedJob entity
* @return void
* @throws \Queue\Model\QueueException
* @return void
*/
public function run(array $data, int $jobId): void {
$data += [
Expand Down

0 comments on commit 59a667d

Please sign in to comment.