# Spark notebook for DIMSUM Algorithm

In [None]:
!pip install -r ./requirements.txt

In [None]:
TEST = True

## Import, functions and Spark Init

In [None]:
import numpy as np
import scipy as sc
import scipy.sparse as sp
import random
from collections import defaultdict
from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix
from numpy import linalg as LA
from collections import Counter

spark = (
    SparkSession.builder.appName("Cloud computing ENSAE project")
    .master("local[5]")
    .getOrCreate()
)
sc = spark.sparkContext

In [None]:
def generate_sparse_matrix_dict_repr(m, n, num_nonzero):
    dict_repr = dict()
    i, j = np.random.choice(m, num_nonzero, replace=True), np.random.choice(
        n, num_nonzero, replace=True
    )
    for x, y in zip(i, j):
        dict_repr[(x, y)] = 1
    return dict_repr

In [None]:
def dot_product_with_dict_repr(dict_repr, m, n):
    multiplied_dict_repr = dict()
    for i in range(n):
        dict_col_i = {x: v for (x, y), v in dict_repr.items() if y == i}
        dict_col_i = defaultdict(int, dict_col_i)
        if dict_col_i:
            for j in range(m):
                result = 0
                dict_col_j = {
                    x: v
                    for (x, y), v in dict_repr.items()
                    if y == j and x in dict_col_i.keys()
                }
                dict_col_j = defaultdict(int, dict_col_j)
                if dict_col_j:
                    for x, v in dict_col_i.items():
                        result += v * dict_col_j[x]
                    if result != 0:
                        multiplied_dict_repr[(i, j)] = result
    return multiplied_dict_repr

In [None]:
def dict_repr_to_sp_csc(dict_rep, m, n):
    row = np.array([])
    col = np.array([])
    data = np.array([])

    for (x, y), v in dict_rep.items():
        row = np.append(row, x)
        col = np.append(col, y)
        data = np.append(data, v)
    return sp.csc_matrix((data, (row, col)), shape=(m, n))

## Parameters and init data

In [None]:
M = int(1e18)
N = int(1e4)
L = int(1e4)
GAMMA = 1.0

In [None]:
if TEST:
    M = int(10000)
    N = int(100)
    L = int(1000)
    GAMMA = 0.5

In [None]:
a = generate_sparse_matrix_dict_repr(M, N, L)

In [None]:
# sparse_matrix=dict_repr_to_sp_csc(a,M,N)

In [None]:
# sparse_matrix.T@sparse_matrix.toarray()

In [None]:
# non computable
# dict_repr_to_sp_csc(dot_product_with_dict_repr(a,M,N),N,N).toarray()

In [None]:
listMatrixEntry = [MatrixEntry(x, y, v) for (x, y), v in a.items()]

In [None]:
entries = sc.parallelize(listMatrixEntry)

In [None]:
mat = CoordinateMatrix(entries, M, N)

In [None]:
sparsity_index = 1 - (mat.entries.count() / (mat.numRows() * mat.numCols()))
sparsity_index

In [None]:
# https://stackoverflow.com/questions/45881580/pyspark-rdd-sparse-matrix-multiplication-from-scala-to-python
def coordinateMatrixMultiply(leftmatrix, rightmatrix):
    left = leftmatrix.entries.map(lambda e: (e.j, (e.i, e.value)))
    right = rightmatrix.entries.map(lambda e: (e.i, (e.j, e.value)))
    productEntries = (
        left.join(right)
        .map(lambda e: ((e[1][0][0], e[1][1][0]), (e[1][0][1] * e[1][1][1])))
        .reduceByKey(lambda x, y: x + y)
        .map(lambda e: (*e[0], e[1]))
    )
    return productEntries

In [None]:
def to_list(a):
    return [a]


def append(a, b):
    a.append(b)
    return a


def extend(a, b):
    a.extend(b)
    return a

In [None]:
def mapper(aij, aik, cj_norm, ck_norm, gamma=1.0):
    if random.randint(0, 1) >= min(1.0, gamma / (ck_norm * cj_norm)):
        return aij * aik


# def reducer(result_list, cj_norm, ck_norm, gamma=1):
#    return sum(result_list) / min(ck_norm * cj_norm, gamma)

In [None]:
# dict_repr_to_sp_csc(a, M, N).toarray()

## DIMSUM Algorithm

In [None]:
norm_cols = (
    mat.transpose()
    .entries.map(lambda e: (e.i, (e.j, e.value)))
    .combineByKey(to_list, append, extend)
    .map(lambda e: (e[0], LA.norm(list(map(lambda x: x[1], e[1])))))
)

In [None]:
left = (
    mat.transpose()
    .entries.map(lambda e: (e.i, (e.j, e.value)))
    .leftOuterJoin(norm_cols)
    .map(lambda e: (e[1][0][0], (e[0], e[1][0][1], e[1][1])))
)

In [None]:
right = (
    mat.entries.map(lambda e: (e.j, (e.i, e.j, e.value)))
    .leftOuterJoin(norm_cols)
    .map(lambda e: (e[1][0][0], (e[1][0][1], e[1][0][2], e[1][1])))
)

In [None]:
productEntriesMap = (
    left.join(right)
    .map(
        lambda e: (
            (e[1][0][0], e[1][1][0]),
            mapper(e[1][0][1], e[1][1][1], e[1][0][2], e[1][1][2], gamma=GAMMA),
        )
    )
    .filter(lambda e: e[1] is not None)
)

In [None]:
#%%timeit -r3

NUMBER_OF_SIMULATIONS=10

list_multiple_results_dimsum=[]
for _ in range(NUMBER_OF_SIMULATIONS):
    final_product = (
        productEntriesMap.map(lambda e: (e[0][0], (e[0][1], e[1])))
        .leftOuterJoin(norm_cols)
        .map(lambda e: (e[1][0][0], ((e[0], e[1][0][0]), e[1][0][1], e[1][1])))
        .leftOuterJoin(norm_cols)
        .map(lambda e: ((*e[1][0][0], e[1][0][2] * e[1][1]), e[1][0][1]))
        .reduceByKey(lambda x, y: x + y)
        .map(lambda e: ((e[0][0], e[0][1]), e[1] / min(GAMMA, e[0][2])))
    )
    list_dimsum=sorted(final_product.collect())
    list_multiple_results_dimsum.append(list_dimsum)
    
counter=Counter()
for result in list_multiple_results_dimsum:
    counter+=Counter({k:v for (k,v) in result})
for k in counter.keys():
    counter[k] /= NUMBER_OF_SIMULATIONS

## Benchmark with sparse dot

In [None]:
dotproduct = coordinateMatrixMultiply(mat.transpose(), mat)
list_dotproduct=sorted(dotproduct.collect())

In [None]:
diff_counter=Counter({k:v for (k,v) in list_dimsum}) - Counter({(x,y):v for (x,y,v) in list_dotproduct})
sum(diff_counter.values())

In [None]:
diff_counter=counter-Counter({(x,y):v for (x,y,v) in list_dotproduct})
sum(diff_counter.values())

In [None]:
spark.stop()