Skip to content

Pipeline

Ray Fung edited this page Feb 26, 2026 · 3 revisions

Pipeline

Razy's Pipeline system provides a plugin-based action pipeline with tree-structured actions, shared storage, broadcast relays, and conditional/tap operations. It is used for orchestrating complex multi-step workflows.


Table of Contents


Quick Start

use Razy\Pipeline;



$pipeline = new Pipeline();



// Add actions via plugin methods

$action = $pipeline->pipe('Validate', $formData);

$action?->then('Sanitize');

$action?->then('Store', $database);



// Execute all actions

$success = $pipeline->execute($request);

Creating a Pipeline

The Pipeline uses the PluginTrait to create actions from registered plugins. Actions are executed sequentially:

use Razy\Pipeline;

use Razy\Pipeline\Action;



$pipeline = new Pipeline();



// Create actions via pipe() → delegates to the plugin system

$validate = $pipeline->pipe('ValidateForm', $schema);

$process  = $pipeline->pipe('ProcessOrder');

$notify   = $pipeline->pipe('SendNotification');



// Or add pre-built actions directly

$customAction = Pipeline::createAction('MyPlugin', $arg1, $arg2);

if ($customAction) {

    $pipeline->add($customAction);

}



// Execute → runs all actions in order

$success = $pipeline->execute($context);

// Returns false immediately if any action's execute() returns false



// Inspect actions

$actions = $pipeline->getActions(); // Action[]

$map = $pipeline->getMap();         // introspection data

Actions

Custom Actions

Create custom actions by extending the abstract Action class:

use Razy\Pipeline\Action;



class ValidateAction extends Action

{

    private array $schema;



    public function __construct(array $schema)

    {

        $this->schema = $schema;

    }



    public function accept(string $actionType = ''): bool

    {

        // Return true to accept this action type

        return $actionType === 'validate';

    }



    public function execute(...$args): bool

    {

        $data = $args[0] ?? [];



        foreach ($this->schema as $field => $rules) {

            if (!isset($data[$field])) {

                $this->reject("Missing field: {$field}");

                return false;

            }

        }



        return true; // passes, continue pipeline

    }

}

Action Chaining

Actions form a tree structure. Use then() to create child actions:

$root = $pipeline->pipe('FormWorker');



// Chain child actions

$validate = $root?->then('Validate', $rules);

$sanitize = $root?->then('Sanitize');

$store    = $root?->then('Store', $db);



// Nested chains

$validate?->then('CheckRequired');

$validate?->then('CheckFormat');



// Result: tree structure

// FormWorker

//   →?→ Validate

//   →   →?→ CheckRequired

//   →   →?→ CheckFormat

//   →?→ Sanitize

//   →?→ Store

Conditional & Tap

$action = $pipeline->pipe('Process');



// When → execute callback only if condition is true

$action?->when($isAdmin, function (Action $action) {

    $action->then('AdminAudit');

});



// Tap → execute callback for side-effects without affecting the chain

$action?->tap(function (Action $action) {

    error_log("Processing action: " . $action->getActionType());

});

Pipeline Storage

Pipelines provide shared storage accessible by all actions. Storage supports scoping via identifiers:

$pipeline = new Pipeline();



// Set global storage

$pipeline->setStorage('total', 0);

$pipeline->setStorage('items', []);



// Set scoped storage

$pipeline->setStorage('count', 0, 'module-a');

$pipeline->setStorage('count', 0, 'module-b');



// Get storage

$total = $pipeline->getStorage('total');            // global

$countA = $pipeline->getStorage('count', 'module-a'); // scoped

Relay (Broadcasting)

The Relay broadcasts method calls to all actions in the pipeline:

$pipeline = new Pipeline();

$pipeline->pipe('HandlerA');

$pipeline->pipe('HandlerB');



// Get relay → lazy-initialized

$relay = $pipeline->getRelay();



// Broadcast to all actions via __call magic

$relay->onDataReceived($data);

$relay->onError($exception);



// Each action that has an onDataReceived() or onError() method will receive the call

Action Tree Navigation

Actions support rich tree navigation:

// Get parent

$parent = $action->getOwner();         // Action|Pipeline|null



// Find parent by type (walks up the chain)

$form = $action->findOwner('FormWorker');

$email = $action->findOwner(':email');               // match suffix

$specific = $action->findOwner('Validate:email');    // match exact



// Get the root Pipeline

$pipeline = $action->getManager();



// Check relationships

$action->isAttached();                 // attached to anything?

$action->isAttached($parent);          // attached to specific parent?

$action->hasChild($otherAction);       // is $otherAction a direct child?

$action->isReachable();                // connected to a Pipeline root?



// Get children

$children = $action->getChildren();    // HashMap of child actions



// Detach / Terminate

$action->detach();                     // detach from parent

$action->terminate();                  // recursive detach of entire subtree

$parent->remove($child);              // remove specific child

Recursive Mode

In recursive mode, then() calls on descendants create siblings under the recursive parent instead of nesting deeper:

class FlatProcessAction extends Action

{

    public function __construct()

    {

        $this->setRecursive(true);

    }



    public function execute(...$args): bool

    {

        // All then() calls below create siblings of this action

        return true;

    }

}



// Without recursive: A -> B -> C (nested)

// With recursive:    A -> B, A -> C (flat siblings under A)

API Reference

Pipeline

| Method | Signature | Description |

| --- | --- | --- |

| __construct | () | Create pipeline |

| pipe | (string $method, ...$arguments): ?Action | Create action from plugin |

| add | (Action $action): static | Add existing action |

| execute | (...$args): bool | Run all actions sequentially |

| getRelay | (): Relay | Get broadcast relay |

| getActions | (): array | All root actions |

| setStorage | (string $name, mixed $value, string $identifier = ''): static | Shared storage |

| getStorage | (string $name, string $identifier = ''): mixed | Read storage |

| createAction (static) | (string $actionType, ...$arguments): ?Action | Factory method |

| isAction (static) | (mixed $action): bool | Type check |

| getMap | (): array | Introspection data |

Action (Abstract)

| Method | Signature | Description |

| --- | --- | --- |

| init | (string $actionType, string $identifier): static | Initialize (called once) |

| accept | (string $actionType): bool | Override: accept action type? |

| execute | (...$args): bool | Override: execute logic |

| isExecuted | (): bool | Has been executed? |

| then | (string $method, ...$arguments): ?Action | Create child action |

| when | (bool $condition, callable $callback): static | Conditional |

| tap | (callable $callback): static | Side-effect |

| reject | (mixed $message): ?Action | Propagate rejection up |

| broadcast | (...$args): static | Broadcast to children |

| attachTo | (Action\|Pipeline $parent): static | Attach to parent |

| adopt | (Action $child): static | Adopt a child |

| remove | (Action $child): static | Remove child |

| detach | (): static | Detach from parent |

| terminate | (): static | Recursive detach |

| getOwner | (): Action\|Pipeline\|null | Get parent |

| findOwner | (string $actionType): ?Action | Walk up chain |

| getManager | (): ?Pipeline | Get root pipeline |

| isReachable | (): bool | Connected to pipeline? |

| isAttached | (Action\|Pipeline\|null $entity): bool | Attached check |

| hasChild | (Action $action): bool | Direct child check |

| getChildren | (): HashMap | All children |

| getIdentifier | (): string | Action identifier |

| getActionType | (): string | Action type string |

| getMap | (): array | Introspection |

Relay

| Method | Signature | Description |

| --- | --- | --- |

| __construct | (Pipeline $pipeline) | Create relay |

| __call | (string $method, array $arguments): static | Broadcast to all actions |

← Previous: Notification

Env

Clone this wiki locally