From 038f1c940fe85a2cda5cc1096ce14ce3fe012978 Mon Sep 17 00:00:00 2001 From: rom Date: Sun, 19 Oct 2025 20:11:01 +0800 Subject: [PATCH] feat: add ecs worker flag --- Console/Command/QueueShell.php | 45 +++++++++++++++++++++++++++++++--- Model/QueuedTask.php | 44 ++++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 6 deletions(-) mode change 100644 => 100755 Model/QueuedTask.php diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 0e28c50b..4d87a6e3 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -297,6 +297,14 @@ public function runworkersqs() { return; } + // Check if running in ECS mode + $enableEcs = !empty($this->params['enable-ecs']); + if ($enableEcs) { + $this->out('[ECS MODE] Only processing messages with useEcsServer=true'); + } else { + $this->out('[EC2 MODE] Only processing messages with useEcsServer=false or not set'); + } + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { if (!file_exists($pidFilePath)) { @@ -346,7 +354,7 @@ public function runworkersqs() { touch($pidFilePath . $pidFileName); } //$this->_log('runworker', isset($pid) ? $pid : null); - $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...'); $data = $this->QueuedTask->requestSqsJob($queueUrl); //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); @@ -374,12 +382,29 @@ public function runworkersqs() { } } if ($data) { - $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; - if ($this->{$taskname}->autoUnserialize) { $data['data'] = unserialize($data['data']); } + + // Check if message is for this worker type + $useEcsServer = isset($data['data']['useEcsServer']) ? $data['data']['useEcsServer'] : false; + $isForThisWorker = ($enableEcs && $useEcsServer) || (!$enableEcs && !$useEcsServer); + + if (!$isForThisWorker) { + // Not for us - release for the other worker type + $targetWorker = $enableEcs ? 'EC2' : 'ECS'; + $this->out('[SKIP] Message is for ' . $targetWorker . ' worker - releasing'); + + $this->QueuedTask->releaseSqsMessage($queueUrl, $data['sqsReceiptHandle'], 30); + $this->QueuedTask->updateAll( + ['fetched' => null, 'workerkey' => null], + ['id' => $data['id']] + ); + continue; + } + + $this->out('Running Job of type "' . $data['jobtype'] . '"'); //prevent tasks that don't catch their own errors from killing this worker try { @@ -527,6 +552,16 @@ public function getOptionParser() { 'default' => '' ]; + $subcommandParserSqs = [ + 'options' => [ + 'enable-ecs' => [ + 'help' => 'Enable ECS mode - only process messages with useEcsServer=true', + 'boolean' => true, + 'default' => false + ] + ] + ]; + return parent::getOptionParser() ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ @@ -548,6 +583,10 @@ public function getOptionParser() { ->addSubcommand('runworker', [ 'help' => 'Run Worker', 'parser' => $subcommandParserFull + ]) + ->addSubcommand('runworkersqs', [ + 'help' => 'Run SQS Worker', + 'parser' => $subcommandParserSqs ]); } diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php old mode 100644 new mode 100755 index 8d1dbbc5..1c9faf06 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -87,6 +87,27 @@ public function nextPriority($priority) return $this; } + public function isCompanyForEcsQueue($bidId) { + $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ + 'conditions' => [ + 'CompanyBid.id' => $bidId, + ], + 'contain' => false, + 'fields' => ['company_id'] + ]); + if (empty($companyBid)) { + return false; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'], + 'CompanySetting.slug' => 'ecs_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } + /** * Add a new Job to the Queue. * @@ -114,12 +135,14 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; break; case 'SaveConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['bidId'] . '.' . $additionalKey; + $data['useEcsServer'] = $this->isCompanyForEcsQueue($data['bidId']); break; case 'SaveSingleConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + $data['useEcsServer'] = $this->isCompanyForEcsQueue($data['bidId']); break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; @@ -221,7 +244,7 @@ public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySecond 'MessageBody' => json_encode([ 'id' => $taskId, 'retryCount' => $retryCount, - 'jobtype' => $jobName + 'jobtype' => $jobName, ]) )); } catch(Exception $e) { @@ -357,6 +380,8 @@ public function requestSqsJob($queueUrl, $waitTime=20) { } $confirmRecord['QueuedTask']['sqsReceiptHandle'] = $message['ReceiptHandle']; + // Include useEcsServer flag from SQS message body for worker filtering + $confirmRecord['QueuedTask']['useEcsServer'] = isset($data['useEcsServer']) ? $data['useEcsServer'] : false; return $confirmRecord['QueuedTask']; } @@ -406,6 +431,19 @@ public function deleteSqsMessage($queueUrl, $receiptHandle) { } } + public function releaseSqsMessage($queueUrl, $receiptHandle, $visibilityTimeout = 30) { + try { + // Release message with delay to let the other worker type receive it + $this->sqsClient->changeMessageVisibility([ + 'QueueUrl' => $queueUrl, + 'ReceiptHandle' => $receiptHandle, + 'VisibilityTimeout' => $visibilityTimeout + ]); + } catch(\Exception $e) { + CakeLog::write('queue-error', 'Failed to release SQS message: ' . $e->getMessage()); + } + } + /** * Look for a new job that can be processed with the current abilities and * from the specified group (or any if null).