<center><img src='./figs/cs-logo.png' width=200></center>

# 1. Introduction


<p align="justify">
<font size="3">
In this lab assignment you'll deepen your Spark programming skills by implementing algorithms that apply computations on matrices.
One interesting application of matrix computation is **PageRank**, the algorithm that Google uses to assign an importance score to each Web page; based on this score, the Google search engine ranks by importance the Web pages matching a particular search.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize the _SparkContext_.**</font>
<hr style=" border:none; height:2px;">
</p>

In [1]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=ab0cf2a71e3d717b44f794e5879b0a06cbcc8af4b538899e663129881869e248
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [63]:
import findspark
findspark.init()

import pyspark
import random
sc = pyspark.SparkContext(appName="lab2")
print("Initialization successful")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=lab2, master=local[*]) created by __init__ at <ipython-input-2-e8484fb12100>:6 

# 2. Matrix Representation

<p align="justify">
<font size="3">
**Please read carefully this section that presents how matrices will be represented in this assignment.**
</font>
</p>

<p align="justify">
<font size="3">
Without loss of generality, we consider that our input matrices are stored
in textual files (see folder _./data_).
As an example, the file _./data/matrix-a.txt_ looks like as follows:
<p>
0 1 2 4<br>
1 2 3 10<br>
2 12 15 150<br>
</p>
</font>
</p>
<p>
<font size="3">
Each line is a row in a matrix $A$. The first number of the line is the
row identifier (starting from 0), the subsequent values (separated by a whitespace)
are the elements in each column of the row. The matrix represented in this file is the
following:
<p>
<center>
  $A= \begin{bmatrix}
    1 & 2 & 4   \\
    2 & 3 & 10  \\
    12 & 15 & 150
\end{bmatrix}$
</center>
</font>
</p>


<p>
<font size="3">
We provide the implementation of  basic functions to load a matrix from file, visualize it
and get attributes.
</font>
</p>

## 2.1 Function $loadMatrix$

<p>
<font size="3">
The function $loadMatrix()$ loads a matrix from a file.
It takes in the name of the file and returns an RDD containing the matrix.

Each element of an RDD matrix is a key-value pair, where the key is the coordinate (row identifier, column identifier) of an element, and the value is the element itself.
For instance, the RDD corresponding to the matrix $A$ is the following:
<p>
$( (0, 0), 1 ), ( (0, 1), 2 ), ( (0, 2), 4 ), ( (1, 0), 2 ), ( (1, 1), 3 ), ( (1, 2), 10 ), ( (2, 0), 12 ), ( (2, 1), 15 ), ( (2, 2), 150 ) $
</p>
</font>
</p>

## 2.2 Function $shape$

<p>
<font size="3">
The function $shape()$ takes in an RDD matrix and returns the size of the matrix as a pair $(nbRows, nbCols)$, where $nbRows$ (resp., $nbCols$) denotes the number of rows (resp., columns) of the matrix.
</font>
</p>

## 2.3 Function $collect$

<p>
<font size="3">
The function $collect()$ takes in an RDD matrix and returns a representation of the matrix as a Python list $L$. Each element of $L$ is itself a list that corresponds to a row in the matrix.
For instance, the output of the function $collect$ for the matrix $A$ is as follows:   

<p>

$[ [1, 2, 4], [2, 3, 10], [12, 15, 150] ]$

</p>

</font>
</p>


## 2.3 Function $nice$

<p>
<font size="3">
The function $nice()$ prints the matrix in a nice and readable way.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize the definition of the functions**</font>
<hr style=" border:none; height:2px;">
</p>

In [64]:
'''
Loads a matrix from a file.
Takes in: the name of the input file
Returns: an RDD containing the matrix
'''
def loadMatrix(filename):
    # Load the file into an RDD matrix
    matrix = sc.textFile(filename)
    # Splits each line. Each element is a list [nbRow, e1, e2, ..., ej]
    matrix = matrix.map(lambda line : line.split(' '))
    # Convert each element to a number (the first is an integer, the others are float)
    matrix = matrix.map(lambda row: [int(row[0])] + [float(row[i]) for i in range(1, len(row))])
    # Get an RDD where each element is a key-value pair ((row, col), element)
    matrix = matrix.flatMap(lambda row: [((row[0], j-1), row[j]) for j in range(1, len(row))])
    return matrix

'''
Returns the number of rows and colums of the matrix
Takes in: An RDD representing a matrix
Returns: the size of the matrix as (nbRows, nbCols)
'''
def shape(matrix):
    M = collect(matrix)
    if len(M) == 0:
        return (0, 0)
    else:
        return (len(M), len(M[0]))

'''
Returns a matrix represented as a list of lists.
Takes in: an RDD representing a matrix
Returns: the matrix represented as a list of lists.
'''
def collect(matrix):
    # Obtain an RDD, where the key is the row identifier and the value is (colId, element)
    matrix = matrix.map(lambda x: (x[0][0], (x[0][1], x[1])))
    # Groups all the values in a row.
    matrix = matrix.groupByKey()
    # Sorts the element by row identifier.
    matrix = matrix.sortByKey()
    # Sort the elements by column identifier.
    matrix = matrix.map(lambda x: sorted(list(x[1])))
    # Now obtain an RDD, where each element is a list containing the elements of a row.
    matrix = matrix.map(lambda row: [x[1] for x in row])
    # Finally, return the RDD as a Python list.
    return matrix.collect()

'''
Prints the matrix in a nice way.
Takes in: the name of the matrix (var) and the matrix in the form of an RDD.
'''
def nice(var, matrix):
    # Obtain a representation of the matrix as a Python list.
    M = collect(matrix)
    # Print the name of the matrix
    print("Matrix ", var)
    # Print the matrix and format the output nicely
    print('\n'.join([''.join(['{:12.2f}'.format(item) for item in row])
      for row in M]))


# 3. Matrix Addition

<p align="justify">
<font size="3">
The code below loads two matrices $A$ and $B$ from file and calls the function $sum()$ to compute $A+B$.
</font>
</p>

<p>
<font size="3">
The function $sum()$ takes in:
<ul>
<li> $A$: an RDD containing the first matrix.
<li> $B$: an RDD containing the second matrix.
</ul>
The function $sum()$ returns an RDD containing the matrix obtained by summing $A$ and $B$.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $sum()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [65]:
'''
Computes the sum of two matrices.
Takes in: two RDDs containing the input matrices
Returns: the RDD containing the sum of the two input matrices
'''
def sum(A, B):

    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Each element of the RDD A and B is ((r,c), e), where e is an element of the matrix and (r, c) is the
    # coordinate of the element in terms of row and column.

    # 1. Put the two RDDs A and B together. Use the transformation union. Remember that a transformation
    # always returns a new RDD with the result of the transformation.
    C = A.union(B)

    #2. Transforms the RDD C into one where the values having the same key (i.e., same row and column)
    # are summed together. Which transformation are you going to use on C?
    C = C.reduceByKey(lambda x, y: x + y)
    # We return the RDD containing the sum of the two input matrices
    return C

    ################## END OF MODIFICATIONS ##################

# Load matrix A from file and print it.
A = loadMatrix("./data/matrix-a.txt")
nice("A", A)

# Load matrix B from file and print it.
B = loadMatrix("./data/matrix-b.txt")
nice("B", B)

# Compute A+B and print it
C = sum(A, B)
nice("C", C)

##############################################################
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 5.00        4.00        6.00      324.00       23.00
# 3.00        6.00       13.00      333.00      423.00
# 35.00       49.00      162.00       12.00        0.00
##############################################################


Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00
Matrix  C
        5.00        4.00        6.00      324.00       23.00
        3.00        6.00       13.00      333.00      423.00
       35.00       49.00      162.00       12.00        0.00


# 4. Scalar Multiplication

<p>
<font size="3">
The code below calls the function $scalarMultiply()$ to obtain the matrix $c\times A$, where $c$ is a scalar value.    
</font>
</p>

<p>
<font size="3">
The function $scalarMultiply()$ takes in:
<ul>
<li> $c$: a scalar value.
<li> $M$: an RDD containing a matrix.
</ul>
The function $scalarMultiply()$ returns an RDD containing the matrix obtained by multiplying $c$ with the input matrix.
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $scalarMultiply()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [66]:
'''
Computes the scalar multiplication.
Takes in a scalar value c and an RDD matrix M
Returns the RDD containing the matrix resulting from the scalar multiplication c * M.
'''
def scalarMultiply(c, M):

    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Apply a transformation on M, so each element of the matrix M is multiplied by c
    # Which transformation are you going to use?
    R = M.map(lambda x: (x[0], c * x[1]))

    return R
    ################## END MODIFICATION ##################


# Prints
nice("A", A)
nice("2*A", scalarMultiply(2, A))

##############################################################
# THE RESULT SHOULD BE
#2.00        4.00        8.00
#4.00        6.00       20.00
#24.00       30.00      300.00
##############################################################

Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  2*A
        2.00        4.00        8.00
        4.00        6.00       20.00
       24.00       30.00      300.00


# 5. Matrix Multiplication


<p>
<font size="3">
We want to implement a function $multiply()$ to obtain the matrix $A \times B$.
</font>
</p>

<p>
<font size="3">
The function $multiply()$ takes in:
<ul>
<li> $A$: an RDD containing the first matrix.
<li> $B$: an RDD containing the second matrix.
</ul>
The function $multiply$ returns an RDD containing the matrix obtained by multiplying the first and the second matrix.
The multiplication can only be computed if the number of columns of $A$ equals the number of rows of $B$.
</font>
</p>


<p>
<font size="3">
Let $A$ be an $n \times m$ matrix and $B$ an $m \times p$ matrix.
The matrix $C = A \times B$ is a $n \times p$ matrix, where each element $c_{i, k}$ is computed as
follows:
<center>
  $c_{i, k} = \sum\limits_{j=0}^{m-1} a_{i, j} \cdot b_{j, k} \quad\quad (1)$
</center>
</font>
</p>


<p>
<font size="3">
One possible implementation of this function is based on a MapReduce schema.
Remember that in a MapReduce schema, the idea is to group elements by a key and apply a function to the elements
that share the same key.
As you can see, for a given $(i, k)$, the element of $A$ that is in the j-th column is multiplied by the value of $B$ that is in the j-th row, for any column $j$.
Therefore, we can change the representation of $A$ and $B$ so that their elements are indexed by using $j$ as the key.
</font>
</p>

<p>
<font size="3">
More specifically, we can represent the matrix $A$ as follows:
<center>    
    $(j, (0, i, a_{i, j})) \quad 0 \leq i \leq n-1 \quad 0 \leq j \leq m-1  \quad\quad (2)$
</center>    
where the value $0$ in the triple $(0, i, a_{i, j})$ means that the element $a_{i, j}$ comes from the matrix $A$.
</font>
</p>


<p>
<font size="3">
Similarly, we can represent the matrix $B$ as follows:
<center>    
    $(j, (1, k, b_{j, k})) \quad 0 \leq j \leq m-1 \quad 0 \leq k \leq p-1 \quad\quad (3) $
</center>    
where the value $1$ in the triple $(1, k, b_{j, k})$ means that the element $b_{j, k}$ comes from the matrix $B$.
</font>
</p>

<p>
<font size="3">
As a first step, we want to code two functions $transformA()$ and $transformB()$ to obtain the two representations of $A$ and $B$ respectively, as described in Equation (2) and (3).
The two representations returned by both functions **must be RDDs**.
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the functions $transformA()$ and $transformB()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [67]:
################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

'''
Transforms the RDD matrix A into an RDD as described in Equation (2)
'''
def transformA(A):
    # Transform A as indicated in the text above
    A = A.map(lambda x: (x[0][1], (0, x[0][0], x[1])))
    return A

'''
Transforms the RDD matrix B into an RDD as described in Equation (3)
'''
def transformB(B):
    # Transform B as indicated in the text above
    B = B.map(lambda x: (x[0][0], (1, x[0][1], x[1])))
    return B

################## END MODIFICATIONS ##################

# Displayes the two matrices
nice("A", A)
nice("B", B)

# Transforms them
Atransformed = transformA(A)
Btransformed = transformB(B)

# Display the result.
print("\n********** Representation for A ************\n")
print(Atransformed.collect())
print("\n********** Representation for B ************\n")
print(Btransformed.collect())


Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00

********** Representation for A ************

[(0, (0, 0, 1.0)), (1, (0, 0, 2.0)), (2, (0, 0, 4.0)), (0, (0, 1, 2.0)), (1, (0, 1, 3.0)), (2, (0, 1, 10.0)), (0, (0, 2, 12.0)), (1, (0, 2, 15.0)), (2, (0, 2, 150.0))]

********** Representation for B ************

[(0, (1, 0, 4.0)), (0, (1, 1, 2.0)), (0, (1, 2, 2.0)), (0, (1, 3, 324.0)), (0, (1, 4, 23.0)), (1, (1, 0, 1.0)), (1, (1, 1, 3.0)), (1, (1, 2, 3.0)), (1, (1, 3, 333.0)), (1, (1, 4, 423.0)), (2, (1, 0, 23.0)), (2, (1, 1, 34.0)), (2, (1, 2, 12.0)), (2, (1, 3, 12.0)), (2, (1, 4, 0.0))]


<p>
<font size="3">
In order to group all the elements of both matrices by the key $j$, we need to merge the two RDDs $Atransformed$ and $Btransformed$.
The function $merge()$ declared below takes in $Atransformed$ and $Btransformed$ and returns an RDD that results from the union of the two input RDDs.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the functions $merge()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [68]:
'''
Returns the union of Atransformed and Btransformed.
'''
def merge(Atransformed, Btransformed):

    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Put together the two input RDDs
    R = Atransformed.union(Btransformed)
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)

merged = merge(Atransformed, Btransformed)

print("\n********** Representation for merged ************\n")
print(merged.collect())



Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00

********** Representation for merged ************

[(0, (0, 0, 1.0)), (1, (0, 0, 2.0)), (2, (0, 0, 4.0)), (0, (0, 1, 2.0)), (1, (0, 1, 3.0)), (2, (0, 1, 10.0)), (0, (0, 2, 12.0)), (1, (0, 2, 15.0)), (2, (0, 2, 150.0)), (0, (1, 0, 4.0)), (0, (1, 1, 2.0)), (0, (1, 2, 2.0)), (0, (1, 3, 324.0)), (0, (1, 4, 23.0)), (1, (1, 0, 1.0)), (1, (1, 1, 3.0)), (1, (1, 2, 3.0)), (1, (1, 3, 333.0)), (1, (1, 4, 423.0)), (2, (1, 0, 23.0)), (2, (1, 1, 34.0)), (2, (1, 2, 12.0)), (2, (1, 3, 12.0)), (2, (1, 4, 0.0))]


<p>
<font size="3">
Now we can group the values of the RDD $merged$ obtained above by their key $j$.
We define a function $group()$ that returns an RDD obtained by grouping the values of the input RDD by their key.
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the functions $group()$ and execute the code**</font>
<hr style=" border:none; height:2px;">
</p>



In [69]:
'''
Returns an RDD where the values of the input RDD are grouped by their key.
'''
def group(merged):

    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Groups the element of the input RDD by key.
    R = merged.groupByKey()

    # Convert the result to a list for each key for easier processing
    R = R.mapValues(list)
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)

grouped = group(merged)

print("\n********** Representation for grouped ************\n")
L = grouped.collect()
print('[')
for l in L:
    print("(",l[0], ",", end='', sep='')
    print("[", end='')
    for el in l[1]:
        print(el, end="")
    print("],")
print(']')

######################################################################
# Note that in the output, each element is (j, L), where
# L is a list that contains all the elements in the j-th column of A
# and all the elements in j-th row of B
######################################################################

Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00

********** Representation for grouped ************

[
(0,[(0, 0, 1.0)(0, 1, 2.0)(0, 2, 12.0)(1, 0, 4.0)(1, 1, 2.0)(1, 2, 2.0)(1, 3, 324.0)(1, 4, 23.0)],
(1,[(0, 0, 2.0)(0, 1, 3.0)(0, 2, 15.0)(1, 0, 1.0)(1, 1, 3.0)(1, 2, 3.0)(1, 3, 333.0)(1, 4, 423.0)],
(2,[(0, 0, 4.0)(0, 1, 10.0)(0, 2, 150.0)(1, 0, 23.0)(1, 1, 34.0)(1, 2, 12.0)(1, 3, 12.0)(1, 4, 0.0)],
]


<p>
<font size="3">
Each element of the RDD $grouped$ obtained above is a key-value pair, where the key is the index $j$ and the value is a list $L$ containing all the triples corresponding to the elements of matrix $A$ in the $j-$th column  and the elements of matrix $B$  in the $j-$row, as follows:
<p>
<center>
$(0, i, a_{i, j})\ 0 \leq i \leq n-1 \quad (1, k, b_{j, k})\ 0 \leq k \leq p-1$
</center>
</p>
<p>
Remember that all triples corresponding to matrix $A$ have 0 as their first value, while those corresponding to matrix $B$ have 1.
</p>
<p>
From Equation (1), you can see that the product $a_{i, j} \cdot b_{j, k}$ contributes to the value $c_{i, k}$, for $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$.
Therefore, given the list $L$, we associate each value $a_{i, j} \cdot b_{j, k}$  to the pair $(i, k)$.
</p>
<p>
In other words, we now transform the RDD $grouped$ into an RDD where
each element is a key-value pair, where the key is $(i, k)$ and the value is $a_{i, j} \cdot b_{j, k}$.

</p>    
<p>
In the code below, the function $multiplyElements()$ is given that takes in a value $(j, L)$ of the RDD $grouped$
and returns a list, where each element is a pair $((i, k), a_{i, j} \cdot b_{j, k})$, $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$.  
</p>
</font>
</p>


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $groupProducts()$ that:
    <ul>
      <li> Takes in the RDD $grouped$.
      <li> Applies the function $multiplyElements()$ to each element of $grouped$.
      <li> Returns an RDD where each element is $((i, k), a_{i, j} \cdot b_{j, k})$, $0 \leq i \leq n-1$ and $0 \leq k \leq p-1$.
    </ul>
    Execute the code.**</font>
<hr style=" border:none; height:2px;">
</p>




In [70]:
import sys

'''
Multiplies each element from matrix A with each element from
matrix B in the list L (see description above).
Takes in: a value (j, L) of the RDD grouped.
Returns: a list where each element is ((i, k), a_ij * b_jk)
'''
def multiplyElements(value):
    j = value[0]
    L = value[1]

    '''
    The output key-value pairs.
    '''
    kv = []
    '''
    Maybe not necessary, we make sure that all triples with
    the first element 0 (those from matrix A)
    comes before any triple from matrix B.
    '''
    L = sorted(list(L))

    '''
    For convenience, we store the triples from matrix A
    and those from matrix B in two separate lists
    LA and LB.
    '''
    sep = 0
    while L[sep][0] == 0:
        sep += 1
    LA = [L[i] for i in range(0, sep)]
    LB = [L[i] for i in range(sep, len(L))]
    '''
    For each element (0, i, a_ij) in LA
    and each element (1, k, b_jk) in LB,
    we add the pair ((i, k), a_ij * b_jk) to kv.
    '''
    for a in LA:
        for b in LB:
            i = a[1]
            k = b[1]
            kv.append(((i, k), a[2]*b[2]))
    return kv

'''
Returns an RDD where each value is a pair ((i, k), a_ij * b_jk)
'''
def groupProducts(grouped):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Apply a transformation to the input RDD to get an RDD as described in the text.
    # Which tranformations are you going to use? map or flatMap?

    # The flatMap will transform each element of the grouped RDD
    # using the multiplyElements function, which returns a list of key-value pairs
    R = grouped.flatMap(multiplyElements)
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)

print("\n********** Representation for multipliedElements ************\n")
multipliedElements = groupProducts(grouped)
print(multipliedElements.collect())

Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00

********** Representation for multipliedElements ************

[((0, 0), 4.0), ((0, 1), 2.0), ((0, 2), 2.0), ((0, 3), 324.0), ((0, 4), 23.0), ((1, 0), 8.0), ((1, 1), 4.0), ((1, 2), 4.0), ((1, 3), 648.0), ((1, 4), 46.0), ((2, 0), 48.0), ((2, 1), 24.0), ((2, 2), 24.0), ((2, 3), 3888.0), ((2, 4), 276.0), ((0, 0), 2.0), ((0, 1), 6.0), ((0, 2), 6.0), ((0, 3), 666.0), ((0, 4), 846.0), ((1, 0), 3.0), ((1, 1), 9.0), ((1, 2), 9.0), ((1, 3), 999.0), ((1, 4), 1269.0), ((2, 0), 15.0), ((2, 1), 45.0), ((2, 2), 45.0), ((2, 3), 4995.0), ((2, 4), 6345.0), ((0, 0), 92.0), ((0, 1), 136.0), ((0, 2), 48.0), ((0, 3), 48.0), ((0, 4), 0.0), ((1, 0), 230.0), ((1, 1), 340.0), ((1, 2), 120.0), ((1, 3),

<p>
<font size="3">
Each element of the RDD $multipliedElements$ obtained above is $((i, k), a_{i, j} \cdot b_{j, k})$,
$0 \leq i \leq n-1$, $0 \leq j \leq m-1$, $0 \leq k \leq p-1$.
From Equation (1), we can see that each $c_{i, k}$ is obtained by summing up the products $a_{i, j} \cdot b_{j, k}$, $0 \leq j \leq m-1$.
<p>
Therefore, in order to obtain the matrix $C = A \times B$, the only thing that we need to do is to sum all
values $a_{i, j} \cdot b_{j, k}$ associated with the same key $(i, k)$.
</p>
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Complete the definition of the function $getResult()$ that:
    <ul>
      <li> Takes in the RDD $multipliedElements$.
      <li> Transforms the input RDD into one where each element is $((i, k), \sum\limits_{j=0}^{m-1} a_{i, j} \cdot b_{j, k})$
      <li> Returns the resulting RDD.
    </ul>
    Execute the code.We finally obtain the matrix $C$.**</font>
<hr style=" border:none; height:2px;">
</p>



In [71]:
def getResult(multipliedElements):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

    # Apply a transformation to the input RDD so that all values with the same key are summed.
    R = multipliedElements.reduceByKey(lambda x, y: x + y)
    return R

    ################## END MODIFICATIONS ##################

nice("A", A)
nice("B", B)
C = getResult(multipliedElements)
nice("C", C)

##############################################################
#YOU SHOULD OBTAIN THE FOLLOWING MATRIX C AS RESULT
# 98.00      144.00       56.00     1038.00      869.00
# 241.00      353.00      133.00     1767.00     1315.00
# 3513.00     5169.00     1869.00    10683.00     6621.00
##############################################################


Matrix  A
        1.00        2.00        4.00
        2.00        3.00       10.00
       12.00       15.00      150.00
Matrix  B
        4.00        2.00        2.00      324.00       23.00
        1.00        3.00        3.00      333.00      423.00
       23.00       34.00       12.00       12.00        0.00
Matrix  C
       98.00      144.00       56.00     1038.00      869.00
      241.00      353.00      133.00     1767.00     1315.00
     3513.00     5169.00     1869.00    10683.00     6621.00


<p>
<font size="3">
We now wrap every function that we implemented above in only one function $multiply()$ that we can use any time we need to multiply two matrices.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the code below to define the function $multiply()$**</font>
<hr style=" border:none; height:2px;">
</p>

In [72]:
def multiply(A, B):
    if shape(A)[1] != shape(B)[0]:
        sys.exit("The number of colums of A must equal the number of rows of B")
    A = transformA(A)
    B = transformB(B)
    C = merge(A, B)
    C = group(C)
    C = groupProducts(C)
    C = getResult(C)
    return C


# DATASET: **Deezer Ego Nets**

The dataset that we are going to use to test our implementation represents the ego-nets of Eastern European users collected from the music streaming service Deezer in February 2020. Nodes are users and edges are mutual follower relationships. The related task is the prediction of gender for the ego node in the graph.
<br><br>
Source: https://snap.stanford.edu/data/deezer_ego_nets.html
<br>
Paper: https://arxiv.org/abs/2003.04819
<br>
Github page: https://github.com/benedekrozemberczki/karateclub

In order to use the dataset, we first need to load the json file that contains the graphs. The nodes of each graph represent the users and the edges are mutual follower relationships.

In [None]:
import json
import networkx as nx
import numpy as np

# Load the data from the JSON file
with open('./deezer_ego_nets/deezer_edges.json') as f:
    data = json.load(f)

# Initialize an empty directed graph
graph = nx.DiGraph()

# Add edges to the graph
for edges in data.values():
    graph.add_edges_from(edges)

# Convert the first graph to an adjacency matrix
deezer_matrix = nx.to_numpy_array(graph)[0]

# Print the adjacency matrix
print(deezer_matrix)

[0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 0. 1. 1. 1. 1. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 0. 1. 0. 1. 1.
 1. 0. 1. 1. 1. 1. 1. 0. 1. 0. 1. 1. 1. 0. 0. 0. 1. 1. 1. 0. 1. 1. 1. 0.
 0. 0. 0. 1. 1. 1. 0. 0. 1. 1. 0. 0. 0. 0. 1. 0. 0. 0. 1. 0. 1. 0. 0. 0.
 0. 0. 1. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 1. 0. 1. 1.
 0. 1. 1. 0. 0. 1. 1. 0. 1. 0. 0. 0. 0. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 1. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1.
 0. 0. 0. 0. 0. 1. 0. 0. 1. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 1.
 1. 1. 1. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.

Now, we can run the operations we implemented on the generated matrix.

In [60]:
'''
nice("Deezer Matrix", deezer_matrix)

S = sum(deezer_matrix, deezer_matrix)
nice("Deezer Matrix + Deezer Matrix", S)

nice("3*Deezer Matrix", scalarMultiply(3, deezer_matrix))

M = multiply(deezer_matrix, deezer_matrix)
nice("Deezer Matrix ^ 2", scalarMultiply(3, deezer_matrix))
'''

'\nnice("Deezer Matrix", deezer_matrix)\n\nS = sum(deezer_matrix, deezer_matrix)\nnice("Deezer Matrix + Deezer Matrix", S)\n\nnice("3*Deezer Matrix", scalarMultiply(3, deezer_matrix))\n\nM = multiply(deezer_matrix, deezer_matrix)\nnice("Deezer Matrix ^ 2", scalarMultiply(3, deezer_matrix))\n'

# 6. PageRank
<p>
<font size="3">
One iteration of PageRank is described by the following equation:
<p>
<center>
$\pmb{v^{(k+1)}} = \beta M \pmb{v^{(k)}} + (1-\beta)\frac{\pmb{e}}{n}\quad\quad (4)$
</center>
</p>
where:
<ul>
<li> $0 \leq \beta \leq 1$ is the probability that a random surfer follows a link to go from one page to another.
<li> $1-\beta$ is the  probability that a random surfer jumps to a random page without following a link.
<li> $\frac{\pmb{e}}{n}$ is a vector with $n$ elements $1/n$.
</ul>
The value of $\beta$ is usually set to $0.8$, meaning that the random surfer is much more likely to visit pages by following links. Note that the simplified version of PageRank can be obtained with $\beta=1.0$.
</font>
</p>


# Function to test matrix is row stochastic

In [73]:
def makeRowStochastic(matrix):
    # Sum the elements of each row
    row_sums = matrix.map(lambda x: (x[0][0], x[1])).reduceByKey(lambda x, y: x + y)

    # Modify the matrix RDD to have the same key structure as row_sums for the join
    # Changing ((row, col), value) to (row, (col, value))
    matrix_modified = matrix.map(lambda x: (x[0][0], (x[0][1], x[1])))

    # Join the sums back with the modified matrix and normalize
    return matrix_modified.join(row_sums).map(lambda x: ((x[0], x[1][0][0]), x[1][0][1] / x[1][1]))


## Implementation of Page Rank

In [85]:
def calculate_pagerank(M, iterations=2, BETA = 0.85):

  # Initialize vector c (e/n)
  n = M.count()
  c = sc.parallelize([(i, 1.0 / n) for i in range(n)])
  v = sc.parallelize([(i, 1.0 / n) for i in range(n)])

  # PageRank iteration
  for i in range(iterations):  # Number of iterations
      v = scalarMultiply(BETA, M).union(scalarMultiply(1 - BETA, c))
      v = v.reduceByKey(lambda x, y: x + y)

  # Collect and print the final ranks
  for (link, rank) in v.collect():
      print(f"Page {link} has rank: {rank:.2f}")

In [86]:
M = loadMatrix('./data/matrix-m.txt')
nice('M', M)
# Joining the original matrix with the norms and normalizing
normalized_matrix = makeRowStochastic(M)
nice('Normalized M', normalized_matrix)
calculate_pagerank(normalized_matrix, iterations=2)

Matrix  M
        0.00        0.50        1.00        0.00
        0.33        0.00        0.00        0.50
        0.33        0.00        0.00        0.50
        0.33        0.50        0.00        0.00
Matrix  Normalized M
        0.00        0.33        0.67        0.00
        0.40        0.00        0.00        0.60
        0.40        0.00        0.00        0.60
        0.40        0.60        0.00        0.00
Page (0, 2) has rank: 0.57
Page (1, 3) has rank: 0.51
Page (2, 0) has rank: 0.34
Page 0 has rank: 0.01
Page 6 has rank: 0.01
Page 12 has rank: 0.01
Page (3, 0) has rank: 0.34
Page (3, 2) has rank: 0.00
Page 1 has rank: 0.01
Page 7 has rank: 0.01
Page 13 has rank: 0.01
Page (0, 0) has rank: 0.00
Page (2, 2) has rank: 0.00
Page (3, 3) has rank: 0.00
Page 2 has rank: 0.01
Page 8 has rank: 0.01
Page 14 has rank: 0.01
Page (1, 0) has rank: 0.34
Page (1, 2) has rank: 0.00
Page 3 has rank: 0.01
Page 9 has rank: 0.01
Page 15 has rank: 0.01
Page (1, 1) has rank: 0.00
Page (3, 1) 

In [87]:
M = loadMatrix('./data/spider-trap.txt')
nice('M', M)
# Joining the original matrix with the norms and normalizing
normalized_matrix = makeRowStochastic(M)
nice('Normalized M', normalized_matrix)
calculate_pagerank(normalized_matrix, iterations=10)

Matrix  M
        0.00        0.50        0.00        0.00
        0.33        0.00        0.00        0.50
        0.33        0.00        1.00        0.50
        0.33        0.50        0.00        0.00
Matrix  Normalized M
        0.00        1.00        0.00        0.00
        0.40        0.00        0.00        0.60
        0.18        0.00        0.55        0.27
        0.40        0.60        0.00        0.00
Page (0, 2) has rank: 0.00
Page (1, 3) has rank: 0.51
Page (2, 0) has rank: 0.15
Page 0 has rank: 0.01
Page 6 has rank: 0.01
Page 12 has rank: 0.01
Page (3, 0) has rank: 0.34
Page (3, 2) has rank: 0.00
Page 1 has rank: 0.01
Page 7 has rank: 0.01
Page 13 has rank: 0.01
Page (0, 0) has rank: 0.00
Page (2, 2) has rank: 0.46
Page (3, 3) has rank: 0.00
Page 2 has rank: 0.01
Page 8 has rank: 0.01
Page 14 has rank: 0.01
Page (1, 0) has rank: 0.34
Page (1, 2) has rank: 0.00
Page 3 has rank: 0.01
Page 9 has rank: 0.01
Page 15 has rank: 0.01
Page (1, 1) has rank: 0.00
Page (3, 1) 

In [88]:
M = loadMatrix('./data/matrix-dead.txt')
nice('M', M)
# Joining the original matrix with the norms and normalizing
normalized_matrix = makeRowStochastic(M)
nice('Normalized M', normalized_matrix)
calculate_pagerank(normalized_matrix, iterations=10)

Matrix  M
        0.00        0.50        0.00        0.00
        0.33        0.00        0.00        0.50
        0.33        0.00        0.00        0.50
        0.33        0.50        0.00        0.00
Matrix  Normalized M
        0.00        1.00        0.00        0.00
        0.40        0.00        0.00        0.60
        0.40        0.00        0.00        0.60
        0.40        0.60        0.00        0.00
Page (0, 2) has rank: 0.00
Page (1, 3) has rank: 0.51
Page (2, 0) has rank: 0.34
Page 0 has rank: 0.01
Page 6 has rank: 0.01
Page 12 has rank: 0.01
Page (3, 0) has rank: 0.34
Page (3, 2) has rank: 0.00
Page 1 has rank: 0.01
Page 7 has rank: 0.01
Page 13 has rank: 0.01
Page (0, 0) has rank: 0.00
Page (2, 2) has rank: 0.00
Page (3, 3) has rank: 0.00
Page 2 has rank: 0.01
Page 8 has rank: 0.01
Page 14 has rank: 0.01
Page (1, 0) has rank: 0.34
Page (1, 2) has rank: 0.00
Page 3 has rank: 0.01
Page 9 has rank: 0.01
Page 15 has rank: 0.01
Page (1, 1) has rank: 0.00
Page (3, 1) 