diff --git a/module/Core/config/module.config.php b/module/Core/config/module.config.php index e8a876d89..6242ebad0 100644 --- a/module/Core/config/module.config.php +++ b/module/Core/config/module.config.php @@ -33,6 +33,7 @@ 'worker_strategies' => [ 'default' => [ Queue\Strategy\IdleSleepStrategy::class => ['duration' => 1], + Queue\Strategy\JobResultStrategy::class, ], 'queues' => [ 'default' => [ diff --git a/module/Core/src/Queue/Job/ExceptionJobResult.php b/module/Core/src/Queue/Job/ExceptionJobResult.php new file mode 100644 index 000000000..4d38a8c42 --- /dev/null +++ b/module/Core/src/Queue/Job/ExceptionJobResult.php @@ -0,0 +1,29 @@ + + */ + +/** */ +namespace Core\Queue\Job; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +class ExceptionJobResult extends JobResult +{ + public function __construct(\Exception $e) + { + parent::__construct([ + 'message' => $e->getMessage(), + 'extra' => $e->getTrace(), + ]); + } + +} diff --git a/module/Core/src/Queue/Job/JobResult.php b/module/Core/src/Queue/Job/JobResult.php new file mode 100644 index 000000000..f9aa21974 --- /dev/null +++ b/module/Core/src/Queue/Job/JobResult.php @@ -0,0 +1,193 @@ + + */ + +declare(strict_types=1); + +/** */ +namespace Core\Queue\Job; + +use SlmQueue\Worker\Event\ProcessJobEvent; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +class JobResult +{ + /** + * + * + * @var int + */ + protected $result; + + /** + * + * + * @var string + */ + protected $reason; + + /** + * + * + * @var array + */ + protected $extra; + + /** + * + * + * @var string|int|\DateInterval + */ + protected $delay; + + /** + * + * + * @var string|int|\DateTime + */ + protected $scheduled; + + public static function success(?string $reason = null, ?array $extra = null) : self + { + return (new static(ProcessJobEvent::JOB_STATUS_SUCCESS)) + ->withReason($reason) + ->withExtra($extra) + ; + } + + public static function failure(string $reason, ?array $extra = null) : self + { + return (new static(ProcessJobEvent::JOB_STATUS_FAILURE)) + ->withReason($reason) + ->withExtra($extra) + ; + } + + public static function recoverable(string $reason, array $options = []) : self + { + $result = (new static(ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE)) + ->withReason($reason); + + foreach ($options as $key => $value) { + $callback = [$result, "with$key"]; + if (is_callable($callback)) { + $callback($value); + } + } + + return $result; + } + + public function __construct(int $result) + { + $this->result = $result; + } + + public function getResult() : int + { + return $this->result; + } + + public function isSuccess() : bool + { + return ProcessJobEvent::JOB_STATUS_SUCCESS == $this->result; + } + + public function isFailure() : bool + { + return ProcessJobEvent::JOB_STATUS_FAILURE == $this->result; + } + + public function isRecoverable() : bool + { + return ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE == $this->result; + } + + /** + * @return string + */ + public function getReason(): ?string + { + return $this->reason; + } + + /** + * @param string $message + * + * @return self + */ + public function withReason($reason) : self + { + $this->reason = $reason; + + return $this; + } + + /** + * @return array + */ + public function getExtra(): ?array + { + return $this->extra; + } + + /** + * @param array $extra + * + * @return self + */ + public function withExtra(array $extra) : self + { + $this->extra = $extra; + + return $this; + } + + /** + * @return \DateInterval|int|string + */ + public function getDelay() + { + return $this->delay; + } + + /** + * @param \DateInterval|int|string $delay + * + * @return self + */ + public function withDelay($delay) : self + { + $this->delay = $delay; + + return $this; + } + + /** + * @return \DateTime|int|string + */ + public function getDate() + { + return $this->scheduled; + } + + /** + * @param \DateTime|int|string $scheduled + * + * @return self + */ + public function withDate($scheduled) : self + { + $this->scheduled = $scheduled; + } +} diff --git a/module/Core/src/Queue/Job/MongoJob.php b/module/Core/src/Queue/Job/MongoJob.php new file mode 100644 index 000000000..d4e53d9b9 --- /dev/null +++ b/module/Core/src/Queue/Job/MongoJob.php @@ -0,0 +1,60 @@ + + */ + +/** */ +namespace Core\Queue\Job; + +use SlmQueue\Job\AbstractJob; +use SlmQueue\Worker\Event\ProcessJobEvent; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +abstract class MongoJob extends AbstractJob implements ResultProviderInterface +{ + protected $result; + + public function setResult(JobResult $result) : void + { + $this->result = $result; + } + + public function getResult() : JobResult + { + if (!$this->result) { + $this->setResult(new JobResult(ProcessJobEvent::JOB_STATUS_UNKNOWN)); + } + + return $this->result; + } + + protected function failure(string $message, ?array $extra = null) : int + { + $this->setResult(JobResult::failure($message, $extra)); + + return ProcessJobEvent::JOB_STATUS_FAILURE; + } + + protected function recoverable(string $message, array $options = []) : int + { + $this->setResult(JobResult::recoverable($message, $options)); + + return ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE; + } + + protected function success(?string $message = null, ?array $extra = null) : int + { + $this->setResult(JobResult::success($message, $extra)); + + return ProcessJobEvent::JOB_STATUS_SUCCESS; + } +} diff --git a/module/Core/src/Queue/Job/ResultProviderInterface.php b/module/Core/src/Queue/Job/ResultProviderInterface.php new file mode 100644 index 000000000..f0024fd6f --- /dev/null +++ b/module/Core/src/Queue/Job/ResultProviderInterface.php @@ -0,0 +1,25 @@ + + */ + +declare(strict_types=1); + +/** */ +namespace Core\Queue\Job; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +interface ResultProviderInterface +{ + public function setResult(JobResult $error) : void; + public function getResult() : JobResult; +} diff --git a/module/Core/src/Queue/LoggerAwareJobTrait.php b/module/Core/src/Queue/LoggerAwareJobTrait.php new file mode 100644 index 000000000..eb29cc1af --- /dev/null +++ b/module/Core/src/Queue/LoggerAwareJobTrait.php @@ -0,0 +1,67 @@ + + */ + +declare(strict_types=1); + +/** */ +namespace Core\Queue; + +use Zend\Log\LoggerInterface; + +/** + * Trait implementing LoggerAwareInterface. + * + * @author Mathias Gelhausen + * @todo write test + */ +trait LoggerAwareJobTrait +{ + /** + * + * + * @var LoggerInterface + */ + private $logger; + + /** + * Set the logger instance + * + * @param LoggerInterface $logger + */ + public function setLogger(LoggerInterface $logger) : void + { + $this->logger = $logger; + } + + /** + * Get the logger instance. + * + * If no logger is set, it will create and return a null logger. + * + * @return LoggerInterface + */ + public function getLogger() : LoggerInterface + { + if (!$this->logger) { + $this->logger = new class implements LoggerInterface + { + public function emerg($message, $extra = []) {} + public function alert($message, $extra = []) {} + public function crit($message, $extra = []) {} + public function err($message, $extra = []) {} + public function warn($message, $extra = []) {} + public function notice($message, $extra = []) {} + public function info($message, $extra = []) {} + public function debug($message, $extra = []) {} + }; + } + + return $this->logger; + } +} diff --git a/module/Core/src/Queue/MongoQueue.php b/module/Core/src/Queue/MongoQueue.php index 1a34ae0e9..59f625c2c 100644 --- a/module/Core/src/Queue/MongoQueue.php +++ b/module/Core/src/Queue/MongoQueue.php @@ -337,50 +337,11 @@ protected function parseOptionsToDateTime($options) $time = microtime(true); $micro = sprintf("%06d", ($time - floor($time)) * 1000000); $this->now = new \DateTime(date('Y-m-d H:i:s.' . $micro, $time), new \DateTimeZone(date_default_timezone_get())); - $scheduled = clone ($this->now); - - if (isset($options['scheduled'])) { - switch (true) { - case is_numeric($options['scheduled']): - $scheduled = new \DateTime( - sprintf("@%d", (int) $options['scheduled']), - new \DateTimeZone(date_default_timezone_get()) - ); - break; - case is_string($options['scheduled']): - $scheduled = new \DateTime($options['scheduled'], new \DateTimeZone(date_default_timezone_get())); - break; - case $options['scheduled'] instanceof \DateTime: - $scheduled = $options['scheduled']; - break; - } - } + $scheduled = isset($options['scheduled']) ? Utils::createDateTime($options['scheduled']) : clone ($this->now); if (isset($options['delay'])) { - switch (true) { - case is_numeric($options['delay']): - $delay = new \DateInterval(sprintf("PT%dS", abs((int) $options['delay']))); - $delay->invert = ($options['delay'] < 0) ? 1 : 0; - break; - case is_string($options['delay']): - try { - // first try ISO 8601 duration specification - $delay = new \DateInterval($options['delay']); - } catch (\Exception $e) { - // then try normal date parser - $delay = \DateInterval::createFromDateString($options['delay']); - } - break; - case $options['delay'] instanceof \DateInterval: - $delay = $options['delay']; - break; - default: - $delay = null; - } - - if ($delay instanceof \DateInterval) { - $scheduled->add($delay); - } + $delay = Utils::createDateInterval($options['delay']); + $scheduled->add($delay); } return $scheduled; diff --git a/module/Core/src/Queue/Strategy/JobResultStrategy.php b/module/Core/src/Queue/Strategy/JobResultStrategy.php new file mode 100644 index 000000000..d85da1942 --- /dev/null +++ b/module/Core/src/Queue/Strategy/JobResultStrategy.php @@ -0,0 +1,129 @@ + + */ + +/** */ +namespace Core\Queue\Strategy; + +use Core\Queue\Job\JobResult; +use Core\Queue\Job\ResultProviderInterface; +use Core\Queue\MongoQueue; +use Core\Queue\Utils; +use SlmQueue\Strategy\AbstractStrategy; +use SlmQueue\Worker\Event\AbstractWorkerEvent; +use SlmQueue\Worker\Event\ProcessJobEvent; +use Zend\EventManager\EventManagerInterface; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +class JobResultStrategy extends AbstractStrategy +{ + public function attach(EventManagerInterface $events, $priority = 1) + { + $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_PROCESS_JOB, [$this, 'handleJobResult'], -999); + } + + public function handleJobResult(ProcessJobEvent $event) + { + $result = $event->getResult(); + $queue = $event->getQueue(); + $job = $event->getJob(); + $logger = $event->getParam('logger'); + + if (!$queue instanceOf MongoQueue) { + return; + } + + $result = $job instanceOf ResultProviderInterface ? $job->getResult() : new JobResult($result); + + if ($result->isSuccess()) { + $queue->delete($job); + if ($reason = $result->getReason()) { + $logger && $logger->info($reason, $result->getExtra() ?? []); + } + + return; + } + + if ($result->isFailure()) { + $reason = $result->getReason(); + $extra = $result->getExtra(); + + $queue->fail($job, ['message' => $reason, 'trace' => $extra]); + + $logger && $logger->err($reason, $extra ?? []); + + return; + } + + if ($result->isRecoverable()) { + $reason = $result->getReason(); + $extra = $result->getExtra(); + $delay = $result->getDelay(); + $date = $result->getDate(); + + $options = ['message' => $reason, 'trace' => $extra]; + + $logger && $logger->warn($reason, $extra ?? []); + + if ($delay) { + $logger && $logger->notice('Will retry in ' . $this->formatDelay($delay)); + $options['delay'] = $delay; + + } elseif ($date) { + $logger && $logger->notice('Will retry on ' . $this->formatScheduled($date)); + $options['scheduled'] = $date; + } + + $queue->retry($job, $options); + + return; + } + + $logger && $logger->warn('Unsupported job result: ' . $result->getResult() . '; Job will be deleted.'); + $queue->delete($job); + + } + + private function formatDelay($delay) + { + $delay = Utils::createDateInterval($delay); + $parts = []; + if ($delay->y) { + $parts[] = $delay->y . ' years'; + } + if ($delay->m) { + $parts[] = $delay->m . ' months'; + } + if ($delay->d) { + $parts[] = $delay->d . ' days'; + } + if ($delay->h) { + $parts[] = $delay->h . ' hours'; + } + if ($delay->i) { + $parts[] = $delay->i . ' minutes'; + } + if ($delay->s) { + $parts[] = $delay->s . ' seconds'; + } + + return join(', ', $parts); + } + + private function formatScheduled($scheduled) + { + $date = Utils::createDateTime($scheduled); + return $date->format('d.m.Y H:i:s'); + } + +} diff --git a/module/Core/src/Queue/Strategy/LogStrategy.php b/module/Core/src/Queue/Strategy/LogStrategy.php index 8b9dcecc4..370a59b17 100644 --- a/module/Core/src/Queue/Strategy/LogStrategy.php +++ b/module/Core/src/Queue/Strategy/LogStrategy.php @@ -48,7 +48,7 @@ class LogStrategy extends AbstractStrategy */ private $tmpl = [ 'queue' => '%s queue: %s', - 'job' => '{ %s } [ %s ] %s%s', + 'job' => '{ %s } [ %s ] %s', ]; /** @@ -161,10 +161,12 @@ public function injectLogger(bool $flag = null) : bool */ public function attach(EventManagerInterface $events, $priority = 1) : void { - $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_BOOTSTRAP, [$this, 'logBootstrap']); - $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_FINISH, [$this, 'logFinish']); + $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_BOOTSTRAP, [$this, 'logBootstrap'], 1000); + $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_FINISH, [$this, 'logFinish'], 1000); $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_PROCESS_JOB, [$this, 'logJobStart'], 1000); $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_PROCESS_JOB, [$this, 'logJobEnd'], -1000); + $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_PROCESS_IDLE, [$this, 'injectLoggerInEvent'], 1000); + $this->listeners[] = $events->attach(AbstractWorkerEvent::EVENT_PROCESS_STATE, [$this, 'injectLoggerInEvent'], 1000); } /** @@ -179,6 +181,9 @@ public function logBootstrap(BootstrapEvent $event) : void 'Start', $event->getQueue()->getName() )); + + $this->injectLoggerInObject($event->getWorker()); + $this->injectLoggerInEvent($event); } /** @@ -193,6 +198,8 @@ public function logFinish(FinishEvent $event) : void 'Stop', $event->getQueue()->getName() )); + + $this->injectLoggerInEvent($event); } /** @@ -216,9 +223,8 @@ public function logJobStart(ProcessJobEvent $event) : void '' )); - if ($job instanceOf LoggerAwareInterface && $this->injectLogger()) { - $job->setLogger($logger); - } + $this->injectLoggerInObject($job); + $this->injectLoggerInEvent($event); } /** @@ -242,37 +248,46 @@ public function logJobEnd(ProcessJobEvent $event) : void $this->tmpl['job'], $queue, 'SUCCESS', - $this->formatJob($job), - '' + $this->formatJob($job) )); break; case ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE: - $reason = $job->getMetadata('log.reason'); $logger->warn(sprintf( $this->tmpl['job'], $queue, 'RECOVERABLE', - $this->formatJob($job), - ": $reason" + $this->formatJob($job) )); break; case ProcessJobEvent::JOB_STATUS_FAILURE: - $reason = $job->getMetadata('log.reason'); $logger->err(sprintf( $this->tmpl['job'], $queue, 'FAILURE', - $this->formatJob($job), - ": $reason" + $this->formatJob($job) )); break; } } + public function injectLoggerInEvent(AbstractWorkerEvent $event) + { + if ($this->injectLogger()) { + $event->setParam('logger', $this->getLogger()); + } + } + + private function injectLoggerInObject(object $object) : void + { + if ($this->injectLogger() && $object instanceOf LoggerAwareInterface) { + $object->setLogger($this->getLogger()); + } + } + /** * Get a string representation of the processed job instance * diff --git a/module/Core/src/Queue/Utils.php b/module/Core/src/Queue/Utils.php new file mode 100644 index 000000000..b2db06e61 --- /dev/null +++ b/module/Core/src/Queue/Utils.php @@ -0,0 +1,67 @@ + + */ + +/** */ +namespace Core\Queue; + +/** + * ${CARET} + * + * @author Mathias Gelhausen + * @todo write test + */ +final class Utils +{ + public static function createDateInterval($value) + { + if ($value instanceOf \DateInterval) { + return $value; + } + + if (is_numeric($value)) { + $delay = new \DateInterval(sprintf("PT%dS", abs((int) $value))); + $delay->invert = ($value < 0) ? 1 : 0; + + return $delay; + } + + if (is_string($value)) { + try { + // first try ISO 8601 duration specification + $delay = new \DateInterval($value); + } catch (\Exception $e) { + // then try normal date parser + $delay = \DateInterval::createFromDateString($value); + } + return $delay; + } + + return new \DateInterval('PT0S'); + } + + public static function createDateTime($value) + { + if ($value instanceOf \DateTime) { + return $value; + } + + if (is_numeric($value)) { + return new \DateTime( + sprintf("@%d", (int) $value), + new \DateTimeZone(date_default_timezone_get()) + ); + } + + if (is_string($value)) { + return new \DateTime($value, new \DateTimeZone(date_default_timezone_get())); + } + + return new \DateTime(); + } +} diff --git a/module/Core/src/Queue/Worker/MongoWorker.php b/module/Core/src/Queue/Worker/MongoWorker.php index d5c5645d7..e15340eb3 100644 --- a/module/Core/src/Queue/Worker/MongoWorker.php +++ b/module/Core/src/Queue/Worker/MongoWorker.php @@ -12,19 +12,27 @@ use Core\Queue\Exception\FatalJobException; use Core\Queue\Exception\RecoverableJobException; +use Core\Queue\Job\ExceptionJobResult; +use Core\Queue\Job\JobResult; +use Core\Queue\Job\ResultProviderInterface; +use Core\Queue\LoggerAwareJobTrait; use Core\Queue\MongoQueue; use SlmQueue\Job\JobInterface; +use SlmQueue\Queue\QueueAwareInterface; use SlmQueue\Queue\QueueInterface; use SlmQueue\Worker\AbstractWorker; use SlmQueue\Worker\Event\ProcessJobEvent; +use Zend\Log\LoggerAwareInterface; /** * Queue worker for the mongo queue. * * @author Mathias Gelhausen */ -class MongoWorker extends AbstractWorker +class MongoWorker extends AbstractWorker implements LoggerAwareInterface { + use LoggerAwareJobTrait; + /** * Process job handler. * @@ -39,19 +47,20 @@ public function processJob(JobInterface $job, QueueInterface $queue) return; } - try { - $job->execute(); - $queue->delete($job); - - return ProcessJobEvent::JOB_STATUS_SUCCESS; - - } catch (RecoverableJobException $exception) { - $queue->retry($job, $exception->getOptions()); + if ($job instanceOf QueueAwareInterface) { + $job->setQueue($queue); + } - return ProcessJobEvent::JOB_STATUS_FAILURE_RECOVERABLE; + try { + return $job->execute(); + } catch (\Exception $exception) { + $this->getLogger()->err('Job execution thrown exception: ' . get_class($exception)); - } catch (FatalJobException $exception) { - $queue->fail($job, $exception->getOptions()); + if ($job instanceOf ResultProviderInterface) { + $job->setResult(JobResult::failure($exception->getMessage(), [$exception->getTraceAsString()])); + } else { + $this->getLogger()->err($exception->getMessage(), [$exception->getTraceAsString()]); + } return ProcessJobEvent::JOB_STATUS_FAILURE; }