# Matrix Multiplication in Spark (pyspark)

\begin{equation*}
 \mathbf{P} = \mathbf{A} \times \mathbf{B}
\end{equation*}

\begin{equation*}
\begin{vmatrix}
\mathbf{a}_{0,0} & \mathbf{a}_{0,1} & \mathbf{a}_{0,2}\\
\mathbf{a}_{1,0} & \mathbf{a}_{1,1} & \mathbf{a}_{1,2}\\
\end{vmatrix} \
\
\times
\
\begin{vmatrix}
\mathbf{b}_{0,0} & \mathbf{b}_{0,1} \\
\mathbf{b}_{1,0} & \mathbf{b}_{1,1} \\
\mathbf{b}_{2,0} & \mathbf{b}_{2,1} \\
\end{vmatrix} 
\
=
\
\begin{vmatrix}
 \mathbf{a}_{0,0} \times \mathbf{b}_{0,0} + \
 \mathbf{a}_{0,1} \times \mathbf{b}_{1,0} + \
 \mathbf{a}_{0,2} \times \mathbf{b}_{2,0}  &\
 \
 \mathbf{a}_{0,0} \times \mathbf{b}_{0,1} + \
 \mathbf{a}_{0,1} \times \mathbf{b}_{1,1} + \
 \mathbf{a}_{0,2} \times \mathbf{b}_{2,1}  \
 \
\\
 \mathbf{a}_{1,0} \times \mathbf{b}_{0,0} + \
 \mathbf{a}_{1,1} \times \mathbf{b}_{1,0} + \
 \mathbf{a}_{1,2} \times \mathbf{b}_{2,0}  &\
 \
 \mathbf{a}_{1,0} \times \mathbf{b}_{0,1} + \
 \mathbf{a}_{1,1} \times \mathbf{b}_{1,1} +\
 \mathbf{a}_{1,2} \times \mathbf{b}_{2,1}  \
 \
\end{vmatrix}
\end{equation*}

For matrices **A** of size $m \times n$  and **B** of size $n \times o$ then their dot-product **P** is of size $m \times o$

The Element of P on line $i$ and row $j$ is defined by the following formula (where $n$ refers to the number of the shared dimention between A and B - the # of columnts of A or the # of rows of B)

\begin{equation*}
\mathbf{p}_{i, j} = \sum_{k=0}^{n} \mathbf{a}_{i,k} \times  \mathbf{b}_{k,j} 
\end{equation*} 

That is, each element $\mathbf{p}_{i,j}$ depends on the (whole) i-th line of $A$  ( $\mathbf{a}_{i,*}$ )  and the (whole) $j$-th column of $B$  ( $\mathbf{b}_{*,j}$ )

### Import pySpark libs 

In [1]:
import numpy as np
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('MatMul').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf=conf)

### Create Tables $A$ and $B$

In [2]:
# The `m`, `n` and `o`  dimensions of the tables
m = 2
n = 3
o = 2

a_shape = (m,n)
b_shape = (n,o)


A = np.random.randint(100, size = a_shape)
B = np.random.randint(100, size = b_shape)
A

array([[ 7,  2, 99],
       [58, 96, 42]])

### Helper functions for sparse matrices

In a real life scenario, we would use one of the sparse matrix libraries of [scipy](https://docs.scipy.org/doc/scipy/reference/sparse.html)
or [Spark](https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/mllib/linalg/SparseMatrix.html) / [pySpark](http://spark.apache.org/docs/2.3.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseMatrix).  
But for this notebook, let's use a custom sparse representation, where a matrix `M` is represented by a list of its elements, and each element is represented as a tuple: `((x, y), val)`.   
`x`,`y` being its coordinates and `val` its value.

In [3]:
"""Helper fn for creating a sparse matrix (in our custom format),
from a NumPy array"""
def sparsify(a):
    rv = []
    for i, line in enumerate(a):
        for (j, elem) in enumerate(line):
            if elem!=0: rv.append(((i,j),elem))
    return rv
        
sparse_A = sparsify(A)
sparse_B = sparsify(B)
sparse_A

[((0, 0), 7),
 ((0, 1), 2),
 ((0, 2), 99),
 ((1, 0), 58),
 ((1, 1), 96),
 ((1, 2), 42)]

In [4]:
"""Helper fn for creating a NumPy array from our
custom sparse matrix format"""
def densify(sparse, shape):
    a = np.zeros(shape)
    for (i, j), e in sparse:
        a[i][j] = e
    return a

# Let's do a sanity check, testing whether our 
# densified version of sparse_A equals A.
np.array_equal(A, densify(sparse_A, a_shape))

True

### Find out dependencies for each element ${p}_{i,j}$

As previously stated for $P = A \times B$, where 
 * $A$ is of shape $m \times n$
 * $B$ is of shape $n \times o$
 * $P$ is of shape $m \times o$

Then, we have:
  >each element $\mathbf{p}_{i,j}$ depends on the $i$-th line of A and the $j$-th column of B

This can be restated as:
 * All of the $o$ elements of every line, $\mathbf{p}_{i,*}$, depend on all of the elements of the corresponding line of $A$
 * All of the $m$ elements of every column,  $\mathbf{p}_{*,j}$   depend on all of the elements of each corresponding column of $B$
 
Taking it one more step:
 * Each element (of each line) of $A$ appears $o$ times in the corresponding line of $P$
 * Each element (of each column) of $B$ appears $m$ times in the corresponding column of $P$
 

And, in conclusion:
 * Each element $\mathbf{a}_{i,k}$ of $A$ is repeated $o$ times on the $i$-th line of $P$ (one for each of the $o$ columns of $P$)
 * Each element $\mathbf{b}_{k,j}$ of $A$ is repeated $m$ times on the $j$-th column of $P$ (one for each of the $m$ lines of $P$)
 


## Repeating Elements of $A$ and $b$:

In order to calculate $P$, in the general case where we can't make the assumption that either $A$ or $B$
can fully fit in memory, we need to seperately calculate each element $p_{i,j}$.  
Rembember, each element of a table is represented as a tuple of `((i,k), Aik)` where `i` and `k` are the line/column indices and `Aik` is the value of the element.

**Intermediate Data:**

We'll create an intermediate set of data, after repeating the elements of $A$ and $B$, before calculating the elemens of $P$.  
For the intermediate data we will introduce a new representation: `((i,j),(k, Aik))` and equivantly `((i,j),(k, Bkj))`, where `(i,j)` are the coordinates of the element of $P$ for which the intermediate element will be used. 

This representation will later help us compute  \begin{equation*}  \sum_{k=0}^{n} \mathbf{a}_{i,k} \times  \mathbf{b}_{k,j} \end{equation*}  for each element $\mathbf{p}_{i,j}$ of $P$

**Note:** We are using `k` to signify the *column* index of $A$, as well as the *line* index of $B$. We will later use this (common) index for computing the product $\mathbf{a}_{i,k} \times  \mathbf{b}_{k,j}$, but, for now, we are just keeping track of it in the intermediate results.


### $A$'s intermediate data
Each element of $A$, $\mathbf{a}_{i,k}$, needs to be repeated accross the $o$ columns of P.  
(Note that $o$ is also the column dimension of B, and that's how we know it)

In [5]:
a_rdd = sc.parallelize(sparse_A)

def a_mapper(elem):
    # decompose the element to its coordinates
    # and value
    (i, k), Aik = elem
    
    # retrive the value of `o` from B's shape
    o = b_shape[1]

    acc = [] # an accumulator for all of the yielded elements
    for j in range(o):
        interm_elem = ((i,j),(k, Aik))
        acc.append(interm_elem)
    return acc

a_interm = a_rdd.flatMap(a_mapper)

### $B$'s intermediate data
Each element of B , $\mathbf{b}_{k,j}$ needs to be repeated accross the $m$ lines of P.  
(Note that $m$ is also the line dimension of A, and that's how we know it)
We are following the symmetric process that we did for $A$, only this time alternating $k$ to
be the line-number of $B$'s element, instead of the column number.

In [6]:
b_rdd = sc.parallelize(sparse_B)

def b_mapper(elem):
    (k, j), Bkj = elem
    m = a_shape[0]
    acc = []
    for i in range(m):
        interm_elem = ((i,j),(k, Bkj))
        acc.append(interm_elem)
    return acc

b_interm = b_rdd.flatMap(b_mapper)

Merge $A$'s and $B$'s intermedate data

In [7]:
p_interm = a_interm.union(b_interm)

## Group the intermediate data
In this step we will put the intermediate data we need in order to compute each element of $P$ together.  
Since our intermedate representation is structured as, `((i,j),(k, value))`, we just need to group together the itnermediate elements with the same `(i,j)`, and thus, we use RDD's built-in fn: `groupByKey`.

In [8]:
p_grouped = p_interm.groupByKey()

 Note that `groupByKey` creates yet another intermediate representation for the  `p_grouped` RDD, that has the following format:
```
((i,j), ((k1, value1), (k2, value2)...))
```

In [9]:
index, elements = p_grouped.first()
print(index)
print(list(elements))

(0, 1)
[(0, 7), (1, 2), (2, 99), (0, 21), (1, 94), (2, 9)]


## Calculate each element of P

For element $\mathbf{p}_{i,j}$ of $P$ we need to compute: \begin{equation*}  \sum_{k=0}^{n} \mathbf{a}_{i,k} \times  \mathbf{b}_{k,j} \end{equation*}

So, we must allign the elements of $A$ and $B$ according to $k$

Eg. for element $\mathbf{p}_{0,1}$ we need to compute: 
($\mathbf{a}_{0,0} \times \mathbf{b}_{0,1} + \mathbf{a}_{0,1} \times \mathbf{b}_{1,1} + \mathbf{a}_{0,2} \times \mathbf{b}_{2,1}$)

Note that if for some $k$ we have an odd intermediate element (aka. only from one of $A$ or $B$) then we need to discard that element.   
(Our sparse representation implies that if a number in the original matrices has a 0 value, then it's ommited in the resulting list of tuples. This means that when performing this here grouping, the will end up with only one of the tables contributing a value for one of the multiplications of,eg: $\mathbf{a}_{0,0} \times \mathbf{b}_{0,1} + \mathbf{a}_{0,1} \times \mathbf{b}_{1,1} + ...$)

In [10]:
def calculate_element(tup):
    idx, vals = tup
    acc ={}
    for k,v in vals:
        if k in acc:
            acc[k].append(v)
        else:
            acc[k] = [v]

    # discard odd elements 
    even_elems = { k: v for k,v in acc.items() if len(v) != 1 }
    
    Pij = 0
    for Aik, Bkj in even_elems.values():
        Pij += Aik * Bkj 
            
    return (idx, Pij)

Lets use `calculate_element` to calculate a single element of $P$

In [11]:
calculate_element(p_grouped.first())

((0, 1), 1226)

And now lets ccalculate all elements of $P$:

In [12]:
sparse_P = p_grouped.map(calculate_element).collect()

Finally, let's create a dense version of $P$ and validate that it's indeed equal to the dot-product of $A \times B$ as caclulated by NumPy

In [13]:
p_shape = (m, o)
P = densify(sparse_P, p_shape)

np.array_equal(P, A.dot(B))

True

That's it! We have calculated $P$ element-by-element and verified its value.