Skip to content

fix(core): fix deadlock for concurrent read and concurrent limit layer#7346

Open
dentiny wants to merge 1 commit intoapache:mainfrom
dentiny:hjiang/fix-concurrent-limit-deadlock
Open

fix(core): fix deadlock for concurrent read and concurrent limit layer#7346
dentiny wants to merge 1 commit intoapache:mainfrom
dentiny:hjiang/fix-concurrent-limit-deadlock

Conversation

@dentiny
Copy link
Copy Markdown
Contributor

@dentiny dentiny commented Mar 31, 2026

Which issue does this PR close?

Closes #7338

Rationale for this change

I met process stuck and deadlock in production environment, which turns out to be deadlock between concurrent limit & concurrent read.
This PR tries to fix the potential deadlock.

Before this PR

HTTP permits = 2, concurrent = 4

ChunkedReader::read():
│
│  while has_remaining() && !done:          # caller's serial loop
│  │
│  │  ┌─ generator.next_reader()
│  │  │    └─ acc.read("file", range=0..256)
│  │  │         └─ http_client.fetch()
│  │  │              ├─ permit = semaphore.acquire()   ← ACQUIRE permit #1
│  │  │              └─ return HttpBody { stream: ConcurrentLimitStream { _permit: permit#1 } }
│  │  │
│  │  └─ reader₁ holds permit#1
│  │
│  │  tasks.execute(reader₁)                # reader₁ (with permit#1) goes INTO the queue
│  │  │  └─ spawn background: reader₁.read_all()    # task running, permit#1 still held
│  │  │
│  │  has_result()? → no (task not done yet)
│  │  ─── loop ───
│  │
│  │  ┌─ generator.next_reader()
│  │  │    └─ acc.read("file", range=256..512)
│  │  │         └─ http_client.fetch()
│  │  │              ├─ permit = semaphore.acquire()   ← ACQUIRE permit #2
│  │  │              └─ return HttpBody { _permit: permit#2 }
│  │  └─ reader₂ holds permit#2
│  │
│  │  tasks.execute(reader₂)                # reader₂ (with permit#2) goes INTO the queue
│  │  ─── loop ───
│  │
│  │  ┌─ generator.next_reader()
│  │  │    └─ acc.read("file", range=512..768)
│  │  │         └─ http_client.fetch()
│  │  │              └─ semaphore.acquire()            ← BLOCKED (0 permits left)
│  │  │                                                   permit#1 stuck in queue (reader₁)
│  │  │                                                   permit#2 stuck in queue (reader₂)
│  │  │                                                   can never reach tasks.next()
│  │  │                                                   to drain the queue
│  │  │                                                   ═══ DEADLOCK ═══

After this change

HTTP permits = 2, concurrent = 4

ChunkedReader::read():
│
│  while has_remaining() && !done:          # caller's serial loop
│  │
│  │  range = next_range()  → 0..256       # just compute the range, no HTTP call
│  │
│  │  tasks.execute(ChunkedReadInput { range: 0..256 })
│  │  │  └─ spawn background task T1:
│  │  │       ├─ acc.read("file", range=0..256)
│  │  │       │    └─ http_client.fetch()
│  │  │       │         ├─ semaphore.acquire()         ← ACQUIRE permit #1
│  │  │       │         └─ return HttpBody { _permit: permit#1 }
│  │  │       ├─ reader.read_all()                     # consume all data
│  │  │       └─ drop reader                           ← RELEASE permit #1 ✓
│  │  │            return (input, Ok(data))             # no permit in the return value
│  │  │
│  │  ─── loop ───
│  │  range = next_range()  → 256..512
│  │  tasks.execute(ChunkedReadInput { range: 256..512 })   # spawn T2
│  │  ─── loop ───
│  │  range = next_range()  → 512..768
│  │  tasks.execute(ChunkedReadInput { range: 512..768 })   # spawn T3
│  │  ─── loop ───
│  │  range = next_range()  → 768..1024
│  │  tasks.execute(ChunkedReadInput { range: 768..1024 })
│  │  │  └─ !has_remaining() (4 tasks == concurrent limit)
│  │  │     await front task T1                        # T1 already completed and released permit
│  │  │     pop T1, push T4                            ← progress ✓
│  │  │
│  │  ... continues until all chunks are read ...
│
│  tasks.next() → return data

# T1–T4 each independently:  acquire permit → fetch → read_all → drop reader → release permit
# No permit ever crosses the task-queue boundary.

What changes are included in this PR?

During the investigation I found only current read suffers the potential deadlock but not concurrent write.
This PR mimics what we do in concurrent write, which issues requests in the background so we don't block on the foreground when ConcurrentTasks issue new read requests.

Are there any user-facing changes?

No.

AI Usage Statement

Opus 4.6 made the code change.

@dentiny dentiny requested a review from Xuanwo as a code owner March 31, 2026 20:53
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. releases-note/fix The PR fixes a bug or has a title that begins with "fix" labels Mar 31, 2026
@dentiny dentiny force-pushed the hjiang/fix-concurrent-limit-deadlock branch from f38c359 to f670349 Compare March 31, 2026 20:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

releases-note/fix The PR fixes a bug or has a title that begins with "fix" size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

new feature (or bug?): concurrent limit layer and concurrent read could lead to deadlock

1 participant