Skip to content

fix: saga orch is stateless, lifetime handled by DI#129

Merged
HardMax71 merged 3 commits intomainfrom
fix/saga-orchestrator
Feb 2, 2026
Merged

fix: saga orch is stateless, lifetime handled by DI#129
HardMax71 merged 3 commits intomainfrom
fix/saga-orchestrator

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 2, 2026


Summary by cubic

Make the SagaOrchestrator stateless and let DI own its lifetime. The Kafka consumer and timeout loop now live in the DI provider; the orchestrator only handles events and persistence.

  • Refactors
    • Replaced LifecycleEnabled orchestrator with a plain class; removed in-memory instance tracking and the create_saga_orchestrator factory.
    • Orchestrator exposes handlers for execution_requested/completed/failed/timeout and a single-run check_timeouts; it creates and binds ExecutionSaga on demand. ExecutionSaga no longer uses BaseSaga and the queue/monitor steps were removed.
    • DI provider wires EventDispatcher to these handlers, starts UnifiedConsumer for SAGA_ORCHESTRATOR, runs a 30s timeout task, and handles clean shutdown.
    • Added SAGA_ORCHESTRATOR subscriptions for execution events in CONSUMER_GROUP_SUBSCRIPTIONS.
    • Added atomic get_or_create_saga in SagaRepository and a unique index on (execution_id, saga_name) to prevent duplicate sagas.
    • Worker starts the orchestrator via the container and waits for shutdown; removed the live-status method in SagaService; tests updated to call the new handlers directly.

Written for commit 8fd854f. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Moved to an event-driven saga orchestrator with unified subscriptions for execution events, streamlined execution and timeout handling, and more graceful shutdown.
    • Simplified saga/step wiring to a dependency-injected flow for more consistent resource allocation, compensation, and cancellation behavior.
  • Database

    • Added a composite unique index to prevent duplicate saga entries for the same execution and name.
    • Repository gained an atomic get-or-create operation for sagas.
  • Tests

    • Updated unit tests to match the new orchestration and step behaviors.

@coderabbitai
Copy link

coderabbitai bot commented Feb 2, 2026

📝 Walkthrough

Walkthrough

The PR refactors saga orchestration into an event-driven orchestrator exposed via an async DI provider. The provider constructs and starts a UnifiedConsumer and EventDispatcher, registers SagaOrchestrator handlers for execution events, runs a periodic timeout checker, and ensures graceful consumer and task shutdown.

Changes

Cohort / File(s) Summary
Provider & Kafka enums
backend/app/core/providers.py, backend/app/domain/enums/kafka.py
Added get_saga_orchestrator() async provider that builds EventDispatcher + UnifiedConsumer, registers handlers for EXECUTION_REQUESTED/COMPLETED/FAILED/TIMEOUT, starts consumer and a background timeout task, and ensures cleanup. Updated CONSUMER_GROUP_EVENTS to subscribe SAGA_ORCHESTRATOR to execution events.
Saga orchestrator core
backend/app/services/saga/saga_orchestrator.py, backend/app/services/saga/__init__.py
Rewrote SagaOrchestrator from a lifecycle-managed component to a lean event-driven class exposing async handlers (handle_execution_requested/completed/failed/timeout) and check_timeouts; removed legacy factory and lifecycle subclassing; adjusted public exports.
Saga model & steps
backend/app/services/saga/execution_saga.py, backend/app/services/saga/saga_step.py, backend/app/services/saga/base_saga.py
Deleted BaseSaga; refactored ExecutionSaga to explicit bind_dependencies(...), simplified step set (removed queue/monitor steps), made step constructors require concrete producer/alloc_repo, and removed several SagaContext helpers.
Repository & DB docs
backend/app/db/repositories/saga_repository.py, backend/app/db/docs/saga.py
Added get_or_create_saga() atomic upsert to SagaRepository; added a unique composite index on (execution_id, saga_name) in SagaDocument settings.
Service API & worker
backend/app/services/saga/saga_service.py, backend/workers/run_saga_orchestrator.py
Removed get_saga_status_from_orchestrator from SagaService. Worker now resolves orchestrator via DI provider and awaits shutdown event (provider manages consumer lifecycle).
Public exports
backend/app/services/saga/__init__.py
Removed exports: BaseSaga, QueueExecutionStep, MonitorExecutionStep, RemoveFromQueueCompensation, and create_saga_orchestrator; retained ExecutionSaga and remaining step/compensation classes.
Tests
backend/tests/unit/services/saga/...
Updated unit tests to instantiate orchestrator with injected SagaRepository, use public handlers (handle_execution_requested), expect exceptions for invalid steps, remove tests referencing deleted steps/BaseSaga, and add fake producer scaffolding for publish/compensation assertions.

Sequence Diagram

