Skip to content

Pipeline

Ray Fung edited this page Feb 26, 2026 · 3 revisions

Pipeline

Razy's Pipeline system provides a plugin-based action pipeline with tree-structured actions, shared storage, broadcast relays, and conditional/tap operations. It is used for orchestrating complex multi-step workflows.


Table of Contents


Quick Start

use Razy\Pipeline;

$pipeline = new Pipeline();

// Add actions via plugin methods
$action = $pipeline->pipe('Validate', $formData);
$action?->then('Sanitize');
$action?->then('Store', $database);

// Execute all actions
$success = $pipeline->execute($request);

Creating a Pipeline

The Pipeline uses the PluginTrait to create actions from registered plugins. Actions are executed sequentially:

use Razy\Pipeline;
use Razy\Pipeline\Action;

$pipeline = new Pipeline();

// Create actions via pipe() — delegates to the plugin system
$validate = $pipeline->pipe('ValidateForm', $schema);
$process  = $pipeline->pipe('ProcessOrder');
$notify   = $pipeline->pipe('SendNotification');

// Or add pre-built actions directly
$customAction = Pipeline::createAction('MyPlugin', $arg1, $arg2);
if ($customAction) {
    $pipeline->add($customAction);
}

// Execute — runs all actions in order
$success = $pipeline->execute($context);
// Returns false immediately if any action's execute() returns false

// Inspect actions
$actions = $pipeline->getActions(); // Action[]
$map = $pipeline->getMap();         // introspection data

Actions

Custom Actions

Create custom actions by extending the abstract Action class:

use Razy\Pipeline\Action;

class ValidateAction extends Action
{
    private array $schema;

    public function __construct(array $schema)
    {
        $this->schema = $schema;
    }

    public function accept(string $actionType = ''): bool
    {
        // Return true to accept this action type
        return $actionType === 'validate';
    }

    public function execute(...$args): bool
    {
        $data = $args[0] ?? [];

        foreach ($this->schema as $field => $rules) {
            if (!isset($data[$field])) {
                $this->reject("Missing field: {$field}");
                return false;
            }
        }

        return true; // passes, continue pipeline
    }
}

Action Chaining

Actions form a tree structure. Use then() to create child actions:

$root = $pipeline->pipe('FormWorker');

// Chain child actions
$validate = $root?->then('Validate', $rules);
$sanitize = $root?->then('Sanitize');
$store    = $root?->then('Store', $db);

// Nested chains
$validate?->then('CheckRequired');
$validate?->then('CheckFormat');

// Result: tree structure
// FormWorker
//   ├── Validate
//   │    ├── CheckRequired
//   │    └── CheckFormat
//   ├── Sanitize
//   └── Store

Conditional & Tap

$action = $pipeline->pipe('Process');

// When — execute callback only if condition is true
$action?->when($isAdmin, function (Action $action) {
    $action->then('AdminAudit');
});

// Tap — execute callback for side-effects without affecting the chain
$action?->tap(function (Action $action) {
    error_log("Processing action: " . $action->getActionType());
});

Pipeline Storage

Pipelines provide shared storage accessible by all actions. Storage supports scoping via identifiers:

$pipeline = new Pipeline();

// Set global storage
$pipeline->setStorage('total', 0);
$pipeline->setStorage('items', []);

// Set scoped storage
$pipeline->setStorage('count', 0, 'module-a');
$pipeline->setStorage('count', 0, 'module-b');

// Get storage
$total = $pipeline->getStorage('total');            // global
$countA = $pipeline->getStorage('count', 'module-a'); // scoped

Relay (Broadcasting)

The Relay broadcasts method calls to all actions in the pipeline:

$pipeline = new Pipeline();
$pipeline->pipe('HandlerA');
$pipeline->pipe('HandlerB');

// Get relay — lazy-initialized
$relay = $pipeline->getRelay();

// Broadcast to all actions via __call magic
$relay->onDataReceived($data);
$relay->onError($exception);

// Each action that has an onDataReceived() or onError() method will receive the call

Action Tree Navigation

Actions support rich tree navigation:

// Get parent
$parent = $action->getOwner();         // Action|Pipeline|null

// Find parent by type (walks up the chain)
$form = $action->findOwner('FormWorker');
$email = $action->findOwner(':email');               // match suffix
$specific = $action->findOwner('Validate:email');    // match exact

// Get the root Pipeline
$pipeline = $action->getManager();

// Check relationships
$action->isAttached();                 // attached to anything?
$action->isAttached($parent);          // attached to specific parent?
$action->hasChild($otherAction);       // is $otherAction a direct child?
$action->isReachable();                // connected to a Pipeline root?

// Get children
$children = $action->getChildren();    // HashMap of child actions

// Detach / Terminate
$action->detach();                     // detach from parent
$action->terminate();                  // recursive detach of entire subtree
$parent->remove($child);              // remove specific child

Recursive Mode

In recursive mode, then() calls on descendants create siblings under the recursive parent instead of nesting deeper:

class FlatProcessAction extends Action
{
    public function __construct()
    {
        $this->setRecursive(true);
    }

    public function execute(...$args): bool
    {
        // All then() calls below create siblings of this action
        return true;
    }
}

// Without recursive: A -> B -> C (nested)
// With recursive:    A -> B, A -> C (flat siblings under A)

API Reference

Pipeline

Method Signature Description
__construct () Create pipeline
pipe (string $method, ...$arguments): ?Action Create action from plugin
add (Action $action): static Add existing action
execute (...$args): bool Run all actions sequentially
getRelay (): Relay Get broadcast relay
getActions (): array All root actions
setStorage (string $name, mixed $value, string $identifier = ''): static Shared storage
getStorage (string $name, string $identifier = ''): mixed Read storage
createAction (static) (string $actionType, ...$arguments): ?Action Factory method
isAction (static) (mixed $action): bool Type check
getMap (): array Introspection data

Action (Abstract)

Method Signature Description
init (string $actionType, string $identifier): static Initialize (called once)
accept (string $actionType): bool Override: accept action type?
execute (...$args): bool Override: execute logic
isExecuted (): bool Has been executed?
then (string $method, ...$arguments): ?Action Create child action
when (bool $condition, callable $callback): static Conditional
tap (callable $callback): static Side-effect
reject (mixed $message): ?Action Propagate rejection up
broadcast (...$args): static Broadcast to children
attachTo (Action|Pipeline $parent): static Attach to parent
adopt (Action $child): static Adopt a child
remove (Action $child): static Remove child
detach (): static Detach from parent
terminate (): static Recursive detach
getOwner (): Action|Pipeline|null Get parent
findOwner (string $actionType): ?Action Walk up chain
getManager (): ?Pipeline Get root pipeline
isReachable (): bool Connected to pipeline?
isAttached (Action|Pipeline|null $entity): bool Attached check
hasChild (Action $action): bool Direct child check
getChildren (): HashMap All children
getIdentifier (): string Action identifier
getActionType (): string Action type string
getMap (): array Introspection

Relay

Method Signature Description
__construct (Pipeline $pipeline) Create relay
__call (string $method, array $arguments): static Broadcast to all actions

Notification Env

Clone this wiki locally