## INFOH515 Pyspark code
## Author: Gianluca Bontempi
## Pyspark implementation of the matrix-vector mulitplication in the INFOH515 slides "Map-reduce analytics" 

In [196]:
import numpy as np
import pwd
import getpass
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum

# create an instance of SparkSession
spark=SparkSession.builder.appName('s.com').getOrCreate()
sc=spark.sparkContext

In [178]:

np.random.seed(1225)   

n=5 # number of features
N=1000 # number of samples
Y= np.random.normal(loc=0, scale=1, size= n).reshape(n,1)
X= np.random.normal(loc=0, scale=1, size=N * n).reshape(N, n)
I=np.arange(N).reshape(N,1)

P=X@Y
P[:5]

array([[-3.28537915],
       [ 0.76688449],
       [ 1.07425344],
       [ 6.03693438],
       [-0.29982747]])

We consider three ways of encoding the matrix X:

* rddX: [0, X[0,:]]
* rddX2: (0, X[0,:])
* rddX3: ((0,0,X[0,0]), (0,1,X[0,1])

In [193]:
broadcastY = sc.broadcast(Y)
rddX=sc.parallelize(np.hstack((I,X)))
rddX.take(5)

[array([ 0.        ,  0.34615021, -1.58390158, -0.47663391,  0.67023779,
        -0.6931159 ]),
 array([ 1.        , -1.2784718 ,  1.22773916,  0.6629516 ,  0.26926281,
         0.30634199]),
 array([ 2.        , -0.15450905,  0.77881069, -0.09776944,  0.39046338,
        -0.55891276]),
 array([ 3.        ,  0.89400835,  0.77571089,  2.91200029, -0.94748154,
         1.89004016]),
 array([ 4.        ,  0.68091192, -0.80111741,  1.82520984,  0.57087397,
         1.87065612])]

## Map with rddX

In [194]:

rddProduct=rddX.map(lambda x: np.dot(x[1:],broadcastY.value))
rddProduct.take(5)


[array([-3.28537915]),
 array([0.76688449]),
 array([1.07425344]),
 array([6.03693438]),
 array([-0.29982747])]

## flatMap +Reduce with rddX

In [183]:
def mapf(x):
    n=len(x)-1
    outm=[]
    for i in np.arange(n):
        outm.append((int(x[0]),float(x[i+1]*broadcastY.value[i])))
    return(outm)
rddProduct=rddX.flatMap(lambda x: mapf(x))
rddProduct.reduceByKey( lambda x, y: x+y).sortByKey().take(5)

                                                                                

[(0, -3.285379145932364),
 (1, 0.7668844932434471),
 (2, 1.0742534434653983),
 (3, 6.036934381989275),
 (4, -0.2998274672400232)]

## Use of mapValues with rddX2 

In [184]:
#Create an RDD of key-value pairs: (row_index, row_values)
rddX2 = sc.parallelize([(i, row) for i, row in enumerate(X)])

# Broadcast the vector to all worker nodes
broadcast_vector = sc.broadcast(Y)

rddX2.take(10)

[(0, array([ 0.34615021, -1.58390158, -0.47663391,  0.67023779, -0.6931159 ])),
 (1, array([-1.2784718 ,  1.22773916,  0.6629516 ,  0.26926281,  0.30634199])),
 (2, array([-0.15450905,  0.77881069, -0.09776944,  0.39046338, -0.55891276])),
 (3, array([ 0.89400835,  0.77571089,  2.91200029, -0.94748154,  1.89004016])),
 (4, array([ 0.68091192, -0.80111741,  1.82520984,  0.57087397,  1.87065612])),
 (5, array([ 0.60524862,  0.02694414, -0.35732297, -0.32862628, -1.84350644])),
 (6, array([ 0.2139407 , -2.03349078, -0.38741379,  1.16814892, -0.50037546])),
 (7, array([-2.67524353, -0.71102341, -0.85106656, -0.59740483, -0.63819803])),
 (8, array([-0.8178201 ,  0.74289242, -0.05032601, -0.24798778, -2.00452167])),
 (9, array([ 1.9011385 , -0.64014282,  0.94837751, -1.08275313,  1.38603835]))]

In [186]:
# MAP OPERATION
# For each row, calculate element-wise products with the vector
def multiply_row_with_vector(row_tuple):
    row_idx, row = row_tuple
    vector_value = broadcast_vector.value
    
    # For each element in the row, multiply by the corresponding vector element
    # Return (row_index, [products])
    return (row_idx, [row[i] * vector_value[i] for i in range(len(row))])

# Apply the map function
element_wise_products = rddX2.map(multiply_row_with_vector)

dot_products = element_wise_products.mapValues(np.sum)

dot_products.take(5)

[(0, -3.285379145932364),
 (1, 0.7668844932434471),
 (2, 1.0742534434653983),
 (3, 6.036934381989275),
 (4, -0.2998274672400232)]

## ReduceByKey with rddX3

In [187]:
# We can also demonstrate the use of reduceByKey by restructuring our data
# First, let's explode the matrix into (i, j, value) format
matrix_elements = []
for i, row in enumerate(X):
    for j, value in enumerate(row):
        matrix_elements.append((i, j, value))

# Create RDD of ((row_idx, col_idx), matrix_value)
rddX3 = sc.parallelize([((i, j), value) for i, j, value in matrix_elements])

rddX3.take(10)

[((0, 0), 0.3461502146150345),
 ((0, 1), -1.5839015772522416),
 ((0, 2), -0.4766339104797224),
 ((0, 3), 0.6702377922137547),
 ((0, 4), -0.6931158962159752),
 ((1, 0), -1.2784717988438972),
 ((1, 1), 1.2277391634174657),
 ((1, 2), 0.6629515977263967),
 ((1, 3), 0.2692628077253596),
 ((1, 4), 0.30634198649624383)]

In [188]:
# Map each matrix element to ((row_idx, col_idx), matrix_value * vector_value[col_idx])
def multiply_with_vector_element(element):
    (row_idx, col_idx), value = element
    return (row_idx, value * broadcast_vector.value[col_idx])

# Apply the multiplication
product_elements = rddX3.map(multiply_with_vector_element)

# REDUCE OPERATION
# Use reduceByKey to sum up all products for each row
result_by_key = product_elements.reduceByKey(lambda x, y: x + y)

In [191]:
result_by_key.sortByKey().take(5)

                                                                                

[(0, array([-3.28537915])),
 (1, array([0.76688449])),
 (2, array([1.07425344])),
 (3, array([6.03693438])),
 (4, array([-0.29982747]))]