# Futures in Python: From Basic to Advanced

## 1. Introduction to Futures

Futures in Python represent the result of asynchronous computations. They are a powerful tool for managing concurrent operations and can significantly improve the performance of I/O-bound and CPU-bound tasks.

Analogy: Think of a future as a 'promise' or an 'IOU' (I Owe You) note. When you start a task, you get a future object immediately, which is like receiving an IOU. You can continue with other work, and when you need the result, you can 'cash in' the IOU to get the actual value.

Let's start with a simple example to illustrate the concept:

In [None]:
import concurrent.futures
import time

def slow_operation(seconds):
    time.sleep(seconds)  # Simulate a time-consuming operation
    return f"Operation completed in {seconds} seconds"

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit the task and get a future
    future = executor.submit(slow_operation, 3)
    
    print("Future created. Doing other work...")
    time.sleep(1)  # Simulate other work
    
    # Get the result from the future
    result = future.result()
    print(result)


In this example:
1. We define a `slow_operation` function that simulates a time-consuming task.
2. We create a `ThreadPoolExecutor` to manage concurrent operations.
3. We submit our task to the executor, which immediately returns a future object.
4. We can do other work while the future is being processed.
5. When we need the result, we call `future.result()`, which will wait if necessary and then return the result.

This demonstrates the basic concept of futures: they allow us to start a task and continue with other work, checking for the result when we need it.

## 2. Basics of Asyncio

Before diving deeper into futures, it's important to understand the basics of asyncio, as it's closely related to how futures work in Python.

Asyncio is a library for writing concurrent code using the async/await syntax. It's particularly useful for I/O-bound and high-level structured network code.

Analogy: Think of asyncio as a juggler. A juggler can keep multiple balls in the air by quickly switching attention between them. Similarly, asyncio can manage multiple tasks by switching between them when they're waiting for I/O operations.

Let's see a basic asyncio example:

In [None]:
import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print("started at", time.strftime("%X"))

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print("finished at", time.strftime("%X"))

asyncio.run(main())


In this example:
1. We define an asynchronous function `say_after` that waits for a specified delay and then prints a message.
2. Our `main` function calls `say_after` twice, waiting for each to complete before moving on.
3. We use `asyncio.run()` to run our main coroutine.

This is a synchronous use of asyncio. In the next section, we'll see how to use futures with asyncio for concurrent operations.

## 3. Concurrent Futures

The `concurrent.futures` module provides a high-level interface for asynchronously executing callables. It abstracts the complexities of using threads or processes, making it easier to write concurrent code.

Analogy: Think of `concurrent.futures` as a team of workers. You can assign tasks to this team, and they'll work on them concurrently. You can check on the progress of any task at any time, or wait for all tasks to complete.

Let's explore the two main types of executors in `concurrent.futures`: ThreadPoolExecutor and ProcessPoolExecutor.

### 3.1 ThreadPoolExecutor

ThreadPoolExecutor is used for I/O-bound tasks. It's suitable when your tasks spend a lot of time waiting for external operations (like network requests or file I/O).

Let's see an example where we use ThreadPoolExecutor to download multiple web pages concurrently:

In [None]:
import concurrent.futures
import requests
import time

def download_page(url):
    response = requests.get(url)
    return f"Downloaded {url}, status: {response.status_code}, length: {len(response.text)}"

urls = [
    'https://www.example.com',
    'https://www.python.org',
    'https://www.github.com',
    'https://www.stackoverflow.com'
]

start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # Submit all tasks and create a map of futures to their URLs
    future_to_url = {executor.submit(download_page, url): url for url in urls}
    
    # Iterate over the futures as they complete
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

print(f"Total time taken: {time.time() - start_time:.2f} seconds")


Let's break down this example:

1. We define a `download_page` function that downloads a webpage and returns a string with information about the download.

2. We create a list of URLs to download.

3. We use a `ThreadPoolExecutor` with a maximum of 4 workers. This means up to 4 downloads can happen concurrently.

