-
Notifications
You must be signed in to change notification settings - Fork 2
/
InternalMarkAsCancelledCommand.php
198 lines (164 loc) · 7.47 KB
/
InternalMarkAsCancelledCommand.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
<?php
declare(strict_types=1);
/*
* This file is part of the SHQCommandsQueuesBundle.
*
* Copyright Adamo Aerendir Crespi 2017.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @author Adamo Aerendir Crespi <hello@aerendir.me>
* @copyright Copyright (C) 2017 Aerendir. All rights reserved.
* @license MIT License.
*/
namespace SerendipityHQ\Bundle\CommandsQueuesBundle\Command;
use Doctrine\ORM\EntityManagerInterface;
use Exception;
use Safe\Exceptions\StringsException;
use function Safe\sprintf;
use SerendipityHQ\Bundle\CommandsQueuesBundle\Entity\Job;
use SerendipityHQ\Bundle\CommandsQueuesBundle\Repository\JobRepository;
use SerendipityHQ\Bundle\CommandsQueuesBundle\Util\JobsMarker;
use SerendipityHQ\Bundle\CommandsQueuesBundle\Util\JobsUtil;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Marks a Job and its childs as cancelled.
*
* We use a dedicated command to mark Jobs and its childs as cancelled to not stop the daemon from processing the queue.
* On very deep trees of Jobs the marking may require a lot of time. Using a dedicated command allows the Daemon to
* continue running while this command, in the background, marks the Jobs and its childs as cancelled.
*/
class InternalMarkAsCancelledCommand extends AbstractQueuesCommand
{
/** @var string */
public static $defaultName = 'queues:internal:mark-as-cancelled';
/** @var JobRepository $jobsRepo */
private $jobsRepo;
/**
* @param EntityManagerInterface $entityManager
* @param JobsMarker $doNotUseJobsMarker
*/
public function __construct(EntityManagerInterface $entityManager, JobsMarker $doNotUseJobsMarker)
{
parent::__construct($entityManager, $doNotUseJobsMarker);
/** @var JobRepository $jobsRepo */
$jobsRepo = $this->getEntityManager()->getRepository(Job::class);
$this->jobsRepo = $jobsRepo;
}
/**
* {@inheritdoc}
*/
protected function configure(): void
{
$this
->setDescription('[INTERNAL] Marks the given Job and its childs as CANCELLED.')
->addOption('id', 'id', InputOption::VALUE_REQUIRED)
->addOption('cancelling-job-id', 'cancelling-job-id', InputOption::VALUE_REQUIRED);
// Only available since Symfony 3.2
if (method_exists($this, 'setHidden')) {
$this->setHidden(true);
}
}
/**
* @param InputInterface $input
* @param OutputInterface $output
*
* @throws StringsException
*
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
parent::execute($input, $output);
$jobId = $input->getOption('id');
$cancellingJobId = $input->getOption('cancelling-job-id');
if (false === is_numeric($jobId)) {
$this->getIoWriter()->error("The Job ID is not valid: maybe you mispelled it or it doesn't exist at all.");
return 1;
}
if (false === is_numeric($cancellingJobId)) {
$this->getIoWriter()->error("The Cancelling Job ID is not valid: maybe you mispelled it or it doesn't exist at all.");
return 1;
}
$failedJob = $this->jobsRepo->findOneById((int) $jobId);
$cancellingJob = $this->jobsRepo->findOneById((int) $cancellingJobId);
if (null === $failedJob) {
// The job may not exist anymore if it expired and so was deleted
$this->getIoWriter()->infoLineNoBg(sprintf("The job <success-nobg>%s</success-nobg> doesn't exist anymore.", $failedJob));
return 0;
}
if (null === $cancellingJob) {
$this->getIoWriter()->error('Impossible to find the failed Job.');
return 1;
}
// We only cancel childs and not the failed Job as the failed Job is marked as "failed" and we don't want to change its status)
$this->cancelChildJobs($failedJob, $cancellingJob, sprintf('Parent Job %s failed.', $failedJob->getId()));
$this->getIoWriter()->successLineNoBg(sprintf('All child jobs of Job %s and their respective child Jobs were marked as cancelled.', $failedJob->getId()));
return 0;
}
/**
* @param Job $markedJob
* @param Job $cancellingJob
* @param string $cancellationReason
* @param array $alreadyCancelledJobs
*
* @throws StringsException
* @throws Exception
*
* @return int
*/
private function cancelChildJobs(Job $markedJob, Job $cancellingJob, string $cancellationReason, array $alreadyCancelledJobs = []): int
{
$this->getIoWriter()->infoLineNoBg(sprintf('Start cancelling child Jobs of Job #%s@%s.', $markedJob->getId(), $markedJob->getQueue()));
// "Security check", no child jobs: ...
if ($markedJob->getChildDependencies()->count() <= 0) {
// ... Exit
return 0;
}
// Mark childs as cancelled
$childInfo = [
'cancelled_by' => $cancellingJob,
'debug' => [
'cancellation_reason' => $cancellationReason,
],
];
$this->getIoWriter()->noteLineNoBg(sprintf(
'[%s] Job #%s@%s: Found %s child dependencies. Start marking them.',
JobsUtil::getFormattedTime($markedJob, 'getClosedAt'), $markedJob->getId(), $markedJob->getQueue(), $markedJob->getChildDependencies()->count())
);
$cancelledChilds = [];
/** @var Job $childDependency */
foreach ($markedJob->getChildDependencies() as $childDependency) {
// If this is already processed...
if (array_key_exists($childDependency->getId(), $alreadyCancelledJobs)) {
continue;
}
// Add the Child dependency to the list of cancelled childs
$childDependencyId = $childDependency->getId();
$cancelledChilds[$childDependencyId] = $childDependencyId;
// If the status is already cancelled...
if (Job::STATUS_CANCELLED === $childDependency->getStatus()) {
// ... Add it to the array of already cancelled Jobs
$alreadyCancelledJobs[$childDependency->getId()] = $childDependency->getId();
}
// If this is not in the already cancelled Jobs array...
if (false === array_key_exists($childDependency->getId(), $alreadyCancelledJobs)) {
$this->getJobsMarker()->markJobAsCancelled($childDependency, $childInfo);
$alreadyCancelledJobs[$childDependency->getId()] = $childDependency->getId();
}
// If this child has other childs on its own...
if ($childDependency->getChildDependencies()->count() > 0) {
// ... Mark as cancelled also the child Jobs of this child Job
$this->cancelChildJobs($childDependency, $cancellingJob, sprintf('Child Job "#%s" were cancelled.', $childDependency->getId()), $alreadyCancelledJobs);
}
}
$cancelledChilds = implode(', ', $cancelledChilds);
$this->getIoWriter()->noteLineNoBg(sprintf(
'[%s] Job #%s@%s: Cancelled childs are: %s', JobsUtil::getFormattedTime($markedJob, 'getClosedAt'), $markedJob->getId(), $markedJob->getQueue(), $cancelledChilds
));
return 0;
}
}