<h1><center>Cloud Computing  et informatique distribuée</center></h1>
<h2>
<hr style=" border:none; height:3px;">
<center>Exercises: TP 2 on Spark - Part 1</center>
<hr style=" border:none; height:3px;">
</h2>

In [None]:
import pyspark
import random
sc = pyspark.SparkContext(appName="tp1")
print("Initialization successful")

# 1. PageRank

<p>
<font size="3">
**PageRank** is the algorithm used by the Google search engine to assign each Web page a numeric score (in short, _PR-score_) representing its importance. Google uses the PR-score to rank the Web pages returned in response to a search; the most important pages come first.

<p>    
The PR-score of a Web page $p$ depends on the number 
of Web pages linking to $p$ and on their PR-score. 
Stated otherwise, a Web page that is linked by many important Web pages is itself considered as important.
</p>
</font>
</p>

<p>
<font size="3">
PageRank is an iterative algorithm. At the beginning, all the $n$ Web pages have the same PR-score ($1/n$). 
At each step, the PR-score of each Web page $p$ is modified based on the PR-score of the pages linking to $p$. 
The algorithm stops when the PR-scores of all pages converge (i.e., they don't change anymore from one iteration to the next).

<p>
We're now going to implement the simplified version of PageRank and we'll see why this doesn't always work.
</p>
</font>
</p>   
   
<p>
<font size="3">
PageRank simulates the way a random surfer would browse the Web by 
randomly choosing the links to follow at any given page.
Referring to Figure 1, 
a random web surfer at page $A$ would choose to go 
either to page $B$, $C$ or $D$ with probability $1/3$, because 
there are three links leaving $A$. 
</font>
</p>

<p>
<font size="3">
There is no way a random surfer will visit $D$ from $C$ because the latter 
does not link to the former.
Similarly, a random surfer at $B$ would go to 
$A$ or $D$ with probability $1/2$, 
because $B$ has two outgoing links.
</font>
</p>

<p>
<font size="3">
By iterating many times these _random walks_
across the graph, PageRank simulates the behavior of multiple random 
surfers; the pages that receive a larger  number of visits are considered
more important than the ones that are visited only rarely.
</font>
</p>

<p>
<font size="3">
These transition probabilities at each node can be described 
in a $n\times n$ _transition matrix_ $M$, 
where $n$ is the number of nodes
in the graph and the element $m_{i, j}$ is the 
probability that a random surfer moves from $j$ to $i$.
</font>
</p>

<p>
<font size="3">
Assuming that the nodes are in alphabetical 
order (first row/column correspond to $A$, second row/column correspond to $B$ and so on), 
the transition matrix corresponding to the graph represented in 
Figure 1 is as follows:

<center>    
$M = 
\begin{bmatrix}
    0 & 1/2 & 1 & 0  \\
    1/3 & 0 & 0 & 1/2  \\
    1/3 & 0 & 0 & 1/2  \\
    1/3 & 1/2 & 0 & 0  
\end{bmatrix}$
</center>
</font>
</p>

<p>
<font size="3" color='#91053d'>**Execute the following code to initialize some of the functions for manipulating data that we have explored and used in the last TD.**</font>
</p>
<hr style="border:2px solid;">




# Nuova sezione

In [None]:
'''
Loads a matrix from a file.
INPUT: 
     the name of the input file
OUTPUT:
     an RDD containing the matrix
'''
def loadMatrix(filename):
    # Load the file into an RDD matrix
    matrix = sc.textFile(filename)
    # Splits each line. Each element is a list [nbRow, e1, e2, ..., ej]
    matrix = matrix.map(lambda line : line.split(' '))
    # Convert each element to a number (the first is an integer, the others are float)
    matrix = matrix.map(lambda row: [int(row[0])] + [float(row[i]) for i in range(1, len(row))])
    # Get an RDD where each element is a key-value pair ((row, col), element)
    matrix = matrix.flatMap(lambda row: [((row[0], j-1), row[j]) for j in range(1, len(row))])
    return matrix

'''
Returns the number of rows and colums of the matrix
INPUT: 
    An RDD representing a matrix
OUTPUT: 
    the size of the matrix as (nbRows, nbCols)
'''
def shape(matrix):
    M = collect(matrix)
    if len(M) == 0:
        return (0, 0)
    else:
        return (len(M), len(M[0]))

'''
Returns a matrix represented as a list of lists.
INPUT: 
    an RDD representing a matrix
OUTPUT: 
    the matrix represented as a list of lists.
'''
def collect(matrix):
    # Obtain an RDD, where the key is the row identifier and the value is (colId, element)
    matrix = matrix.map(lambda x: (x[0][0], (x[0][1], x[1])))
    # Groups all the values in a row.
    matrix = matrix.groupByKey()
    # Sorts the element by row identifier.
    matrix = matrix.sortByKey()
    # Sort the elements by column identifier.
    matrix = matrix.map(lambda x: sorted(list(x[1])))
    # Now obtain an RDD, where each element is a list containing the elements of a row.
    matrix = matrix.map(lambda row: [x[1] for x in row])
    # Finally, return the RDD as a Python list.
    return matrix.collect()
    
'''
Prints the matrix in a nice way.
INPUT: 
    the name of the matrix (var) and the matrix in the form of an RDD.
OUTPUT:
    - no output- it simply prints (shows) the matrix representation of the input
'''
def nice(var, matrix):
    # Obtain a representation of the matrix as a Python list.
    M = collect(matrix)
    # Print the name of the matrix
    print("Matrix ", var)
    # Print the matrix and format the output nicely
    print('\n'.join([''.join(['{:12.2f}'.format(item) for item in row]) 
      for row in M]))
    
def multiply(A, B):
  # lambda ((i, j), v): (j, (i, v))
  left = A.map(lambda e: (e[0][1], (e[0][0], e[1])))
  # lambda ((j, k), w): (j, (k, w))
  right = B.map(lambda e: (e[0][0], (e[0][1], e[1])))
  productEntries = left.join(right)
  # lambda (x, ((i, v), (k, w))): ((i, k), (v * w))
  productEntries = productEntries.map(lambda e: ( (e[1][0][0], e[1][1][0]), (e[1][0][1] * e[1][1][1]) ) )\
                  .reduceByKey(lambda x,y: x+y)
  return productEntries

### <strong> Exercise 1.</strong> Matrix visualization (2 points)

<p align="justify">
Complete the code below to load the matrix $M$ from the file matrix-m.txt_ and display the matrix calling the function $nice()$.
</p>

In [None]:
################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################

# Get the matrix from file.
M = 
nice("M", M)
################## END MODIFICATIONS ##################





<p>
<font size="3">    
When a random surfer starts her walk, she can be anywhere in the graph. 
If we have no reason to believe that she would be more likely to choose one 
page over another one as her starting point, we can say that the initial 
probability of the surfer of being at a certain page is $1/n$. 
</font>
</p>

<p>
<font size="3">    
The probability distribution of the position of the surfer can be described 
by a column vector $\pmb{v^{(0)}}$ with $n$ elements, which in our example looks like as 
follows:
<center>
$
\pmb{v^{(0)}} = 
\begin{bmatrix}
    1/4  \\
    1/4 \\
    1/4 \\
    1/4 
\end{bmatrix}
$
</center>
</font>
</p>



### <strong> Exercise 2.</strong> Initialization (2 points)

<p align="justify">
Implement the function $initialization()$ that:
    <ul>
     <li> Takes in a matrix $M$.
     <li> Returns the vector $v_0$. $v_0$ must be an RDD, in the same way as the matrix $M$.
    </ul>
</font>
</p>

In [None]:
'''
Initialization of the vector v0
'''
def initialize(M):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Get the number n of rows of the input matrix

    
    # Create the column vector having n elements equal to 1/n
    # The vector must be represented in the same way as the matrix M.
    # First, create a list L in Python where each element is ((i, 0), 1/n) for i=0, ..., n-1
    L =
    
    # Then tranforms this list into an RDD (remember the transformation parallelize...)
    L = 
    
    return L
    
    ################## END MODIFICATIONS ##################

v0 = initialize(M)
nice("v0", v0)

<p>
<font size="3">
We now want to compute the vector $\pmb{v^{(1)}}$
that gives 
the distribution probability of the position of the surfer after 
one iteration.
The probability $\pmb{v^{(1)}}_i$
that the surfer will be at node $i$ after the
first iteration is expressed as follows:
<p>
<center>
$\pmb{v^{(1)}}_i = \sum \limits_{j=1}^n m_{i, j} \pmb{v^{(0)}}_j$
</center>
</p>
where $m_{i,j}$ is the probability that the surfer 
moves from node $j$ to node $i$ and 
$\pmb{v^{(0)}}_j$ is the 
probability that the surfer is at node $j$ at the iteration 0.
</font>
</p>

<p>
<font size="3">
Therefore, $\pmb{v^{(1)}}$ can be obtained by multiplying $M$ with $\pmb{v^{(0)}}$ :
<p>
<center>
$\pmb{v^{(1)}} = M \cdot \pmb{v^{(0)}}$
</center>
</p>
</font>
</p>





### <strong> Exercise 3.</strong> $performStep()$ function (8 points)
<p align="justify">

 <font  size="3">Implement the function $performStep()$ that:
    <ul>
     <li> Takes in a matrix $M$ and a vector $v$
     <li> Returns the vector $M \cdot v$. 
    </ul>
</font>
</p>

In [None]:
'''
Performs a step of the PageRank algorithm
This function returns a vector obtained by multiplying 
matrix M with vector v.
'''
def performStep(M, v):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Multiply M and v. We already defined the function multiply() above to multiply two matrices.
    # Use it :)
    R = 
    
    return R
    ################## END MODIFICATIONS ##################


v1= performStep(M, v0)
nice("v1", v1)

##############################################################
#
# YOU SHOULD OBTAIN THE FOLLOWING VALUES:
#  0.38
#  0.21
#  0.21
#  0.21
#
##############################################################

<p>
<font size="3">
If we want to obtain the 
probability distribution $\pmb{v^{(i)}}$ after
$i$ iterations, we compute:
<p>
<center>
$\pmb{v^{(i)}} = M \cdot \pmb{v^{(i-1)}}$
</center>
</p>
</font>
</p>

<p>
<font size="3">
The algorithm stops at the iteration $k$ where $\lvert \pmb{v^{(k-1)}} - \pmb{v^{(k)}} \rvert < \epsilon$, $\epsilon$ being an arbitrarily small constant.
</font>
</p>

<p>
<font size="3">
We now wrap up all the functions that we defined, to obtain an implementation of PageRank.
The code below defines a function $SimplifiedPageRank()$ that takes in a matrix, initializes the vector $v_0$
and calls a function $SimplifiedPageRankStep()$ that recursively performs a step until convergence.
</font>
</p>





### <strong> Exercise 4.</strong> $converge()$ function (8 points)

<p align="justify">
 <font  size="3">Implement the function $converge()$ that:
    <ul>
     <li> Takes in two vectors $v_{prev}$ and $v_{next}$.
     <li> Returns whether $\lvert v_{prev} - v_{next} \rvert < \epsilon$
    </ul>
    Follow the comments in the code for the implementation.</font>
</p>

In [None]:
# Variable eps
eps = 0.001

'''
Returns true if |v_prev - v_next| < eps
'''
def converge(v_prev, v_next):
    ################## COMPLETE HERE FOLLOWING THE INSTRUCTIONS ##################
    
    # Put together v_prev and v_next
    v = 
    
    # Each element of the RDD v is a key-value pair ((i, 0), e), where e is an element of 
    # either the vector v_prev or v_next, for i = 0, .., n-1
    # If you don't believe it, print it :)
    # We now want to obtain a new RDD from v, such that for any pair ((i, 0), e1), ((i, 0), e2) 
    # of elements of v, we obtain an element ((i, 0), |e1 - e2|).
    # In practice, the new RDD represents the vector |v_prev-v_next|.
    # Which transformation are you going to apply on v?
    v = 
    
    # Now we want to obtain a new RDD by keeping only the elements from v that are greater than eps.
    # Which transformation are you going to apply on v?
    v = 
    
    # Finally counts the number of elements left in v. Which actions are you going to apply on v?
    c = 

    # If no element is greater than eps, then the algorithm converged.
    return c == 0
    
    ################## END MODIFICATIONS ##################
    
    
'''
One iteration of PageRank.
Calls recursively itself until convergence.
'''
def SimplifiedPageRankIteration(M, v):
    v_next = performStep(M, v)
    if converge(v, v_next):
        return v_next
    else:
        return SimplifiedPageRankIteration(M, v_next)

'''
Initializes the vector v0 and starts the iterations.
'''
def SimplifiedPageRank(M):
    (n, m) = shape(M)
    v0 = sc.parallelize([((i, 0), 1./n) for i in range(m)])
    return SimplifiedPageRankIteration(M, v0)

M = loadMatrix("matrix-m.txt")
nice("M", M)
pr = SimplifiedPageRank(M)
nice("PageRank for M", pr)
############################################################## 
# YOU SHOULD OBTAIN THE FOLLOWING VECTOR FOR M AS RESULT
# 0.33
# 0.22
# 0.22
# 0.22
##############################################################


M1 = loadMatrix("matrix-m1.txt")
nice("M1", M1)
pr = SimplifiedPageRank(M1)
nice("Page Rank for M1", pr)


############################################################## 
#YOU SHOULD OBTAIN THE FOLLOWING VECTOR PAGE_RANK AS RESULT
# 0.22
# 0.44
# 0.33
##############################################################
    