# IPC and Locking

François-David Collin (CNRS, IMAG, Paul-Valéry Montpellier 3
University)  
Wednesday, August 27, 2025

# Streaming (pipelining) data

Sticking to our favorite hobby, which is finding prime numbers, this
time, we’ll use a different strategy.

Instead of partitioning the data from scratch, we will continuously
*feed* workers from our multiprocessing pool with small size chunk of
numbers and the workers send us back the the primes they found on those
chunks.

We need two queues : one for the chunks of numbers that’s the *INPUT*

Another one for the results sent that’s the *OUTPUT*

Let’s revive our old `check_prime` function back from the dead…

In [3]:
import math

def check_prime(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

Let’s get back the chunk generator, too.

In [4]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

Now we want to use a *producer/consumer* model, where each process got :

-   an *input* queue for receiving chunks of numbers
-   an *output* queue for sending back a list of all primes found in the
    *input* queue.

Both *input* and *output* elements are *lists* (or even python iterables
for the input). We’ll use `None` as a terminating element in the queue.

## Queue Worker Function

Create a *worker* function, which takes as the *input* and *output*
queues as argument.

For each element in the *input* queue, which is a list of numbers, get
the primes (as a list). Put the list of found primes to the *output*
queue.

``` python
def find_prime_worker(input, output):
    for chunk in iter(input.get,None):
        primes_found = ...
        output.put(primes_found)
```

## Test the worker function

1.  Manually allocate the *input* and *output* queues (we use managed
    queues)
2.  Put some chunks of numbers in the *input* queue (don’t forget to
    terminate the queue with `None`)
3.  Launch the worker function on the queues and terminate the output
    queue with `None`.
4.  Collect the results in a unified list.

## Some Tools

### Iterate on a queue

To make a queue terminated by `None` iterable use the
[`iter`](https://docs.python.org/3/library/functions.html#iter) function
:

``` python
iter(queue.get,None)
```

### Collect a list of list

To collect a list of list use the
[`chain`](https://docs.python.org/3/library/itertools.html#itertools.chain)
function from `itertools` :

``` python
chain(*list_of_list)
```

### Reminder

Iterables are lazy in python, to actually make a list you have to force
a `list()` of them.

``` python
list(iterables)
```

### Worker function

In [5]:
import time

def find_prime_worker(input, output):
    t0 = time.time()
    for chunk in iter(input.get,None):
        primes_found = list(filter(check_prime,chunk))
        output.put(primes_found)
    # print the time taken by the worker in seconds and two decimal places
    print(f"Worker done in {time.time()-t0:.2f}s")
    return

### 1. Allocations

In [6]:
from multiprocessing import Manager

manager = Manager()
input = manager.Queue()
output = manager.Queue()

### 2. Some chunk in the input

In [7]:
input.put(range(2,100))
input.put(range(1000,2000))
input.put(None)

### 3. Launch the worker and terminate the output

In [8]:
find_prime_worker(input,output)
output.put(None)

Worker done in 0.00s

### 4. Collect the results

In [9]:
from itertools import chain

list(chain(*list(iter(output.get,None))))

[3,
 5,
 7,
 11,
 13,
 17,
 19,
 23,
 29,
 31,
 37,
 41,
 43,
 47,
 53,
 59,
 61,
 67,
 71,
 73,
 79,
 83,
 89,
 97,
 1009,
 1013,
 1019,
 1021,
 1031,
 1033,
 1039,
 1049,
 1051,
 1061,
 1063,
 1069,
 1087,
 1091,
 1093,
 1097,
 1103,
 1109,
 1117,
 1123,
 1129,
 1151,
 1153,
 1163,
 1171,
 1181,
 1187,
 1193,
 1201,
 1213,
 1217,
 1223,
 1229,
 1231,
 1237,
 1249,
 1259,
 1277,
 1279,
 1283,
 1289,
 1291,
 1297,
 1301,
 1303,
 1307,
 1319,
 1321,
 1327,
 1361,
 1367,
 1373,
 1381,
 1399,
 1409,
 1423,
 1427,
 1429,
 1433,
 1439,
 1447,
 1451,
 1453,
 1459,
 1471,
 1481,
 1483,
 1487,
 1489,
 1493,
 1499,
 1511,
 1523,
 1531,
 1543,
 1549,
 1553,
 1559,
 1567,
 1571,
 1579,
 1583,
 1597,
 1601,
 1607,
 1609,
 1613,
 1619,
 1621,
 1627,
 1637,
 1657,
 1663,
 1667,
 1669,
 1693,
 1697,
 1699,
 1709,
 1721,
 1723,
 1733,
 1741,
 1747,
 1753,
 1759,
 1777,
 1783,
 1787,
 1789,
 1801,
 1811,
 1823,
 1831,
 1847,
 1861,
 1867,
 1871,
 1873,
 1877,
 1879,
 1889,
 1901,
 1907,
 1913,
 1931,
 

# Putting the workers to… work.

make a function which allocates the queues, and use a `Pool(ncores)` of
worker.

``` python
def calculate_primes(ncores,N,chunksize):
    ...
```

-   `ncores` is the number of workers (and will be aligned with the
    number of cores you got, 8 for example)
-   `N` is the upper limit of the primes we want to find
-   `chunksize` is the size of the chunks we’ll send to process to
    workers.

## The main process

1.  First we’ll use a
    [`starmap_async`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap_async)
    for our main dispatcher function (don’t use the `chunksize` optional
    argument of the function)
2.  Feed the input queue with all chunks from the
    `chunks(range(1,N),chunksize)`
3.  Terminate the *input* queue (`ncores * None`, one for each worker)
4.  Wait for the workers to finish
5.  Collect and return the results

Test and benchmark it on a `int(N/64)` chunk size

``` python
N = 5000000
```

## Solution for main process function

In [10]:
from multiprocessing import Pool,Manager
from itertools import chain

def calculate_primes(ncores,N):
    with Manager() as manager:
        input = manager.Queue()
        output = manager.Queue()

        with Pool(ncores) as p:
            it = p.starmap_async(find_prime_worker,[(input,output)]*ncores)
            for r in chunks(range(1,N),int(N/ncores)):
                input.put(r)
            for i in range(ncores): input.put(None)
            it.wait()
            output.put(None)

        res = list(chain(*list(iter(output.get,None))))
    return res

### Test of the main function

In [11]:
N = 10000000

In [12]:
%timeit -r 1 -n 1 calculate_primes(8,N)

Worker done in 1.58s
Worker done in 2.02s
Worker done in 2.25s
Worker done in 0.92s
Worker done in 2.56s
Worker done in 2.72s
Worker done in 2.90s
3.16 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

## Autosizing chunks ! (optional)

We know that greater the number is, longer it is to check if it is a
prime.

A slight optimization to our multi-processing/queued algorithm is to
make the chunks smaller and smaller with greater numbers chunks.

1.  Try to modify the `chunks` function to take this into account, test
    the function.
2.  Modify the `calculate_primes_chunks` to use this function
3.  Test and benchmark it.

### autosizing chunks example function

<span class="proof-title">*Solution*. </span>If the time to check if a
number is prime at most proportional to the square root of the number,
we can make the hypothesis that the mean real time for the check is a
“lower” power law than the square root, something like $O(N^{p)$ where
$p < \frac{1}{2}$. So the time of checking all numbers to `N` is
proportional to the integral function of this power root which is (up to
a constant) $N^{1+p}$. We can infer a method to balance the chunks size
with the number of workers.

In [13]:
def chunks_rsquared(lst, n):
    """Yield successive n-sized chunks with n recursively root-squared."""
    i = 0
    rsq = 1 + 0.3 # empirical value for the power law
    while (i < len(lst)):
        yield lst[i:i + n]
        im = i
        i = i + n
        ip = (2 * (i ** rsq) - (im ** rsq)) ** (1/rsq)
        n = max(1,int((ip-i)))

In [14]:
#! tags: [solution]
def find_start_chunk(lst,n):
    for i in range(2,n+1):
        res = list(chunks_rsquared(lst,int(len(lst)/i)))
        if len(res) >= n:
            return res

### Test it

In [15]:
list(chunks_rsquared(range(1,100),50))

[range(1, 51), range(51, 86), range(86, 100)]

In [16]:
find_start_chunk(range(1,100),10)

[range(1, 20),
 range(20, 33),
 range(33, 44),
 range(44, 54),
 range(54, 63),
 range(63, 71),
 range(71, 78),
 range(78, 84),
 range(84, 89),
 range(89, 93),
 range(93, 96),
 range(96, 98),
 range(98, 99),
 range(99, 100)]

### Modify the main worker process function

In [17]:
def calculate_primes_chunks_rsquared(ncores,N):
    with Manager() as manager:
        input = manager.Queue()
        output = manager.Queue()

        with Pool(ncores) as p:
            it = p.starmap_async(find_prime_worker,[(input,output)]*ncores)
            for r in find_start_chunk(range(1,N),ncores):
                input.put(r)
            for i in range(ncores): input.put(None)
            it.wait()
            output.put(None)

        res = list(chain(*list(iter(output.get,None))))
    return res

### Test and benchmark it

In [18]:
%timeit -r 1 -n 1 calculate_primes_chunks_rsquared(8,N)

Worker done in 2.10s
Worker done in 2.11s
Worker done in 2.21s
Worker done in 2.26s
Worker done in 2.30s
Worker done in 2.34s
Worker done in 2.40s
2.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

This gives a better balance between the workers and the chunks size.