A comprehensive Laravel package for managing pipelined processes and their tasks with state management, logging, and async execution capabilities.
- Pipeline-based Process Execution: Execute complex workflows using Laravel's Pipeline pattern
- State Management: Track process and task states (PENDING, PROCESSING, COMPLETED, ABORTED, WAITING)
- Task Orchestration: Chain multiple tasks within processes with automatic state transitions
- Pause & Resume: Pause processes and resume them via signed URLs
- Nested Processes: Execute processes within other processes for complex workflows
- Async Execution: Dispatch processes to queues for background processing
- Comprehensive Logging: Built-in activity logging with batch support
- Polymorphic Relations: Associate processes with any Eloquent model
- Payload Management: Type-safe payload handling with DTOs
- Exception Handling: Graceful error handling with task-specific exceptions
Install the package via Composer:
composer require ibrostudio/laravel-tasks
Run the installation command:
php artisan tasks:install
This command will:
- Publish the configuration file
- Publish and run the migrations
The package publishes a configuration file to config/tasks.php
:
<?php
return [
'log_processes' => true, // Enable/disable process logging
'queue' => 'processes', // Queue name for async process execution
];
The package creates two main tables:
id
: Primary keytype
: Process class namepayload
: JSON payload datastate
: Current process stateprocessable_id/processable_type
: Polymorphic relationprocessable_dto
: JSON DTO dataparent_process_id
: For nested processeslog_batch_uuid
: Activity log batch UUIDcreated_at/updated_at
: Timestamps
id
: Primary keyprocess_id
: Foreign key to processestype
: Task class nameas_process_id
: For tasks that run as sub-processesstate
: Current task stateprocessable_id/processable_type
: Polymorphic relationprocessable_dto
: JSON DTO data
Create a process class by extending the base Process
model:
<?php
namespace App\Processes;
use IBroStudio\Tasks\Models\Process;
use IBroStudio\Tasks\Dto\ProcessConfigDto;
use App\Tasks\ValidateDataTask;
use App\Tasks\ProcessDataTask;
use App\Tasks\SendNotificationTask;
class DataProcessingProcess extends Process
{
protected function getConfig(array $properties = []): ProcessConfigDto
{
return ProcessConfigDto::from([
'tasks' => [
ValidateDataTask::class,
ProcessDataTask::class,
SendNotificationTask::class,
],
'use_logs' => true,
'log_name' => 'data-processing',
...$properties,
]);
}
}
Create task classes by extending the base Task
model:
<?php
namespace App\Tasks;
use IBroStudio\Tasks\Models\Task;
use IBroStudio\Tasks\Contracts\PayloadContract;
class ValidateDataTask extends Task
{
/**
* @param MyCustomPaylodDto $payload
*/
protected function execute(PayloadContract $payload): PayloadContract|array
{
// Perform task logic
// Return updated payload
return $payload->updateDto(['validated' => true]);
}
}
Create type-safe payload DTOs:
<?php
namespace App\Dto;
use IBroStudio\Tasks\Dto\DefaultProcessPayloadDto;
class DataProcessingPayloadDto extends DefaultProcessPayloadDto
{
public function __construct(
public array $data,
public bool $validated = false,
public bool $processed = false,
public ?string $notification_sent = null,
) {}
}
Payloads DTOs extend Spatie Laravel Data.
use App\Processes\DataProcessingProcess;
use App\Dto\DataProcessingPayloadDto;
// Create and execute a process
$process = DataProcessingProcess::->create([
'payload' => DataProcessingPayloadDto::from([
'data' => ['user_id' => 123, 'required_field' => 'value']
])
]);
$result = $process->handle();
// Check the result
if ($result->state === ProcessStatesEnum::COMPLETED) {
echo "Process completed successfully!";
echo "Final payload: " . json_encode($result->payload);
}
// Dispatch to queue
$process->dispatch();
// The process will be executed in the background
// You can check the status later
$process->refresh();
echo "Current state: " . $process->state->value;
Associate processes with Eloquent models:
use App\Models\User;
use IBroStudio\Tasks\Concerns\IsProcessableModel;
class User extends Model
{
use IsProcessableModel;
// optional
public function processUserData(): Process
{
return $this->processes()->create([
'type' => DataProcessingProcess::class,
'payload' => DataProcessingPayloadDto::from([
'data' => $this->toArray()
])
]);
}
}
// Usage
$user = User::find(1);
$process = $user->process(DataProcessingProcess::class, DataProcessingPayloadDto::from(['user_id' => 123, 'required_field' => 'value']));
// or
$user->dispatch(DataProcessingProcess::class, DataProcessingPayloadDto::from(['user_id' => 123, 'required_field' => 'value']));
// or
$process = $user->processUserData()->handle();
Tasks can pause processes and generate signed URLs for resumption:
use IBroStudio\Tasks\Exceptions\PauseProcessException;
class RequireApprovalTask extends Task
{
protected function execute(PayloadContract $payload): PayloadContract|array
{
if ($this->requiresApproval($payload)) {
// This will pause the process and generate a resume URL
throw new PauseProcessException(
task: $this,
message: 'Approval required'
);
}
return $payload;
}
private function requiresApproval(PayloadContract $payload): bool
{
return $payload->amount > 10000;
}
}
// Generate resume URL
$resumeUrl = $process->resumeUrl();
// Send URL to approver via email/notification
Mail::to($approver)->send(new ApprovalRequired($resumeUrl));
Execute processes within other processes:
class ParentProcess extends Process
{
protected function getConfig(array $properties = []): ProcessConfigDto
{
return ProcessConfigDto::from([
'tasks' => [
PrepareDataTask::class,
ExecuteChildProcess::class, // This task will run a sub-process
FinalizeTask::class,
],
...$properties,
]);
}
}
Handle different types of exceptions:
use IBroStudio\Tasks\Exceptions\AbortProcessException;
use IBroStudio\Tasks\Exceptions\SkipTaskException;
class ConditionalTask extends Task
{
protected function execute(PayloadContract $payload): PayloadContract|array
{
if ($payload->should_abort) {
throw new AbortProcessException(
task: $this,
message: 'Process aborted due to condition'
);
}
if ($payload->should_skip) {
throw new SkipTaskException('Skipping this task');
}
// Normal execution
return $payload->updateDto(['processed' => true]);
}
}
Override configuration for specific needs:
class CustomProcess extends Process
{
protected function getConfig(array $properties = []): ProcessConfigDto
{
return ProcessConfigDto::from([
'tasks' => [
CustomTask::class,
],
'use_logs' => false, // Disable logging
'payload' => CustomPayload::class, // Custom payload class
'queue' => 'high-priority', // Custom queue
...$properties,
]);
}
}
The package is thoroughly tested with Pest. Run the tests:
composer test
use function Pest\Laravel\assertModelExists;
it('can execute a data processing process', function () {
$process = DataProcessingProcess::factory()->create([
'payload' => DataProcessingPayloadDto::from([
'data' => ['required_field' => 'test_value']
])
]);
$result = $process->handle();
expect($result->state)->toBe(ProcessStatesEnum::COMPLETED)
->and($result->tasks)->each(fn (Task $task) => $task->state->toBe(TaskStatesEnum::COMPLETED))
->and($result->payload->validated)->toBeTrue()
->and($result->payload->processed)->toBeTrue();
assertModelExists($result);
});
it('can pause and resume a process', function () {
Queue::fake();
$process = ApprovalProcess::factory()->create([
'payload' => ApprovalPayloadDto::from([
'amount' => 15000 // This will trigger pause
])
]);
$result = $process->handle();
expect($result->state)->toBe(ProcessStatesEnum::WAITING);
// Simulate approval via resume URL
get($result->resumeUrl())->assertSuccessful();
$result->refresh();
expect($result->state)->toBe(ProcessStatesEnum::COMPLETED);
});
handle()
: Execute the process synchronouslydispatch()
: Queue the process for async executiontransitionTo(ProcessStatesEnum $state, ?string $message = null)
: Change process stateupdatePayload(PayloadContract|array $data)
: Update the process payloadresumeUrl()
: Generate signed URL for resuming paused processes
handle(PayloadContract $payload)
: Execute the taskexecute(PayloadContract $payload)
: Override this method for custom task logictransitionTo(TaskStatesEnum $state, ?string $message = null)
: Change task state
PENDING
: Process created but not startedPROCESSING
: Process is currently runningCOMPLETED
: Process finished successfullyABORTED
: Process was aborted due to errorWAITING
: Process is paused and waiting for external action
PENDING
: Task not yet startedSTARTED
: Task is currently executingCOMPLETED
: Task finished successfullyABORTED
: Task was abortedWAITING
: Task is paused
AbortProcessException
: Aborts the entire processPauseProcessException
: Pauses the process for external actionSkipTaskException
: Skips the current task and continues
Please see CONTRIBUTING for details.
- Clone the repository
- Install dependencies:
composer install
- Run tests:
composer test
- Check code style:
composer pint
Please review our security policy on how to report security vulnerabilities.
The MIT License (MIT). Please see License File for more information.
Please see CHANGELOG for more information on what has changed recently.