Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
more code
Browse files Browse the repository at this point in the history
  • Loading branch information
bupy7 committed Sep 7, 2017
1 parent 8d457bf commit b671829
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
7 changes: 4 additions & 3 deletions src/Entity/TaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
interface TaskInterface
{
public const STATUS_WAIT = 10;
public const STATUS_ERROR = 20;
public const STATUS_OK = 30;
public const STATUS_IMPOSSIBLE = 40;
public const STATUS_IN_PROCESSING = 20;
public const STATUS_ERROR = 30;
public const STATUS_OK = 40;
public const STATUS_IMPOSSIBLE = 50;

public function setId(int $id): TaskInterface;

Expand Down
36 changes: 23 additions & 13 deletions src/Service/QueueService.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@
use Bupy7\Queue\Task\TaskInterface;
use Bupy7\Queue\Options\ModuleOptions;
use Exception;
use Zend\EventManager\EventManagerAwareInterface;
use Zend\EventManager\EventManagerAwareTrait;

class QueueService
class QueueService implements EventManagerAwareInterface
{
public const EVENT_ERROR_EXECUTE = 'errorExecute';

use EventManagerAwareTrait;

/**
* @var TaskRepositoryInterface
*/
Expand Down Expand Up @@ -46,17 +52,13 @@ public function __construct(
public function run(): void
{
$entities = $this->taskRepository->findForRun($this->config->getOneTimeLimit() ?: null);
try {
foreach ($entities as $entity) {
if (in_array($entity->getStatusId(), [
TaskEntityInterface::STATUS_WAIT,
TaskEntityInterface::STATUS_ERROR,
])) {
$this->executeTask($entity);
}
foreach ($entities as $entity) {
if (in_array($entity->getStatusId(), [
TaskEntityInterface::STATUS_WAIT,
TaskEntityInterface::STATUS_ERROR,
])) {
$this->executeTask($entity);
}
} finally {
$this->entityManager->flush();
}
}

Expand All @@ -65,9 +67,13 @@ protected function executeTask(TaskEntityInterface $entity): void
if (!$this->queueManager->has($entity->getName())) {
throw new UnknownTaskException(sprintf('"%s task is unknown."', $entity->getName()));
}

/** @var TaskInterface $task */
$task = $this->queueManager->get($entity->getName());
$entity->setRunAt(new DateTime);
$entity->setStatusId(TaskEntityInterface::STATUS_IN_PROCESSING);
$this->entityManager->flush($entity);

try {
if ($task->execute($entity->getParams())) {
$entity->setStatusId(TaskEntityInterface::STATUS_OK);
Expand All @@ -76,8 +82,10 @@ protected function executeTask(TaskEntityInterface $entity): void
}
} catch (Exception $e) {
$entity->setStatusId(TaskEntityInterface::STATUS_ERROR);

// @TODO: Logging
// event trigger
$this->getEventManager()->trigger(self::EVENT_ERROR_EXECUTE, $this, [
'exception' => $e,
]);
} finally {
if ($entity->getStatusId() === TaskEntityInterface::STATUS_ERROR) {
$entity->incNumberErrors();
Expand All @@ -90,5 +98,7 @@ protected function executeTask(TaskEntityInterface $entity): void
}
$entity->setStopAt(new DateTime);
}

$this->entityManager->flush($entity);
}
}
1 change: 1 addition & 0 deletions test/Assert/Entity/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public function setStatusId(int $statusId): TaskInterface
{
if (!in_array($statusId, [
self::STATUS_WAIT,
self::STATUS_IN_PROCESSING,
self::STATUS_ERROR,
self::STATUS_OK,
self::STATUS_IMPOSSIBLE,
Expand Down

0 comments on commit b671829

Please sign in to comment.