feat(java): worker resource dependency injection#340
Conversation
Register a resource once, resolve it in handlers via Resources.use(name). Two scopes: WORKER (built once, shared, disposed at worker close) and TASK (built + disposed per invocation). Injected at the single dispatch point via a ThreadLocal ScopeContext seam; zero overhead when unused.
Worker-scoped sharing, task-scoped per-invocation build + dispose via metrics, and use-outside-task rejection.
📝 WalkthroughWalkthroughAdds named worker resources with WORKER and TASK scopes, runtime metrics, task-scoped access helpers, worker lifecycle wiring, and public registration APIs. It also adds new exception and scope types plus tests for lifecycle, duplication, circular dependencies, and access rules. ChangesWorker-side Resource DI System
Sequence Diagram(s)sequenceDiagram
participant Resources
participant TaskScope
participant ResourceRuntime
participant Worker
Resources->>TaskScope: enter(scope)
TaskScope->>ResourceRuntime: resolveForTask(scope, name)
alt WORKER scope
ResourceRuntime->>ResourceRuntime: resolveWorker(name)
else TASK scope
ResourceRuntime->>TaskScope: cache + teardown registration
end
Worker->>ResourceRuntime: acquireWorker / teardownWorker
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java (1)
33-35: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAlso assert worker-scoped teardown.
This test proves
"shared"is created once, but it never verifies that the worker-scoped resource is disposed when the worker closes. A regression in worker teardown would still pass here.Proposed test tightening
void workerResourceSharedTaskResourcePerInvocation(`@TempDir` Path dir) throws Exception { int jobs = 3; AtomicInteger taskDisposed = new AtomicInteger(); + AtomicInteger sharedDisposed = new AtomicInteger(); try (Taskito queue = Taskito.builder().url(dir.resolve("r.db").toString()).open()) { - queue.resource("shared", ctx -> new Object()); // WORKER scope (default): built once + queue.resource("shared", ResourceScope.WORKER, ctx -> new Object(), value -> sharedDisposed.incrementAndGet()); queue.resource("perTask", ResourceScope.TASK, ctx -> new Object(), value -> taskDisposed.incrementAndGet()); @@ Map<String, ResourceStat> metrics = queue.resourceMetrics(); assertEquals(1, metrics.get("shared").created(), "worker resource built once"); + assertEquals(1, metrics.get("shared").disposed(), "worker resource disposed on worker close"); assertEquals(jobs, metrics.get("perTask").created(), "task resource built per invocation"); assertEquals(jobs, metrics.get("perTask").disposed(), "task resource disposed per invocation"); + assertEquals(1, sharedDisposed.get()); assertEquals(jobs, taskDisposed.get()); } }Also applies to: 51-55
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java` around lines 33 - 35, The ResourceTest currently verifies creation behavior for the worker-scoped "shared" resource but does not assert its worker teardown; update the test around queue.resource and the worker lifecycle so it also checks that the WORKER-scoped resource is disposed when the worker closes. Use the existing teardown callback pattern shown on queue.resource("perTask", ...) and the worker/resource setup in this test to add an assertion tied to the worker-close path, so a regression in worker disposal will fail the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java`:
- Line 65: The worker-scoped ResourceRuntime is being shared by all workers
created from a single DefaultTaskito instance, so WORKER resources are
effectively client-scoped. Update DefaultTaskito and its worker() flow so each
Worker gets its own live runtime state instead of reusing the same resources
field, while keeping any shared registration metadata separate or cloned per
worker. Make sure worker creation, resource lookup, and worker close/dispose
paths all use the per-worker runtime and not the DefaultTaskito-level instance.
In `@sdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.java`:
- Around line 48-52: The register method in ResourceRuntime does not fully
replace an already resolved WORKER resource because workerCache and
workerTeardown still retain the old instance and disposer. Update
register(String, ResourceDefinition) so it either rejects re-registration after
first resolution or, under the same per-name lock used by
resolveWorker/teardownWorker, evicts the cached entry and disposes the old
instance before storing the new definition.
- Around line 90-139: Add an explicit in-progress guard to ResourceRuntime’s
resolveWorker and resolveForTask paths so circular dependencies fail with
ResourceException instead of recursing or deadlocking. Before calling build(...)
in resolveWorker and resolveForTask, mark the resource name as currently being
resolved in the relevant cache/context, and detect re-entry on the same name
(for A -> A) or nested cycles (for A -> B -> A) to throw a clear cycle error.
Keep the existing caching and teardown behavior, but ensure the guard is set
before build(...) and cleared in a finally block, using the unique symbols
resolveWorker, resolveForTask, workerCache, and TaskScope.cache()/pushTeardown()
to locate the changes.
In `@sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java`:
- Around line 55-57: The Worker.builder factory has replaced the older overload,
which breaks existing callers. In Worker.java, keep the original
Worker.builder(QueueBackend, Serializer, List<Middleware>) overload for
compatibility and have it delegate to the new resource-aware Worker.builder(...)
path by supplying the appropriate ResourceRuntime default. Preserve the existing
Builder constructor usage so external code depending on the 3-arg signature
continues to compile.
---
Nitpick comments:
In `@sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java`:
- Around line 33-35: The ResourceTest currently verifies creation behavior for
the worker-scoped "shared" resource but does not assert its worker teardown;
update the test around queue.resource and the worker lifecycle so it also checks
that the WORKER-scoped resource is disposed when the worker closes. Use the
existing teardown callback pattern shown on queue.resource("perTask", ...) and
the worker/resource setup in this test to add an assertion tied to the
worker-close path, so a regression in worker disposal will fail the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ed93b01d-2fbd-4a01-91cc-d298d2c71838
📒 Files selected for processing (15)
sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.javasdks/java/src/main/java/org/byteveda/taskito/Taskito.javasdks/java/src/main/java/org/byteveda/taskito/errors/ResourceException.javasdks/java/src/main/java/org/byteveda/taskito/internal/ScopeContext.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceContext.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceDefinition.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceScope.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceStat.javasdks/java/src/main/java/org/byteveda/taskito/resources/Resources.javasdks/java/src/main/java/org/byteveda/taskito/resources/TaskScope.javasdks/java/src/main/java/org/byteveda/taskito/resources/package-info.javasdks/java/src/main/java/org/byteveda/taskito/worker/Worker.javasdks/java/src/main/java/org/byteveda/taskito/worker/WorkerDispatchBridge.javasdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java
There was a problem hiding this comment.
♻️ Duplicate comments (1)
sdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.java (1)
131-138: 🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy liftCross-thread WORKER cycles can still deadlock.
The
ThreadLocalchain only detects cycles on one thread. Two worker threads resolvingA -> BandB -> Acan each hold one per-nameReentrantLockand block forever on the other. Consider shared in-progress cycle tracking or lock acquisition with rollback/timeout before running factories under nested locks.This overlaps with the earlier circular-dependency review note, but the cross-thread WORKER case remains visible in this implementation.
Also applies to: 172-175
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.java` around lines 131 - 138, Cross-thread WORKER resolution can still deadlock because `ResourceRuntime` only tracks cycles with a per-thread `ThreadLocal` while `workerLocks.computeIfAbsent(...)/lock()` allows two threads to block each other on `ReentrantLock`s. Update the WORKER path in `ResourceRuntime` (including the `build(...)` flow and the related code around the later lines noted in the review) to add shared in-progress cycle detection or use a lock acquisition strategy with rollback/timeout so nested factory calls cannot wait forever across threads.
🧹 Nitpick comments (1)
sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java (1)
29-58: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd coverage for worker-scoped teardown.
Current tests verify task-scoped disposal and worker-scoped creation, but not that a WORKER-scoped disposer runs when the worker closes or that multiple worker disposers run LIFO. A small disposer-order assertion would lock down the worker teardown path separately from
TaskScope.Also applies to: 106-129
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java` around lines 29 - 58, Add a test that covers WORKER-scoped teardown in `ResourceTest.workerResourceSharedTaskResourcePerInvocation` (or a nearby dedicated test) by registering one or more `Taskito.builder().resource(...)` entries with `ResourceScope.WORKER` and a disposer that records execution. Assert that the worker-scoped disposer runs when the `Worker` is closed, and if multiple worker resources are registered, verify their disposers run in LIFO order using the existing `Worker`, `Taskito`, and `resourceMetrics()` patterns for location.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.java`:
- Around line 131-138: Cross-thread WORKER resolution can still deadlock because
`ResourceRuntime` only tracks cycles with a per-thread `ThreadLocal` while
`workerLocks.computeIfAbsent(...)/lock()` allows two threads to block each other
on `ReentrantLock`s. Update the WORKER path in `ResourceRuntime` (including the
`build(...)` flow and the related code around the later lines noted in the
review) to add shared in-progress cycle detection or use a lock acquisition
strategy with rollback/timeout so nested factory calls cannot wait forever
across threads.
---
Nitpick comments:
In `@sdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java`:
- Around line 29-58: Add a test that covers WORKER-scoped teardown in
`ResourceTest.workerResourceSharedTaskResourcePerInvocation` (or a nearby
dedicated test) by registering one or more `Taskito.builder().resource(...)`
entries with `ResourceScope.WORKER` and a disposer that records execution.
Assert that the worker-scoped disposer runs when the `Worker` is closed, and if
multiple worker resources are registered, verify their disposers run in LIFO
order using the existing `Worker`, `Taskito`, and `resourceMetrics()` patterns
for location.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d199c0b7-053f-4562-8cbc-d71c2ad5b48e
📒 Files selected for processing (4)
sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.javasdks/java/src/main/java/org/byteveda/taskito/resources/ResourceRuntime.javasdks/java/src/main/java/org/byteveda/taskito/worker/Worker.javasdks/java/src/test/java/org/byteveda/taskito/ResourceTest.java
🚧 Files skipped from review as they are similar to previous changes (2)
- sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java
- sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java
Adds worker-side dependency injection: register a resource once, resolve it
inside a task without threading it through payloads. Node-style two-scope model
(no Python-style 4-scope/pool/health).
API
ResourceScope{WORKER, TASK};ResourceContext(scope +use(name)).Taskito.resource(name, factory)(WORKER), an overload with scope + adisposecallback, andresourceMetrics().Resources.use("name")resolves the resource (task cache → worker).Mechanics
internal.ScopeContext<T>— a thinThreadLocalseam (single swap point if thebaseline later rises to
ScopedValue); set/cleared per task intry/finallysopooled worker threads never leak context.
ResourceRuntimememoizes WORKER resources (build-once via a per-name lock,evict on factory failure → retryable), creates per-task scopes, and disposes
LIFO. Lease-counted
acquireWorker/teardownWorker.WorkerDispatchBridge.runJobwraps the handler in a task scope only whenresources are registered (
isEmpty()short-circuit → zero overhead unused).ResourceException; a disposal failure neverfails a settled job (best-effort, logged via
System.Logger).Tests
WorkerResourceTestcovers WORKER vs TASK scope, injection atrunJob, disposalorder, and metrics.
Pure Java, no native change. Rebased onto master.
Summary by CodeRabbit