Skip to content

[Feature]: Add simple saga/compensation pattern helper (decorator + Saga wrapper) #239

@embano1

Description

@embano1

What would you like?

Summary

It would be helpful to have a first-class way to define compensating (undo) actions for steps when building workflows with DurableContext. The idea is to support a simple, Saga-style pattern using:

  • A decorator to declare a compensation function for a step.
  • A lightweight Saga helper that wraps context.step(...), tracks compensations, and runs them in reverse order on failure.

This would let users build multi-step workflows with clear, structured rollback logic without reinventing this pattern in every project.

Motivation

When orchestrating multiple side-effecting steps (e.g., create order → charge payment → ship), users often want sagas: if step N fails, we undo steps N-1, N-2, … in reverse order.

Right now, users can implement this manually with DurableContext, but:

  • Compensation logic ends up scattered or ad-hoc.
  • There’s no simple, recommended pattern to associate a step with its “undo” logic.
  • It’s easy to forget to register compensations consistently.

A small helper + decorator would provide a consistent, ergonomic pattern and encourage best practices (idempotent compensations, durable compensating steps, etc.).

Possible Implementation

Proposed API / Approach

1. @compensable decorator

Allow users to define a step and its compensation in one place:

from typing import Any, Callable
from functools import wraps

def compensable(compensation: Callable[..., Any]):
    """
    Attach a compensation function to a step function.

    Compensation will be called like:
        compensation(step_ctx, *args, result, **kwargs)
    """
    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        fn._compensation = compensation  # marker for Saga

        @wraps(fn)
        def wrapper(*args, **kwargs):
            return fn(*args, **kwargs)

        return wrapper

    return decorator

Usage:

def cancel_order(step_ctx, order_id: str, result):
    # undo create_order
    ...

@compensable(cancel_order)
def create_order(step_ctx, order_id: str):
    ...
    return {"order_id": order_id}

2. Saga helper built on top of DurableContext

Saga wraps context.step(...), automatically registers compensations, and runs them on failure. Internally, each compensation is also executed as its own durable step for observability and retries.

from typing import Any, Callable, List, Tuple

class Saga:
    def __init__(self, ctx, step_config=None, comp_step_config=None):
        self._ctx = ctx  # DurableContext
        self._step_config = step_config
        self._comp_config = comp_step_config
        self._comp_stack: List[Tuple[str, Callable[..., Any]]] = []

    def __enter__(self) -> "Saga":
        return self

    def __exit__(self, exc_type, exc, tb) -> bool:
        if exc_type is not None:
            self.compensate()
        # Propagate original exception
        return False

    def step(
        self,
        func: Callable[..., Any],
        *args,
        name: str | None = None,
        **kwargs,
    ) -> Any:
        step_name = name or func.__name__

        def step_runner(step_ctx):
            return func(step_ctx, *args, **kwargs)

        result = self._ctx.step(step_runner, step_name, config=self._step_config)

        compensation = getattr(func, "_compensation", None)
        if compensation is not None:
            def comp_runner(step_ctx, _result=result):
                return compensation(step_ctx, *args, _result, **kwargs)

            comp_name = f"{step_name}_compensate"
            self._comp_stack.append((comp_name, comp_runner))

        return result

    def compensate(self):
        while self._comp_stack:
            comp_name, comp_runner = self._comp_stack.pop()
            self._ctx.step(comp_runner, comp_name, config=self._comp_config)

Example usage in a durable function order workflow

def order_workflow(context, input):
    with Saga(context) as saga:
        order = saga.step(create_order, input["order_id"])
        payment = saga.step(charge_payment, order["order_id"])
        shipment = saga.step(ship_items, order["order_id"])

        # If all steps succeed, no compensations run.
        return {
            "order_id": order["order_id"],
            "payment_id": payment["id"],
            "shipment_id": shipment["id"],
        }

If any step raises an exception, the Saga context manager will:

Execute registered compensations in reverse order, each as its own step.

Re-raise the original exception so the workflow’s failure behavior remains consistent.

Notes / Considerations

  • Compensations should be idempotent and retry-safe, since they are steps with their own retry semantics.
  • API could be added as a small “saga” utility module in the SDK or as an official pattern in documentation.
  • Optionally, Saga could support:
    • Per-step custom configs.
    • Nested sagas (for sub-workflows).
    • Integration with any existing logging/metrics hooks.

Is this a breaking change?

No

Does this require an RFC?

No

Additional Context

PM for the feature. Several users pinged me directly asking for this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions