Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion ProcessMaker/Nayra/Managers/WorkflowManagerRabbitMq.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@
use Illuminate\Support\Facades\Auth;
use Illuminate\Support\Facades\Log;
use ProcessMaker\Contracts\WorkflowManagerInterface;
use ProcessMaker\Exception\ScriptException;
use ProcessMaker\Facades\MessageBrokerService;
use ProcessMaker\Facades\WorkflowManager;
use ProcessMaker\GenerateAccessToken;
use ProcessMaker\Managers\DataManager;
use ProcessMaker\Models\EnvironmentVariable;
use ProcessMaker\Models\Process as Definitions;
use ProcessMaker\Models\ProcessRequest;
use ProcessMaker\Models\Script;
use ProcessMaker\Models\User;
use ProcessMaker\Nayra\Contracts\Bpmn\BoundaryEventInterface;
use ProcessMaker\Nayra\Contracts\Bpmn\ScriptTaskInterface;
use ProcessMaker\Nayra\Contracts\Bpmn\ServiceTaskInterface;
use ProcessMaker\Nayra\Contracts\Bpmn\StartEventInterface;
use ProcessMaker\Nayra\Contracts\Bpmn\TokenInterface;
use ProcessMaker\Nayra\Contracts\Engine\ExecutionInstanceInterface;
use Throwable;

class WorkflowManagerRabbitMq extends WorkflowManagerDefault implements WorkflowManagerInterface
{
const ACTION_START_PROCESS = 'START_PROCESS';

const ACTION_COMPLETE_TASK = 'COMPLETE_TASK';
const ACTION_TRIGGER_INTERMEDIATE_EVENT = 'TRIGGER_INTERMEDIATE_EVENT';
const ACTION_RUN_SCRIPT = 'RUN_SCRIPT';
Expand Down Expand Up @@ -197,6 +202,102 @@ public function runScripTask(ScriptTaskInterface $scriptTask, TokenInterface $to
]);
}

/**
* Run a service task.
*
* @param ServiceTaskInterface $serviceTask
* @param TokenInterface $token
*/
public function runServiceTask(ServiceTaskInterface $serviceTask, TokenInterface $token)
{
// Log execution
Log::info('Dispatch a service task: ' . $serviceTask->getId());

// Get complementary information
$element = $token->getDefinition(true);
$instance = $token->processRequest;
$processModel = $instance->process;
$version = $instance->process_version_id;
$userId = $this->getCurrentUserId();
$state = $this->serializeState($instance);

// Exit if the task was completed or closed
if (!$token || !$element) {
return;
}

// Get service task configuration
$implementation = $element->getImplementation();
$configuration = json_decode($element->getProperty('config'), true);
$errorHandling = json_decode($element->getProperty('errorHandling'), true);

// Check to see if we've failed parsing. If so, let's convert to empty array.
if ($configuration === null) {
$configuration = [];
}

try {
if (empty($implementation)) {
throw new ScriptException('Service task implementation not defined');
} else {
$script = Script::where('key', $implementation)->first();
}

// Check if service task implementation exists
$existsImpl = WorkflowManager::existsServiceImplementation($implementation);
if (!$existsImpl && empty($script)) {
throw new ScriptException('Service task not implemented: ' . $implementation);
}

// Get data
$dataManager = new DataManager();
$data = $dataManager->getData($token);

// Run implementation/script
if ($existsImpl) {
$response = [
'output' => WorkflowManager::runServiceImplementation($implementation, $data, $configuration, $token->getId(), $errorHandling),
];
} else {
$response = $script->runScript($data, $configuration, $token->getId(), $errorHandling);
}

// Update data
if (is_array($response['output'])) {
// Validate data
$this->validateData($response['output'], $processModel, $element);
$dataManager = new DataManager();
$dataManager->updateData($token, $response['output']);
}

// Dispatch complete task action
$this->dispatchAction([
'bpmn' => $version,
'action' => self::ACTION_COMPLETE_TASK,
'params' => [
'request_id' => $token->process_request_id,
'token_id' => $token->uuid,
'element_id' => $token->element_id,
'data' => $response['output'],
],
'state' => $state,
'session' => [
'user_id' => $userId,
],
]);
} catch (Throwable $exception) {
// Change to error status
$token->setStatus(ServiceTaskInterface::TOKEN_STATE_FAILING);
$error = $element->getRepository()->createError();
$error->setName($exception->getMessage());
$token->setProperty('error', $error);

// Log message errors
Log::info('Service task failed: ' . $implementation . ' - ' . $exception->getMessage());
Log::error($exception->getTraceAsString());
}
}

/**
* Trigger a boundary event
*
Expand Down
11 changes: 6 additions & 5 deletions ProcessMaker/Nayra/MessageBrokers/ServiceRabbitMq.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ public function disconnect(): void
public function sendMessage(string $subject, string $collaborationId, mixed $body): void
{
// Connect to RabbitMQ
$this->connect();
if (!($this->connection instanceof AMQPStreamConnection) || !$this->connection->isConnected()) {
$this->connect();
}

// Prepare the message to send
$message = new AMQPMessage(json_encode(['data' => $body, 'collaboration_id' => $collaborationId]));

// Publish the message
$this->channel->basic_publish($message, '', self::QUEUE_NAME_PUBLISH);

// Close connection to RabbitMQ
$this->disconnect();
}

/**
Expand Down Expand Up @@ -114,7 +113,9 @@ public function worker(): void
}

// Disconnect from service
$this->disconnect();
if ($this->connection->isConnected()) {
$this->disconnect();
}
}

/**
Expand Down
10 changes: 10 additions & 0 deletions ProcessMaker/Nayra/Repositories/PersistenceHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Exception;
use Illuminate\Support\Facades\Auth;
use ProcessMaker\Listeners\BpmnSubscriber;
use ProcessMaker\Managers\TaskSchedulerManager;
use ProcessMaker\Repositories\ExecutionInstanceRepository;
use ProcessMaker\Repositories\TokenRepository;
Expand Down Expand Up @@ -118,6 +119,15 @@ public function save(array $transaction)
case 'schedule_duration':
$this->persistScheduleDuration($transaction);
break;
case 'service_task_activated':
// Get object instances
$token = $this->deserializer->unserializeToken($transaction['token']);
$serviceTask = $this->deserializer->unserializeEntity($transaction['activity']);

// Trigger service task listener
$subscriber = new BpmnSubscriber();
$subscriber->onServiceTaskActivated($serviceTask, $token);
break;
default:
throw new Exception('Unknown transaction type ' . $transaction['type']);
}
Expand Down