4. We submit all tasks to the executor using a dictionary comprehension. This creates a mapping of Future objects to their corresponding URLs.

5. We use `concurrent.futures.as_completed()` to iterate over the futures as they complete. This allows us to process results as soon as they're available, rather than waiting for all tasks to finish.

6. For each completed future, we retrieve its result (or catch any exceptions), and print the output.

7. Finally, we print the total time taken for all downloads.

This example demonstrates how ThreadPoolExecutor can significantly speed up I/O-bound operations by performing them concurrently.

### 3.2 ProcessPoolExecutor

While ThreadPoolExecutor is great for I/O-bound tasks, ProcessPoolExecutor is better suited for CPU-bound tasks. It uses separate processes instead of threads, which allows it to bypass the Global Interpreter Lock (GIL) and achieve true parallelism on multi-core systems.

Let's see an example where we use ProcessPoolExecutor to perform a CPU-intensive task:

In [None]:
import concurrent.futures
import math
import time

def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def count_primes(start, end):
    return sum(1 for n in range(start, end) if is_prime(n))

def main():
    start_time = time.time()
    
    # Define the range to search for primes
    start, end = 1, 10**7
    
    # Split the range into chunks for each process
    chunk_size = (end - start) // 4
    ranges = [(i, i + chunk_size) for i in range(start, end, chunk_size)]
    
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(lambda r: count_primes(*r), ranges)
    
    total_primes = sum(results)
    
    print(f"Found {total_primes} prime numbers in range {start} to {end}")
    print(f"Time taken: {time.time() - start_time:.2f} seconds")

if __name__ == "__main__":
    main()


Let's break down this example:

1. We define an `is_prime` function to check if a number is prime.

2. We define a `count_primes` function that counts the number of primes in a given range.

3. In the `main` function:
   - We set the range to search for primes (1 to 10^7).
   - We split this range into 4 chunks, one for each process.

4. We use a `ProcessPoolExecutor` to distribute the work across multiple processes.

5. We use `executor.map()` to apply our `count_primes` function to each range. This returns an iterator of results.

6. We sum up the results to get the total number of primes found.

7. Finally, we print the results and the time taken.

This example shows how ProcessPoolExecutor can be used to parallelize CPU-intensive tasks across multiple cores, potentially providing significant speedup on multi-core systems.

## 4. Understanding Futures: Internal Flow and Analogies

To understand how futures work internally, let's use an analogy of a coffee shop. In this analogy:
- The main program is the cashier
- The ThreadPoolExecutor is the team of baristas
- Each future is an order ticket
- The tasks are different coffee orders

Here's a diagram to visualize this:

```
                 +-------------+
                 |   Cashier   |
                 | (Main Thread) |
                 +-------------+
                        |
                        | Submit orders
                        v
            +------------------------+
            |    ThreadPoolExecutor  |
            | (Team of Baristas)     |
            +------------------------+
                 |       |       |
            +----+   +---+   +---+
            |        |       |
         Future1  Future2  Future3
         (Order1) (Order2) (Order3)
```

Let's implement this analogy in code:

In [None]:
import concurrent.futures
import time
import random

def make_coffee(order):
    # Simulate coffee making process
    print(f"Starting to make {order}")
    time.sleep(random.uniform(1, 3))  # Random time to make coffee
    return f"{order} is ready!"

def coffee_shop():
    orders = ['Espresso', 'Latte', 'Cappuccino', 'Americano']
    
    print("Coffee shop is open!")
    
    # Create a ThreadPoolExecutor with 2 workers (baristas)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as barista_team:
        # Submit all orders and create a map of futures to their orders
        future_to_order = {barista_team.submit(make_coffee, order): order for order in orders}
        
        print("All orders submitted to baristas. Cashier is free now.")
        
        # Simulate cashier doing other work
        time.sleep(1)
        print("Cashier is checking the status of orders...")
        
        # Check and collect finished orders
        for future in concurrent.futures.as_completed(future_to_order):
            order = future_to_order[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"{order} order failed: {e}")
            else:
                print(f"Cashier: {result}")
    
    print("Coffee shop is closed!")

coffee_shop()


Let's break down this code and explain how it relates to our coffee shop analogy:

1. `make_coffee(order)`: This function represents a barista making a coffee. It simulates the time taken to make a coffee using `time.sleep()`.

2. `coffee_shop()`: This is our main function, representing the entire coffee shop operation.

3. `orders = ['Espresso', 'Latte', 'Cappuccino', 'Americano']`: These are the coffee orders we need to process.

4. `with concurrent.futures.ThreadPoolExecutor(max_workers=2) as barista_team:`: This creates our team of baristas. We have 2 baristas who can work concurrently.

5. `future_to_order = {barista_team.submit(make_coffee, order): order for order in orders}`: This is where we submit all our orders to the barista team. Each submission returns a Future object, which is like an order ticket.

6. `for future in concurrent.futures.as_completed(future_to_order):`: This is how we check for completed orders. As each order is completed, we process it.

7. `result = future.result()`: This is where we 'collect' the finished coffee. If there was a problem making the coffee, an exception would be raised here.

The internal flow works like this:

1. The cashier (main thread) takes all orders and submits them to the barista team (ThreadPoolExecutor).
2. Each order becomes a Future object, which is like a ticket for that order.
3. The baristas (worker threads) start working on the orders concurrently.
4. The cashier can do other work while waiting for the orders to be completed.
5. As each order is completed, the cashier checks the ticket (Future) and announces that the coffee is ready.

## 5. Asynchronous Programming with Futures

Futures can be combined with asyncio to create powerful asynchronous programs. This is particularly useful for I/O-bound tasks that involve waiting for external resources.

Let's create an example that simulates fetching data from multiple APIs concurrently using asyncio and futures.

In [None]:
import asyncio
import aiohttp
import time

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_data(session, url))
            tasks.append(task)
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        'https://api.github.com/events',
        'https://api.github.com/emojis',
        'https://api.github.com/meta'
    ]
    
    start_time = time.time()
    results = await fetch_all(urls)
    end_time = time.time()
    
    for i, result in enumerate(results):
        print(f"API {i+1} returned {len(result)} items")
    
    print(f"Total time: {end_time - start_time:.2f} seconds")

asyncio.run(main())


Let's break down this code and explain each part:

1. `async def fetch_data(session, url):` 
   This asynchronous function fetches data from a given URL using an aiohttp session. It returns the JSON response.

2. `async def fetch_all(urls):`
   This function creates a single aiohttp session and then creates tasks for fetching data from all URLs concurrently.
   - `async with aiohttp.ClientSession() as session:` creates a session that will be used for all requests.
   - `task = asyncio.create_task(fetch_data(session, url))` creates a task (which is a wrapper around a coroutine) for each URL.
   - `results = await asyncio.gather(*tasks)` waits for all tasks to complete and collects their results.

3. `async def main():`
   This is our main function that orchestrates the entire process.
   - We define a list of URLs to fetch data from.
   - We call `fetch_all(urls)` to fetch data from all URLs concurrently.
   - We measure the total time taken and print the results.

4. `asyncio.run(main())`
   This runs our main coroutine, which is the entry point of our asynchronous program.

This example demonstrates how futures (in the form of Tasks in asyncio) can be used to perform multiple I/O operations concurrently, potentially saving a significant amount of time compared to sequential execution.

Now, let's extend this example to show how we can handle timeouts and cancellations with futures.

In [None]:
import asyncio
import aiohttp
import time

async def fetch_data(session, url, timeout):
    try:
        async with session.get(url, timeout=timeout) as response:
            return await response.json()
    except asyncio.TimeoutError:
        print(f"Timeout occurred for {url}")
        return None

async def fetch_all(urls, timeout):
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch_data(session, url, timeout)) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    urls = [
        'https://api.github.com/events',
        'https://api.github.com/emojis',
        'https://api.github.com/meta',
        'https://api.github.com/non_existent'  # This URL doesn't exist
    ]
    
    start_time = time.time()
    results = await fetch_all(urls, timeout=2)  # 2 second timeout
    end_time = time.time()
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"API {i+1} encountered an error: {result}")
        elif result is None:
            print(f"API {i+1} timed out")
        else:
            print(f"API {i+1} returned {len(result)} items")
    
    print(f"Total time: {end_time - start_time:.2f} seconds")

asyncio.run(main())


In this extended example:

1. We've added a `timeout` parameter to `fetch_data` and `fetch_all` functions.

2. In `fetch_data`, we now catch `asyncio.TimeoutError` and return `None` if a timeout occurs.

3. In `fetch_all`, we use `return_exceptions=True` in `asyncio.gather()`. This means that if any task raises an exception, it will be returned in the results instead of being raised immediately.

4. In `main`, we've added a non-existent URL to demonstrate error handling.

5. When processing results, we now check for exceptions and timeouts:
   - If the result is an instance of `Exception`, we print an error message.
   - If the result is `None`, we know it timed out.
   - Otherwise, we print the number of items returned.

This example shows how futures can be used to handle concurrent operations with timeouts and error handling, which is crucial in real-world applications where network operations can fail or take too long.

The use of futures (via asyncio tasks) allows us to:
- Perform multiple I/O operations concurrently
- Set timeouts for operations
- Handle errors gracefully
- Collect results as they become available

These capabilities make futures a powerful tool for building efficient and robust asynchronous applications.

## 6. Advanced Use Cases and Best Practices

### 6.1 Combining Futures with Context Managers

Futures can be combined with context managers to ensure proper resource management. This is particularly useful when working with resources that need to be explicitly closed or released.

In [None]:
import concurrent.futures
import contextlib
import time

class DatabaseConnection:
    def __init__(self, db_name):
        self.db_name = db_name
    
    def __enter__(self):
        print(f"Connecting to database {self.db_name}")
        time.sleep(1)  # Simulate connection time
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Closing connection to database {self.db_name}")
        time.sleep(0.5)  # Simulate disconnection time
    
    def query(self, sql):
        print(f"Executing query on {self.db_name}: {sql}")
        time.sleep(2)  # Simulate query execution time
        return f"Result from {self.db_name}"

@contextlib.contextmanager
def get_db_connection(db_name):
    with DatabaseConnection(db_name) as conn:
        yield conn

def run_query(db_name, sql):
    with get_db_connection(db_name) as conn:
        return conn.query(sql)

def main():
    databases = ['DB1', 'DB2', 'DB3']
    sql = "SELECT * FROM users"
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(run_query, db, sql) for db in databases]
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(f"Got result: {result}")

if __name__ == "__main__":
    main()


In this example:

1. We define a `DatabaseConnection` class that simulates a database connection. It uses the context manager protocol (`__enter__` and `__exit__` methods) to manage the connection lifecycle.

2. We create a `get_db_connection` context manager function using `@contextlib.contextmanager`. This allows us to use the `with` statement with our `DatabaseConnection`.

3. The `run_query` function uses the `get_db_connection` context manager to ensure that the database connection is properly closed after the query is executed.

4. In the `main` function, we use a `ThreadPoolExecutor` to run queries on multiple databases concurrently.

This pattern ensures that resources (in this case, database connections) are properly managed even when used with futures and concurrent execution.

### 6.2 Chaining Futures

Sometimes, you may want to perform a series of asynchronous operations where each operation depends on the result of the previous one. Futures can be chained to achieve this.

In [None]:
import concurrent.futures
import time

def fetch_data(data):
    print(f"Fetching data: {data}")
    time.sleep(2)  # Simulate API call
    return f"Fetched: {data}"

def process_data(data):
    print(f"Processing data: {data}")
    time.sleep(1)  # Simulate processing
    return f"Processed: {data}"

def save_data(data):
    print(f"Saving data: {data}")
    time.sleep(1)  # Simulate saving to database
    return f"Saved: {data}"

