# Parallel Python

### Map-Reduce

Remember that PCA works as follows:

- Center the data
- Compute the covariance matrix
- Compute the eigendecomposition of the covariance matrix

We can use map-reduce operations to write PCA. First, let's write PCA leveraging broadcasting in NumPy

In [1]:
import numpy as np
from functools import reduce

# Define lambda functions 
add = lambda x, y: x + y
subtract = lambda x, y: x - y
mean = lambda x: reduce(add, x) / len(x)

def outer(x):
    """Computes the outer product of a vector with itself."""
    xxx = np.repeat(x, len(x))
    yyy = np.array(list(x) * len(x))
    return np.reshape(list(map(lambda x, y: x*y, xxx, yyy)), (len(x), len(x)))

def cov(X):
    """Computes X^t X by summing up the outer products of the rows of X"""
    return np.array(reduce(lambda x, y: x + y, list(map(outer, X - mean(X)))))

def cum_sum(vector):
    """Computes the cumulative sum of a vector"""
    indeces = np.arange(1, len(vector)+1)
    return list(map(lambda i: reduce(add, vector[:i], 0), indeces))


In [2]:
# Generate Data
np.random.seed(123)
X = np.random.normal(size=(5, 3))

In [3]:
# Compute eigendecomposition of the covariance matrix
values, vectors = np.linalg.eigh(cov(X))
# Eigenvalues are returned in ascending order! 
values, vectors = values[::-1], vectors[:, ::-1]
values_matrix = np.diag(values)

In [4]:
# Compute total variation
total_variation = reduce(add, values)
# Compute the proportion of variability
prop_variability = list(map(lambda a, b: a / b, cum_sum(values), [total_variation]*len(values)))
# Choose a threshold
threshold = 0.9
# Keep index where threshold is achieved
n_principal_components = np.nonzero(list(map(lambda x: x >= threshold, prop_variability)))[0][0] + 1
n_principal_components

2

In the examples above we have seen how actually one can combine `map` and `reduce` in a number of ways. Of course the most standard way is that of using a map and feeding it into a reduce to be able to compute the final result. For instance, suppose we want to sum the remainder mod 7 of all the elements of a list. Then we could do something like this

In [5]:
mylist = [7, 3, 14, 2, 21]
reduce(lambda x, y: x+y, map(lambda x: x % 7, mylist))

5

We can also use `map` and `reduce` to compute matrix multiplication. First of all, we create a list of all possible tuples of indeces of the matrix.

In [6]:
prod= lambda n, m: reduce(lambda x, y: x+y, map(lambda i: list(map(lambda j: (i, j), np.arange(m))), np.arange(n)), [])

Next we generate some data.

In [7]:
np.random.seed(123)
A = np.random.randint(-10, 10, size=(5, 5))
B = np.random.randint(-10, 10, size=(5, 5))

We can then use the following combinations of `map` and `reduce` to compute the matrix product.

In [8]:
out = np.reshape(list(map(lambda ij_tuple: reduce(lambda x, y: x + y, map(lambda x, y: x*y, A[ij_tuple[0], :], B[:, ij_tuple[1]])), prod(5, 5))), (5, 5))
out

array([[ 112,  112,   17,  213,   62],
       [ 125,   66,   11,  232,  -20],
       [  26, -173,  113,   27,  -17],
       [ -60, -115,   35,  -81,  -97],
       [ -64,   14,   81,   57,   22]])

We can check that the above function works, by using NumPy.

In [9]:
np.array_equal(np.dot(A, B), out)

True

### Multiprocessing

Unfortunately, the method above for matrix multiplications uses lambdas, which are not supported by the `multiprocessing` package. Therefore we write the following functions using standard function definitions

In [10]:
def add(x, y):
    return x + y

def prod(x, y):
    return x * y

def dot_product(i, j):
    return reduce(add, map(prod, A[i, :], B[:, j]))

And then we can parallelize matrix multiplication, as shown in script `parallel_multiplication.py`, by feeding the `dot_product` function to a `starmap` as follows `pool.starmap(dot_product, product(np.arange(n), np.arange(n)))`

In [11]:
! python parallel_multiplication.py

[[ 251 -259 -459 ...  546  793 -112]
 [-679 -336 -249 ... -112  462  119]
 [  78 -239 -499 ...  902 -210 -208]
 ...
 [  47  -12  197 ... -135 -612  695]
 [ 289   46 -353 ... -129 -272 -162]
 [ 297  189 -404 ... -169   82  338]]


### Asynchronous

We can also work asynchronously. We will consider mainly two functions:

- `apply_async()` which can be used when a function requires multiple arguments.
- `map_async()` which can be used when the function requires only one argument.

In the script `asynchronous.py` we see an example of how to use such functions to run multiple different functions in parallel, and how to check whether the processes have finished and weather they have been successful.

In [12]:
! python asynchronous.py

Process finished. Result 3
Process finished. Result 1.7142857142857142
Process finished. Result 2.0
Process finished. Result -1
Process finished. Result 1.0
Process finished. Result 400
Process finished. Result [-98, 66, 133, 71, 65, 132, 27, -73, 335, 1, -73, 86, 39, 190, -1, 152, 5, -200, 255, 36, 120, 47, -258, -39, 37, 40, 174, 65, -104, -74, 5, -23, -55, 145, 173, 49, 223, 36, 48, 32, 94, 23, -50, 36, -1, 115, 21, 39, 45, 57, 191, -1, -50, 109, 79, -43, -124, 30, 42, -80, -215, 0, 306, -47, -131, -17, -8, -110, -10, 55, 195, -6, -172, 158, 136, -67, -41, 47, -32, 6, 32, 46, -79, 81, -116, -163, -37, -22, -22, 144, 218, -36, -218, 42, 89, -177, 20, 181, -101, -8]


### Distributed Parallel Python with Scoop

We can also distribute `map` across a cluster using the `Scoop` package. We install it in the environment using `conda install -c bioconda scoop` and then we can use script `Distributed.py` to perform some distributed, parallel calculation.

In [13]:
! python -m scoop Distributed.py

[2020-05-30 10:27:20,845] launcher  INFO    SCOOP 0.7 1.1 on linux using Python 3.7.4 (default, Aug 13 2019, 20:35:49) [GCC 7.3.0], API: 1013
[2020-05-30 10:27:20,845] launcher  INFO    Deploying 8 worker(s) over 1 host(s).
[2020-05-30 10:27:20,845] launcher  INFO    Worker distribution: 
[2020-05-30 10:27:20,845] launcher  INFO       127.0.0.1:	7 + origin
Result:  -1583
[2020-05-30 10:27:21,678] launcher  (127.0.0.1:36883) INFO    Root process is done.
[2020-05-30 10:27:21,678] launcher  (127.0.0.1:36883) INFO    Finished cleaning spawned subprocesses.