sequenceDiagram
    participant Kafka as Kafka Broker
    participant Consumer as UnifiedConsumer
    participant Dispatcher as EventDispatcher
    participant Orchestrator as SagaOrchestrator
    participant Repo as SagaRepository

    rect rgba(100,150,255,0.5)
    Note over Kafka,Consumer: Provider startup builds components
    Kafka->>Consumer: subscribe to EXECUTION_* events
    Consumer->>Dispatcher: initialize/register
    Dispatcher->>Orchestrator: register handlers
    Consumer->>Consumer: start polling
    end

    rect rgba(150,200,100,0.5)
    Note over Kafka,Orchestrator: Event handling flow
    Kafka->>Consumer: publish EXECUTION_REQUESTED
    Consumer->>Dispatcher: dispatch event
    Dispatcher->>Orchestrator: handle_execution_requested()
    Orchestrator->>Repo: get_or_create_saga / persist progress
    Orchestrator->>Repo: persist terminal state
    end

    rect rgba(200,150,100,0.5)
    Note over Orchestrator,Repo: Timeout background check
    loop periodic
      Orchestrator->>Repo: query stale sagas
      Orchestrator->>Orchestrator: handle_execution_timeout()
    end
    end

    rect rgba(200,100,100,0.5)
    Note over Consumer,Orchestrator: Shutdown
    Consumer->>Consumer: stop polling
    Orchestrator->>Orchestrator: cancel timeout task
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐰 I nudged the wires and hopped about the code,
Events now call the tune and tidy up the road.
Provider lights the stage, consumer hums the tune,
Sagas start on request and timeouts check by moon.
Hooray — graceful stops, and logs that softly glowed!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.96% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main architectural change: making SagaOrchestrator stateless and moving lifetime management to dependency injection, which aligns with the extensive refactoring across multiple files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/saga-orchestrator

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/tests/unit/services/saga/test_saga_orchestrator_unit.py`:
- Around line 71-80: The test assertion is too weak and can pass spuriously
because saved.saga_name will match for any created saga; update the assertion in
test_existing_saga_short_circuits to explicitly verify no new sagas were saved
by checking the fake repo's saved container is empty (e.g., assert
len(fake_repo.saved) == 0 or assert fake_repo.saved == []), referencing the test
function test_existing_saga_short_circuits, the _FakeRepo instance fake_repo,
and the saved attribute used by the orchestrator when calling
orch.handle_execution_requested.
🧹 Nitpick comments (4)
backend/app/services/saga/saga_orchestrator.py (2)

48-70: Consider explicit type guards for production safety.

The assert isinstance(...) statements will be stripped when Python runs with optimization flags (-O or -OO), potentially causing AttributeError if an incorrect event type is dispatched. While the EventDispatcher routes by event_type making mismatches unlikely, consider explicit type guards for defense-in-depth:

♻️ Optional: Explicit type checking
 async def handle_execution_requested(self, event: DomainEvent) -> None:
     """Handle EXECUTION_REQUESTED — starts a new saga."""
-    assert isinstance(event, ExecutionRequestedEvent)
+    if not isinstance(event, ExecutionRequestedEvent):
+        self.logger.error(f"Unexpected event type: {type(event)}")
+        return
     await self._start_saga(event)

115-117: Fire-and-forget task may lose unhandled exceptions.

asyncio.create_task() without storing a reference means exceptions raised before the try block in _execute_saga will be silently lost (only logged by asyncio's default exception handler). The internal error handling in _execute_saga mitigates this, but consider storing the task reference for observability or adding a done callback:

♻️ Optional: Add exception logging callback
-asyncio.create_task(self._execute_saga(saga, instance, context, trigger_event))
+task = asyncio.create_task(self._execute_saga(saga, instance, context, trigger_event))
+task.add_done_callback(
+    lambda t: self.logger.error(f"Saga task failed: {t.exception()}") if t.exception() else None
+)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)

65-67: Consider adding assertions to verify saga creation.

The test calls handle_execution_requested but doesn't verify the outcome. Consider asserting that a saga was saved:

♻️ Suggested enhancement
 `@pytest.mark.asyncio`
 async def test_handle_event_triggers_saga() -> None:
-    orch = _orch()
-    await orch.handle_execution_requested(make_execution_requested_event(execution_id="e"))
+    fake_repo = _FakeRepo()
+    orch = _orch(repo=fake_repo)
+    await orch.handle_execution_requested(make_execution_requested_event(execution_id="e"))
+    assert len(fake_repo.saved) >= 1
+    assert fake_repo.saved[0].execution_id == "e"
backend/app/core/providers.py (1)

842-908: Well-structured DI provider with proper lifecycle management.

The provider correctly:

  • Wires event handlers for all four execution lifecycle events
  • Starts the consumer with appropriate topic subscriptions
  • Runs a background timeout checker loop
  • Properly cleans up on shutdown (cancels task, stops consumer)

One minor note: the 30-second timeout check interval (line 890) is hardcoded. Consider making it configurable via settings if different environments need different frequencies.

♻️ Optional: Configurable timeout interval
         async def timeout_loop() -> None:
             while True:
-                await asyncio.sleep(30)
+                await asyncio.sleep(settings.SAGA_TIMEOUT_CHECK_INTERVAL_SECONDS or 30)
                 try:
                     await orchestrator.check_timeouts()
                 except Exception as exc:
                     logger.error(f"Error checking saga timeouts: {exc}")

,

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 6 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/tests/unit/services/saga/test_saga_orchestrator_unit.py">

<violation number="1" location="backend/tests/unit/services/saga/test_saga_orchestrator_unit.py:67">
P2: This test performs no assertions, so it can pass even if the saga is never started. Add an assertion that the repository recorded a saga start (or similar observable behavior).</violation>

<violation number="2" location="backend/tests/unit/services/saga/test_saga_orchestrator_unit.py:80">
P2: The assertion is too weak: it passes even if a new saga with the same name but a new ID is saved. Assert that no new sagas were saved (or that the only saved saga is the existing one).</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/services/saga/saga_orchestrator.py`:
- Around line 91-118: The current _start_saga function performs a
read-then-create and can produce duplicate Saga records under concurrent events;
update the implementation to use an atomic get-or-create/upsert or enforce and
handle a DB unique constraint on (execution_id, saga_name): modify the
repository (replace calls to _repo.get_saga_by_execution_and_name and
_save_saga) to provide an atomic method such as get_or_create_saga(execution_id,
saga_name) that returns the existing Saga or creates and returns a new one, or
ensure the DB has a UNIQUE index on (execution_id, saga_name) and update
_start_saga to catch the duplicate-key error from _save_saga (or the repo) and
then re-query via _repo.get_saga_by_execution_and_name to return the existing
saga_id; reference symbols: _start_saga, _repo.get_saga_by_execution_and_name,
_save_saga, get_or_create_saga (new), Saga, _SAGA_NAME, saga_id.
🧹 Nitpick comments (1)
backend/app/services/saga/saga_orchestrator.py (1)

