Stateful, graph-based workflow engine for Laravel.
Build multi-step agent pipelines, human-in-the-loop processes, and parallel fan-out/fan-in tasks — all backed by your database and queue.
Inspired by LangGraph
- Installation
- Core Concepts
- Building a Workflow
- Running a Workflow
- State
- Human-in-the-Loop
- Node Contracts
- Built-in Node Types
- Prism Integration
- Laravel AI Integration
- Sub-graph Workflows
- Events
- Configuration
- Testing
composer require cainy/laragraphPublish and run the migration:
php artisan vendor:publish --tag="laragraph-migrations"
php artisan migratePublish the config file:
php artisan vendor:publish --tag="laragraph-config"LaraGraph models a workflow as a directed graph of nodes connected by edges. Each run of that graph is a WorkflowRun — a database record that tracks the current state, status, and active node pointers.
| Term | Meaning |
|---|---|
| Node | A unit of work. Receives the current state, returns a mutation. |
| Edge | A directed connection between two nodes, optionally conditional. |
| State | A plain PHP array that accumulates mutations as nodes execute. |
| Pointer | Tracks which nodes are currently in-flight for a run. |
| WorkflowRun | The persisted record for a single execution of a workflow. |
Execution is fully queue-driven. Each node runs as an independent ExecuteNode job, so parallel branches execute concurrently across your worker pool.
A node is any class implementing Cainy\Laragraph\Contracts\Node:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Engine\NodeExecutionContext;
class SummarizeNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
$text = implode("\n", $state['paragraphs'] ?? []);
return ['summary' => substr($text, 0, 200)];
}
}handle() receives a typed NodeExecutionContext and the current full state. It returns an array of mutations — only the keys you want to change.
The context object carries everything the node needs to know about its execution environment:
$context->runId // int — ID of the WorkflowRun
$context->workflowKey // string — registered name or key of the workflow
$context->nodeName // string — name of this node in the graph
$context->attempt // int — current queue attempt (1-based)
$context->maxAttempts // int — maximum attempts configured
$context->createdAt // DateTimeImmutable
$context->isolatedPayload // ?array — payload from a Send fan-out (see below)Build a workflow with the fluent Workflow builder:
use Cainy\Laragraph\Builder\Workflow;
$workflow = Workflow::create()
->addNode('fetch', FetchNode::class)
->addNode('transform', TransformNode::class)
->addNode('store', StoreNode::class)
->transition(Workflow::START, 'fetch')
->transition('fetch', 'transform')
->transition('transform', 'store')
->transition('store', Workflow::END);Workflow::START and Workflow::END are the reserved entry and exit pseudo-nodes.
Nodes can be registered as class strings (resolved via the container) or as pre-built instances.
Pass a condition as the third argument to ->transition(). It can be a Closure or a Symfony Expression Language string:
// Closure
->transition('classify', 'approve', fn(array $state) => $state['score'] > 50)
->transition('classify', 'reject', fn(array $state) => $state['score'] <= 50)
// Expression string (serializable — required for snapshot workflows)
->transition('classify', 'approve', "state['score'] > 50")
->transition('classify', 'reject', "state['score'] <= 50")The expression receives the full state under the state variable.
Built-in expression functions:
| Function | Description |
|---|---|
has_tool_calls(messages) |
Returns true if the last message in messages contains tool_calls. |
send_all(nodeName, items, key) |
Returns a Send[] array for dynamic fan-out (see below). |
A branch edge uses a resolver to return one or more target node names dynamically at runtime:
->branch('router', function(array $state): string {
return $state['approved'] ? 'publish' : 'revise';
}, targets: ['publish', 'revise'])The targets array is optional but recommended — it enables graph visualization without executing the resolver.
For serializable workflows, pass an expression string as the resolver:
->branch('router', "state['approved'] ? 'publish' : 'revise'", targets: ['publish', 'revise'])To execute multiple nodes in parallel from a single node, add multiple transitions from the same source:
Workflow::create()
->addNode('split', SplitNode::class)
->addNode('branch-a', BranchANode::class)
->addNode('branch-b', BranchBNode::class)
->addNode('merge', MergeNode::class)
->transition(Workflow::START, 'split')
->transition('split', 'branch-a') // dispatched in parallel
->transition('split', 'branch-b') // dispatched in parallel
->transition('branch-a', 'merge')
->transition('branch-b', 'merge')
->transition('merge', Workflow::END);branch-a and branch-b run as independent queue jobs. The merge node is dispatched once each branch completes. Fan-in barrier logic (waiting for all branches) is the node's responsibility — check that all expected state keys are present before proceeding.
To fan out over a dynamic list, return Send objects from a branch edge:
use Cainy\Laragraph\Routing\Send;
// In a branch edge resolver:
->branch('planner', function(array $state): array {
return array_map(
fn(string $query) => new Send('worker', ['query' => $query]),
$state['queries']
);
}, targets: ['worker'])Each Send dispatches an independent ExecuteNode job with an isolatedPayload available via $context->isolatedPayload.
Register workflows in your AppServiceProvider or a dedicated provider:
use Cainy\Laragraph\Facades\Laragraph;
public function boot(): void
{
Laragraph::register('my-pipeline', fn() => MyPipelineWorkflow::build());
}Or register them via the config file:
// config/laragraph.php
'workflows' => [
'my-pipeline' => MyPipelineWorkflow::class, // must return a Workflow or CompiledWorkflow
],use Cainy\Laragraph\Facades\Laragraph;
$run = Laragraph::start('my-pipeline', initialState: [
'input' => 'Hello, world!',
]);
echo $run->id; // WorkflowRun ID
echo $run->status; // RunStatus::RunningThe run is created synchronously. Node jobs are dispatched to your queue immediately after.
For ad-hoc workflows that aren't pre-registered, pass a Workflow builder directly. The graph is serialized as a snapshot so workers can reconstruct it without a registry:
$run = Laragraph::startFromBlueprint(
blueprint: Workflow::create()
->withName('ad-hoc-pipeline')
->addNode('step', MyNode::class)
->transition(Workflow::START, 'step')
->transition('step', Workflow::END),
initialState: ['input' => 'data'],
);Snapshot workflows require all edges to use expression strings (not Closures), since Closures cannot be serialized.
// Pause a running workflow
Laragraph::pause($run->id);
// Resume a paused workflow, optionally merging additional state
Laragraph::resume($run->id, ['approved' => true]);
// Abort a workflow (sets status to Failed, clears pointers)
Laragraph::abort($run->id);These are also available as model methods:
$run->pause();
$run->resume(['approved' => true]);
$run->abort();State is a plain PHP array that persists in the workflow_runs.state column. Every node receives the full current state and returns a mutation — a partial array of keys to update.
The reducer determines how mutations are merged into the existing state.
LaraGraph ships with three reducers:
| Class | Behaviour |
|---|---|
SmartReducer (default) |
List arrays are appended. Scalars and associative arrays are overwritten. |
MergeReducer |
Deep recursive merge for all keys. |
OverwriteReducer |
Shallow array_merge — always overwrites. |
SmartReducer is the right default for most agent workflows: message histories accumulate naturally, while scalar values like status or score simply overwrite.
Implement StateReducerInterface and bind it in your service provider, or attach it to a specific workflow:
// Globally
$this->app->bind(StateReducerInterface::class, MyReducer::class);
// Per workflow
Workflow::create()
->withReducer(MyReducer::class)
// ...LaraGraph has first-class support for pausing workflows and waiting for human input before continuing.
Pause the run before a node executes. On resume, the node runs normally.
Workflow::create()
->addNode('review', ReviewNode::class)
// ...
->interruptBefore('review');Pause the run after a node executes but before its outgoing edges are evaluated. Use this when you want a human to inspect the node's output before the workflow continues.
Workflow::create()
->addNode('drafter', DrafterNode::class)
->addNode('publish', PublishNode::class)
->transition(Workflow::START, 'drafter')
->transition('drafter', 'publish')
->transition('publish', Workflow::END)
->interruptAfter('drafter'); // pauses after draft is writtenCall Laragraph::resume() with any additional state to merge before the run continues. This is how you pass human decisions back into the workflow:
// Human approves — merge the decision into state, edges evaluate against it
Laragraph::resume($run->id, [
'meta' => ['approved' => true],
]);A branch edge on the next node can then route based on state['meta']['approved'].
Any node can pause the run at runtime by throwing NodePausedException. Unlike interrupt_before/after, this lets the node itself decide whether to pause based on runtime state:
use Cainy\Laragraph\Exceptions\NodePausedException;
class ConfidenceCheckNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
if ($state['confidence'] < 0.7) {
throw new NodePausedException($context->nodeName);
}
return ['status' => 'confident'];
}
}The engine keeps the node's active pointer alive so resume() re-dispatches it from the same position.
Nodes can implement optional contracts to declare capabilities to the engine.
Give a node a human-readable identifier for graph visualization and edge routing:
use Cainy\Laragraph\Contracts\HasName;
class ResearchAgentNode implements Node, HasName
{
public function name(): string
{
return 'research-agent';
}
// ...
}Emit telemetry metadata alongside the NodeCompleted event. Useful for tracking token usage, model names, cost centers, or tenant IDs:
use Cainy\Laragraph\Contracts\HasTags;
class LLMNode implements Node, HasTags
{
public function tags(): array
{
return [
'model' => 'claude-sonnet-4-6',
'cost_center' => 'marketing',
];
}
// ...
}Tags are included in the NodeCompleted event payload and broadcast to the workflow channel.
Declare a maximum execution time in seconds. If the node exceeds this limit, the queue worker terminates the job and the retry policy kicks in:
use Cainy\Laragraph\Contracts\HasTimeout;
class SlowAPINode implements Node, HasTimeout
{
public function timeout(): int
{
return 120; // 2 minutes
}
// ...
}Define per-node retry behaviour with exponential backoff and optional jitter:
use Cainy\Laragraph\Contracts\HasRetryPolicy;
use Cainy\Laragraph\Engine\RetryPolicy;
class FlakyAPINode implements Node, HasRetryPolicy
{
public function retryPolicy(): RetryPolicy
{
return new RetryPolicy(
initialInterval: 1.0, // seconds before first retry
backoffFactor: 2.0, // doubles each attempt
maxInterval: 30.0, // cap at 30 seconds
maxAttempts: 5,
jitter: true, // add ±25% randomness
);
}
// ...
}You can also restrict retries to specific exception types:
new RetryPolicy(
maxAttempts: 3,
retryOn: [RateLimitException::class, TimeoutException::class],
)
// Or with a Closure for full control:
new RetryPolicy(
maxAttempts: 3,
retryOn: fn(Throwable $e) => $e->getCode() === 429,
)The current attempt number is always available in the node via $context->attempt and $context->maxAttempts.
A lightweight inline node for pure state transforms — no class required:
use Cainy\Laragraph\Nodes\FormatNode;
Workflow::create()
->addNode('format', new FormatNode(
fn(array $state) => ['summary' => implode("\n", $state['lines'])]
))The closure receives ($state, ?array $isolatedPayload) and returns a mutation array.
A convenience node that always pauses the run and fires a HumanInterventionRequired event. Prefer ->interruptBefore() / ->interruptAfter() for static pause points — use this when the pause decision is dynamic:
Workflow::create()
->addNode('check', new HumanInterruptNode())LaraGraph ships with first-class support for Prism via the Cainy\Laragraph\Integrations\Prism namespace. Install Prism to use these:
composer require prism-php/prismA concrete, configurable LLM node. No subclass needed for common use cases:
use Cainy\Laragraph\Integrations\Prism\PrismNode;
use Prism\Prism\Enums\Provider;
use Prism\Prism\Tool;
$workflow = Workflow::create()
->addNode('agent', new PrismNode(
provider: Provider::Anthropic,
model: 'claude-sonnet-4-20250514',
systemPrompt: 'You are a helpful assistant.',
maxTokens: 1024,
tools: [
(new Tool)
->as('get_weather')
->for('Get weather for a city')
->withStringParameter('city', 'City name')
->using(fn (string $city): string => "Sunny, 22°C in {$city}"),
],
))
->transition(Workflow::START, 'agent')
->transition('agent', Workflow::END);PrismNode properly serializes Prism Message objects to/from plain arrays for state storage using MessageSerializer. It returns the assistant's response (including any tool calls) as a single message in state['messages'].
For dynamic behaviour, extend and override getPrompt() or tools():
class ResearchAgent extends PrismNode
{
protected function getPrompt(array $state): string
{
return 'Research: ' . $state['topic'];
}
public function tools(): array
{
return [/* dynamic tools based on config */];
}
}Abstract base for nodes that manually execute tool calls from state['messages']. Implement toolMap() to return a map of tool names to callables:
use Cainy\Laragraph\Integrations\Prism\ToolNode;
class WeatherToolNode extends ToolNode
{
protected function toolMap(): array
{
return [
'get_weather' => fn(array $args): string =>
"Sunny, 22°C in " . ($args['city'] ?? 'unknown'),
];
}
}Tool results are appended to state['messages'] in Prism's tool_result format, ready for the next LLM call.
Note: You typically don't need
ToolNodewhen using automatic tool loops (see below). It exists as an escape hatch for custom tool routing.
When a node has tools (detected via a public tools() method), the compiler automatically injects a tool execution loop at compile time. You don't need to wire up tool routing manually.
$workflow = Workflow::create()
->addNode('agent', new PrismNode(
tools: [$weatherTool, $searchTool],
))
->transition(Workflow::START, 'agent')
->transition('agent', Workflow::END)
->compile();
// The compiled graph looks like:
// START → agent ──(has_tool_calls)──→ agent.__tools__ → agent
// ──(no tool_calls)──→ ENDThe compiler:
- Detects any node with a public
tools()method returning non-empty tools - Injects a synthetic
{name}.__tools__node that executes the tool calls - Guards existing outgoing edges with
!has_tool_callsso they only fire when the LLM is done calling tools - Adds loop edges:
agent → agent.__tools__(when tool calls present) andagent.__tools__ → agent(unconditional)
This works with PrismNode, custom nodes with a tools() method, and Laravel AI agents using AsGraphNode.
PrismNode resets an iteration counter each time it runs. The ToolExecutorNode increments it and enforces maxIterations (default 25). When the limit is hit, it returns a text message instead of tool results, breaking the loop naturally.
Use Workflow::toolNode() to reference the synthetic tools node in interrupt points:
$workflow = Workflow::create()
->addNode('agent', new PrismNode(tools: [...]))
->transition(Workflow::START, 'agent')
->transition('agent', Workflow::END)
->interruptBefore(Workflow::toolNode('agent')); // pause before tool executionIf you need full control over how tools are routed, skip the auto-injection by keeping tools() empty on your LLM node and wiring edges explicitly:
$workflow = Workflow::create()
->addNode('agent', MyCustomAgentNode::class)
->addNode('tools', WeatherToolNode::class)
->transition(Workflow::START, 'agent')
->transition('agent', 'tools', 'has_tool_calls(state["messages"])')
->transition('agent', Workflow::END, '!has_tool_calls(state["messages"])')
->transition('tools', 'agent');LaraGraph integrates with Laravel AI via the AsGraphNode trait. Any Laravel AI agent can be dropped into a workflow graph without adapter classes.
composer require laravel/aiAdd the AsGraphNode trait to a standard Laravel AI agent to make it a Laragraph node. The agent keeps its native Agent contract and Promptable trait — AsGraphNode bridges the two systems:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Integrations\LaravelAi\AsGraphNode;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Promptable;
use Stringable;
class ResearchAgent implements Agent, Node
{
use AsGraphNode, Promptable;
public function instructions(): Stringable|string
{
return 'You are a research assistant. Analyze the given topic thoroughly.';
}
protected function getAgentPrompt(): string
{
// $this->state is hydrated from the workflow before prompt() is called
return 'Research: ' . ($this->state['topic'] ?? 'general');
}
}The trait:
- Hydrates
$this->stateand$this->ctxbefore execution so your agent methods (instructions(),messages(),tools()) can read the current graph state - Calls Laravel AI's native
prompt()method - Converts the response into a state mutation automatically — text responses are appended to
state['messages']
Register it like any other node:
Workflow::create()
->addNode('researcher', ResearchAgent::class)
->transition(Workflow::START, 'researcher')
->transition('researcher', Workflow::END);If your agent implements HasStructuredOutput, the trait automatically maps the structured response keys to state mutation keys:
use Illuminate\Contracts\JsonSchema\JsonSchema;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Contracts\HasStructuredOutput;
class ClassifierAgent implements Agent, Node, HasStructuredOutput
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'Classify the input text into a category and confidence score.';
}
public function schema(JsonSchema $schema): array
{
return [
'category' => $schema->string()->required(),
'confidence' => $schema->number()->min(0)->max(1)->required(),
];
}
}After prompting, $response['category'] and $response['confidence'] are merged directly into graph state. Override mutateStateWithStructuredOutput() if you need to remap keys.
Laravel AI agents that implement HasTools are automatically detected by the compiler. The tool loop injection works exactly as it does with PrismNode — no manual wiring needed:
use App\Ai\Tools\GetWeather;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Contracts\HasTools;
class WeatherAgent implements Agent, Node, HasTools
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'You are a weather assistant.';
}
public function tools(): array
{
return [
new GetWeather,
];
}
}
// compile() auto-injects: weather → weather.__tools__ → weather loop
Workflow::create()
->addNode('weather', WeatherAgent::class)
->transition(Workflow::START, 'weather')
->transition('weather', Workflow::END)
->compile();Any CompiledWorkflow implements Node and can be embedded directly inside another workflow. This lets you compose complex pipelines from smaller, independently testable pieces.
$researchSubgraph = Workflow::create()
->withName('research')
->addNode('search', SearchNode::class)
->addNode('extract', ExtractNode::class)
->transition(Workflow::START, 'search')
->transition('search', 'extract')
->transition('extract', Workflow::END)
->compile();
$parentWorkflow = Workflow::create()
->addNode('research', $researchSubgraph) // compiled workflow as a node
->addNode('write', WriteNode::class)
->transition(Workflow::START, 'research')
->transition('research', 'write')
->transition('write', Workflow::END);When the engine executes a CompiledWorkflow node:
- A child
WorkflowRunis created and linked to the parent viaparent_run_idandparent_node_name. - The child workflow starts normally — its nodes run as independent queue jobs.
- The parent run pauses at the sub-graph node, keeping its pointer alive.
- When the child completes, the engine resumes the parent automatically.
- The parent node re-executes, reads the child's final state, computes the delta, and returns it as a mutation.
The parent and child appear as separate entries in workflow_runs, making it easy to inspect each level of a complex pipeline independently. Each node in the sub-graph gets its own retries, timeout, and telemetry.
$run = WorkflowRun::find($id);
$run->parent; // ?WorkflowRun — the parent run, if this is a child
$run->children; // Collection<WorkflowRun> — all child runs spawned by this runLaraGraph fires events throughout the workflow lifecycle. All events implement ShouldBroadcast and are broadcast on the workflow channel when broadcasting is enabled.
| Event | Payload |
|---|---|
WorkflowStarted |
runId, workflowKey |
NodeExecuting |
runId, nodeName |
NodeCompleted |
runId, nodeName, mutation, tags |
NodeFailed |
runId, nodeName, exception |
WorkflowCompleted |
runId |
WorkflowFailed |
runId, exception |
WorkflowResumed |
runId |
HumanInterventionRequired |
runId |
Enable broadcasting in your .env:
LARAGRAPH_BROADCASTING_ENABLED=true
LARAGRAPH_CHANNEL_TYPE=private # public | private | presence
LARAGRAPH_CHANNEL_PREFIX=workflow.Each run broadcasts on channel {prefix}{runId} (e.g. workflow.42). Authorize the channel in your routes/channels.php as needed.
// config/laragraph.php
return [
// Queue name for ExecuteNode jobs
'queue' => env('LARAGRAPH_QUEUE', 'default'),
// Queue connection (null = default connection)
'connection' => env('LARAGRAPH_QUEUE_CONNECTION'),
// Default max attempts per node (overridden per-node via HasRetryPolicy)
'max_node_attempts' => 3,
// Default node timeout in seconds (overridden per-node via HasTimeout)
'node_timeout' => 60,
// DB lock timeout in seconds
'lock_timeout' => 30,
// Soft-delete workflow runs older than this many days
'prunable_after_days' => 30,
// Pre-registered workflows (name => class or callable)
'workflows' => [],
'broadcasting' => [
'enabled' => env('LARAGRAPH_BROADCASTING_ENABLED', false),
'channel_type' => env('LARAGRAPH_CHANNEL_TYPE', 'private'),
'channel_prefix' => env('LARAGRAPH_CHANNEL_PREFIX', 'workflow.'),
],
];composer testLaraGraph is designed to work with the sync queue driver in tests — set QUEUE_CONNECTION=sync in your phpunit.xml or test environment and runs will execute synchronously, making assertions straightforward:
use Cainy\Laragraph\Facades\Laragraph;
use Cainy\Laragraph\Enums\RunStatus;
it('completes the pipeline', function () {
$run = Laragraph::start('my-pipeline', ['input' => 'hello']);
expect($run->fresh())
->status->toBe(RunStatus::Completed)
->state->toHaveKey('output');
});For unit-testing individual nodes, use the makeContext() test helper:
use function Cainy\Laragraph\Tests\makeContext;
it('returns a summary mutation', function () {
$node = new SummarizeNode();
$mutation = $node->handle(
makeContext(nodeName: 'summarize'),
['text' => 'Long article...'],
);
expect($mutation)->toHaveKey('summary');
});The MIT License (MIT). Please see License File for more information.