Skip to content

Commit

Permalink
Merge pull request #145 from fredden/track-cron-hostname
Browse files Browse the repository at this point in the history
Track hostname of running jobs
  • Loading branch information
Ethan3600 committed Jun 6, 2020
2 parents 2e574c2 + 6f1f309 commit 0346ebb
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 64 deletions.
5 changes: 5 additions & 0 deletions Api/Data/ScheduleInterface.php
Expand Up @@ -33,6 +33,11 @@ public function getJobCode(): string;
*/
public function getStatus(): string;

/**
* @return string|null
*/
public function getHostname();

/**
* @return int|null
*/
Expand Down
18 changes: 9 additions & 9 deletions Console/Command/KillJob.php
Expand Up @@ -38,27 +38,27 @@ class KillJob extends Command
private $scheduleRepository;

/**
* @var ScheduleManagementInterface
* @var ScheduleManagementInterface
*/
private $scheduleManagement;

/**
* @var SearchCriteriaBuilder
* @var SearchCriteriaBuilder
*/
private $searchCriteriaBuilder;

/**
* @var FilterBuilder
* @var FilterBuilder
*/
private $filterBuilder;

/**
* @var FilterGroupBuilder
* @var FilterGroupBuilder
*/
private $filterGroupBuilder;

/**
* @var ProcessManagement
* @var ProcessManagement
*/
private $processManagement;

Expand Down Expand Up @@ -137,7 +137,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
/** @var bool $killed */
$killed = false;
if ($optionProcKill) {
$killed = $this->processManagement->killPid($pid);
$killed = $this->processManagement->killPid($pid, $job->getHostname());
} else {
$killed = $this->scheduleManagement->kill($id, \time());
}
Expand Down Expand Up @@ -190,7 +190,7 @@ private function loadRunningJobsByCode(string $jobCode): array
$searchCriteria = $this->searchCriteriaBuilder->setFilterGroups(
[$jobCodeFilterGroup, $statusFilterGroup]
)->create();

/** @var \Magento\Framework\Api\SearchResultsInterface $result */
$result = $this->scheduleRepository->getList($searchCriteria);
return $result->getItems();
Expand Down
4 changes: 4 additions & 0 deletions Model/CleanRunningJobs.php
Expand Up @@ -53,6 +53,10 @@ public function execute()
$runningJobs = $this->scheduleRepository->getByStatus(ScheduleInterface::STATUS_RUNNING);

foreach ($runningJobs as $schedule) {
if ($schedule->getHostname() !== \gethostname()) {
continue;
}

if ($this->processManagement->isPidAlive($schedule->getPid())) {
continue;
}
Expand Down
6 changes: 6 additions & 0 deletions Model/Data/Schedule.php
Expand Up @@ -13,6 +13,7 @@ class Schedule extends DataObject implements \EthanYehuda\CronjobManager\Api\Dat
const KEY_SCHEDULE_ID = 'schedule_id';
const KEY_JOB_CODE = 'job_code';
const KEY_STATUS = 'status';
const KEY_HOSTNAME = 'hostname';
const KEY_PID = 'pid';
const KEY_MESSAGES = 'messages';
const KEY_CREATED_AT = 'created_at';
Expand Down Expand Up @@ -41,6 +42,11 @@ public function getStatus(): string
return $this->getData(self::KEY_STATUS);
}

public function getHostname(): string
{
return (string) $this->getData(self::KEY_HOSTNAME);
}

