Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions tests/unit/test_load_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,41 +144,48 @@ def test_context_manager_exception_handling(self):
assert controller.queue_depth == 0

def test_concurrent_operations(self):
"""Test controller under high concurrency."""
controller = BackpressureController(max_concurrent=5, queue_size=10, timeout=0.1)
"""Test controller under high concurrency.

Uses a Barrier to force all threads to compete simultaneously,
and work duration >> timeout so permits can't be recycled.
Capacity = max_concurrent(2) + queue_size(3) = 5, so 15 of 20
threads must be rejected — no timing sensitivity.
"""
controller = BackpressureController(max_concurrent=2, queue_size=3, timeout=0.01)

results = []
exceptions = []
barrier = threading.Barrier(20, timeout=5)

def worker(worker_id):
from cachekit.backends.errors import BackendError

try:
barrier.wait() # All threads launch simultaneously
with controller.acquire():
time.sleep(0.05) # Simulate work
time.sleep(0.2) # Hold permit far longer than timeout
results.append(worker_id)
except BackendError as e:
exceptions.append((worker_id, str(e)))
except threading.BrokenBarrierError:
exceptions.append((worker_id, "barrier broken"))

# Launch many concurrent workers
threads = []
for i in range(20): # More than max_concurrent + queue_size
for i in range(20):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

# Wait for all threads
for t in threads:
t.join()

# Some should succeed, some should be rejected
assert len(results) > 0, "Some operations should succeed"
assert len(exceptions) > 0, "Some operations should be rejected"
assert len(results) + len(exceptions) == 20

# Verify final state is clean
assert controller.queue_depth == 0
assert controller._semaphore._value == 5 # Back to max_concurrent
assert controller._semaphore._value == 2 # Back to max_concurrent

def test_metrics_tracking(self):
"""Test that metrics are properly tracked."""
Expand Down
Loading