ref(service): Add task spawning with panic isolation to StorageService#322
ref(service): Add task spawning with panic isolation to StorageService#322
Conversation
Public methods now take owned values and spawn each operation in an isolated task. This ensures run-to-completion on caller cancellation (critical for tombstone consistency) and prevents backend panics from propagating.
* main: meta(ai): Improve AGENTS.md development workflow (#321)
Integration tests (real backends) now use the public StorageService API. Unit tests use StorageServiceInner directly.
jan-auer
left a comment
There was a problem hiding this comment.
I am considering to split the public and inner types in two modules eventually. For now, they can remain in one.
| /// isolated from panics in backend code — a failure in one operation does not | ||
| /// bring down other in-flight work. See [`Error::Panic`]. | ||
| #[derive(Clone, Debug)] | ||
| pub struct StorageService(Arc<StorageServiceInner>); |
There was a problem hiding this comment.
This merely moved so that the struct definition and impl block aren't separated, and another section on task isolation was added.
| { | ||
| let (tx, rx) = tokio::sync::oneshot::channel(); | ||
| tokio::spawn(async move { | ||
| let result = std::panic::AssertUnwindSafe(f) |
There was a problem hiding this comment.
We have a panic handler in requests, too. This one allows the request handler to catch panics individually, such as in the batch endpoint.
| long_term_backend: BoxedBackend, | ||
| } | ||
|
|
||
| impl StorageServiceInner { |
There was a problem hiding this comment.
All actual business logic has been moved to StorageServiceInner now. This avoid an indirection via self.0 in the implementation.
| .unwrap(); | ||
|
|
||
| let (_metadata, stream) = service.get_object(&key).await.unwrap().unwrap(); | ||
| let (_metadata, stream) = service.get_object(key).await.unwrap().unwrap(); |
There was a problem hiding this comment.
Integration tests use the public API while all lower-level logic tests use the inner service.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
Builds on [#322](#322) (task spawning with panic isolation). Adds a semaphore-based concurrency limit to `StorageService` that caps the total number of in-flight backend operations. When the limit is reached, new operations are rejected immediately with HTTP 429 rather than queueing, preventing backend overload during traffic bursts. The limit is acquired before spawning each operation task and the permit is held until the task completes (including after panics). Configured via `service.max_concurrency` (default: 500, effectively unlimited without configuration). Includes tests for rejection at capacity and permit release after panics.
Each
StorageServiceoperation now runs in a spawned tokio task, communicating results via a oneshot channel. This gives two guarantees:Error::Panicinstead of propagating).Business logic moved from
StorageServiceto a privateStorageServiceInnerstruct.StorageServiceis now a thin public API that takes owned values (ObjectId,Metadata), spawns each operation in a separate task, and delegates to the inner type. Existing tests targetStorageServiceInnermethods directly (same-module access), so they required no changes. New tests exercise the public spawn-based API including panic recovery and receiver-drop completion.Next steps
ServiceRunner), replacing the currentInFlightRequestsLayerwith purpose-specific controls.tokio::spawnwith a bounded channel dispatching to a fixed worker pool, enabling backpressure, fire-and-forget operations, and priority scheduling.CancellationTokento actively cancel backend operations on client disconnect, with safety invariants protecting tombstone writes from cancellation.Ref FS-171