Skip to content

python concurrent future

MilesBai edited this page Aug 15, 2025 · 2 revisions

as_completed

https://github.com/python/cpython/blob/53da1e8c8ccbe3161ebc42e8b8b7ebd1ab70e05b/Lib/concurrent/futures/_base.py#L193

as_completed() is indeed a blocking generator that yields futures as they finish or raises TimeoutError on timeout.

Key Insights:
Lazy evaluation: Only yields when something is actually ready
Efficient blocking: No CPU cycles wasted on polling
Order independence: Yields in completion order, not submission order
Exception propagation: future.result() can still raise exceptions from the original task


# You have 3 scan files, submitted simultaneously
future_to_arg = {
    future1: "file1.zip",  # Takes 10 seconds
    future2: "file2.zip",  # Takes 5 seconds  
    future3: "file3.zip",  # Takes 15 seconds
}

for future in as_completed(future_to_arg):
    # t=0s:   Generator created, starts waiting... 🔒 BLOCKED
    # t=5s:   future2 completes → YIELDS future2  ⚡
    # t=5s:   Your loop processes future2.result()
    # t=5s:   Generator continues waiting... 🔒 BLOCKED
    # t=10s:  future1 completes → YIELDS future1  ⚡  
    # t=10s:  Your loop processes future1.result()
    # t=10s:  Generator continues waiting... 🔒 BLOCKED
    # t=15s:  future3 completes → YIELDS future3  ⚡
    # t=15s:  Your loop processes future3.result()
    # t=15s:  No more futures → StopIteration (loop ends)

Great question! Let me break down how as_completed() works internally. This is a fascinating piece of concurrent programming.

High-Level Summary

as_completed() is an iterator that yields futures as they finish, regardless of the order they were submitted. It uses an event-driven waiting mechanism to efficiently monitor multiple futures simultaneously.

Step-by-Step Breakdown

1. Setup Phase

fs = set(fs)  # Convert to set (removes duplicates, fast lookups)
total_futures = len(fs)
  • Converts the input futures to a set for efficient operations
  • Stores total count for timeout error messages

2. Initial State Check

finished = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
  • Immediately identifies any futures that are already done
  • Separates them into finished and pending sets

3. Waiter Installation

waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

This is the key mechanism:

  • Creates a single waiter object that monitors ALL futures
  • Installs this waiter on each future's _waiters list
  • When any future completes, it will notify this waiter

4. Immediate Yield of Finished Futures

yield from _yield_finished_futures(finished, waiter, ref_collect=(fs,))
  • Immediately yields any futures that were already completed
  • No waiting required for these

5. Main Event Loop

while pending:
    waiter.event.wait(wait_timeout)  # BLOCKS until ANY future completes
    
    with waiter.lock:
        finished = waiter.finished_futures  # Get newly completed futures
        waiter.finished_futures = []       # Reset the list
        waiter.event.clear()               # Reset the event
    
    finished.reverse()  # Keep finishing order
    yield from _yield_finished_futures(finished, waiter, ...)

The Magic: Event-Driven Waiting

Here's the brilliant part:

Traditional Polling Approach (inefficient):

# Bad approach - constantly checking
while pending:
    for future in pending:
        if future.done():
            yield future
    time.sleep(0.01)  # Waste CPU cycles

Event-Driven Approach (efficient):

# Good approach - wait for notifications
waiter.event.wait()  # Thread sleeps until ANY future completes
# When a future finishes, it wakes up this thread

Key Components

Waiter Object:

  • event: A threading event that gets signaled when futures complete
  • finished_futures: List of futures that just completed
  • lock: Protects the finished_futures list from race conditions

Future State Management:

  • Each future has a _waiters list
  • When a future completes, it notifies ALL waiters in its list
  • The waiter collects these notifications efficiently

Why This Design is Brilliant

1. Efficient Resource Usage

  • No polling: Thread sleeps until work is actually done
  • Single waiter: One object monitors all futures, not one per future

2. Thread Safety

  • Uses locks to protect shared data structures
  • Atomic operations for state changes

3. Timeout Support

  • Can wait with a timeout and raise TimeoutError appropriately
  • Calculates remaining time dynamically

4. Order Preservation

finished.reverse()  # Keep finishing order
  • Maintains the order in which futures actually completed
  • Important for debugging and logging

In Your Code Context

When you use:

for future in as_completed(future_to_arg):
    brief_log = future.result()

What happens:

  1. Setup: Waiter installed on all your futures
  2. First yield: Any already-completed futures (usually none)
  3. Wait: Thread sleeps until any scan processing completes
  4. Wake up: When a scan finishes, thread wakes up
  5. Yield: That completed future is yielded to your loop
  6. Repeat: Back to waiting for the next completion

This is why you see results as soon as each scan finishes, not in the order you submitted them!

Performance Benefits

  • Zero CPU waste: No busy waiting or polling
  • Immediate response: Results available the moment tasks complete
  • Scalable: Works efficiently with 1 future or 1000 futures

This design makes as_completed() perfect for your use case where you want immediate feedback as scan files are processed!

Let me explain the CPU/OS mechanics behind how waiter.event.wait() actually works.

The Foundation: Operating System Primitives

What threading.Event Really Is:

