Some of the stuff in here is heavily inspired by [this tutorial](https://sebastianraschka.com/Articles/2014_multiprocessing.html). Cheers!

In [13]:
import numpy as np
import multiprocessing as mp

Let's play with a very minimal example:

In [14]:
a = np.ones([5,6], dtype=int)
a

array([[1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1],
       [1, 1, 1, 1, 1, 1]])

Here's our function from the [parallel processes notebook](Parallel%20processes.ipynb). However, here, we have added a second argument called *shift*, which allows us to "shift" a cell back to its correct column after splitting up an array for parallel processing. The *expansion* factor, allowing us to control how the cells are randomly distributed within the cell (or beyond), is also a parameter of the function now:

⚡️**TODO**: remember to re-activate the randomization part!

TODO: Also try out the np.empty/np.append approach from toPointsParallel, instead of using the insertrow (benchmark!)

In [15]:
def toPoints(raster, shift=0):
    numpoints = np.sum(raster)
    # make an empty 2D "target array"
    points = np.zeros((numpoints,2), dtype=int)

    # keep track of where in the target array we have to insert
    insertrow = 0

    # iterate over the input raster
    it = np.nditer(raster, flags=['multi_index'])
    while not it.finished:
        entry = np.array([np.array(it.multi_index)])
        entry = entry + np.array([shift,0])  # apply the shift
        block = np.repeat(entry, it[0], axis=0)
        points[insertrow:insertrow+block.shape[0]] = block
        insertrow = insertrow + block.shape[0]
        it.iternext()

    return points

Let's define a function that splits up the input array based on the number of CPUs, runs the toPoints function in parallel on each part individually, and puts the processed parts back together:

In [16]:
def toPointsParallel(raster):
    # how many CPUs do we have?
    cpus = mp.cpu_count()
    cpus = 2
    
    # How many columns does the raster have?
    numcols = raster.shape[0]
    
    # If we have more CPUs than the raster is wide, just treat each column of the 
    # raster separately (the remaining CPUs will not be used then)
    if cpus > numcols:
        cpus = numcols
        
    numpoints = np.sum(raster)
    # make an empty 2D "target array"
    points = np.empty((0,2), dtype=int)
    
    # determine how to split up the array
    splitpoints = np.linspace(start = 1, stop = numcols, num = cpus-1, endpoint = False).astype(int)
   
    # we need to add a 0 add the bedinning of the list of shift values, 
    # because the first array won't get shifted
    shifters = np.append(np.array([0]),splitpoints)
    
    # Get ready for multiprocessing...
    pool = mp.Pool(processes=cpus)
    results = [pool.apply(toPoints, args=(subarray, shift)) for shift, subarray in zip(shifters, np.array_split(raster,splitpoints))]
    
    for r in results:
        points = np.append(points, r, axis=0)
        
    pool.close()
            
    return points

The same function, but just looping through the parts sequentially (effectively using just one CPU):

In [17]:
def toPointsSequential(raster):
    # how many CPUs do we have?
    cpus = mp.cpu_count()
    
    # How many columns does the raster have?
    numcols = raster.shape[0]
    
    # If we have more CPUs than the raster is wide, just treat each column of the 
    # raster separately (the remaining CPUs will not be used then)
    if cpus > numcols:
        cpus = numcols
        
    numpoints = np.sum(raster)
    # make an empty 2D "target array"
    points = np.empty((0,2), dtype=int)
    
    # determine how to split up the array
    splitpoints = np.linspace(start = 1, stop = numcols, num = cpus-1, endpoint = False).astype(int)
   
    # we need to add a 0 add the bedinning of the list of shift values, 
    # because the first array won't get shifted
    shifters = np.append(np.array([0]),splitpoints)
    
    for shift, subarray in zip(shifters, np.array_split(raster,splitpoints)):
        points = np.append(points, toPoints(subarray, shift), axis=0)
    
    return points

The output for a given array should be the same, whether we are using the simple "toPoints" function, the sequential one, or the parallel one:

In [22]:
# create a 40x50 array of random values between 0 and 500:
test = np.random.rand(40,50)
test = (test * 500).astype(int)
print(test)

print(np.array_equal(toPoints(test), toPointsParallel(test)))
print(np.array_equal(toPoints(test), toPointsSequential(test)))
print(np.array_equal(toPointsParallel(test), toPointsSequential(test)))

[[ 98 201  39 ... 263  44 346]
 [246  23 414 ... 469 399  21]
 [102 478 326 ...  25  29 241]
 ...
 [322 363 175 ... 497  84 365]
 [373 340 453 ...   2 349 280]
 [222 496  24 ... 263 423 280]]
True
True
True


Looks good! Let's do some benchmarking:

In [23]:
array_sizes = [10, 100, 500, 750, 1000]
block_time = []
seq_time = []
parallel_time = []

for i in array_sizes:
    print(i)
    # make a random array for the given size (e.g. 10x10)
    test = np.random.rand(i,i)
    test = (test * 500).astype(int)
    print("block...")    
    t = %timeit -o toPoints(test)
    block_time.append(t)
    print("sequential...")    
    t = %timeit -o toPointsSequential(test)
    seq_time.append(t)
    print("parallel...")    
    t = %timeit -o toPointsParallel(test)
    parallel_time.append(t)

10
block...
656 µs ± 1.89 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
sequential...
913 µs ± 4.02 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
parallel...
13.9 ms ± 22.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
100
block...
82 ms ± 2.79 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
sequential...
150 ms ± 896 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
parallel...
300 ms ± 18.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
500
block...
2.31 s ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
sequential...
4.26 s ± 31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
parallel...
8.19 s ± 567 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
750
block...
5.29 s ± 52.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
sequential...
9.35 s ± 147 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
parallel...


MaybeEncodingError: Error sending result: 'array([[  1,   0],
       [  1,   0],
       [  1,   0],
       ...,
       [749, 749],
       [749, 749],
       [749, 749]])'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)'

In [24]:
block_time

[<TimeitResult : 656 µs ± 1.89 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)>,
 <TimeitResult : 82 ms ± 2.79 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)>,
 <TimeitResult : 2.31 s ± 3.44 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)>,
 <TimeitResult : 5.29 s ± 52.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)>]

Randomization function:

In [None]:
def randomize(points, expansion = 1.0):
    seed = np.random.rand(numpoints,2)
    seed = seed * expansion
    return points + seed