diff --git a/ProcessMaker/Nayra/Managers/WorkflowManagerRabbitMq.php b/ProcessMaker/Nayra/Managers/WorkflowManagerRabbitMq.php index ecab7b0813..542447172c 100644 --- a/ProcessMaker/Nayra/Managers/WorkflowManagerRabbitMq.php +++ b/ProcessMaker/Nayra/Managers/WorkflowManagerRabbitMq.php @@ -6,22 +6,27 @@ use Illuminate\Support\Facades\Auth; use Illuminate\Support\Facades\Log; use ProcessMaker\Contracts\WorkflowManagerInterface; +use ProcessMaker\Exception\ScriptException; use ProcessMaker\Facades\MessageBrokerService; +use ProcessMaker\Facades\WorkflowManager; use ProcessMaker\GenerateAccessToken; +use ProcessMaker\Managers\DataManager; use ProcessMaker\Models\EnvironmentVariable; use ProcessMaker\Models\Process as Definitions; use ProcessMaker\Models\ProcessRequest; +use ProcessMaker\Models\Script; use ProcessMaker\Models\User; use ProcessMaker\Nayra\Contracts\Bpmn\BoundaryEventInterface; use ProcessMaker\Nayra\Contracts\Bpmn\ScriptTaskInterface; +use ProcessMaker\Nayra\Contracts\Bpmn\ServiceTaskInterface; use ProcessMaker\Nayra\Contracts\Bpmn\StartEventInterface; use ProcessMaker\Nayra\Contracts\Bpmn\TokenInterface; use ProcessMaker\Nayra\Contracts\Engine\ExecutionInstanceInterface; +use Throwable; class WorkflowManagerRabbitMq extends WorkflowManagerDefault implements WorkflowManagerInterface { const ACTION_START_PROCESS = 'START_PROCESS'; - const ACTION_COMPLETE_TASK = 'COMPLETE_TASK'; const ACTION_TRIGGER_INTERMEDIATE_EVENT = 'TRIGGER_INTERMEDIATE_EVENT'; const ACTION_RUN_SCRIPT = 'RUN_SCRIPT'; @@ -197,6 +202,102 @@ public function runScripTask(ScriptTaskInterface $scriptTask, TokenInterface $to ]); } + /** + * Run a service task. + * + * @param ServiceTaskInterface $serviceTask + * @param TokenInterface $token + */ + public function runServiceTask(ServiceTaskInterface $serviceTask, TokenInterface $token) + { + // Log execution + Log::info('Dispatch a service task: ' . $serviceTask->getId()); + + // Get complementary information + $element = $token->getDefinition(true); + $instance = $token->processRequest; + $processModel = $instance->process; + $version = $instance->process_version_id; + $userId = $this->getCurrentUserId(); + $state = $this->serializeState($instance); + + // Exit if the task was completed or closed + if (!$token || !$element) { + return; + } + + // Get service task configuration + $implementation = $element->getImplementation(); + $configuration = json_decode($element->getProperty('config'), true); + $errorHandling = json_decode($element->getProperty('errorHandling'), true); + + // Check to see if we've failed parsing. If so, let's convert to empty array. + if ($configuration === null) { + $configuration = []; + } + + try { + if (empty($implementation)) { + throw new ScriptException('Service task implementation not defined'); + } else { + $script = Script::where('key', $implementation)->first(); + } + + // Check if service task implementation exists + $existsImpl = WorkflowManager::existsServiceImplementation($implementation); + if (!$existsImpl && empty($script)) { + throw new ScriptException('Service task not implemented: ' . $implementation); + } + + // Get data + $dataManager = new DataManager(); + $data = $dataManager->getData($token); + + // Run implementation/script + if ($existsImpl) { + $response = [ + 'output' => WorkflowManager::runServiceImplementation($implementation, $data, $configuration, $token->getId(), $errorHandling), + ]; + } else { + $response = $script->runScript($data, $configuration, $token->getId(), $errorHandling); + } + + // Update data + if (is_array($response['output'])) { + // Validate data + $this->validateData($response['output'], $processModel, $element); + $dataManager = new DataManager(); + $dataManager->updateData($token, $response['output']); + } + + // Dispatch complete task action + $this->dispatchAction([ + 'bpmn' => $version, + 'action' => self::ACTION_COMPLETE_TASK, + 'params' => [ + 'request_id' => $token->process_request_id, + 'token_id' => $token->uuid, + 'element_id' => $token->element_id, + 'data' => $response['output'], + ], + 'state' => $state, + 'session' => [ + 'user_id' => $userId, + ], + ]); + } catch (Throwable $exception) { + // Change to error status + $token->setStatus(ServiceTaskInterface::TOKEN_STATE_FAILING); + $error = $element->getRepository()->createError(); + $error->setName($exception->getMessage()); + $token->setProperty('error', $error); + + // Log message errors + Log::info('Service task failed: ' . $implementation . ' - ' . $exception->getMessage()); + Log::error($exception->getTraceAsString()); + } + } + /** * Trigger a boundary event * diff --git a/ProcessMaker/Nayra/MessageBrokers/ServiceRabbitMq.php b/ProcessMaker/Nayra/MessageBrokers/ServiceRabbitMq.php index b621030626..7408d9457e 100644 --- a/ProcessMaker/Nayra/MessageBrokers/ServiceRabbitMq.php +++ b/ProcessMaker/Nayra/MessageBrokers/ServiceRabbitMq.php @@ -61,16 +61,15 @@ public function disconnect(): void public function sendMessage(string $subject, string $collaborationId, mixed $body): void { // Connect to RabbitMQ - $this->connect(); + if (!($this->connection instanceof AMQPStreamConnection) || !$this->connection->isConnected()) { + $this->connect(); + } // Prepare the message to send $message = new AMQPMessage(json_encode(['data' => $body, 'collaboration_id' => $collaborationId])); // Publish the message $this->channel->basic_publish($message, '', self::QUEUE_NAME_PUBLISH); - - // Close connection to RabbitMQ - $this->disconnect(); } /** @@ -114,7 +113,9 @@ public function worker(): void } // Disconnect from service - $this->disconnect(); + if ($this->connection->isConnected()) { + $this->disconnect(); + } } /** diff --git a/ProcessMaker/Nayra/Repositories/PersistenceHandler.php b/ProcessMaker/Nayra/Repositories/PersistenceHandler.php index 1600e38155..51088a65df 100644 --- a/ProcessMaker/Nayra/Repositories/PersistenceHandler.php +++ b/ProcessMaker/Nayra/Repositories/PersistenceHandler.php @@ -4,6 +4,7 @@ use Exception; use Illuminate\Support\Facades\Auth; +use ProcessMaker\Listeners\BpmnSubscriber; use ProcessMaker\Managers\TaskSchedulerManager; use ProcessMaker\Repositories\ExecutionInstanceRepository; use ProcessMaker\Repositories\TokenRepository; @@ -118,6 +119,15 @@ public function save(array $transaction) case 'schedule_duration': $this->persistScheduleDuration($transaction); break; + case 'service_task_activated': + // Get object instances + $token = $this->deserializer->unserializeToken($transaction['token']); + $serviceTask = $this->deserializer->unserializeEntity($transaction['activity']); + + // Trigger service task listener + $subscriber = new BpmnSubscriber(); + $subscriber->onServiceTaskActivated($serviceTask, $token); + break; default: throw new Exception('Unknown transaction type ' . $transaction['type']); }