Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions Console/Command/QueueShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ public function runworkersqs() {
return;
}

// Check if running in ECS mode
$enableEcs = !empty($this->params['enable-ecs']);
if ($enableEcs) {
$this->out('[ECS MODE] Only processing messages with useEcsServer=true');
} else {
$this->out('[EC2 MODE] Only processing messages with useEcsServer=false or not set');
}


if ($pidFilePath = Configure::read('Queue.pidfilepath')) {
if (!file_exists($pidFilePath)) {
Expand Down Expand Up @@ -346,7 +354,7 @@ public function runworkersqs() {
touch($pidFilePath . $pidFileName);
}
//$this->_log('runworker', isset($pid) ? $pid : null);
$this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...');
$this->out('[' . date('Y-m-d H:i:s') . '] Looking for' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...');

$data = $this->QueuedTask->requestSqsJob($queueUrl);
//$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group);
Expand Down Expand Up @@ -374,12 +382,29 @@ public function runworkersqs() {
}
}
if ($data) {
$this->out('Running Job of type "' . $data['jobtype'] . '"');
$taskname = 'Queue' . $data['jobtype'];

if ($this->{$taskname}->autoUnserialize) {
$data['data'] = unserialize($data['data']);
}

// Check if message is for this worker type
$useEcsServer = isset($data['data']['useEcsServer']) ? $data['data']['useEcsServer'] : false;
$isForThisWorker = ($enableEcs && $useEcsServer) || (!$enableEcs && !$useEcsServer);

if (!$isForThisWorker) {
// Not for us - release for the other worker type
$targetWorker = $enableEcs ? 'EC2' : 'ECS';
$this->out('[SKIP] Message is for ' . $targetWorker . ' worker - releasing');

$this->QueuedTask->releaseSqsMessage($queueUrl, $data['sqsReceiptHandle'], 30);
$this->QueuedTask->updateAll(
['fetched' => null, 'workerkey' => null],
['id' => $data['id']]
);
continue;
}

$this->out('Running Job of type "' . $data['jobtype'] . '"');
//prevent tasks that don't catch their own errors from killing this worker

try {
Expand Down Expand Up @@ -527,6 +552,16 @@ public function getOptionParser() {
'default' => ''
];

$subcommandParserSqs = [
'options' => [
'enable-ecs' => [
'help' => 'Enable ECS mode - only process messages with useEcsServer=true',
'boolean' => true,
'default' => false
]
]
];

return parent::getOptionParser()
->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system."))
->addSubcommand('clean', [
Expand All @@ -548,6 +583,10 @@ public function getOptionParser() {
->addSubcommand('runworker', [
'help' => 'Run Worker',
'parser' => $subcommandParserFull
])
->addSubcommand('runworkersqs', [
'help' => 'Run SQS Worker',
'parser' => $subcommandParserSqs
]);
}

Expand Down
44 changes: 41 additions & 3 deletions Model/QueuedTask.php
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ public function nextPriority($priority)
return $this;
}

public function isCompanyForEcsQueue($bidId) {
$companyBid = ClassRegistry::init('CompanyBid')->find('first', [
'conditions' => [
'CompanyBid.id' => $bidId,
],
'contain' => false,
'fields' => ['company_id']
]);
if (empty($companyBid)) {
return false;
}
$companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [
'conditions' => [
'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'],
'CompanySetting.slug' => 'ecs_queue_enabled',
],
'contain' => false,
]);
return !empty($companySetting['CompanySetting']['value']);
}

/**
* Add a new Job to the Queue.
*
Expand Down Expand Up @@ -114,12 +135,14 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu
$dupeKey = $data['company_id'] . '.' . $data['sub_service_id'];
break;
case 'SaveConnection':
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$dupeKey = $data['bidId'] . '.' . $additionalKey;
$data['useEcsServer'] = $this->isCompanyForEcsQueue($data['bidId']);
break;
case 'SaveSingleConnection':
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0;
$dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey;
$data['useEcsServer'] = $this->isCompanyForEcsQueue($data['bidId']);
break;
case 'SyncIntercomCompany':
$dupeKey = $data['company_id'];
Expand Down Expand Up @@ -221,7 +244,7 @@ public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySecond
'MessageBody' => json_encode([
'id' => $taskId,
'retryCount' => $retryCount,
'jobtype' => $jobName
'jobtype' => $jobName,
])
));
} catch(Exception $e) {
Expand Down Expand Up @@ -357,6 +380,8 @@ public function requestSqsJob($queueUrl, $waitTime=20) {
}

$confirmRecord['QueuedTask']['sqsReceiptHandle'] = $message['ReceiptHandle'];
// Include useEcsServer flag from SQS message body for worker filtering
$confirmRecord['QueuedTask']['useEcsServer'] = isset($data['useEcsServer']) ? $data['useEcsServer'] : false;

return $confirmRecord['QueuedTask'];
}
Expand Down Expand Up @@ -406,6 +431,19 @@ public function deleteSqsMessage($queueUrl, $receiptHandle) {
}
}

public function releaseSqsMessage($queueUrl, $receiptHandle, $visibilityTimeout = 30) {
try {
// Release message with delay to let the other worker type receive it
$this->sqsClient->changeMessageVisibility([
'QueueUrl' => $queueUrl,
'ReceiptHandle' => $receiptHandle,
'VisibilityTimeout' => $visibilityTimeout
]);
} catch(\Exception $e) {
CakeLog::write('queue-error', 'Failed to release SQS message: ' . $e->getMessage());
}
}

/**
* Look for a new job that can be processed with the current abilities and
* from the specified group (or any if null).
Expand Down