public function getPid()
{
return (int) $this->getData(self::KEY_PID);
Expand Down
2 changes: 1 addition & 1 deletion Model/ProcessKillRequests.php
Expand Up @@ -51,7 +51,7 @@ public function execute()

private function killScheduleProcess(ScheduleInterface $schedule): void
{
if ($this->processManagement->killPid($schedule->getPid())) {
if ($this->processManagement->killPid($schedule->getPid(), $schedule->getHostname())) {
$messages = [];
if ($schedule->getMessages()) {
$messages[] = $schedule->getMessages();
Expand Down
7 changes: 5 additions & 2 deletions Model/ProcessManagement.php
Expand Up @@ -9,11 +9,14 @@ class ProcessManagement

public function isPidAlive(int $pid): bool
{
return \file_exists('/proc/' . $pid);
return \posix_kill($pid, 0);
}

public function killPid($pid): bool
public function killPid(int $pid, string $hostname): bool
{
if ($hostname !== \gethostname()) {
return false;
}
if (!$this->isPidAlive($pid)) {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions Plugin/Cron/Model/SchedulePlugin.php
Expand Up @@ -21,6 +21,7 @@ public function __construct(ScheduleResource $scheduleResource)
public function afterTryLockJob(Schedule $subject, bool $result)
{
if ($result) {
$subject->setData('hostname', \gethostname());
$subject->setData('pid', \getmypid());
$this->scheduleResource->save($subject);
}
Expand Down
50 changes: 0 additions & 50 deletions Plugin/Cron/Model/ScheduleResourcePlugin.php
Expand Up @@ -34,54 +34,4 @@ public function afterSave(
}
return $result;
}

/**
* Replace method to update pid column together with status column
*
* @param \Magento\Cron\Model\ResourceModel\Schedule $subject
* @param callable $proceed
* @param $scheduleId
* @param $newStatus
* @param $currentStatus
* @return bool
* @throws \Zend_Db_Statement_Exception
*/
public function aroundTrySetJobUniqueStatusAtomic(
\Magento\Cron\Model\ResourceModel\Schedule $subject,
callable $proceed,
$scheduleId,
$newStatus,
$currentStatus
) {
$connection = $subject->getConnection();

// this condition added to avoid cron jobs locking after incorrect termination of running job
$match = $connection->quoteInto(
'existing.job_code = current.job_code ' .
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL) ' .
'AND existing.status = ?',
$newStatus
);

$selectIfUnlocked = $connection->select()
->joinLeft(
['existing' => $subject->getTable('cron_schedule')],
$match,
[
'status' => new \Zend_Db_Expr($connection->quote($newStatus)),
'pid' => new \Zend_Db_Expr($connection->quote(\getmypid()))
]
)
->where('current.schedule_id = ?', $scheduleId)
->where('current.status = ?', $currentStatus)
->where('existing.schedule_id IS NULL');

$update = $connection->updateFromSelect($selectIfUnlocked, ['current' => $subject->getTable('cron_schedule')]);
$result = $connection->query($update)->rowCount();

if ($result == 1) {
return true;
}
return false;
}
}
27 changes: 27 additions & 0 deletions Setup/UpgradeSchema.php
Expand Up @@ -46,6 +46,10 @@ public function upgrade(
$this->addKillRequestToSchedule();
}

if (version_compare($context->getVersion(), '1.9.0') < 0) {
$this->addHostnameToSchedule();
}

$this->setup->endSetup();
}

Expand Down Expand Up @@ -73,6 +77,29 @@ public function addPidToSchedule()
);
}

/**
* Add column to cron_schedule to keep track of which server is running each process
*/
public function addHostnameToSchedule()
{
if (version_compare($this->magentoMetaData->getVersion(), '2.3.0', '>=')) {
// For Magento 2.3+, db_schema.xml is used instead
return;
}
$this->setup->getConnection()->addColumn(
$this->setup->getTable('cron_schedule'),
'hostname',
[
'type' => Table::TYPE_TEXT,
'length' => 255,
'comment' => 'Hostname of the server running this job',
'nullable' => true,
'default' => null,
'after' => 'pid',
]
);
}

/**
* Add column to cron_schedule to send kill requests
*/
Expand Down
16 changes: 16 additions & 0 deletions Test/Integration/CleanRunningJobsTest.php
Expand Up @@ -19,6 +19,7 @@
class CleanRunningJobsTest extends TestCase
{
const NOW = '2019-02-09 18:33:00';
const REMOTE_HOSTNAME = 'hostname.example.net';

/**
* @var ObjectManager
Expand Down Expand Up @@ -50,11 +51,20 @@ protected function setUp(): void
public function testDeadRunningJobsAreCleaned()
{
$this->givenRunningScheduleWithInactiveProcess($schedule);
$this->givenScheduleIsRunningOnHost($schedule, \gethostname());
$this->whenEventIsDispatched('process_cron_queue_before');
$this->thenScheduleHasStatus($schedule, Schedule::STATUS_ERROR);
$this->andScheduleHasMessage($schedule, 'Process went away at ' . self::NOW);
}

public function testDeadRunningJobsOnAnotherHostAreNotCleaned()
{
$this->givenRunningScheduleWithInactiveProcess($schedule);
$this->givenScheduleIsRunningOnHost($schedule, self::REMOTE_HOSTNAME);
$this->whenEventIsDispatched('process_cron_queue_before');
$this->thenScheduleHasStatus($schedule, Schedule::STATUS_RUNNING);
}

public function testActiveRunningJobsAreNotCleaned()
{
$this->givenRunningScheduleWithActiveProcess($schedule);
Expand All @@ -71,6 +81,12 @@ private function givenRunningScheduleWithInactiveProcess(&$schedule)
$schedule->save();
}

private function givenScheduleIsRunningOnHost(Schedule &$schedule, string $hostname): void
{
$schedule->setData('hostname', $hostname);
$schedule->save();
}

private function givenRunningScheduleWithActiveProcess(&$schedule)
{
/** @var Schedule $schedule */
Expand Down
85 changes: 85 additions & 0 deletions Test/Integration/HostnameTest.php
@@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
namespace EthanYehuda\CronjobManager\Test\Integration;

use Magento\Cron\Model\ResourceModel\Schedule as ScheduleResource;
use Magento\Cron\Model\Schedule;
use Magento\Framework\ObjectManager\ObjectManager;
use Magento\TestFramework\Helper\Bootstrap;
use PHPUnit\Framework\TestCase;

/**
* @magentoAppArea crontab
* @magentoDbIsolation enabled
*/
class HostnameTest extends TestCase
{
/**
* @var ObjectManager
*/
protected $objectManager;

/**
* @var ScheduleResource
*/
protected $scheduleResource;

protected function setUp(): void
{
$this->objectManager = Bootstrap::getObjectManager();
$this->scheduleResource = $this->objectManager->get(ScheduleResource::class);
}

public function testProcessIdSavedOnStart(): void
{
$this->givenHostname($hostname);
$this->givenPendingSchedule($schedule);
$this->whenTryLockJob($schedule);
$this->thenScheduleIsSavedWithHostname($schedule, $hostname);
}

public function testProcessIdMaintainedAfterSuccesfulRun(): void
{
$this->givenHostname($hostname);
$this->givenPendingSchedule($schedule);
$this->whenTryLockJob($schedule);
$this->andScheduleSavedWithSuccess($schedule);
$this->thenScheduleIsSavedWithHostname($schedule, $hostname);
}

private function givenPendingSchedule(&$schedule): void
{
/** @var Schedule $newSchedule */
$newSchedule = $this->objectManager->create(Schedule::class);
$newSchedule->setStatus(Schedule::STATUS_PENDING);
$newSchedule->setJobCode('test_job_code');
$newSchedule->save();
/** @var Schedule $schedule */
$schedule = $this->objectManager->create(Schedule::class);
$schedule->load($newSchedule->getId());
}

private function whenTryLockJob(Schedule $schedule): void
{
$lock = $schedule->tryLockJob();
$this->assertTrue($lock, 'Precondition: tryLockJob() should be successful');
}

private function andScheduleSavedWithSuccess(Schedule $schedule): void
{
$schedule->setStatus(Schedule::STATUS_SUCCESS);
$this->scheduleResource->save($schedule);
}

private function thenScheduleIsSavedWithHostname(Schedule $schedule, string $hostname): void
{
$this->scheduleResource->load($schedule, $schedule->getId());
$this->assertEquals($hostname, $schedule->getData('hostname'), 'Hostname should be saved in schedule');
}

private function givenHostname(&$hostname): void
{
$hostname = \gethostname();
$this->assertNotFalse($hostname, 'Precondition: gethostname() should not return false');
}
}
16 changes: 16 additions & 0 deletions Test/Integration/ProcessKillRequestsTest.php
Expand Up @@ -21,6 +21,7 @@
class ProcessKillRequestsTest extends TestCase
{
const NOW = '2019-02-09 18:33:00';
const REMOTE_HOSTNAME = 'hostname.example.net';

/**
* @var int
Expand Down Expand Up @@ -76,12 +77,21 @@ protected function tearDown(): void
public function testDeadRunningJobsAreCleaned()
{
$this->givenRunningScheduleWithKillRequest($schedule, $this->timeStampInThePast());
$this->givenScheduleIsRunningOnHost($schedule, \gethostname());
$this->whenEventIsDispatched('process_cron_queue_before');
$this->thenScheduleHasStatus($schedule, ScheduleInterface::STATUS_KILLED);
$this->andScheduleHasMessage($schedule, 'Process was killed at ' . self::NOW);
$this->andProcessIsKilled($schedule);
}

public function testDeadRunningJobsOnAnotherHostAreNotCleaned()
{
$this->givenRunningScheduleWithKillRequest($schedule, $this->timeStampInThePast());
$this->givenScheduleIsRunningOnHost($schedule, self::REMOTE_HOSTNAME);
$this->whenEventIsDispatched('process_cron_queue_before');
$this->thenScheduleHasStatus($schedule, Schedule::STATUS_RUNNING);
}

private function givenRunningScheduleWithKillRequest(&$schedule, int $timestamp)
{
/** @var Schedule $schedule */
Expand All @@ -92,6 +102,12 @@ private function givenRunningScheduleWithKillRequest(&$schedule, int $timestamp)
$this->scheduleManagement->kill((int)$schedule->getId(), $timestamp);
}

private function givenScheduleIsRunningOnHost(Schedule &$schedule, string $hostname): void
{
$schedule->setData('hostname', $hostname);
$schedule->save();
}

private function whenEventIsDispatched($eventName)
{
$this->eventManager->dispatch($eventName);
Expand Down

0 comments on commit 0346ebb

Please sign in to comment.