# Final Practical Work - PageRank algorithm 
<p><b>Name:</b> Pablo Yuste Ramos</p>
<p><b>NIA:</b> 100406674</p>

<b>*</b>  I have decided to include all my clarifications in markdown cells. </p>

In [0]:
import pandas as pd
import re

In [0]:
from pyspark.sql.types import *

For this 2 cells bellow we are importing and creating the pyspark contexts and session. <b>When we run this cells once, they cannot be ran again</b>. It would give us an error because the session and contexts are already created and it is not neccessary to do it again. That is why some of the lines are commented.

In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#sc = SparkContext('local')
#spark = SparkSession(sc)


In [0]:
#from pyspark.sql import SQLContext
#sqlContext = SQLContext(sc)

Now we are going to <b>load the dataset</b> that we will be using during the whole project. We get this datset from the cloud using the following code:

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [0]:
%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet

In [0]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

In [0]:
display(wikipediaDF)

In [0]:
wikipediaDF.count()

<p>Since the dataset is so huge (more than 5 million rows), <b>we need to use a sampled version</b> in order to develop our functions and algorithms without running into ridiculous expensive computations.</p> 
<p>It is also really important to set the seed with that value. I will explain in my <b>Final Conclusions</b> why.</p>

In [0]:
PartialWikipediaDF=wikipediaDF.sample(fraction=0.0001,seed=122370982).cache()

As we can see using this line, our sampled dataset has just <b>602 observations</b> or rows.

In [0]:
PartialWikipediaDF.count()

In [0]:
display(PartialWikipediaDF)

Here <b>the function parse_links</b> is created. This function takes as input the body of a document and returns the links present in that text. Using the library "re" that we have imported at the beginning of the notebook this is easily done.

In [0]:
def parse_links(document_body):
  data=re.findall(r'\[\[(.+?)\]\]',document_body)
  if (len(data)>0):
    links=[s.lower() for s in data]
  else:
    links=[]
  return links

In [0]:
from pyspark.sql.types import ArrayType, StringType,LongType

In [0]:
parse_links_udf = udf(parse_links,ArrayType(StringType()))

We create <b>the function tolower</b> that we will implement when analysing the links and titles of our dataset in orther to avoid possible errors.

In [0]:
tolower_udf= udf(lambda x: x.lower())

Here we create a dataframe called <b>data</b> where we choose the columns from our previous sampled dataset that we are going to use (title,id,text). Using <b>the function tolower</b> in the title column to avoid possible errors further on. 

<p>We are also going to turn the text from our sampled data into a list containing the links present in that document body using <b>the function parse_links</b> that I described before</p>

In [0]:
data=PartialWikipediaDF.select(tolower_udf(PartialWikipediaDF["title"]).alias("title"),PartialWikipediaDF["id"],parse_links_udf(PartialWikipediaDF["text"]).alias("links")).cache()

In [0]:
display(data)

Now we create the pandas datagrame <b>page_index</b> which consists of every tilte in the whole dataset and its corresponding id. We need this information in order to create our next function (<b>titles_to_id</b>)

In [0]:
intermediate_value=wikipediaDF.select(tolower_udf(wikipediaDF["title"]).alias("title"),wikipediaDF["id"])

In [0]:
pages_index=intermediate_value.toPandas()

In [0]:
pages_index

<b>The function titles_to_id</b> uses the information in the <b>pages_index</b> pandas dataframe to convert a list of given links into a new list with their corresponding ids, this way we can visualize better our data.

In [0]:
broadcastVar = sc.broadcast(pages_index)

In [0]:
def titles_to_id(links):
  data_titles=broadcastVar.value
  if (len(links)>0):
    ids=data_titles[data_titles.title.isin(links)].id.to_list()
  else:
    ids=[]
  return ids

In [0]:
titles_to_id_udf=udf(titles_to_id,ArrayType(LongType()))

We use <b>the function titles_to_id</b> to turn <b>data</b> into <b>Partialdirect_linksDF</b>. In this new dataframe we will have 2 coulmns:
<p>
<ol>
  <li><b>"id":</b> The ids of every web page in our sampled data (<b>PartialWikipediaDF</b>)</li>
  <li><b>"links_id":</b> For each web page in <b>"id"</b> we have a list of the web pages it is pointing to (its <b>forward links</b>). These web pages are also represented with their id</li>
</ol>
</p>

In [0]:
Partialdirect_linksDF=data.select(data["id"],titles_to_id_udf(data["links"]).alias("links_id")).cache()

In [0]:
display(Partialdirect_linksDF)

Using a simple <b>count_links</b> function we can create another dataframe called <b>PartialDirect_links_countingDF</b> in which we turn the <b>"links_id"</b> column above into an integer column with the number of forward links for each web page in <b>"id"</b>. This column will be called <b>"count"</b>

In [0]:
def count_links(links):
  return len(links)

In [0]:
count_links_udf=udf(count_links,LongType())

In [0]:
PartialDirect_links_countingDF=Partialdirect_linksDF.select(Partialdirect_linksDF["id"],count_links_udf(Partialdirect_linksDF["links_id"]).alias("count")).cache()

In [0]:
display(PartialDirect_links_countingDF)

<b>PartialDirect_links_countingDF</b> will be really important to calculate the final value of the PageRank. 
<br>
<br>
That is why, in order to be able to use this information further on, we are going to turn it into a pandas dataframe called <b>PartialDirect_links_countingPDF</b>

In [0]:
PartialDirect_links_countingPDF=PartialDirect_links_countingDF.toPandas()

Now we are going to turn the <b>Partialdirect_linksDF</b> into a new dataframe with the following information:
<p>
<ol>
  <li><b>"links":</b> The id of all the web pages in the <b>forward links</b> of <b>Partialdirect_linksDF</b></li>
  <li><b>"connections":</b>A list of the web pages from which you can access the web page in the <b>"links"</b> column</li>
</ol>
</p>
<br>
Basically we have to reverse the information in <b>Partialdirect_linksDF</b>. To do so, we are going to follow the next steps:

<p>
<ol>
  <li>First we are going to turn the <b>Partialdirect_linksDF</b> dataframe into <b>RDD</b> (<b>Partialdirect_linksRDD</b>)</li>
  <li>Then we create <b>the reverse_data function</b>, that for a given row of <b>Partialdirect_linksRDD</b> it returns a list of tuples (web page, forward link)</li>
  <li>With that information we can reverse <b>Partialdirect_linksRDD</b> into <b>Partialreverse_linksRDD</b></li>
  <li>Lastly we can turn <b>Partialreverse_linksRDD</b> into a dataframe (<b>Partialreverse_linksDF</b>)</li>
</ol>
</p>
<br>

<p><b>*</b> When we create the column <b>"connections"</b> it is very important to use <b>list(set())</b> to avoid duplicates</p>

<p><b>*</b> When we pass from <b>RDD</b> to <b>DF</b> instead of using <b>toDF()</b> we have decided to implement a <b>SQLContext</b>, define the schema of the new dataframe and use <b>sqlContext.createDataFrame()</b> instead in orther for our program to work faster</p>

In [0]:
Partialdirect_linksRDD=Partialdirect_linksDF.rdd

In [0]:
def reverse_data(row):
  id = row.id
  links = row.links_id
  reverse = [ (tgt_id,id) for tgt_id in links ]
  return reverse

In [0]:
Partialreverse_linksRDD= ( Partialdirect_linksRDD
                          .flatMap(reverse_data)
                          .groupByKey()
                          .map(lambda d: (d[0],list(set(d[1]))))
                          )

In [0]:
schema = StructType([StructField('link', LongType()),StructField('connections', ArrayType(LongType()))])
Partialreverse_linksDF = sqlContext.createDataFrame(Partialreverse_linksRDD,schema).cache()

In [0]:
display(Partialreverse_linksDF)

# Implementing the PageRank algorithm
<br>
<p> Finally we have all the information we need, now we can start the implementation of the PageRank algorithm

In this cell we create the initial value of the PageRank, to do so, we need to follow the next steps:

<p>
<ol>
  <li>Assign the value 0.85 as <b>the Damping factor</b></li>
  <li>Then we create a <b>current_pr</b> dataframe with just one column (<b>"link"</b> column of <b>Partialreverse_linksDF</b>)</li>
  <li>We turn the dataframe into pandas (<b>current_pr_pd</b>) and we add a new column setting the PageRank of every web page to <b>d/N</b> where N is the total number of webpages and d is the Damping factor</li>
</ol>
</p>

In [0]:
import numpy as np

#Damping factor
d=0.85 

#Setting up the Current Page Rank Data Frame
current_pr=Partialreverse_linksDF.select(Partialreverse_linksDF["link"])
current_pr_pd=current_pr.toPandas()
N=current_pr.count()

#We add the initial value of the Page Rank which d/N for every web page
current_pr_pd["pagerank"]=pd.Series(d/N*np.ones(N))

#We save the value of N in order to use it in the next function
N_dv=sc.broadcast(N)

#We also need the information about the number of outgoing links for each web page
DirectLinksCounter_BV=sc.broadcast(PartialDirect_links_countingPDF)
  
  

Now we describe the PageRank function. The value of the PageRank follows the next formula:

<b>$$PageRank' \ (a) \ = \left ( 1-d \right ) \ · \sum_{x \ \in \ A}\left (  {\frac{PageRank \ (x)}{N\ links \ (x)}}\right ) \ + \ \frac{d}{N \ total}$$</b>
<br>
Where:
<p>
<ul>
  <li><b>d:</b> is the Damping Factor</li>
  <li><b>PageRank':</b> is the new value of the PageRank</li>
  <li><b>PageRank:</b> is the actual value of the PageRank</li>
  <li><b>a:</b> is the given web page</li>
  <li><b>A:</b> is the set of all the web pages that point to a</li>
  <li><b>N links (x):</b> is the total number of forward links from x</li>
  <li><b>N total:</b> is the  total number of web pages</li>
</ul>
</p>

In [0]:
def new_pagerank(links, current_pr_pd):
  counts = DirectLinksCounter_BV.value
  aux=0
  for link in links:
    if link in current_pr_pd.link.to_list():
      index1=current_pr_pd[current_pr_pd['link']==link].index.values.astype(int)
      x1=float(current_pr_pd.iloc[index1]['pagerank'])
      index2=counts[counts['id']==link].index.values.astype(int)
      x2=int(counts.iloc[index2]['count'])
      aux+=x1/x2
  return 0.15*aux+0.85/N_dv.value

Now we have to run this function for a given <b>number of iterations (20)</b> or until the Max variation of the PageRank with respect to its previous value does not surpass <b>0.1%</b> :
<br>
<br>
To do so we use a <b>while loop</b> and a <b>counter</b>. When the variation is exceeded the while loop is braken. The value of the counter will correspond to the iteration in which the variation is exceeded.
<br>
<br>
For each iteration, to improve the visualization of what's happenning I decided to print: <ul>
  <li> If the variation limit has been exceeded.
  <li> The Max value of the PageRank <b>before</b> the iteration is completed.
  <li> Which iteration is being completed.
  <li> The Max value of the PageRank <b>after</b> the iteration is completed.
  <li> The Max value of the variation.

In [0]:
count=1
while count<=20:
  new_page_rank_UDF = udf(lambda l: new_pagerank(l,current_pr_pd),DoubleType())
  new_pagerank_DF=Partialreverse_linksDF.select(Partialreverse_linksDF["link"], new_page_rank_UDF(Partialreverse_linksDF["connections"]).alias("pagerank"))
  new_pagerank_PD=new_pagerank_DF.toPandas()
  

  
  #Comparison between past and new PageRank

  variation=abs((current_pr_pd.pagerank-new_pagerank_PD.pagerank)/current_pr_pd.pagerank)
  if max(variation) < 0.001:
      print("The variation limit is not surpassed")
      print("Max variation = ", max(variation)*100,"%  <  0.1%")
      print("Total iterations completed : ", count -1)
      break
      
  print("Max PageRank before iteration",count,": ", max(current_pr_pd.pagerank))
  
  #Update PageRank values
  current_pr_pd = new_pagerank_PD
  
  print("Iteration ", count, " completed")  
  print("Max PageRank after iteration",count,": ", max(current_pr_pd.pagerank))
  print("Max variation = ", max(variation)*100,"%")
  print("----------------------------------------------------------------")
  
  count+=1  

In [0]:
display(current_pr_pd)

Finally I have decided to order the PageRanks in descending order:

In [0]:
current_pr_pd_=current_pr_pd.rename(columns={'link': 'Web Page ID','pagerank': 'PageRank'})
FinalRanked_PR=current_pr_pd_.sort_values(by=['PageRank'], ascending=False)
display(FinalRanked_PR)

# Final Conclusions:
<br>
<br>
<h3>1.- Databricks and Pyspark (Advantages and disadvantages):</h3>
<br>
<p>The main problem that I had during this project is that it always took too long to re-launch the  cluster, create one new cluster and all the different waits or delays that I had to go over just to run the first line everytime I wanted to start the program. Also, at the beginning I couldn't understand why lines cointaning .count() or display() could take so long to run. Later on, I found out that it was due to the <b>lazy evaluation</b> in Spark. This means that the execution will not start until an action is triggered, so we can execute an operation any time by calling an action on data. Hence, in lazy evaluation data is not loaded until it is necessary.
  
<p>In the other hand, Spark can provide us <b>high data processing speed</b> and faster computations. Spark is a lightning fast cluster computing tool. This is achieved thanks to storing intermediate data in-memory Spark.
  
  <li><h5>Pyspark UDF:</h5></li>
  
<p>PySpark UDF's (a.k.a <b>User Defined Function</b>) are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s. This are the steps to follow in order to use them preperly:
  
<p>
<ol>
  <li>Create a function</li>
  <li>Convert this function to UDF with udf()</li>
  <li>Now we can use the udf function on a DataFrame column as a regular build-in function using .select()</li>
</ol>


Also, when we create the initial function, if we want to use a pandas dataframe inside the function without providing it as a parameter we can follow the next steps: 
  <p>
<ol>
  <li>We perform <b>sc.broadcast()</b> on the wanted dataframe and assign the new varaible a "name"</li>
  <li>Inside the function we can use name<b>.value</b> to access the information of the broadcasted pandas dataframe</li>
</ol>

  <li><h5>Pyspark RDD:</h5></li>
  
<p>RDD stands for <b>Resilient Distributed Dataset</b>. It is a fault-tolerant collection of elements that can be operated on in <b>parallel</b> and they are fundamental in Spark. In this project we have used RDD to perform a map transformation that passes each dataset element through a function and returns a new RDD representing the results. This type of transformation makes it really easy to perform operations like turning Partialdirect_linksDF into Partialreverse_linksDF.</p>

<br>
<br>
  <h3>2.- Implementing the PageRank algorithm:</h3>
<br>
<p>It was really hard to understand how we could design the implementation of the PageRank algorithm for a dataset when the program only allowed us to use and run functions for the 0.01% of the whole information. To define the PageRank of a web page we need to know the forwards and backwards links for each web page (Partialdirect_linksDF and Partialreverse_linksDF respectively). If we take a look at the formula of the PageRank that I have shown before:
  
  $$PageRank' \ (a) \ = \left ( 1-d \right ) \ · \sum_{x \ \in \ A}\left (  {\frac{PageRank \ (x)}{N\ links\ (x)}}\right ) \ + \ \frac{d}{N \ total}$$
  
<p>Since we don't have all the information, for each x in A we face problems like not having acces to its PageRank(x).

<p>We get <b>each "a"</b> from the <b>"link" column of the current_pr_pd</b>. For each web page in this column we have its <b>corresponding backwards links "A"</b>. For all the web pages in "A" we get the value of their <b>"N links(x)"</b> from the broadcasted <b>Partialdirect_links_counting_PDF</b>. And we get the PageRank from the current_pr_pd. We have used a lambda function so we can use current_pr_pd as a parameter of the new_pagerank function for every iteration. Obviously, it is also necessary to update the PageRank at each iteration.
  
<p>In order to check if our function is working properly we need at least one web page for which we can acces its <b>"N links(x)"</b> and its <b>PageRank</b>. In other words, there has to be at least one web page present in these 2 dataframes:

In [0]:
counts = DirectLinksCounter_BV.value
for link in counts.id.to_list():
  if link in current_pr_pd.link.to_list():
    print(link)

This means that only the PageRank of the web pages that 2270964 is pointing to is going to change when we run our code with the sampled data. Using this code we can see the id of the web pages I am talking about and their corresponding PageRank after running the whole algorithm.

In [0]:
Partialdirect_linksPDF=Partialdirect_linksDF.toPandas()
links= Partialdirect_linksPDF[Partialdirect_linksPDF['id']==2270964].links_id.to_list()
for link in list(set(links[0])):
  if link in current_pr_pd.link.to_list():
    index1=current_pr_pd[current_pr_pd['link']==link].index.values.astype(int)
    x1=float(current_pr_pd.iloc[index1]['pagerank'])
    print("ID :",link, end = " "*(10 - len([n for n in str(link)])))
    print("PageRank :" , x1)

As we can see all this web pages have the same value of the PageRank. When we display FinalRanked_PR, all of this web pages are at the top of the dataframe. They will also be the only web pages in our sampled dataset with different PageRank than:
$$\frac{d}{N \ total}$$
<br>
<br>
<br>
<h3>3.- Analysing our results</h3></li>
  <br>
<p> I have implemented several prints in the while loop that runs the PageRank algorithm to get a better idea of what is really going on in each iteration. Now I am going to briefly describe what is our code doing at each iteration for our sampled data and why this means that the code is working properly. </p>

<li><h4>Iteration 0:</h4></li>

The value of the PageRank at iteration 0 for every web page is:
$$\frac{d}{N \ total}$$
<br>
<li><h4>Iteration 1:</h4></li>

As I have said before, only the web pages to which 2270964 is pointing to are going to change the value of the PageRank, therefore when we run the first iteration this web pages will take the following value:
<br>
<br>

 $$PageRank' \ (a) \ = \left ( 1-d \right ) \ · \sum_{x \ \in \ A}\left (  {\frac{PageRank \ (x)}{N\ links \ (x)}}\right ) \ + \ \frac{d}{N \ total}$$
  $$PageRank' \ (a) \ = \left ( 0.15 \right ) \ · \left (  \frac{PageRank \ (2270964)}{N\ links\ (2270964)}\right ) \ + \ \frac{0.85}{N \ total}$$
<br> 

<li><h4>Iteration n:</h4></li> 

For the rest of the iterations the values of the PageRank don't change anymore and the max variation is just a flat 0%. This is due to the fact that the PageRank of 2270964 does not change during the whole process, therefore the value of the PageRank of the web pages it has forward links to will only change at the first iteration:
<br>
<br>
<table>
  <center>
  <thead>
    <tr>
      <th>Web Page</th>
      <th>Page Rank</th>
    </tr>
    <tr>
      <th></th>
      <th>Iteration 0</th>
      <th>Iteration 1</th>
      <th>Iteration n</th>
    </tr>
  </thead>
  </center>
  <tbody>
    <tr>
      <td>2270964</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
    </tr>
    <tr>
      <td>Web pages 2270964 points to</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
      <td>$$PageRank' \ (a) \ = \left ( 0.15 \right ) \ · \left (  \frac{PageRank_0 \ (2270964)}{N\ links\ (2270964)}\right ) \ + \ \frac{0.85}{N \ total}$$</td>
      <td>$$PageRank' \ (a) \ = \left ( 0.15 \right ) \ · \left (  \frac{PageRank_{n-1} \ (2270964)}{N\ links\ (2270964)}\right ) \ + \ \frac{0.85}{N \ total}$$</td>
    </tr>
    <tr>
      <td>Rest of the web pages</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
      <td>$$\frac{0.85}{N \ total}$$</td>
    </tr>
  </tbody>
</table>
<br>
<br>
<li><h4>Variation:</h4></li> 

When I run my final loop, after just the first iteration, the values of the PageRank don't change anymore. This means that after the first iteration the max value of the variation will be just a flat 0%, making the final loop stop at that point. At first I thougth this meant my code wasn't working right, but when I went deeper into the meaning of my results I realized that my code was working properly and it was doing what it is supposed to do. In the table above I show a visual demonstration of why the values of the PageRank don't vary after the first iteration.
<br>
<br>
<br>
<b>In conclusion</b>, I have ran into many problems during this project due mostly to the lack of familiarization with this new way to treat and handle data. Anyways, finally I was able to use the Spark implementations properly to come up with a useful and well-defined code that is able to rightly perform the PageRank algorithm.