diff --git a/tests/unit/test_load_control.py b/tests/unit/test_load_control.py index 5593708..c089350 100644 --- a/tests/unit/test_load_control.py +++ b/tests/unit/test_load_control.py @@ -144,41 +144,48 @@ def test_context_manager_exception_handling(self): assert controller.queue_depth == 0 def test_concurrent_operations(self): - """Test controller under high concurrency.""" - controller = BackpressureController(max_concurrent=5, queue_size=10, timeout=0.1) + """Test controller under high concurrency. + + Uses a Barrier to force all threads to compete simultaneously, + and work duration >> timeout so permits can't be recycled. + Capacity = max_concurrent(2) + queue_size(3) = 5, so 15 of 20 + threads must be rejected — no timing sensitivity. + """ + controller = BackpressureController(max_concurrent=2, queue_size=3, timeout=0.01) results = [] exceptions = [] + barrier = threading.Barrier(20, timeout=5) def worker(worker_id): from cachekit.backends.errors import BackendError try: + barrier.wait() # All threads launch simultaneously with controller.acquire(): - time.sleep(0.05) # Simulate work + time.sleep(0.2) # Hold permit far longer than timeout results.append(worker_id) except BackendError as e: exceptions.append((worker_id, str(e))) + except threading.BrokenBarrierError: + exceptions.append((worker_id, "barrier broken")) - # Launch many concurrent workers threads = [] - for i in range(20): # More than max_concurrent + queue_size + for i in range(20): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() - # Wait for all threads for t in threads: t.join() - # Some should succeed, some should be rejected assert len(results) > 0, "Some operations should succeed" assert len(exceptions) > 0, "Some operations should be rejected" assert len(results) + len(exceptions) == 20 # Verify final state is clean assert controller.queue_depth == 0 - assert controller._semaphore._value == 5 # Back to max_concurrent + assert controller._semaphore._value == 2 # Back to max_concurrent def test_metrics_tracking(self): """Test that metrics are properly tracked."""