Spec 043: executor zombie job detection#13
Conversation
Adds envtest for ImagePullBackOff detection via real k8s informer (sigs.k8s.io/controller-runtime). Build-tag gated (//go:build envtest) in separate package task/executor/pkg/envtest/. Also adds //nolint:funlen exemption on main.go application.Run (initialization sequence; pure wiring, no business logic) which grew past the 80-line threshold when the zombie sweeper was wired in prompt 197.
Three must-fix issues from /coding:pr-review on spec 043 prompts:
1. podLister nil race (main.go ↔ job_watcher.go)
- main.go called jobWatcher.PodLister() at wiring time, before
JobWatcher.Run() assigned the lister. Sweeper captured nil; would
panic at first sweep tick (~60s after startup).
- Fix: pass JobWatcher itself to ZombieSweeper; call PodLister()
lazily inside SweepOnce; skip tick (log + return) if lister still
nil. Updated factory.CreateZombieSweeper signature accordingly.
2. ttlDedupe stale-index after eviction (result_publisher.go)
- Old impl re-sliced entries on capacity overflow but did not
reindex d.index[jobName]; once 1024-cap saturated, lookups read
wrong dedupeEntry → false-dedupes / false-misses.
- Fix: replace integer-index map with order []string + seen
map[string]time.Time. No indices to invalidate.
3. PublishFailure partial-send leaves dedupe unrecorded
- Two-message send (update + increment); on increment failure the
update had committed but dedupe was not recorded, so next call
re-sent the update → duplicate current_job: "" write.
- Fix: record dedupe between the two sends. If increment fails,
dedupe stays recorded (suppresses duplicate update); trigger_count
is under by 1 but no duplicate write. Logged at WARN.
Plus: spec 043 status auto-transitioned prompted → verifying after all
5 prompts completed.
The zombie sweeper (spec 043) reads Pods via a shared informer cache. Without get/list/watch on pods, the informer's reflector fails: Failed to watch ... "pods is forbidden: User ... cannot list resource \"pods\" in API group \"\" in the namespace ... Consequence: JobWatcher.Run blocks at WaitForCacheSync (pod informer never syncs), w.podLister is never assigned, every sweep tick logs 'sweep skipped: pod lister not yet synced' — zombie detection never fires. Verified live on dev (deploy 21:23:40Z): informer started only after RBAC grant at 21:37:21Z. Pre-spec-043 the executor never needed Pod permissions; the gap was invisible until this work landed.
There was a problem hiding this comment.
Now let me compile the consolidated report and produce the final JSON verdict:
PR Review: Spec 043: executor zombie job detection
Scope
Feature branch feature/zombie-detect → master. 41 files changed (5 prompts, CRD types, sweeper, informer, publisher doctrine fix, tests, envtest).
Must Fix (Critical)
None. No security vulnerabilities, no context.Background() in business logic, no concurrency bugs that would cause incorrect behavior.
Should Fix (Important)
-
task/executor/pkg/job_watcher.go:65,124,130— Data race onpodListerfield. ThepodListeris written inRun()(line 124) without synchronization and read concurrently byPodLister()(line 130), which is called from the sweeper goroutine on every tick.service.Runstarts all components concurrently, so the sweeper can callPodLister()beforeRun()sets it. The nil-check atzombie_sweeper.go:123avoids a nil dereference but does not prevent a torn read or race detector failure. Fix: usesync/atomic.Pointer[corev1listers.PodLister]or async.RWMutex. This will be caught bygo test -racein CI. -
task/executor/pkg/result_publisher.go:194— Dedupe entry recorded before second publish. The spec requirement 3d says "ONLY after BOTH publishes succeed, callrecordDedupe". The implementation records after the first (update) publish, before the second (increment). If the increment fails, the dedupe entry exists, so the next retry is suppressed — buttrigger_countwas never incremented. The controller'sapplyTriggerCapwill never fire for this task until the dedupe TTL expires. The "lesser evil" tradeoff in the comment is defensible, but the spec contract is violated. Either update the spec or reorderrecordDedupeto after the second publish. -
task/executor/pkg/zombie_sweeper.go:90— Sweeper interval resolved once at startup.resolveSweeperIntervalis called once inRunbefore the ticker is created. If a CRD reconciliation hot-reloadsZombieSweeperIntervalSeconds, the running sweeper ignores it until restart. Since the executor pod is short-lived relative to CRD reconciliation cycles, this is acceptable in practice, but it should be noted as a behavioral limitation.
Nice to Have (Optional)
-
task/executor/pkg/job_watcher.go:310-329—classifyPodFailuredoes not handleCrashLoopBackOff.CrashLoopBackOffis a k8sWaiting.Reason. WithBackoffLimit=0in the spawner, crash-looping pods never reachPodFailedphase, sopod_crash_no_stdoutdoes not fire via the Pod path — they rely entirely onactiveDeadlineSecondskilling them. AddingCrashLoopBackOffto the switch closes this gap. -
task/executor/pkg/result_publisher_test.go— Missing dedupe TTL expiry test. The central dedupe mechanism has no test for TTL window expiry. After onePublishFailure, advancing time past 3600s and calling again should allow a fresh publish. Without this test, an off-by-one error in the< d.ttlboundary silently blocks re-publishes after Kafka recovery. -
task/executor/pkg/result_publisher_test.go— MissingPublishFailurefirst-send error test. When the update publish (firstpublishRaw) returns an error, the increment must NOT be attempted and no dedupe entry should be recorded. This early-return path is implemented but untested. -
task/executor/pkg/zombie_sweeper_test.go— Multiple-taskSweepOncewith per-task publish error not tested. The loop mustcontinueon error and process remaining tasks. With only single-task tests, this contract is implicit. -
task/executor/pkg/zombie_sweeper.go:202— Unused parameters inclassify._ lib.Task,_ string,_ time.Timeare dead parameters —classifyonly usestaskIDandnow. Either use them for richer logging or remove them. -
task/executor/pkg/result_publisher.go:26-28— Constants not grouped.dedupeCapacityanddedupeTTLare separateconststatements. Group them:const ( dedupeCapacity = 1024; dedupeTTL = 3600 * time.Second ).
Test Coverage Assessment
The PR adds 6 new test files covering the acceptance criteria. All 11 ACs are addressed. Key coverage:
- ✅
PublishFailureatomic update+increment (AC #1) - ✅ Pod-state classifier — 5 reasons covered (AC #5)
- ✅ Deadline sweeper — 7 test cases (AC #6)
- ✅
SpawnJobstampsActiveDeadlineSeconds(AC #8) - ✅ envtest ImagePullBackOff against real apiserver (AC #10)
⚠️ Dedupe TTL expiry — not covered (AC #9 partial)⚠️ PublishTypeMismatchFailurewith empty prior assignee — not covered (AC #4)⚠️ PublishFailurefirst-send error path — not covered (AC #1)
Manual Review
Reviewed the diff end-to-end. The feature is well-scoped and the implementation follows the spec closely. Key observations:
- RBAC grant is minimal and correct: executor ServiceAccount gets
get/list/watchon pods only — sufficient for informer cache, least-privilege. - CRD validation floors are correct:
zombieSweeperIntervalSeconds >= 10,zombieJobTimeoutSeconds >= 30prevent the abuse cases described in the spec. activeDeadlineSecondsstamp is correct: every Job gets the deadline; k8s enforces it natively.- Factory pattern is clean:
CreateZombieSweeperis pure composition, no business logic. - Error wrapping is consistent: all errors use
errors.Wrapf(ctx, err, ...), no barereturn err, nofmt.Errorf. - Time injection is correct:
libtime.CurrentDateTimeGetterused throughout, no directtime.Now(). - glog gating is correct: non-error logs use
V(2)orV(3)per project conventions.
The spec's "do-nothing option" correctly identifies the operational problem this solves — parked tasks with current_job set and no failure signal.
Verdict
{
"verdict": "request-changes",
"summary": "The zombie job detection feature is well-architected and mostly correct, but has a data race on the podLister field, a dedupe ordering contract violation vs the spec, and missing test coverage for dedupe TTL expiry. These should be addressed before merge.",
"comments": [
{
"file": "task/executor/pkg/job_watcher.go",
"line": 65,
"severity": "major",
"message": "[go-quality] podLister field has a data race: written in Run() (line 124) without synchronization, read concurrently by PodLister() (line 130) called from sweeper goroutine. Use sync/atomic.Pointer or sync.RWMutex. Will fail go test -race."
},
{
"file": "task/executor/pkg/result_publisher.go",
"line": 194,
"severity": "major",
"message": "[go-quality] Dedupe entry recorded after first publish (update) but before second (increment). If increment fails, dedupe blocks retry but trigger_count was never bumped — controller applyTriggerCap never fires for this task until TTL expiry. Spec requires dedupe after BOTH publishes succeed. Either reorder recordDedupe or update spec."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 90,
"severity": "major",
"message": "[go-quality] resolveSweeperInterval called once at Run() startup; CRD changes to ZombieSweeperIntervalSeconds are ignored until restart. Document as a known limitation or re-resolve on each tick using a re-arming time.Timer."
},
{
"file": "task/executor/pkg/job_watcher.go",
"line": 310,
"severity": "nit",
"message": "[go-quality] classifyPodFailure does not handle CrashLoopBackOff Waiting.Reason. With BackoffLimit=0 in the spawner, crash-looping pods never reach PodFailed phase and rely solely on activeDeadlineSeconds. Consider adding CrashLoopBackOff to the switch."
},
{
"file": "task/executor/pkg/result_publisher_test.go",
"line": 267,
"severity": "major",
"message": "[go-test-coverage] Dedupe TTL expiry not tested. After one PublishFailure, advancing currentDateTime past 3600s and calling again should allow a fresh publish. Without this, an off-by-one in the < d.ttl boundary silently breaks Kafka-recovery dedupe."
},
{
"file": "task/executor/pkg/result_publisher_test.go",
"line": 203,
"severity": "major",
"message": "[go-test-coverage] PublishFailure first-send error path not tested. When the update publish returns an error, the increment must NOT be attempted and no dedupe entry recorded. This early-return path at line 178 is implemented but untested."
},
{
"file": "task/executor/pkg/zombie_sweeper_test.go",
"line": 95,
"severity": "major",
"message": "[go-test-coverage] Multiple-task SweepOnce with per-task publish error not tested. Loop must continue on error and process remaining tasks. All 7 test cases use exactly one task in the store."
},
{
"file": "task/executor/pkg/result_publisher_test.go",
"line": 336,
"severity": "major",
"message": "[go-test-coverage] PublishTypeMismatchFailure with empty prior assignee not tested. The source explicitly omits previous_assignee key when priorAssignee == '' but this branch has no test coverage."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 202,
"severity": "nit",
"message": "[go-quality] Unused parameters in classify: lib.Task, string, time.Time are all discarded with _. Either use them (richer logging) or remove them from signature."
},
{
"file": "task/executor/pkg/result_publisher.go",
"line": 26,
"severity": "nit",
"message": "[go-quality] Constants dedupeCapacity and dedupeTTL declared as separate const statements. Group them: const ( dedupeCapacity = 1024; dedupeTTL = 3600 * time.Second ) per codebase convention."
}
],
"concerns_addressed": [
"correctness: goroutine lifecycle — zombie_sweeper.go Run() returns nil on ctx.Done(), ticker deferred — clean shutdown. podLister nil check at line 123 avoids nil dereference but is a data race (see finding).",
"correctness: partial-send race in PublishFailure — dedupe ordering violates spec (see finding). WARN log on increment failure at line 210 is Warningf, should be Errorf or at minimum V(1) since it indicates a stuck trigger_count.",
"correctness: LRU dedupe replaced with seen+order — TTL eviction logic correct (oldest evicted when at capacity, ts refreshed on re-insert). No stale entries after eviction (delete + order shift are atomic).",
"security: RBAC grant for pods get/list/watch — minimal and correct for informer cache. No new verbs or resources.",
"correctness: CRD validation floors correct (zombieSweeperIntervalSeconds >= 10, zombieJobTimeoutSeconds >= 30).",
"performance: 60s interval sweeper using Pod informer lister — no per-tick LIST, correct. LRU capacity 1024 with 3600s TTL is reasonable.",
"correctness: ZombieReason enum covers 7 failure modes. BackoffLimitExceeded mapped to deadline_exceeded (rationale documented).",
"tests: envtest ImagePullBackOff test covers real informer wiring. Unit tests cover classifier table and sweeper predicate."
]
}Bot review on PR #13 surfaced 10 findings; this commit addresses all 7 substantive ones (4 major + 3 nits). Documentation-only nit on sweeper interval hot-reload is inlined as a code comment. Major fixes: - Data race on jobWatcher.podLister: replaced bare field with atomic.Pointer[corev1listers.PodLister]. Write site in Run() uses Store; PodLister() uses Load+nil-guard. Verified clean by go test -race ./pkg/.... - Dedupe ordering: recordDedupe was being called between the two publishes. Bot was right that this violates spec — if increment fails, dedupe blocks retry → trigger_count never bumps → cap never fires. Moved recordDedupe to after BOTH publishes succeed. If increment fails, dedupe is NOT recorded, so the next cycle re-sends both messages; the duplicate current_job: "" write is idempotent visually. - classifyPodFailure: added CrashLoopBackOff to the Waiting.Reason switch (returns pod_crash_no_stdout). With BackoffLimit=0 in the spawner, crash-looping pods never reach PodFailed phase, so this closes a gap that previously relied solely on activeDeadlineSeconds. - classify() in zombie_sweeper: removed three unused params (lib.Task, string, time.Time). Nits: - Grouped dedupeCapacity + dedupeTTL into a single const block. - Added code comment above resolveSweeperInterval documenting that the interval is resolved once at startup (CRD changes need pod restart). Test coverage additions (all bot-requested): - Dedupe TTL expiry: advance mocked time past 3600s, assert second PublishFailure publishes fresh messages. - PublishFailure first-send error path: when update publish fails, increment must NOT be attempted and no dedupe entry recorded. - Multi-task SweepOnce with one task's publish error mid-loop: assert remaining tasks still get processed. - PublishTypeMismatchFailure with empty prior assignee: assert Updates map omits previous_assignee key. - CrashLoopBackOff classification: assert new branch fires. Verified: go test -race ./pkg/... passes; make precommit passes.
# Conflicts: # CHANGELOG.md
superseded by new automated review
There was a problem hiding this comment.
{
"verdict": "approve",
"summary": "Spec 043 zombie job detection is well-implemented with comprehensive test coverage (581 lines for zombie_sweeper alone). No critical issues found — RBAC is minimal, context propagation is correct, validation floors are enforced, and the classification predicate is sound. Five should-fix items are flagged for post-merge follow-up: FIFO vs LRU dedupe semantics, context-cancellation error logging, missing CRD upper bounds, and two untested error/concurrency paths. Three SRP refactoring opportunities and a missing factory test suite are noted as nice-to-have.",
"comments": [],
"concerns_addressed": [
"security: RBAC for pods/get/list/watch is namespace-scoped and minimal per security review",
"security: MinZombieSweeperIntervalSeconds=10 and MinZombieJobTimeoutSeconds=30 floors prevent abuse per security review",
"correctness: context.Background() not used in business logic per context review",
"correctness: context propagation correct in goroutines (ticker loop, informer handlers) per context review",
"correctness: classification predicate covers all failure modes per job_watcher and zombie_sweeper reviews",
"correctness: atomic.Pointer for PodLister prevents nil-pointer races per security review",
"performance: informer cache used for sweeper (no per-cycle API LIST) per spec requirement",
"tests: comprehensive table-driven tests for classify outcomes (6 cells) per test coverage review",
"tests: envtest covers real informer wiring for ImagePullBackOff path per spec AC#9"
]
}There was a problem hiding this comment.
{
"verdict": "approve",
"summary": "Spec 043 adds a thorough zombie job detection system to the executor with Pod informer, deadline sweeper, LRU dedupe, and stable reason enums. The implementation is well-structured, uses proper error wrapping, correct time injection, and has good test coverage. No critical issues found. The main gaps are test coverage for edge cases (configProvider error paths, Run lifecycle, hasPodScheduledFalse boundaries) and a minor LRU eviction ordering inaccuracy that doesn't affect correctness.",
"comments": [
{
"file": "task/executor/pkg/result_publisher.go",
"line": 111,
"severity": "major",
"message": "[go-quality] recordDedupe refreshes timestamp but does not move the key to end of d.order, making eviction insertion-order rather than access-order. The ttlDedupe struct GoDoc claims 'minimal TTL'd LRU' but this is approximate insertion-order LRU. Either update the GoDoc to say 'approximate LRU (insertion-order eviction)' or move the refreshed key to end of order slice."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 131,
"severity": "major",
"message": "[go-test-coverage] SweepOnce returns an error when configProvider.Get fails but no unit test covers this error path. A test injecting a failing configProvider should assert the error propagates and no failures are published."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 90,
"severity": "major",
"message": "[go-test-coverage] ZombieSweeper.Run has zero unit tests. The critical invariant (returns nil on ctx.Done(), does not publish on tick errors) is untested. Add a test that calls Run with an already-cancelled context to cover the ctx.Done() branch."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 183,
"severity": "major",
"message": "[go-test-coverage] resolveSweeperInterval error path is untested. When configProvider.Get fails during Run startup, the wrapped error propagates and Run returns — not covered by any unit test."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 243,
"severity": "major",
"message": "[go-test-coverage] hasPodScheduledFalse boundary cases not tested: (a) pod with PodScheduled=True, (b) pod with PodScheduled=False but age <= grace window, (c) pod with no PodScheduled condition. Function is only exercised transitively through SweepOnce tests."
},
{
"file": "task/executor/pkg/zombie_sweeper.go",
"line": 237,
"severity": "major",
"message": "[go-test-coverage] classify fallthrough to ZombieReasonDeadlineExceeded for PodFailed past-deadline is untested via SweepOnce. zombie_sweeper_test.go covers PodFailed only through HandleJob (informer path), not through the SweepOnce/classify path."
},
{
"file": "task/executor/pkg/envtest/job_watcher_envtest_test.go",
"line": 1,
"severity": "major",
"message": "[go-test-coverage] envtest covers only ImagePullBackOff. The Evicted and CrashLoopBackOff informer wiring paths lack envtest coverage despite being classified by the same HandlePod handler."
},
{
"file": "task/executor/pkg/result_publisher.go",
"line": 54,
"severity": "nit",
"message": "[go-quality] PublishRaw GoDoc says 'for testing error paths' but the method is exported and part of the ResultPublisher interface. Consider renaming or clarifying its status as an internal testing utility."
},
{
"file": "task/executor/pkg/zombie_sweeper_test.go",
"line": 113,
"severity": "nit",
"message": "[go-test-coverage] eventHandlerConfig.OnAdd error return value is silently discarded via _ = ... OnAdd(ctx, cfg) at 7 locations. Should use Expect(eventHandlerConfig.OnAdd(ctx, cfg)).To(Succeed()) to catch setup failures."
},
{
"file": "task/executor/pkg/factory/factory.go",
"line": 1,
"severity": "nit",
"message": "[go-factory-pattern] factory_suite_test.go missing from task/executor/pkg/factory/. Pre-existing gap; new CreateZombieSweeper factory function would benefit from a Ginkgo test suite per project standard."
}
],
"concerns_addressed": [
"security: executor ServiceAccount granted get/list/watch on pods — RBAC is least-privilege (batch/jobs: create/get/list/watch/delete, core/pods: get/list/watch, agent.borbe.de/configs: get/list/watch); pods informer cache access is the correct pattern for zombie detection, not per-tick LIST calls",
"security: zombieSweeperIntervalSeconds (floor 10s) and zombieJobTimeoutSeconds (floor 30s) CRD validation enforced via validateZombieSweeperInterval and validateZombieJobTimeout rejecting values below Min constants; negative values and overflow rejected by int32 comparison",
"correctness: PublishFailure two-command atomicity — dedupe recorded ONLY after both publishes succeed (result_publisher.go:207); if first publish fails, dedupe not recorded so next-cycle retry can re-attempt; if second fails, dedupe not recorded so retry re-attempts both; confirmed by code review",
"correctness: podLister nil race fixed via lazy initialization inside SweepOnce (zombie_sweeper.go:122-126) — lister resolved on every tick, returns nil if not yet synced, skips tick; atomic.Pointer pattern used for store/load of lister reference",
"correctness: LRU dedupe uses seen map + order slice — eviction on capacity (zombie_sweeper.go:119-123) deletes oldest by slice shift; stale-index eviction handled correctly by deleting the evicted key from seen map",
"performance: sweeper reads from Pod informer cache (lister.Pods().List with label selector) with no per-tick LIST call — confirmed at zombie_sweeper.go:210 using the shared informer cache per spec DB #9",
"performance: LRU dedupe capacity 1024 entries, 3600s TTL — memory bound is acceptable; hardcoded TTL (3600s) is conservative regardless of zombieJobTimeoutSeconds setting",
"correctness: HandlePod does not delete task from TaskStore — sweeper and Job-condition path own deletion; dedupe in PublishFailure (result_publisher.go:157-159) prevents double-publish for same job name",
"tests: envtest only covers ImagePullBackOff — Evicted and CrashLoopBackOff paths lack envtest coverage as noted in Should Fix"
]
}
Summary
Closes spec 043 — executor classifies dispatched agent Jobs that never produce an
AgentResult(image-pull stuck, pod evicted, deadline exceeded, crash-no-stdout, executor watch loss) and emits doctrine-correct failure events.Five prompts shipped (194-198), plus three runtime fixes from pr-review and one RBAC grant verified live on dev.
What changed
PublishFailure— bumptrigger_count, clearcurrent_job, leave phase/status/assignee untouched. ExistingapplyTriggerCapchokepoint handles escalation at cap.PublishTypeMismatchFailure— clear assignee, set previous_assignee. Semantic failure, retry can't help.JobWatcher.HandlePod+ typedZombieReasonenum (image_pull_backoff,pod_evicted,pod_crash_no_stdout,pod_not_scheduled,deadline_exceeded,executor_watch_lost,type_mismatch).ZombieSweeper.Rungoroutine, 60s interval (Config CRD knobzombieSweeperIntervalSeconds), reads from Pod informer cache (no per-tick LIST).activeDeadlineSecondsstamped on every Job — k8s-native primary deadline; sweeper is safety net.current_job.zombieSweeperIntervalSeconds(default 60, floor 10) andzombieJobTimeoutSeconds(default 1800, floor 30); admission validates floors.get/list/watchon pods (required for the informer cache).sigs.k8s.io/controller-runtimein-process apiserver, build-tag gated.Runtime fixes from /coding:pr-review
podListernil race — passJobWatcherto sweeper; lazyPodLister()insideSweepOncewith nil-guard.seen map[string]time.Time+order []string.PublishFailurepartial-send — record dedupe between the 2 sends; WARN log if increment fails (no duplicate update on retry).Verified on dev
:devrebuilt from feature branch).zombie sweeper started interval=1m0s.job and pod informer started in namespace dev→ lister assigned, no more "sweep skipped: pod lister not yet synced" logs.Not in this PR
Test plan
cd task/executor && make precommitexits 0## Failuresections + cap escalation