## Distance matrix computation in Spark (High feature/sample ratio)##

###Notations###
* Let $X$ be a matrix with $n$ features (rows) and $N$ observations (columns), $n\gg N$. 
* Let $x_i$, $1 \le i \le n$ be the $i$-th row of X (of size $N$). 
* Let D be the $n \times n$ matrix of distances between all pairs of features of $X$. 
* Let $d_{ij}$ be the distance between features $i$ and $j$, $d_{ij}=d(x_i,x_j)$. 

###Approach: Block partitioned matrix###

Let $X$ be partitioned into $p$ row blocks of size $n_p \times N$
$$X = \left[\begin{array}
{r}
X_1  \\
X_2  \\
...\\
X_p
\end{array}\right]
$$

and the distance matrix be partitioned in $p^2$ row and column blocks

$$D = \left[\begin{array}
{rrrr}
D_{11} & D_{11} & ... & D_{1p} \\
D_{21} & D_{22} & ... & D_{2p} \\
... & ... & ... & ... \\
D_{p1} & D_{p2} & ... & D_{pp} 
\end{array}\right]
$$

**The distributed implementation will consist in computing $D_{ij}$ in parallel**.
* **Stage 1**: Mapping. Partition $X$ in $p$ blocks. Key is the partition id, value is $X_k$, $1 \le k \le p$.
* **Stage 2**: Mapping. For each block $X_k$, replicate the block $p$ times. Key is the pair $(k,l)$, $1 \le l \le p$, value is $X_k$.
* **Stage 3**: Group by. Groups $X_k$ and $X_l$ for each pair $(k,l)$. Key is the pair $(k,l)$, value is the pair $(X_k,X_l)$ if $k \ne l$, or the singleton ($X_k$) if $k=l$.
* **Stage 4**: Reduce. For each pair $(k,l)$, compute block distance matrix $D_{kj}$.

###Spark implementation (Python)###

In the following the distance function is the Pearson correlation.

####Context initialization####

In [None]:
from pyspark import SparkContext
sc = SparkContext("local[2]", "correlationExample")

In [1]:
#For random number generation and matrix operation
import random
import numpy as np

In [2]:
#Create a random matrix
N=3 #Number of columns
n=6 #Number of rows
matrix=np.random.random((n,N))
matrix

array([[ 0.29712212,  0.95591243,  0.37373404],
       [ 0.23117962,  0.96107945,  0.8316498 ],
       [ 0.06633115,  0.15409498,  0.52527106],
       [ 0.38767595,  0.69526431,  0.09503964],
       [ 0.58534828,  0.13666517,  0.04192201],
       [ 0.22321103,  0.29025997,  0.56436033]])

####Stage 1####

In [3]:
#Let us define 3 partitions
p=3
stage0=sc.parallelize(matrix,p)

In [4]:
def f(splitIndex ,v): 
    return [(splitIndex,list(v))]

stage1=stage0.mapPartitionsWithIndex(lambda splitIndex,v: f(splitIndex,v))

In [5]:
stage1.collect()

[(0,
  [array([ 0.29712212,  0.95591243,  0.37373404]),
   array([ 0.23117962,  0.96107945,  0.8316498 ])]),
 (1,
  [array([ 0.06633115,  0.15409498,  0.52527106]),
   array([ 0.38767595,  0.69526431,  0.09503964])]),
 (2,
  [array([ 0.58534828,  0.13666517,  0.04192201]),
   array([ 0.22321103,  0.29025997,  0.56436033])])]

####Stage 2####

In [6]:
def makePairParts(k,v,p):
    return [(str(sorted([k,l])),(k,v)) for l in range(0,p)]

stage2=stage1.flatMap(lambda (k,v):makePairParts(k,v,p))

In [7]:
stage2.collect()

[('[0, 0]',
  (0,
   [array([ 0.29712212,  0.95591243,  0.37373404]),
    array([ 0.23117962,  0.96107945,  0.8316498 ])])),
 ('[0, 1]',
  (0,
   [array([ 0.29712212,  0.95591243,  0.37373404]),
    array([ 0.23117962,  0.96107945,  0.8316498 ])])),
 ('[0, 2]',
  (0,
   [array([ 0.29712212,  0.95591243,  0.37373404]),
    array([ 0.23117962,  0.96107945,  0.8316498 ])])),
 ('[0, 1]',
  (1,
   [array([ 0.06633115,  0.15409498,  0.52527106]),
    array([ 0.38767595,  0.69526431,  0.09503964])])),
 ('[1, 1]',
  (1,
   [array([ 0.06633115,  0.15409498,  0.52527106]),
    array([ 0.38767595,  0.69526431,  0.09503964])])),
 ('[1, 2]',
  (1,
   [array([ 0.06633115,  0.15409498,  0.52527106]),
    array([ 0.38767595,  0.69526431,  0.09503964])])),
 ('[0, 2]',
  (2,
   [array([ 0.58534828,  0.13666517,  0.04192201]),
    array([ 0.22321103,  0.29025997,  0.56436033])])),
 ('[1, 2]',
  (2,
   [array([ 0.58534828,  0.13666517,  0.04192201]),
    array([ 0.22321103,  0.29025997,  0.56436033])])),


####Stage 3####

In [8]:
stage3=stage2.groupByKey()

In [9]:
list(stage3.collect()[2][1]) #Example of the content for pair (0,2)

[(0,
  [array([ 0.29712212,  0.95591243,  0.37373404]),
   array([ 0.23117962,  0.96107945,  0.8316498 ])]),
 (2,
  [array([ 0.58534828,  0.13666517,  0.04192201]),
   array([ 0.22321103,  0.29025997,  0.56436033])])]

####Stage 4####

In [10]:
#Faster implementation of matrix correlation in Python
#http://stackoverflow.com/questions/30143417/computing-the-correlation-coefficient-between-two-multi-dimensional-arrays
def corr2_coeff(A,B):
    A_mA = A - A.mean(1)[:,None]
    B_mB = B - B.mean(1)[:,None]
    ssA = (A_mA**2).sum(1);
    ssB = (B_mB**2).sum(1);
    return np.dot(A_mA,B_mB.T)/np.sqrt(np.dot(ssA[:,None],ssB[None]))

In [11]:
def getCorrelation(k,v):
    pairBlock=list(v)
    pairBlock1=pairBlock[0][0]
    
    blockMatrix1=pairBlock[0][1]
    if (len(pairBlock)==1):
        blockMatrix2=pairBlock[0][1]
        k=(pairBlock[0][0],pairBlock[0][0])
    else:
        blockMatrix2=pairBlock[1][1]
        k=(pairBlock[0][0],pairBlock[1][0])
    
    corrB1B2=corr2_coeff(np.array(blockMatrix1),np.array(blockMatrix2))
    
    return (k,corrB1B2)

stage4=stage3.map(lambda (k,v):getCorrelation(k,v))

In [12]:
stage4.collect()

[((0, 1), array([[-0.23377681,  0.81634516],
         [ 0.51222424,  0.18032957]])),
 ((1, 1), array([[ 1.        , -0.75240277],
         [-0.75240277,  1.        ]])),
 ((0, 2), array([[-0.44949702, -0.22847719],
         [-0.94576292,  0.51689488]])),
 ((1, 2), array([[-0.76344727,  0.99998516],
         [ 0.14898302, -0.7488036 ]])),
 ((2, 2), array([[ 1.        , -0.76695405],
         [-0.76695405,  1.        ]])),
 ((0, 0), array([[ 1.        ,  0.71530707],
         [ 0.71530707,  1.        ]]))]

###All in one line###
Note that using Spark pipelining, you may express the execution of all stages in one line.

In [13]:
result=sc.parallelize(matrix,p).mapPartitionsWithIndex(f).flatMap(lambda (k,v):makePairParts(k,v,p)).groupByKey().map(lambda (k,v):getCorrelation(k,v))

In [14]:
result.collect()

[((0, 1), array([[-0.23377681,  0.81634516],
         [ 0.51222424,  0.18032957]])),
 ((1, 1), array([[ 1.        , -0.75240277],
         [-0.75240277,  1.        ]])),
 ((0, 2), array([[-0.44949702, -0.22847719],
         [-0.94576292,  0.51689488]])),
 ((1, 2), array([[-0.76344727,  0.99998516],
         [ 0.14898302, -0.7488036 ]])),
 ((2, 2), array([[ 1.        , -0.76695405],
         [-0.76695405,  1.        ]])),
 ((0, 0), array([[ 1.        ,  0.71530707],
         [ 0.71530707,  1.        ]]))]

In [15]:
sc.stop()

##Resources##
* [Wang S, Pandis I, Johnson D, et al. Optimising parallel R correlation matrix calculations on gene expression data using MapReduce. BMC Bioinformatics. 2014;15(1):351. doi:10.1186/s12859-014-0351-9.](http://www.ncbi.nlm.nih.gov/pmc/articles/PMC4246436/)