Skip to content

Refactor provisioning service and extract worker tasks#565

Merged
jfrench9 merged 4 commits intomainfrom
feature/worker-tasks
Mar 31, 2026
Merged

Refactor provisioning service and extract worker tasks#565
jfrench9 merged 4 commits intomainfrom
feature/worker-tasks

Conversation

@jfrench9
Copy link
Copy Markdown
Member

Summary

Refactors the Dagster job handling and graph provisioning architecture by extracting core operations into dedicated worker tasks and consolidating the provisioning logic into a standalone service. The SSE direct monitor is restructured into a proper ProvisioningService under the operations layer, and long-running concerns (Dagster monitoring, graph materialization, subgraph creation) are broken out into discrete, independently deployable worker tasks.

Key Accomplishments

Architecture Restructuring

  • Renamed and refactored middleware/sse/direct_monitor.pyoperations/graph/provisioning_service.py, significantly reducing its scope and complexity (~509 lines changed, net reduction in code)
  • Introduced dedicated worker tasks for three core concerns:
    • dagster_monitoring.py — Handles Dagster job status polling and lifecycle tracking
    • graph_materialization.py — Manages graph materialization workflows as background tasks
    • subgraph_creation.py — Encapsulates subgraph creation/provisioning logic
  • Simplified router handlers (materialize.py, subgraphs/main.py, backups/backup.py, backups/restore.py) by delegating heavy lifting to the new worker tasks and provisioning service

Infrastructure

  • Expanded CloudFormation worker template (cloudformation/worker.yaml) with additional resource definitions to support the new task-based architecture (+107 lines)
  • Updated Dagster deployment workflow (.github/workflows/deploy-dagster.yml) to align with the refactored job structure

Cleanup

  • Removed unused SSE middleware exports from robosystems/middleware/sse/__init__.py
  • Added new worker task queue constant in robosystems/worker/constants.py
  • Updated worker module initialization to register the new tasks

Breaking Changes

  • middleware/sse/direct_monitor module has been removed and relocated to operations/graph/provisioning_service. Any external references to the old import path will break.
  • Router endpoint internals have changed — while the API contracts should remain the same, the underlying execution model now delegates to worker tasks rather than handling operations inline. Clients relying on synchronous completion semantics within request handlers may observe behavioral differences.
  • SSE middleware exports have been trimmed; consumers importing directly from robosystems.middleware.sse should verify their imports still resolve.

Testing Notes

  • Existing tests in tests/middleware/sse/test_direct_monitor.py have been substantially refactored (~173 lines changed) to align with the new ProvisioningService interface and location
  • Dagster billing webhook handler tests updated for minor import/reference changes
  • New worker tasks (dagster_monitoring, graph_materialization, subgraph_creation) should have integration tests added in follow-up work to validate task execution, retry behavior, and failure handling

Infrastructure Considerations

  • The CloudFormation worker stack requires redeployment with the updated template to provision resources needed by the new task definitions
  • The Dagster deployment pipeline has been updated and should be validated in staging before merging to ensure job registration and scheduling remain intact
  • Worker task queue configuration has been extended — ensure the new queue constant is propagated to all environments via configuration/environment variables

🤖 Generated with Claude Code

Branch Info:

  • Source: feature/worker-tasks
  • Target: main
  • Type: feature

Co-Authored-By: Claude noreply@anthropic.com

- Moved provisioning logic from middleware to a dedicated provisioning service, enhancing code organization and clarity.
- Updated worker tasks to handle graph provisioning and materialization, replacing previous direct monitoring patterns.
- Introduced autoscaling parameters for worker tasks in the deployment configuration, allowing for dynamic scaling based on workload.
- Cleaned up imports and adjusted related tests to reflect the new structure, ensuring comprehensive coverage and validation of the provisioning process.
@jfrench9
Copy link
Copy Markdown
Member Author

@claude please review this PR

@claude
Copy link
Copy Markdown

claude bot commented Mar 31, 2026

Claude finished @jfrench9's task in 2m 12s —— View job


PR Review: Refactor provisioning service and extract worker tasks

Tasks:

  • Gather context and understand the request
  • Read changed files
  • Review worker tasks (dagster_monitoring, graph_materialization, subgraph_creation)
  • Review provisioning service refactor
  • Review router changes
  • Review infrastructure changes (CloudFormation, GitHub Actions)
  • Review test changes
  • Post comprehensive review

This is a solid architectural improvement that significantly reduces coupling between middleware, routers, and long-running operations. The net -126 lines speaks for itself. A few issues below ranging from a rule violation to a runtime bug worth addressing before merge.

Issues

1. os.getenv() violates CLAUDE.md rule — provisioning_service.py:20

DAGSTER_REPORT_TIMEOUT = float(os.getenv("DAGSTER_MATERIALIZATION_TIMEOUT", "15.0"))

CLAUDE.md explicitly prohibits direct os.getenv() usage. This should go through env:

from robosystems.config import env
DAGSTER_REPORT_TIMEOUT = float(env.DAGSTER_MATERIALIZATION_TIMEOUT)

Fix this →

2. Return type mismatch in create_subgraph fork path — subgraphs/main.py:286,365

The function is annotated -> SubgraphResponse but the fork path returns a raw dict from enqueue_task:

async def create_subgraph(...) -> SubgraphResponse:
    ...
    if request.fork_parent:
        response = await enqueue_task(...)
        ...
        return response  # ← returns dict, not SubgraphResponse

This will cause FastAPI response model validation to fail at runtime (or silently coerce incorrectly). The return type annotation should reflect that both shapes are possible, or the fork path should return a compatible model.

Fix this →

3. fork_options from request metadata silently dropped — subgraph_creation.py:49-51

The subgraph router passes fork_data=True but discards request.metadata (which may contain explicit fork_options). The task hardcodes:

fork_options = {"tables": []}  # Fork all tables

If request.metadata.fork_options existed in the old synchronous path, callers relying on selective forking are silently broken. Worth verifying whether this was ever exposed.

4. Security audit log missing for fork path — subgraphs/main.py:334-365

The non-fork path logs a SecurityAuditLogger.log_security_event(SecurityEventType.SUBGRAPH_CREATED, ...) at line 380. The fork path dispatches to a worker task that has no equivalent audit event. This means forked subgraph creations (the more expensive/impactful operation) don't appear in the security audit log.

Fix this →

5. _release_lock duplicated across two task files

dagster_monitoring.py:55-68 and graph_materialization.py:58-74 contain nearly identical lock-release logic. The BaseTask class is the right home for this, or a shared mixin. Minor, but as more tasks are added this will diverge.

6. Lock release without ownership verification — graph_materialization.py:67-70

The comment correctly identifies the risk:

"The lock_id ownership check isn't possible across processes, but the TTL protects against permanent deadlocks."

This means if the TTL expires mid-task and a second materialization acquires a new lock, the completing first task will silently delete the second task's lock. With the default 1-hour TTL (INGESTION_LOCK_TTL) this is unlikely in practice, but worth tracking as the worker scales horizontally.

Observations

worker/__init__.py — inconsistent # noqa: F401: Only the last import has the suppression. Either all four should have it (they're all imported for side effects) or none, with a module-level comment explaining the pattern.

DAGSTER_REPORT_TIMEOUT test will be brittle (test_direct_monitor.py:477): assert DAGSTER_REPORT_TIMEOUT == 15.0 will fail if DAGSTER_MATERIALIZATION_TIMEOUT is set in any test environment. Better to assert isinstance(DAGSTER_REPORT_TIMEOUT, float) only, or mock the env var.

CloudFormation autoscaling ResourceId hardcodes service name (worker.yaml:432):

ResourceId: !Sub service/${DagsterClusterName}/robosystems-worker-${Environment}

If the ECS service name ever diverges from this convention, the autoscaling target silently breaks. Consider deriving this from the WorkerService resource reference instead: !Sub service/${DagsterClusterName}/${WorkerService.Name}.

subgraph_creation.py:36-37 — The db_gen/next() session pattern used across the new tasks is fragile. It works because SQLAlchemy generators have a try/finally cleanup, but if the generator is ever changed or the pattern is misread, sessions leak. Not a new issue (same as graph_materialization.py and provisioning_service.py), but worth a note to standardize on async with or context managers in follow-up.

New worker tasks lack tests (acknowledged in PR): dagster_monitoring, graph_materialization, and subgraph_creation cover significant operational logic (lock release, DB session handling, SSE progress reporting) with no unit tests. Tagging as a follow-up is fine, but the lock-release race condition (#6 above) is a good candidate for a targeted test now.

Summary

The architectural direction is right — provisioning service is well-placed in operations/, worker tasks are clean and focused, and the router simplification is meaningful. The os.getenv() violation and the fork-path return type mismatch are the most important to fix before merge. The security audit gap is worth addressing in this PR since it's a behavioral regression. The rest can reasonably land in follow-up issues.

- Replaced direct lock release methods in `DagsterJobMonitorTask` and `GraphMaterializationTask` with a shared `release_lock` method from the `BaseTask` class, promoting code reuse and consistency.
- Updated the Dagster report timeout test to ensure it checks for a positive float value instead of a fixed default, enhancing test robustness.
- Cleaned up comments and import statements for clarity and maintainability.
- Added error handling in `graph_creation_queue_sensor` to revert graph status to QUEUED upon enqueue failure, improving resilience and logging for failed tasks.
- Updated `worker_inflight_reaper_sensor` to include detailed metadata in the dead-letter queue (DLQ) for tasks that exceed retry limits, aiding in troubleshooting and monitoring.
- Refactored `DagsterJobMonitorTask` to support cancellation during job monitoring, allowing for better control over long-running tasks and improved user experience.
- Cleaned up code and comments for clarity and maintainability.
…ponse

- Introduced a custom JSON serialization function to handle date and datetime objects, ensuring they are properly formatted in the streaming response.
- Updated the `generate_stream` function to utilize the new serialization method, improving the handling of large result sets.
@jfrench9 jfrench9 merged commit f198e2e into main Mar 31, 2026
7 checks passed
@jfrench9 jfrench9 deleted the feature/worker-tasks branch March 31, 2026 02:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant