<center><span style="color:blue; font-family:Georgia;  font-size:2em;">
    <h1>PageRank</h1></span></center>
    <p> </p>
    <p> </p>
    <center><span style="color:blue; font-family:Georgia;  font-size:1em;">
    Ramon Béjar</span></center>
    <canvas id="myCanvas" width="200" height="100" style="border:0px solid"></canvas>
    <center>Data mining - Master on Computer Science</center>
    <center><img src="https://github.com/deinok/EXPLOTACIO-DE-DADES/blob/main/ASSIGNAMENT_1/M-UdL2.png?raw=1"  width="200" alt="UdL Logo"></center>

In this notebook, we study the PageRank algorithm used to rank (order) web pages by a ranking order based on the probability to reach a web page performing a "random walk" trough the web graph. The web graph we consider is the one where nodes represent web pages, and directed edges represent directed links (a link from page A to page B is represented as a directed edge from A to B in the web graph).

We focus on a simple version of Page Rank, where the update formula for the pagerank of a webpage does not consider the "dampling factor" that is actually considered in implementations of PageRank that try to take into account the problem of "dangling nodes" and "disconnected components".

Preliminary start-up code:

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [5]:
import pyspark
import os
import math
import random
import sys


spark_home = os.environ.get('SPARK_HOME', None)
print ( spark_home )
sc = pyspark.SparkContext('local[*]')
sc.setLogLevel('ERROR')
sc

None


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at /tmp/ipython-input-903549013.py:10 

# Random Walk

Consider a  **random walk** trough the following simple graph:


<div>
<img style="float: center;" src="https://github.com/deinok/EXPLOTACIO-DE-DADES/blob/main/ASSIGNAMENT_1/graph1-s0.png?raw=1" width ="300px"  />
</div>

Consider that at the beginning we start a random walk at any node with the same probability:

$$ \pi⁰ = [1/5,1/5,1/5,1/5,1/5] $$

What will happen after many steps of the random walk ? It seems like we should end with high probability at the central node $n0$.



## Random Walk Example 1

After one step, the probability to move to node $ni$ is updated as:

$$ \pi^1(i) = \sum_{nodes \ j \ that \ link \ to \ i} \pi^0(j) \frac{1}{odeg(j)}   $$

where $odeg(j)$ is the number of **outgoing edges from node j**. Then:

$$  \pi^1 = [(1/5+1/5+1/5+1/5),0,0,0,0] $$

Observe that we talk about the probability to move to a particular node, and because once we reach node $n0$ we will not move anymore, because this node has no "outgoing" edges, at this step the probability "to move" to node n0 is 4/5 (if we started at n0 at the beginning we did not move anymore at this step). This is the problem of what it is called "dangling nodes" (nodes that make the random walk to stop).

But the relevant point here is that if we repeat these steps many times, the final probability values we get can be used as a measure of the "relevance" of that node, because that value indicates how likely is the event that we reach that node of the graph after many steps (assuming no dangling nodes are in the graph).

## Random Walk Example 2, no dangling nodes

We can modify the example to make dissapear the problem of dangling nodes by make a self-loop in that node:

<div>
<img style="float: center;" src="https://github.com/deinok/EXPLOTACIO-DE-DADES/blob/main/ASSIGNAMENT_1/graph2-s0.png?raw=1" width ="500px"  />
</div>


Then, a random walk in this graph will never stop, and now the probability "of being" at the different nodes after 1 step is:

$$  \pi^1 = [1,0,0,0,0] $$

Thanks to the fact that now node $n_0$ has a link to itself, so the formula for this node after one step is:


$$  \pi^1(0) = \sum_{nodes \ j \ that \ link \ to \ 0} \pi^0(j) \frac{1}{odeg(j)} = (1/5+1/5+1/5+1/5+1/5) = 1$$


So, it is clear that from this time updating the formula will not modify the probabilities any more:

$$  \pi^2(0) = \sum_{nodes \ j \ that \ link \ to \ 0} \pi^1(j) \frac{1}{odeg(j)} = (1+0/5+0/5+0/5+0/5) = 1$$

and for any other node ni different from $n_0$:

$$ i \neq 0 \rightarrow \pi^2(i) = \sum_{nodes \ j \ that \ link \ to \ i} \pi^1(j) \frac{1}{deg(j)} = 0$$

as the other nodes do not have any incoming links

So, we say that we have reached an stationary distribution. And using the distribution as a way to "rank" the nodes, we can conclude that the most relevant node is the node  $n_0$.

### Random Walk Example 3, no dangling nodes

Ok, but in general graphs will have more links, and we will need to simulate "more steps" to reach an stable distribution. Consider this modified example:

<div>
<img style="center;" src="https://github.com/deinok/EXPLOTACIO-DE-DADES/blob/main/ASSIGNAMENT_1/graph3-s0.png?raw=1" width ="500px"  />
</div>

Initial distribution:
$$ \pi^{0} = [1/5,1/5,1/5,1/5,1/5] $$

After first step:

$$ \pi^1 = [1/5*(1/2)+(4/5),1/5*(1/2),0,0,0] = [0.9,0.1,0,0,0] $$



After second step:

$$ \pi^2 = [0.9*(1/2)+0.1,0.9*(1/2),0,0,0] = [0.55,0.45,0,0,0] $$

After third step:

$$ \pi^3 = [0.55*(1/2)+0.45,0.55*(1/2),0,0,0] = [0.725,0.275,0,0,0] $$

After fourth step:

$$ \pi^4 = [0.725*(1/2)+0.275,0.725*(1/2),0,0,0] = [0.6375,0.3625,0,0,0] $$

After fith step:

$$ \pi^5 = [0.6375*(1/2)+0.3625,0.6375*(1/2),0,0,0] = [0.6812,0.3187,0,0,0] $$

After sixth step:

$$ \pi^6= [0.6812*(1/2)+0.3187,0.6812*(1/2),0,0,0] = [0.6593,0.3406,0,0,0] $$

...

After 20th step:

$$ \pi^{20} = [0.6666,0.3333,0,0,0] $$

So, in this modified example the rank is distributed among the first two vertices (but still the most relevant is the first one). But observe that we need some more iterations to reach the stable distribution!

# The PageRank Algorithm

So, the PageRank algorithm for a directed graph with no dangling nodes is simply the iterated update of the previous equation that computes the probability of being on the different nodes after a random walk of many steps:

At iteration t+1, the rank vector $\pi^{t+1}$ is updated as:
            
$$  \pi^{t+1}(i) = \sum_{nodes \ j \ that \ link \ to \ i} \pi^t(j) \frac{1}{odeg(j)}  $$

When do we stop iterating ?
- The best option would be to stop when we reach a point where the solution is almost stable (no significant changes from $ \pi^t$ to $ \pi^{t+1}$).
- We can also use some maximum number of iterations.

## High-level pseudo-code of the PageRank algorithm

>LinksRDD = { (j, $[i_1,i_2,...,i_k]$)  | there is a link from j to $i_{l}$ }  
>$\pi^0$ := [1/n,1/n,...,1/n]  
>RankRDD = { $(j,\pi^0[j])$  |  for  all  the  nodes  j }  
>t=0  
>While ( *rank vector $\pi^t$ not stable* ) do:
>>// Join linksRDD with RankRDD to get pairs of the kind:  
>>// { ( j , ( [i1,i2,...,ik], $\pi^t[j]$ )) | for any node j }  
>>LinksRankInfoRDD = LinksRDD.join( RankRDD )  
>>// compute all the contributions from  node j to target node i  
>>flatContRDD = $\{ (i, \pi^t(j) \frac{1}{odeg(j)}  ) \ | \ for \ any \ edge \ j\rightarrow i \} $  
>>// Reduce all the elements with the same key i  
>>// to get $ \pi^{t+1}[i] = \sum_{j \ edge \ to \ i}  \pi^t(j) \frac{1}{odeg(j)} $  
>>RankRDD = flatContRDD.ReduceByKey( x,y : x+y )  
>>t=t+1

# PageRank Implementation in map-reduce

To implement the algorithm with map-reduce, we need to mantain two main RDDs:
1. The main one, **that will not change** (LinksRDD), represents the set of all the links $ j \rightarrow i $ for any node j. That is, a suitable representation of the graph. We will using key,value records of this kind:
    $$ (j, [i_1,i_2,...,i_k]) $$
   that indicate that there is a link from node j to any node in the list
2. The second one is the rank vector (RankRDD), that will be updated with a new one in each iteration (from the point of view of spark the RDD for $   \pi^{t+1} $ will be created and the one for  $   \pi^{t} $   will be eliminated).

In [None]:
# Example 1, with a dangling node
LinksRDD = sc.parallelize([(0, []), (1, [0]), (2, [0]), (3, [0]), (4, [0])])
RankRDD = sc.parallelize([(0, 1/5), (1, 1/5), (2, 1/5), (3, 1/5), (4, 1/5)])

In [None]:
# Example 2
LinksRDD = sc.parallelize([(0, [0]), (1, [0]), (2, [0]), (3, [0]), (4, [0])])
RankRDD = sc.parallelize([(0, 1/5), (1, 1/5), (2, 1/5), (3, 1/5), (4, 1/5)])

In [None]:
# Example 3
LinksRDD = sc.parallelize([(0, [0, 1]), (1, [0]), (2, [0]), (3, [0]), (4, [0])])
RankRDD = sc.parallelize([(0, 1/5), (1, 1/5), (2, 1/5), (3, 1/5), (4, 1/5)])

In [None]:
# Example 4, two disconnected subgraphs
LinksRDD = sc.parallelize([(0, [1]), (1, [2]), (2, [0]), (3, [4]), (4, [5]), (5, [3])])
RankRDD = sc.parallelize([(0, 1/6), (1, 1/6), (2, 1/6), (3, 1/6), (4, 1/6), (5, 1/6)])

## Exercise, implement PageRank

Implement PageRank using the next function to map a record of the type
(j,  (list of nodes to which j points to, current rank of j))

to a flat list of records of the type:

( i, rank(j)/odeg(j) )

where there is a node FROM j TO i

In [None]:
def getContributions(linksAndRank):
    j = linksAndRank[0]
    targets = linksAndRank[1][0]
    rankj = linksAndRank[1][1]
    odegj = float(len(targets))
    # Iterate over all pages i with a link from j (j -> i)
    return [ (i, rankj / odegj) for i in targets]

In [None]:
print ("time 0:", RankRDD.collect())
iter = 1
while (iter <= 10):
    LinksRankInfo = LinksRDD.join(RankRDD)
    flatContributions = LinksRankInfo.flatMap(getContributions)
    RankRDD = flatContributions.reduceByKey(lambda x, y : x + y)
    print ( "time ", iter, ":", RankRDD.collect() )
    iter = iter + 1

time 0: [(0, 0.2), (1, 0.2), (2, 0.2), (3, 0.2), (4, 0.2)]
time  1 : [(0, 0.8)]
time  2 : []
time  3 : []
time  4 : []
time  5 : []


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

# ASSIGNAMENT 1

## STEP 1
You must implement in the right way the PageRank algorithm, and include plenty of explanatory comments in each step of your functions. But you must implement the version actually introduced by google, that considers a "damping" factor. This time, at each step with probability d the random walk proceeds in the regular way, but with probability (1-d) you jump randomly to any node of the graph. In that case, the update equation at time t+1 is equal to:
πt+1(i)=1−dN+d⋅(∑nodes j that link to iπt(j)1odeg(j))

In [None]:
def getContributionsWithDamping(linksAndRank):
  j = linksAndRank[0]
  targets = linksAndRank[1][0]
  rankj = linksAndRank[1][1]
  odegj = float(len(targets))
  # Iterate over all pages i with a link from j (j -> i)
  if odegj == 0: return []
  return [ (i, rankj / odegj) for i in targets]

def compute_pagerank(LinksRDD: pyspark.rdd.RDD, d=0.85, max_iter=10):
  print("LinksRDD:", LinksRDD.collect())

  # Set of all nodes (ensures nodes with no in-links still get a rank)
  NodesRDD = LinksRDD.keys() \
    .distinct()

  # N = number of nodes
  N = NodesRDD.count()

  # If no numbers exist, early return
  if N == 0:
    return LinksRDD.context.parallelize([])

  # Uniform initialization: sum(ranks) = 1
  RankRDD = NodesRDD \
    .map(lambda i: (i, 1.0 / N))

  print("time 0:", RankRDD.collect())
  iter = 1
  while (iter <= max_iter):
    # Join LinksRDD wit RankRDD to get pairs of the same kind
    LinksRankInfo = LinksRDD.join(RankRDD)

    # Distribute the rank of each page to its outgoing links
    flatContributions = LinksRankInfo \
      .flatMap(getContributionsWithDamping)

    # Calculate the sum of contributions for each node
    sumContributions = flatContributions \
      .reduceByKey(lambda x, y : x + y)

    # Apply the damping factor
    RankRDD = sumContributions \
      .mapValues(lambda x: ((1 - d) / N) + (d * x))

    print("time ", iter, ":", RankRDD.collect())
    iter = iter + 1
  return RankRDD

In [None]:
RankRDD = compute_pagerank(LinksRDD, d=0.85, max_iter=10)


LinksRDD: [(0, []), (1, [0]), (2, [0]), (3, [0]), (4, [0])]
time 0: [(0, 0.2), (2, 0.2), (4, 0.2), (1, 0.2), (3, 0.2)]
time  1 : [(0, 0.7100000000000001)]
time  2 : []
time  3 : []
time  4 : []
time  5 : []


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

## STEP 2
You must load the data of the algorithm in RDDs (or spark dataframes if you prefer) by implementing a function that takes the jsonl files as input, and then it creates in a distributed way (with spark transformations) three distributed datasets: LinksRDD, namesRDD, and initial RankRDD, that are needed by your pagerank algorithm. The namesRDD shoud be a dataset of (key,value) pairs, where each key is a vertex integer identifier and the value is the corresponding name of the researcher.

In [20]:
import json

namesRDD = sc.textFile("netscience-v.jsonl") \
  .map(lambda x: json.loads(x)) \
  .flatMap(lambda x: [(v, k) for k, v in x.items()])
print(namesRDD.collect())

[(1, 'ABRAMSON, G'), (2, 'KUPERMAN, M'), (3, 'ACEBRON, J'), (4, 'BONILLA, L'), (5, 'PEREZVICENTE, C'), (6, 'RITORT, F'), (7, 'SPIGLER, R'), (8, 'ADAMIC, L'), (9, 'ADAR, E'), (10, 'HUBERMAN, B'), (11, 'LUKOSE, R'), (12, 'PUNIYANI, A'), (13, 'AERTSEN, A'), (14, 'GERSTEIN, G'), (15, 'HABIB, M'), (16, 'PALM, G'), (17, 'AFRAIMOVICH, V'), (18, 'VERICHEV, N'), (19, 'RABINOVICH, M'), (20, 'AGRAWAL, H'), (21, 'AHUJA, R'), (22, 'MAGNANTI, T'), (23, 'ORLIN, J'), (24, 'AIELLO, W'), (25, 'CHUNG, F'), (26, 'LU, L'), (27, 'ALBA, R'), (28, 'ALBERICH, R'), (29, 'MIROJULIA, J'), (30, 'ROSSELLO, F'), (31, 'ALBERT, R'), (32, 'ALBERT, I'), (33, 'NAKARADO, G'), (34, 'BARABASI, A'), (35, 'JEONG, H'), (36, 'ALBERTS, B'), (37, 'BRAY, D'), (38, 'LEWIS, J'), (39, 'RAFF, M'), (40, 'ROBERTS, K'), (41, 'WATSON, J'), (42, 'ALDANA, M'), (43, 'ALDOUS, D'), (44, 'PITTEL, B'), (45, 'ALEKSIEJUK, A'), (46, 'HOLYST, J'), (47, 'STAUFFER, D'), (48, 'ALLARIA, E'), (49, 'ARECCHI, F'), (50, 'DIGARBO, A'), (51, 'MEUCCI, R'), (52