### Environnement

In [1]:
import os
# force numpy to use only a single processor, by changing the environment of the underlying libraries
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
import numpy as np

# libraries for multiprocessing
import multiprocessing as mp
import threading as th

from scipy.stats import unitary_group
import random
import time
import tqdm

In [2]:
### Main execution procedure, taking the matrix, mapper and reducer as input

def execute(big_matrix, mapper, reducer, type = "T", n_split = 6, gamma = 1):
    """Execute map - reduce algorithm based on input mapper and reducer, using Threads / Processes depending on type,
    returns a tuple (result_matrix, execution_time)"""
    (nrow_big_matrix, ncol_big_matrix) = big_matrix.shape
    sub_matrix_list = chunkify(big_matrix, n_split=n_split)
    norms_array = norm(big_matrix)

    # initialize execution time
    start_time = 0
    end_time = 0

    if type == "P":
        pool = mp.Pool(n_split)

        # create list of arguments
        args_list = []
        for sub_mat in sub_matrix_list:
            gamma_copy = gamma + 0.
            args_list.append((sub_mat, norms_array.copy(), gamma_copy))

        start_time = time.time()
        # map
        mapped = pool.starmap(mapper, args_list)
        # reduce
        result = reducer(mapped, norms_array, gamma)
        end_time = time.time()
        return result, end_time - start_time

    if type == "T":
        thread_list = []
        # allocate output
        sub_output_list = []
        for i in range(n_split):
            sub_output_list.append(np.zeros((ncol_big_matrix, ncol_big_matrix)))
        start_time = time.time()

        # define thread content
        def thread_content(mapper, sub_matrix, sub_output, norms_array, gamma):
            """Calls the mapper on the sub_matrix and copies the values in sub_output"""
            sub_output[:, :] = mapper(sub_matrix, norms_array, gamma)

        # start threads
        for i in range(n_split):
            gamma_copy = gamma + 0. # necessary in order to copy the gamma value to avoid GIL
            args = (mapper, sub_matrix_list[i], sub_output_list[i], norms_array.copy(), gamma_copy)
            thread_current = th.Thread(target=thread_content, args=args)
            thread_current.start()
            thread_list.append(thread_current)
        # Wait until all threads are finished
        for thread in thread_list:
            thread.join()

        # reduce
        result = reducer(sub_output_list, norms_array, gamma)
        end_time = time.time()
        return result, end_time - start_time

    if not(isinstance(type, str)):
        raise TypeError("type must be an instance of str")
    raise ValueError("type must be either T for threads or P for processes")

In [3]:
def naive_mapper(mat, norms_array, gamma):
    return mat.T@mat


def naive_reducer(mat_list, norms_array, gamma):
    return sum(mat_list)


In [4]:
def paper_mapper(mat, norms_array, gamma):
    gamma_copy = gamma
    nrow, ncol = mat.shape
    output = np.zeros((ncol, ncol)) # note that ncol << nrow, so the for loops are OK
    for i_output in range(ncol):
        for j_output in range(ncol):
            # randomly choose pairs
            random_values = np.random.rand(nrow)
            probas = gamma_copy/(norms_array[i_output]*norms_array[j_output])*np.ones((nrow,))
            bool_vect = (probas < random_values)
            # sum chosen pairs
            output[i_output, j_output] = np.sum(mat[bool_vect, i_output]*mat[bool_vect, j_output])
    return output


def paper_reducer(mat_list, norms_array, gamma):
    return 1/np.minimum(np.outer(norms_array, norms_array), gamma)*sum(mat_list)

In [5]:

def chunkify(Mat, n_split):
    """Splits the matrix Mat into nsplit matrices, returns a list of np.ndarray"""
    (nrow, ncol) = Mat.shape
    indexes = (nrow // n_split) * np.arange(1, n_split)
    Mat_list = np.vsplit(Mat, indexes)
    assert len(Mat_list) == n_split
    return Mat_list


# create a sparse matrix
def create_big_matrix(nrow_big_matrix, ncol_big_matrix, nonzero):
    assert ncol_big_matrix >= nonzero
    A = np.zeros((nrow_big_matrix, ncol_big_matrix))
    for i in tqdm.tqdm(range(nrow_big_matrix)):
        nonzero_index = random.sample(list(range(ncol_big_matrix)), k=nonzero)
        values = np.random.rand(nonzero)
        A[i, nonzero_index] = values
    return A

def time_basic(big_matrix):
    start_time = time.time()
    result = big_matrix.T@big_matrix
    end_time = time.time()
    return result, end_time - start_time


### Define several norms

#Recall that in finite dimension, all norms are equivalent
def max_diff(matrix1, matrix2):
    """Returns the maximum absolute difference between matrix1 and matrix2 (Linf distance)"""
    return np.max(np.absolute(matrix1 - matrix2))

def norm(Mat):
    """returns an array with the norm of the columns of matrix mat"""
    return np.sqrt(np.sum(np.square(Mat), axis=0))

def distance(mat1, mat2, norm = None):
    # return the distance between 2 matrix using different norms:
    # norm = 
        # 'fro' for the Froebenius norm
        # 'nuc' for the nuclear norm
        #  inf  for the spectral norm
    return np.linalg.norm(mat1-mat2,ord = norm)


### Premier test

In [15]:
# create big matrix (input)
nrow_big_matrix, ncol_big_matrix = int(1e3), 50
big_matrix = create_big_matrix(nrow_big_matrix=nrow_big_matrix, ncol_big_matrix=ncol_big_matrix, nonzero = 3)

# check rank
print("Big matrix size =", big_matrix.shape)
print("Big matrix rank =", np.linalg.matrix_rank(big_matrix))

100%|██████████| 1000/1000 [00:00<00:00, 43265.23it/s]

Big matrix size = (1000, 50)
Big matrix rank = 50





In [None]:
gamma = 1.

result_parallel, exec_time_parallel = execute(big_matrix=big_matrix,
                                              mapper=naive_mapper,
                                              reducer=naive_reducer,
                                              type="P",
                                              n_split=6,
                                              gamma=gamma)

# display execution time
print("Execution time parallel=", exec_time_parallel)

# compare with basic approach
result_basic, exec_time_basic = time_basic(big_matrix)
print("Execution time basic=", exec_time_basic)
print("Distance between results", distance(result_basic, result_parallel, norm = float("Inf")))

### mesures de performance

In [8]:
# 1 generate big matrices to test

