Problem
RedisSemaphore.acquire() in src/workflow/utils/concurrency.py uses a busy-wait loop with asyncio.sleep(0.25). Under contention:
- Each waiter does 4 Redis ops (INCR + EXPIRE + DECR + TTL check) every 250ms
- 100 waiters = 1600 Redis ops/second just for polling
- Adds up to 250ms unnecessary latency per acquisition
- No backoff — fixed interval regardless of contention level
Proposed Solution
Replace the poll loop with Redis pub/sub notification:
- When
release() is called, publish to channel conc:notify:{key}
- In
acquire(), subscribe to the channel and await a message instead of polling
- Keep the INCR/DECR counter approach for the semaphore itself
- Keep a timeout deadline for safety
This eliminates wasted Redis ops and reduces acquisition latency from ~125ms avg to near-instant.
Files
src/workflow/utils/concurrency.py
Problem
RedisSemaphore.acquire()insrc/workflow/utils/concurrency.pyuses a busy-wait loop withasyncio.sleep(0.25). Under contention:Proposed Solution
Replace the poll loop with Redis pub/sub notification:
release()is called, publish to channelconc:notify:{key}acquire(), subscribe to the channel andawaita message instead of pollingThis eliminates wasted Redis ops and reduces acquisition latency from ~125ms avg to near-instant.
Files
src/workflow/utils/concurrency.py