# Python's threading.Event wraps OS primitives
waiter.event.wait()  # Becomes a system call to the OS kernel

The Event object is built on top of conditional variables and mutexes, which are OS-level synchronization primitives.

CPU/OS Mechanics Breakdown

1. Thread State Transitions

When waiter.event.wait() is called:

🔄 RUNNING → 😴 BLOCKED (WAITING)

At the CPU level:

  • Thread is removed from CPU scheduler
  • CPU registers are saved (context)
  • Thread moves to OS wait queue
  • CPU is freed for other threads

2. OS Kernel Wait Queue

OS Kernel Wait Queue for Event Object:
┌─────────────────────────────────────┐
│  Event Object ID: 0x7f8b1c004e70    │
├─────────────────────────────────────┤
│  Waiting Threads:                   │
│  ├─ Thread-1 (your as_completed)    │  😴
│  ├─ Thread-2 (another waiter)       │  😴
│  └─ Thread-3 (yet another waiter)   │  😴
└─────────────────────────────────────┘

3. The Notification Mechanism

When a future completes, it calls event.set():

# Inside the completing future
self._condition.notify_all()  # Wakes up ALL waiters

OS-level sequence:

  1. System call to kernel: "Wake up threads waiting on Event X"
  2. Kernel scheduler moves threads from wait queue → ready queue
  3. CPU scheduler eventually runs the awakened threads

Deep Dive: Conditional Variables

POSIX pthread_cond Implementation (Linux/macOS):

// Simplified C code that Python's threading.Event uses
pthread_mutex_t mutex;
pthread_cond_t condition;

// wait() equivalent
pthread_mutex_lock(&mutex);
while (!event_is_set) {
    pthread_cond_wait(&condition, &mutex);  // 🔒 BLOCKS HERE
}
pthread_mutex_unlock(&mutex);

// set() equivalent  
pthread_mutex_lock(&mutex);
event_is_set = true;
pthread_cond_broadcast(&condition);  // 📢 WAKE ALL WAITERS
pthread_mutex_unlock(&mutex);

Windows Implementation:

Uses WaitForSingleObject() and SetEvent() Win32 APIs.

CPU Architecture Details

Modern CPU Features Used:

1. Hardware Support for Synchronization

  • Atomic instructions: CMPXCHG, LOCK prefix
  • Memory barriers: Ensure correct ordering
  • Cache coherency: MESI protocol for multi-core consistency

2. Futex (Fast Userspace Mutex) on Linux:

User Space:     if (uncontended) { quick_lock(); return; }
                ↓ (only if contention)
Kernel Space:   futex_wait() → scheduler → wait_queue

Optimization: Most locks never go to kernel if uncontended!

3. CPU Context Switching:

When thread blocks/wakes:

1. Save CPU state:    Registers → Thread Control Block (TCB)
2. Update scheduler:  Thread state = WAITING
3. Load new thread:   TCB → Registers  
4. Jump to new code:  CPU continues different thread

Cost: ~1000-5000 CPU cycles for context switch

Real Example: Your Code's Flow

# Thread-1: Your main thread
for future in as_completed(future_to_arg):  # 👈 You are here
    
# Thread-2: Processing file1.zip
def process_scan(arg):
    # ... processing zip file ...
    return result  # 👈 When this returns, magic happens!

The Notification Chain:

1. Thread-2 finishes process_scan()
   ↓
2. ThreadPoolExecutor sets future._state = FINISHED
   ↓  
3. future._condition.notify_all() called
   ↓
4. OS kernel: pthread_cond_broadcast()
   ↓
5. Kernel moves Thread-1 from wait_queue → ready_queue
   ↓
6. CPU scheduler: Thread-1 gets CPU time
   ↓
7. Thread-1 wakes up in as_completed(), yields the future
   ↓
8. Your loop continues: brief_log = future.result()

Performance Implications

Why This is Efficient:

1. No CPU Waste:

# BAD: Polling (wastes CPU)
while not future.done():
    time.sleep(0.001)  # 1000 wake-ups per second!

# GOOD: Event-driven (zero CPU when waiting)
waiter.event.wait()  # Sleeps until notified

2. Scalable:

  • O(1) notification: One notify_all() wakes all waiters
  • No polling overhead: CPU completely free while waiting
  • Efficient for 1 or 1000 futures

3. Hardware Optimization:

  • Modern CPUs: Hardware support for atomic operations
  • OS optimization: Futex avoids syscalls for uncontended cases
  • Memory hierarchy: Cache-efficient data structures

Alternative Implementations

Busy Waiting (Bad):

# Wastes CPU cycles
while not condition:
    pass  # Spin lock - BAD!

Polling with Sleep (Better but Inefficient):

while not condition:
    time.sleep(0.01)  # Still wasteful

Event-Driven (Optimal):

event.wait()  # Blocks efficiently until notified

Summary

The waiter.event.wait() mechanism is built on decades of OS and CPU optimization:

  1. Hardware: Atomic instructions, cache coherency
  2. OS Kernel: Conditional variables, efficient schedulers
  3. Runtime: Optimized threading libraries
  4. Result: Zero CPU usage while waiting, instant notification when ready

This is why as_completed() can efficiently monitor hundreds of futures simultaneously without any performance penalty! 🚀

Clone this wiki locally