In [1]:
import numpy as np
from multiprocessing import Pool

def fill_missing_with_row_means(data):
    # Calculate means of rows ignoring NaNs
    row_means = np.nanmean(data, axis=1)
    # Find indices where NaN values are
    inds = np.where(np.isnan(data))
    # Replace NaNs with the mean of the respective row
    data[inds] = np.take(row_means, inds[0])
    return data

def np_pearson_cor(x, y, yv, yvss):
    # Derivation see: https://cancerdatascience.org/blog/posts/pearson-correlation/
    if yv is None or yvss is None:
        yv = y - y.mean(axis=1, keepdims=True)
        # yvss = (yv * yv).sum(axis=1)
        yvss = np.einsum('ij,ij->i', yv, yv)  # Memory-efficient sum of squares
    xv = x - x.mean(axis=1, keepdims=True)
    xvss = np.einsum('ij,ij->i', xv, xv)  # Memory-efficient sum of squares
    # Use einsum for memory-efficient matrix multiplication
    # result = np.matmul(xv, yv.T) / np.sqrt(np.outer(xvss, yvss))
    result = np.einsum('ij,kj->ik', xv, yv) / np.sqrt(xvss[:, np.newaxis] * yvss[np.newaxis, :])

    # Limit the result to the range [-1, 1]
    np.clip(result, -1.0, 1.0, out=result)
    return result

def correlation_chunk(start_row, end_row, data, yv, yvss):
    return np_pearson_cor(data[start_row:end_row], data, yv, yvss)

def correlation_matrix_by_rows(data, chunk_size, num_processes=None, return_upper_triangle=True):
    n_rows = data.shape[0]
    # here we create this matrix up-front.
    # it is also possible to not create this but to keep writing data to disk as they are generated
    correlation_matrix = np.zeros((n_rows, n_rows))

    print("Performing mean imputation for missing values ...")
    data = fill_missing_with_row_means(data)

    print("Precompute matrix quantities ...")
    yv = data - data.mean(axis=1, keepdims=True)
    yvss = np.einsum('ij,ij->i', yv, yv)  # Memory-efficient sum of squares

    if num_processes is None or num_processes <= 1:
        # Non-parallel execution
        for start_row in range(0, n_rows, chunk_size):
            end_row = min(start_row + chunk_size, n_rows)
            print("Working on rows", start_row, "to", end_row, "out of", n_rows, "rows")
            correlation_matrix[start_row:end_row, :] = correlation_chunk(start_row, end_row, data, yv, yvss)
    else:
        # Parallel execution
        row_ranges = [(start, min(start + chunk_size, n_rows)) for start in range(0, n_rows, chunk_size)]
        with Pool(processes=num_processes) as pool:
            for idx, (start, end) in enumerate(row_ranges):
                correlation_matrix[start:end, :] = pool.apply(correlation_chunk, (start, end, data, yv, yvss))

    # Mirror the upper triangle to the lower triangle
    if not return_upper_triangle:
        i_lower = np.tril_indices(n_rows, -1)
        correlation_matrix[i_lower] = correlation_matrix.T[i_lower]
    return correlation_matrix

First, test if the customized function gives same result as np default:

In [2]:
import time

def test_correlation_matrix_by_rows():
    # compare between customized iplementation and np.corr
    data_matrix = np.random.rand(1000, 200)  # Example large data matrix
    chunk_size = 200  # Define chunk size
    num_processes = 0  # Number of parallel processes
    time_custom = time.time()
    correlation_matrix_custom = correlation_matrix_by_rows(data_matrix, chunk_size, num_processes, False)
    time_custom = time.time() - time_custom
    print(time_custom)
    time_np = time.time()
    correlation_matrix_np = np.corrcoef(data_matrix, rowvar=True)
    time_np = time.time() - time_np
    print(time_np)
    accuracy = np.allclose(correlation_matrix_custom, correlation_matrix_np)
    assert accuracy == True
test_correlation_matrix_by_rows()

Performing mean imputation for missing values ...
Precompute matrix quantities ...
Working on rows 0 to 200 out of 1000 rows
Working on rows 200 to 400 out of 1000 rows
Working on rows 400 to 600 out of 1000 rows
Working on rows 600 to 800 out of 1000 rows
Working on rows 800 to 1000 out of 1000 rows
0.1574692726135254
0.021502971649169922


Benchmark the np default approach: 50K by 50K matrix is 18Gb in memory.

In [3]:
def benchmark_np_correlation():
    # 17K samples, pair-wise LD of 50,000 variants
    data_matrix = np.random.rand(50000, 17000)  # Example large data matrix
    time_np = time.time()
    correlation_matrix_np = np.corrcoef(data_matrix, rowvar=True)
    time_np = time.time() - time_np
    print(time_np)
benchmark_np_correlation()

142.4240164756775


Benchmark the customized approach:

In [None]:
def benchmark_correlation_matrix_by_rows():
    # 17K samples, pair-wise LD of 50,000 variants 
    data_matrix = np.random.rand(50000, 17000)  # Example large data matrix
    chunk_size = 1000  # Define chunk size
    num_processes = 0  # Number of parallel processes
    time_custom = time.time()
    correlation_matrix_custom = correlation_matrix_by_rows(data_matrix, chunk_size, num_processes)
    time_custom = time.time() - time_custom
    print(time_custom)
benchmark_correlation_matrix_by_rows()

Performing mean imputation for missing values ...
Precompute matrix quantities ...
Working on rows 0 to 1000 out of 50000 rows
Working on rows 1000 to 2000 out of 50000 rows
Working on rows 2000 to 3000 out of 50000 rows
Working on rows 3000 to 4000 out of 50000 rows
Working on rows 4000 to 5000 out of 50000 rows
Working on rows 5000 to 6000 out of 50000 rows
Working on rows 6000 to 7000 out of 50000 rows
Working on rows 7000 to 8000 out of 50000 rows
Working on rows 8000 to 9000 out of 50000 rows
Working on rows 9000 to 10000 out of 50000 rows
Working on rows 10000 to 11000 out of 50000 rows
Working on rows 11000 to 12000 out of 50000 rows
Working on rows 12000 to 13000 out of 50000 rows
Working on rows 13000 to 14000 out of 50000 rows
Working on rows 14000 to 15000 out of 50000 rows
Working on rows 15000 to 16000 out of 50000 rows
Working on rows 16000 to 17000 out of 50000 rows
Working on rows 17000 to 18000 out of 50000 rows
Working on rows 18000 to 19000 out of 50000 rows
Working 