48-70: Replace assert-based event type checks with explicit guards.
assert can be stripped with -O, so misrouted events could bypass validation and fail later with less clear errors. Consider explicit isinstance checks (log + raise/return).

♻️ Suggested guard pattern (apply to all handlers)
-        assert isinstance(event, ExecutionRequestedEvent)
-        await self._start_saga(event)
+        if not isinstance(event, ExecutionRequestedEvent):
+            raise TypeError(f"Expected ExecutionRequestedEvent, got {type(event).__name__}")
+        await self._start_saga(event)

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 2, 2026

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 888-904: The timeout_task created from timeout_loop is cancelled
but not awaited, risking a pending task on shutdown; after calling
timeout_task.cancel() in the finally block await timeout_task and handle
asyncio.CancelledError (or wrap await timeout_task in try/except
asyncio.CancelledError: pass) so the task is properly awaited and cleaned up
before awaiting consumer.stop() and logging "Saga orchestrator stopped";
reference timeout_task, timeout_loop, and orchestrator.check_timeouts when
locating the code to update.
🧹 Nitpick comments (3)
backend/app/services/saga/saga_orchestrator.py (2)

117-117: Fire-and-forget task pattern may lose tasks on shutdown.

asyncio.create_task without storing a reference means the task is untracked. While _execute_saga handles its own exceptions internally, if the process shuts down abruptly, running saga executions won't complete gracefully.

Consider tracking active tasks for graceful shutdown if saga completion guarantees are important:

💡 Optional: track active tasks
# In __init__:
self._active_tasks: set[asyncio.Task] = set()

# In _start_saga:
task = asyncio.create_task(self._execute_saga(saga, instance, context, trigger_event))
self._active_tasks.add(task)
task.add_done_callback(self._active_tasks.discard)

# Add shutdown method:
async def wait_for_active_sagas(self, timeout: float = 30.0) -> None:
    if self._active_tasks:
        await asyncio.wait(self._active_tasks, timeout=timeout)

282-283: Redundant self._producer checks.

self._producer is a required constructor parameter and will always be set. The truthiness checks on lines 282 and 334 are redundant but harmless.

Also applies to: 334-334

backend/app/db/repositories/saga_repository.py (1)

45-67: Atomic get-or-create implementation is correct.

The $setOnInsert pattern with response_type=UpdateResponse.NEW_DOCUMENT and upsert=True correctly implements atomic get-or-create semantics. The created flag derivation (comparing saga_id) is valid: newly inserted documents receive the input saga.saga_id via on_insert=insert_doc, while existing documents retain their original saga_id from creation, making the comparison reliable for detecting whether an insert occurred.

One optional improvement: the assert doc is not None could be replaced with explicit error handling for defensive programming, though Beanie's response_type=NEW_DOCUMENT with upsert=True guarantees a non-null return:

💡 Optional: explicit error handling
-        assert doc is not None
+        if doc is None:
+            raise RuntimeError(f"Unexpected: upsert returned None for saga {saga.execution_id}/{saga.saga_name}")

@HardMax71 HardMax71 merged commit c2eceb9 into main Feb 2, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/saga-orchestrator branch February 2, 2026 16:35
This was referenced Feb 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments