Skip to content

refactor: extract distributed attempt ownership#2183

Merged
yohamta0 merged 4 commits into
mainfrom
refactor/distributed-attempt-ownership
May 20, 2026
Merged

refactor: extract distributed attempt ownership#2183
yohamta0 merged 4 commits into
mainfrom
refactor/distributed-attempt-ownership

Conversation

@yohamta0
Copy link
Copy Markdown
Collaborator

@yohamta0 yohamta0 commented May 19, 2026

Summary

Refactor distributed attempt ownership logic into a focused coordinator module while preserving distributed run behavior.

Changes

  • Extract distributed attempt status decisions, lease sync, active-run tracking, task-claim recording, and cleanup into distributedAttemptOwnership.
  • Route coordinator status reporting, heartbeat repair, stale reconciliation, orphan cleanup, and indexed-run repair through the new module.
  • Preserve queued attempt ownership tracking and fall back to the claimed worker ID when ACK requests omit WorkerId.
  • Add regression coverage for ownership decisions, status synchronization, task-claim tracking, and ACK worker fallback.
  • Isolate CloudClient test transports so parallel httptest.Server cleanup cannot close another test client connection.

Related Issues

N/A

Checklist

  • Code follows the project style guidelines
  • Self-review of the code has been performed
  • Tests have been added or updated as needed
  • Documentation has been updated as needed
  • Changes have been tested locally

Testing

  • go test ./internal/service/coordinator -run TestDistributedAttemptOwnership -count=1
  • go test ./internal/service/coordinator -run 'TestDistributedAttemptOwnership|TestHandler_AckTaskClaim' -count=1
  • go test ./internal/license -run TestCloudClient_PullGitHubDispatch -count=50
  • go test ./internal/license -count=100
  • go test ./internal/license -cover -count=20
  • go test ./internal/service/coordinator ./internal/service/worker ./internal/service/scheduler ./internal/license -count=1
  • go test ./internal/service/coordinator ./internal/service/worker ./internal/service/scheduler -count=1
  • git diff --check

Summary by CodeRabbit

Release Notes

  • Refactor

    • Consolidated distributed task attempt ownership coordination for enhanced reliability
    • Improved stale attempt reconciliation and orphaned-status detection logic
  • Tests

    • Added comprehensive test coverage for distributed attempt ownership decision-making, status synchronization, and task claim tracking
    • Enhanced HTTP client isolation in test utilities

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 19, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: db357809-5816-4e99-a2cf-118bf9a72f9a

📥 Commits

Reviewing files that changed from the base of the PR and between 2a7bdbc and e554c42.

📒 Files selected for processing (2)
  • internal/service/coordinator/distributed_attempt_ownership.go
  • internal/service/coordinator/distributed_attempt_ownership_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/service/coordinator/distributed_attempt_ownership.go

📝 Walkthrough

Walkthrough

This PR extracts distributed-execution bookkeeping into a new attemptOwnership helper and rewires the coordinator to use it for status validation, lease/active-run synchronization, task-claim recording, and tracking cleanup.

Changes

Distributed Attempt Ownership Encapsulation

Layer / File(s) Summary
Ownership structure and initialization
internal/service/coordinator/distributed_attempt_ownership.go
New attemptOwnership type wraps owner, lease, and active-run stores and is constructed via Handler.distributedAttempts() to centralize coordination.
Remote status validation and decision logic
internal/service/coordinator/distributed_attempt_ownership.go
statusDecision() accepts/rejects incoming DAGRunStatus updates using attempt identity, terminal/active rules, and lease freshness with explicit rejection reasons.
Lease synchronization from remote status
internal/service/coordinator/distributed_attempt_ownership.go
syncLeaseFromStatus() / upsertLeaseFromStatus() upsert or delete distributed run leases for active vs terminal states, normalizing worker IDs and preserving claimed/queue metadata.
Restore confirmed tracking from status
internal/service/coordinator/distributed_attempt_ownership.go
restoreConfirmedFromStatus() recreates lease and active-run tracking for non-terminal or queued confirmed remote statuses.
Active-run synchronization and indexed matching
internal/service/coordinator/distributed_attempt_ownership.go
syncActiveRunFromStatus() / upsertActiveFromStatus() maintain active-run records (timestamps, worker/owner, status); indexedRunMatchesStatus() compares incoming statuses to stored active-run entries.
Task claim recording and tracking deletion
internal/service/coordinator/distributed_attempt_ownership.go, internal/service/coordinator/handler.go
recordTaskClaim() persists task claims as leases and active-run records with queue/root fallbacks; deleteTracking() removes lease and active-run records while ignoring not-found errors. AckTaskClaim now delegates to ownership.recordTaskClaim().
Status predicates, worker/queue resolution, and logging
internal/service/coordinator/distributed_attempt_ownership.go
Adds isTerminalRunStatus, isCancellableTerminalRunStatus, sameAttemptStatus, distributedWorkerIDForStatus, queueNameForStatus, and logRejectedRemoteStatusUpdate.
Handler integration and reconciliation rewiring
internal/service/coordinator/handler.go
Coordinator paths (ReportStatus, RunHeartbeat, reconciliation/orphan/indexed flows, AckTaskClaim) now call ownership methods (statusDecision, syncFromStatus, upsertActiveFromStatus, deleteTracking, restoreConfirmedFromStatus) and remove legacy helpers.
Unit tests and small test fixes
internal/service/coordinator/distributed_attempt_ownership_test.go, internal/service/coordinator/handler_test.go, internal/license/client_test.go
Adds tests for statusDecision, syncFromStatus, and recordTaskClaim; adds an AckTaskClaim test for omitted WorkerId; clones default HTTP transport in test client and closes idle connections.

Sequence Diagram(s)

sequenceDiagram
  participant Handler
  participant attemptOwnership
  participant LeaseStore
  participant ActiveRunStore
  Handler->>attemptOwnership: statusDecision(incomingStatus, latestStatus)
  attemptOwnership->>LeaseStore: Get lease / validate freshness
  LeaseStore-->>attemptOwnership: lease (or not found)
  attemptOwnership->>LeaseStore: Upsert or Delete lease
  attemptOwnership->>ActiveRunStore: Upsert or Delete active-run
  ActiveRunStore-->>attemptOwnership: ack
  attemptOwnership-->>Handler: syncFromStatus result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • dagucloud/dagu#2041: Both PRs modify distributed-run tracking in the coordinator; this PR centralizes ownership/status sync while the related PR adds worker-heartbeat corroboration and repair.
  • dagucloud/dagu#2143: Overlaps coordinator reconciliation and distributed attempt handling; that PR updates stale repair to use attempt-keyed CompareAndSwapAttemptStatus/DAGRunAttemptRef.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'refactor: extract distributed attempt ownership' directly reflects the main change: extracting distributed attempt ownership logic into a new module.
Description check ✅ Passed The description covers all required template sections: Summary, Changes with detailed bullet points, Related Issues, completed Checklist, and additional Testing details.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/distributed-attempt-ownership

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
internal/service/coordinator/distributed_attempt_ownership_test.go (1)

134-141: ⚡ Quick win

Consider verifying the active record's UpdatedAt timestamp.

The test verifies the active record's DAGRun, Root, AttemptID, WorkerID, and Status but omits the UpdatedAt field. TestDistributedAttemptOwnershipTaskClaimTracking explicitly checks UpdatedAt at line 198, suggesting this timestamp is important to verify for consistency.

📋 Suggested addition
 record, err := activeStore.Get(ctx, "attempt-key-1")
 require.NoError(t, err)
 assert.Equal(t, run, record.DAGRun)
 assert.Equal(t, run, record.Root)
 assert.Equal(t, "attempt-1", record.AttemptID)
 assert.Equal(t, "worker-1", record.WorkerID)
 assert.Equal(t, core.Running, record.Status)
+assert.Equal(t, now.UnixMilli(), record.UpdatedAt)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/distributed_attempt_ownership_test.go` around
lines 134 - 141, The test block reading the active record via activeStore.Get
does not assert the UpdatedAt timestamp; add an assertion on record.UpdatedAt
(e.g., assert.False(t, record.UpdatedAt.IsZero()) or assert.Equal(t,
expectedUpdatedAt, record.UpdatedAt) depending on the existing test pattern) so
the UpdatedAt field is validated alongside DAGRun, Root, AttemptID, WorkerID and
Status; reference the activeStore.Get call and the record variable in
distributed_attempt_ownership_test.go and mirror how UpdatedAt is asserted in
TestDistributedAttemptOwnershipTaskClaimTracking.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/service/coordinator/distributed_attempt_ownership.go`:
- Around line 120-124: The switch in distributed_attempt_ownership.go is
incorrectly treating core.Queued as a terminal status and removing ownership;
update the handling so core.Queued is treated as a live attempt like
core.Running and core.NotStarted by calling upsertLeaseFromStatus(ctx, workerID,
status, fallbackAttemptID) instead of dropping the lease/active-run record, and
make the same change in the other switch (around the block that mirrored lines
232-236). Ensure this aligns with the other helpers that consider Queued live
(restoreConfirmedFromStatus, indexedRunMatchesStatus, lease reconciliation) so
queued updates do not delete the durable lease before a worker reaches Running.

In `@internal/service/coordinator/handler.go`:
- Around line 927-929: In AckTaskClaim, avoid recording an empty worker ID: when
calling h.distributedAttempts().recordTaskClaim(ctx, claimed.Task, req.WorkerId)
ensure you use a non-empty worker ID by falling back to claimed.WorkerID if
req.WorkerId is empty (e.g., compute workerID := req.WorkerId; if workerID == ""
{ workerID = claimed.WorkerID } and pass workerID), or alternatively reject the
ACK when req.WorkerId is empty; update the call site that invokes
recordTaskClaim and keep references to claimed.Task, req.WorkerId and
claimed.WorkerID consistent so upsertActiveFromTask receives the correct owner.

---

Nitpick comments:
In `@internal/service/coordinator/distributed_attempt_ownership_test.go`:
- Around line 134-141: The test block reading the active record via
activeStore.Get does not assert the UpdatedAt timestamp; add an assertion on
record.UpdatedAt (e.g., assert.False(t, record.UpdatedAt.IsZero()) or
assert.Equal(t, expectedUpdatedAt, record.UpdatedAt) depending on the existing
test pattern) so the UpdatedAt field is validated alongside DAGRun, Root,
AttemptID, WorkerID and Status; reference the activeStore.Get call and the
record variable in distributed_attempt_ownership_test.go and mirror how
UpdatedAt is asserted in TestDistributedAttemptOwnershipTaskClaimTracking.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 22d0ae54-4b8f-4132-b55a-266d18b4bacf

📥 Commits

Reviewing files that changed from the base of the PR and between 6404d0b and 84f3382.

📒 Files selected for processing (3)
  • internal/service/coordinator/distributed_attempt_ownership.go
  • internal/service/coordinator/distributed_attempt_ownership_test.go
  • internal/service/coordinator/handler.go

Comment thread internal/service/coordinator/distributed_attempt_ownership.go
Comment thread internal/service/coordinator/handler.go Outdated
@yohamta0
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 19, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yohamta0
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 19, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yohamta0
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yohamta0 yohamta0 force-pushed the main branch 2 times, most recently from 409d4ed to 4a435e2 Compare May 20, 2026 10:41
@yohamta0 yohamta0 merged commit 37f4555 into main May 20, 2026
10 checks passed
@yohamta0 yohamta0 deleted the refactor/distributed-attempt-ownership branch May 20, 2026 10:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant