In [1]:
import sys

In [2]:
sys.version

'3.13.5 (main, Jun 11 2025, 15:36:57) [Clang 17.0.0 (clang-1700.0.13.3)]'

In [3]:
sys._is_gil_enabled()

True

In [19]:
import concurrent.futures
import multiprocessing
from functools import partial, wraps
from random import gauss
from time import sleep, time

## Set up some utilities for timing function calls and submitting them to a thread or process pool

In [5]:
def timed(f):
    """
    Decorator to measure the execution time of a function.

    Parameters
    ----------
    f : callable
        The function whose execution time is to be measured.

    Returns
    -------
    callable
        A wrapped function that returns a tuple (result, elapsed_time), where
        `result` is the output of the original function and `elapsed_time` is
        the time taken in seconds.
    """
    @wraps(f)
    def f_timed(*args, **kwargs):
        t0 = time()
        result = f(*args, **kwargs)
        t = time() - t0
        return result, t

    return f_timed

In [6]:
@timed
def submit_get_results(timed_fun, n, pool_type, as_completed=False, max_workers=None, verbose=False):
    """
    Run a function across a pool of workers using submit, collecting results and execution times.

    Parameters
    ----------
    timed_fun : callable
        A function that accepts an integer and returns a tuple (result, exec_time).
    n : int
        Number of tasks to submit to the pool.
    pool_type : type
        The executor class to use (e.g., ThreadPoolExecutor or ProcessPoolExecutor).
    as_completed : bool, optional
        If True, results are collected as tasks complete. If False, results are collected in submission order.
        Default is False.
    max_workers : int or None, optional
        The maximum number of workers for the pool. If None, the default for the executor is used.
    verbose : bool, optional
        If True, enables verbose output for the timed function. Default is False.

    Returns
    -------
    results : list
        List of results returned by `timed_fun`.
    times : list of float
        List of execution times for each task.
    """
    with pool_type(max_workers) as pool:
        tasks = []
        pfun = partial(timed_fun, verbose=verbose)
        for i in range(n):
            tasks.append(pool.submit(pfun, i))
        if as_completed:
            results_times = [task.result() for task in concurrent.futures.as_completed(tasks)]
        else:
            results_times = [task.result() for task in tasks]
        results, times = zip(*results_times)
        return results, times

In [7]:
@timed
def map_results(timed_fun, n, pool_type, max_workers=None, verbose=False):
    """Map a simple timed function across a range of argument values with a process Pool.

    Parameters
    ----------
    timed_fun: callable
        A function that accepts a single value argument, and returns a (result, seconds) tuple.
    n: int
        The range of values (from 0 to n - 1) to submit to `timed_fun`.
    pool_type: type
        The Pool class (e.g., multiprocessing.Pool).
    max_workers: int, Optional
        Maximum number of workers in the pool. Default: None.
    verbose: bool, Optional
        Verbose message printing. Default: False.

    Returns
    -------
    tuple[list, list]
        The results and run time for each task.
    """
    with pool_type(max_workers) as pool:
        pfun = partial(timed_fun, verbose=verbose)
        results_times = pool.map(pfun, range(n))
        results, times = zip(*results_times)
        return results, times

In [8]:
def display_results(timed_fun, n, pool_type, submit_type, as_completed=False, max_workers=None, verbose=False):
    """Display the results and run times of running tasks across a pool.

    Parameters
    ----------
    timed_fun: callable
        A function that accepts a single value argument, and returns a (result, seconds) tuple.
    n: int
        The range of values (from 0 to n - 1) to submit to `timed_fun`.
    pool_type: type
        The Pool class (e.g., multiprocessing.Pool).
    submit_type: str
        'submit' or 'map'.
    as_completed: bool, Optional
        Return results as they are completed, rather than in the original order of submission. Default: False.
        (Ignored when submit_type == 'map')
    max_workers: int, Optional
        Maximum number of workers in the pool. Default: None.
    verbose: bool, Optional
        Verbose message printing. Default: False.
    """
    
    print(f"Running: {timed_fun.__name__}")
    if submit_type == 'submit':
        (results, times), total_time = submit_get_results(timed_fun, n, pool_type, as_completed=as_completed,
                                                          max_workers=max_workers, verbose=verbose)
    elif submit_type == 'map':
        (results, times), total_time = map_results(timed_fun, n, pool_type, max_workers=max_workers, verbose=verbose)
    else:
        raise ValueError(f"Unrecognized value for submit_type: {submit_type}")
    print(f"Results: {results}")
    print(f"Task times: {' + '.join(f'{t:.3g}' for t in times)} = {sum(times):.3g}")
    print(f"Actual time: {total_time}")

## Create some simple timed functions that have delays or take a significant amount of compute time

In [9]:
@timed
def delayed_return(value, seconds=None, seconds_mean=1, seconds_sigma=0.25, verbose=False):
    """
    Sleep for a random or specified number of seconds, then return the given value.

    Parameters
    ----------
    value : any
        The value to return after sleeping.
    seconds : float or None, optional
        The number of seconds to sleep. If None, a random value is drawn from a normal
        distribution with mean `seconds_mean` and standard deviation `seconds_sigma`.
        Default is None.
    seconds_mean : float, optional
        Mean of the normal distribution for sleep time if `seconds` is None. Default is 1.
    seconds_sigma : float, optional
        Standard deviation of the normal distribution for sleep time if `seconds` is None. Default is 0.25.
    verbose : bool, optional
        If True, prints status messages. Default is False.

    Returns
    -------
    value : any
        The input value, returned after the delay.
    """
    def vprint(*args, **kwargs):
        if verbose:
            print(*args, **kwargs)

    vprint(f"Starting with value: {value}")
    if seconds is None:
        seconds = gauss(seconds_mean, seconds_sigma)
    if seconds > 0:
        sleep(seconds)
    vprint(f"Returning: {value}")
    return value

In [10]:
@timed
def long_factorize(offset, base=100000001, verbose=False):
    """
    Find the greatest non-trivial factor pair of value = base + offset, eventually returning (1, value) if value is a
     prime number. This is done inefficiently factorization, in order to take up CPU time.

    Parameters
    ----------
    offset : int
        The offset to add to the base value to form the target integer.
    base : int, optional
        The base value to which the offset is added. Default is 100000001.
    verbose : bool, optional
        If True, prints status messages. Default is False.

    Returns
    -------
    value : int
        The integer being factorized (base + offset).
    f1 : int
        The first factor found (largest factor less than value).
    f2 : int
        The second factor found (value divided by f1).
    """
    def vprint(*args, **kwargs):
        if verbose:
            print(*args, **kwargs)

    value = base + offset
    vprint(f"Finding factors of {base} + {offset} = {value}")
    next_guess = int(value / 2)
    while next_guess > 1 and (value % next_guess):
        next_guess -= 1
    f1 = next_guess
    f2 = value // next_guess
    vprint(f"{f1 * f2} = {f1} * {f2}")
    return value, f1, f2

In [11]:
%%time
long_factorize(0, verbose=True)

Finding factors of 100000001 + 0 = 100000001
100000001 = 5882353 * 17
CPU times: user 1.73 s, sys: 6.64 ms, total: 1.74 s
Wall time: 1.74 s


((100000001, 5882353, 17), 1.736630916595459)

## Run a function that has a delay, but does not utilize CPU time. This is similar to waiting for a server response.

First, using only a single worker, run the task several times with variable delays.

In [12]:
display_results(delayed_return, 10, concurrent.futures.ThreadPoolExecutor, 'submit', max_workers=1)

Running: delayed_return
Results: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
Task times: 0.812 + 0.527 + 1.09 + 0.955 + 0.763 + 1.38 + 1.11 + 1.16 + 0.878 + 0.835 = 9.52
Actual time: 9.523026943206787


The time to complete all tasks equals the sum of the times to complete each individual task.

Next, run the delayed tasks across a thread pool with multiple workers.

In [13]:
display_results(delayed_return, 10, concurrent.futures.ThreadPoolExecutor, 'submit')

Running: delayed_return
Results: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
Task times: 0.87 + 0.789 + 1.22 + 1.21 + 1.19 + 1.45 + 0.948 + 0.47 + 0.604 + 1.07 = 9.82
Actual time: 1.452613115310669


The total time is only a bit longer than the time of the longest task, because the independent threads were waiting for the task responses in parallel, with minimal processing once the responses were received.

If we return results as completed, the ordering of the results is not preserved.

In [14]:
display_results(delayed_return, 10, concurrent.futures.ThreadPoolExecutor, 'submit', as_completed=True)

Running: delayed_return
Results: (3, 7, 5, 4, 8, 9, 1, 0, 6, 2)
Task times: 0.444 + 0.7 + 0.783 + 0.949 + 0.95 + 1.06 + 1.1 + 1.13 + 1.17 + 1.28 = 9.56
Actual time: 1.286355972290039


### Thread pools share a single CPU

Next, submit tasks which take up CPU time to a thread pool.

In [15]:
display_results(long_factorize, 10, concurrent.futures.ThreadPoolExecutor, 'submit', as_completed=True)

Running: long_factorize
Results: ((100000008, 50000004, 2), (100000002, 50000001, 2), (100000010, 50000005, 2), (100000004, 50000002, 2), (100000006, 50000003, 2), (100000005, 33333335, 3), (100000001, 5882353, 17), (100000003, 155521, 643), (100000009, 671141, 149), (100000007, 1, 100000007))
Task times: 9.3e-06 + 5.25e-06 + 5.25e-06 + 3.1e-06 + 4.05e-06 + 3.18 + 7.49 + 8.01 + 7.98 + 8.06 = 34.7
Actual time: 8.206502199172974


The ThreadPoolExecutor does not run across multiple independent processes, but instead share a single CPU. Some tasks therefore stall while waiting for CPU time. The sum of execution times is therefore double-counting a lot of clock time.

In [16]:
display_results(long_factorize, 10, concurrent.futures.ThreadPoolExecutor, 'submit', max_workers=1, as_completed=True)

Running: long_factorize
Results: ((100000001, 5882353, 17), (100000002, 50000001, 2), (100000003, 155521, 643), (100000004, 50000002, 2), (100000005, 33333335, 3), (100000006, 50000003, 2), (100000007, 1, 100000007), (100000008, 50000004, 2), (100000009, 671141, 149), (100000010, 50000005, 2))
Task times: 1.8 + 7.15e-06 + 2.05 + 3.81e-06 + 0.655 + 3.1e-06 + 1.98 + 4.05e-06 + 1.99 + 2.86e-06 = 8.48
Actual time: 8.4774808883667


### Try using a process Pool

In [17]:
# This will fail because the ProcessPool is attempting to pickle a local function when spawning workers.
display_results(long_factorize, 1, multiprocessing.Pool, 'map')

Running: long_factorize


Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/queues.py", line 387, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute 'long_factorize' on <module '__main__' (<class '_frozen_importlib.BuiltinImpo

KeyboardInterrupt: 

The `multiprocess` library fixes the problem. A second workaround will be shown in `demo_1b.ipynb`

In [18]:
import multiprocess

In [21]:
display_results(long_factorize, 10, multiprocess.Pool, 'map')

Running: long_factorize
Results: ((100000001, 5882353, 17), (100000002, 50000001, 2), (100000003, 155521, 643), (100000004, 50000002, 2), (100000005, 33333335, 3), (100000006, 50000003, 2), (100000007, 1, 100000007), (100000008, 50000004, 2), (100000009, 671141, 149), (100000010, 50000005, 2))
Task times: 2.03 + 8.34e-06 + 2.29 + 6.91e-06 + 0.764 + 8.11e-06 + 2.29 + 1.19e-05 + 2.26 + 8.11e-06 = 9.64
Actual time: 2.3283560276031494