def chain_operations(executor, initial_data):
    fetch_future = executor.submit(fetch_data, initial_data)
    
    process_future = executor.submit(process_data, fetch_future.result())
    
    save_future = executor.submit(save_data, process_future.result())
    
    return save_future

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        future1 = chain_operations(executor, "Data1")
        future2 = chain_operations(executor, "Data2")
        
        for future in concurrent.futures.as_completed([future1, future2]):
            result = future.result()
            print(f"Final result: {result}")

if __name__ == "__main__":
    main()


In this example:

1. We define three functions that simulate different stages of data processing: `fetch_data`, `process_data`, and `save_data`.

2. The `chain_operations` function demonstrates how to chain futures:
   - It submits the `fetch_data` task and gets a future.
   - It then submits the `process_data` task, using `fetch_future.result()` to wait for and get the result of the fetch operation.
   - Finally, it submits the `save_data` task, using `process_future.result()` to wait for and get the result of the process operation.

3. In the `main` function, we start two chains of operations concurrently and wait for both to complete.

This pattern allows you to create pipelines of asynchronous operations while still leveraging the concurrency provided by futures.

### Best Practices

1. **Use context managers**: Always use futures and executors with context managers (`with` statements) to ensure proper cleanup.

2. **Set appropriate timeouts**: When waiting for futures to complete, consider setting timeouts to avoid indefinite waits.

3. **Handle exceptions**: Always handle potential exceptions when calling `future.result()` to prevent unhandled exceptions from crashing your program.

4. **Choose the right executor**: Use `ThreadPoolExecutor` for I/O-bound tasks and `ProcessPoolExecutor` for CPU-bound tasks.

5. **Limit the number of workers**: Set an appropriate `max_workers` value to avoid overwhelming system resources.

6. **Use `as_completed` for dynamic results**: When you want to process results as soon as they're available, use `concurrent.futures.as_completed()`.

7. **Consider using `asyncio` for complex asynchronous logic**: For more complex asynchronous patterns, consider using `asyncio` in combination with futures.

By following these practices and understanding these patterns, you can effectively use futures to create efficient and robust concurrent programs in Python.

## 7. Advanced Error Handling and Cancellation

### 7.1 Detailed Error Handling

When working with futures, it's crucial to handle errors properly. Let's look at a more comprehensive example of error handling:

In [None]:
import concurrent.futures
import random
import time

class DataFetchError(Exception):
    pass

class DataProcessError(Exception):
    pass

def fetch_data(id):
    print(f"Fetching data for id {id}")
    time.sleep(random.uniform(0.5, 2))  # Simulate network delay
    if random.random() < 0.2:  # 20% chance of failure
        raise DataFetchError(f"Failed to fetch data for id {id}")
    return f"Data for id {id}"

def process_data(data):
    print(f"Processing {data}")
    time.sleep(random.uniform(0.5, 1.5))  # Simulate processing time
    if random.random() < 0.1:  # 10% chance of failure
        raise DataProcessError(f"Failed to process {data}")
    return f"Processed {data}"

def fetch_and_process(id):
    try:
        data = fetch_data(id)
        return process_data(data)
    except DataFetchError as e:
        print(f"Fetch error: {e}")
        return None
    except DataProcessError as e:
        print(f"Process error: {e}")
        return None

def main():
    ids = range(1, 11)  # Process 10 items
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        future_to_id = {executor.submit(fetch_and_process, id): id for id in ids}
        
        for future in concurrent.futures.as_completed(future_to_id):
            id = future_to_id[future]
            try:
                result = future.result()
                if result:
                    print(f"Successfully processed id {id}: {result}")
                else:
                    print(f"Failed to process id {id}")
            except Exception as e:
                print(f"Unexpected error for id {id}: {e}")

if __name__ == "__main__":
    main()


In this example:

1. We define custom exceptions `DataFetchError` and `DataProcessError` for specific error cases.

2. The `fetch_data` and `process_data` functions simulate network and processing operations that can fail.

3. `fetch_and_process` combines these operations and handles their specific exceptions.

4. In the `main` function, we use a ThreadPoolExecutor to process multiple items concurrently.

5. We use `as_completed` to handle results as they become available, and we catch any unexpected exceptions that might occur when calling `future.result()`.

This approach allows us to handle different types of errors at different levels, providing more granular control over error handling and reporting.

### 7.2 Cancellation and Timeouts

Futures also support cancellation and timeouts, which are crucial for managing long-running or potentially hanging tasks. Let's look at an example:

In [None]:
import concurrent.futures
import time

def long_running_task(n):
    print(f"Starting long running task {n}")
    try:
        time.sleep(10)  # Simulate a long-running operation
        return f"Task {n} completed"
    except concurrent.futures.CancelledError:
        print(f"Task {n} was cancelled")
        raise

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(long_running_task, i) for i in range(5)]
        
        # Wait for the first task to complete or 3 seconds to pass
        try:
            done, not_done = concurrent.futures.wait(futures, timeout=3, return_when=concurrent.futures.FIRST_COMPLETED)
            
            for future in done:
                print(future.result())
            
            # Cancel the tasks that didn't complete
            for future in not_done:
                future.cancel()
            
            # Wait for the cancelled tasks to finish
            concurrent.futures.wait(not_done, return_when=concurrent.futures.ALL_COMPLETED)
        
        except concurrent.futures.TimeoutError:
            print("Timeout occurred")

if __name__ == "__main__":
    main()


In this example:

1. We define a `long_running_task` that simulates a task taking 10 seconds to complete. It's designed to catch `CancelledError` to demonstrate proper cancellation handling.

2. In `main`, we submit 5 tasks to a ThreadPoolExecutor.

3. We use `concurrent.futures.wait` with a timeout of 3 seconds and `return_when=FIRST_COMPLETED`. This means it will return as soon as either a task completes or 3 seconds pass.

4. After the wait, we process any completed tasks and cancel the rest.

5. We then wait for the cancelled tasks to finish (which should be quick as they're cancelled).

6. We also catch `TimeoutError` in case the initial wait times out before any task completes.

This pattern is useful for scenarios where you want to limit the total time spent on a set of tasks, or when you want to cancel remaining tasks once you have a sufficient number of results.

## 8. Combining Futures with Other Python Features

### 8.1 Futures with Generators

Futures can be combined with generators to create powerful data processing pipelines. Here's an example:

In [None]:
import concurrent.futures
import random

def data_generator(n):
    for i in range(n):
        yield f"Data chunk {i}"

def process_chunk(chunk):
    # Simulate processing
    return f"Processed {chunk} (result: {random.randint(1, 100)})"

def process_data_pipeline(data, max_workers=3):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit the first batch of tasks
        futures = {executor.submit(process_chunk, next(data)) for _ in range(max_workers)}
        
        while futures:
            # Wait for the next completed future
            done, futures = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
            
            for future in done:
                yield future.result()
            
            # Submit new tasks to replace the completed ones
            for _ in done:
                try:
                    futures.add(executor.submit(process_chunk, next(data)))
                except StopIteration:
                    # No more data to process
                    pass

def main():
    data = data_generator(20)  # Generate 20 chunks of data
    for result in process_data_pipeline(data):
        print(result)

if __name__ == "__main__":
    main()


In this example:

1. We define a `data_generator` that yields chunks of data.

2. The `process_chunk` function simulates processing a single chunk of data.

3. `process_data_pipeline` is the core of our pipeline. It:
   - Maintains a pool of futures, keeping `max_workers` tasks running at all times.
   - Yields results as they become available.
   - Submits new tasks to replace completed ones until all data is processed.

4. The `main` function demonstrates how to use this pipeline, printing results as they're produced.

This pattern allows for efficient processing of streaming data, maintaining a constant level of concurrency while yielding results as soon as they're available.

These examples demonstrate more advanced uses of futures, including detailed error handling, cancellation and timeout management, and combining futures with other Python features like generators. They showcase the flexibility and power of futures in creating efficient and robust concurrent programs.