# Wikipedia Analysis

# **Final Practical Work**

# Alejo González García (100454351)
# Andrés Navarro Pedregal (100451730)


This notebook IS NOT an skeleton. Is a sample of instruccions to analyse in classroom the Wikipedia Dataset provided by Databricks

During the class we will fill cells to implement the PageRank algorithm

In [None]:
import pandas as pd
import re



In [None]:
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StringType,LongType
from pyspark.sql.functions import lower, size, explode, collect_list


In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


In [None]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
# As we have seen in class and in the document, the database we are using is the Wikipedia datasets with all the available links that this program, Databricks has already collected in this link. 

In [None]:
# wikipediaDF.count() # We already know that there are 5823210 entries, so we just run this command once to check as it takes a lot of time. 

In [None]:
PartialWikipediaDF=wikipediaDF.sample(fraction=0.001,seed=0).cache()
# We have been suggested to use no more than the 0.01% of the data, but we will use the 0.1% (fraction=0.001) for this last experiment to have better results as it just takes about 5 minutes. 
# Notice that we have to store the data frame in the cache memory so that it can be accessed directly during the code execution without wasting resources multiple times. 

In [None]:
# PartialWikipediaDF.count() 
# We are skipping this command as it takes a lot of time. We have just run it once, and as requested it counts 576 samples, that is the 0.01% of the full data. 

In [None]:
# display(PartialWikipediaDF) 
# Here we can have an insight of the data with the expected variables (id, revisionId, username, id, text...)
# We are also skipping this command to save resources each type we execute the code!

# We have been given this function that takes a document and returns all the links inside that document: 

In [None]:
# We have decided to change it a bit the provided function and we have gained some improvements: 
def parse_links(document_body):       
    titles = re.findall(r'\[\[(.+?)\]\]',document_body.lower())
    if len(titles) == 0:
        return []
    return list(set([title.lower() for title in titles])) # to remove duplicates and make all lowercase
# We are retrieving a full list with all the links inside!


In [None]:
# Here we are applying the UDF (User Defined Function that was requested in the statement)
parse_links_udf = udf(parse_links,ArrayType(StringType()))

In [None]:
temp_forward_df = PartialWikipediaDF.select("title","id",parse_links_udf("text").alias("links"))
# This links column represents the list of titles that contains that specificic id


In [None]:
# display(temp_forward_df) # If we uncomment this line we can see the temporal forward dataframe!

# On class, we have defined this function, Titles2ID, that is going to map the titles with the IDs of the documents:

In [None]:
# We have made some adjustments:
# The links is a list of titles with the output documents
# The data titles is the DAta Frame that maps the titles and the ids. 
def Titles2ID (links, data_titles):
    if len(links) == 0:
        return []
    return list(set(data_titles[data_titles.title.isin(links)].id.to_list())) # to remove duplicates
# This function is returning for the provided titles, a list full of the respective IDs. 

If titles is empty it returns an empty list, otherwise, it returs the id.

In [None]:
Titles_ID_PDF = temp_forward_df.select(lower("title").alias("title"),"id").toPandas() # This data frame contains ONLY the id and title. 

In [None]:
Titles2ID_udf = udf(lambda x: Titles2ID(x, Titles_ID_PDF), ArrayType(LongType(),False)) # We create again a UDF (User Defined Function) so that we can get the spark versions

In [None]:
forward_df = temp_forward_df.select("id", "title", Titles2ID_udf("links").alias("links")).cache()
# Here we are placing on the links column the output titles of the provided page

In [None]:
outgoings_links_counters_pdf = forward_df.select("id", "title", "links", size("links").alias("count_output_links")).toPandas()
# Here we are defining the counter of outgoing links so that we obtain the IDs of the input pages

# This function, input_link is going to retrieve the links of a document with the use of the ouput of the provided document: 

In [None]:
# document id is defined by it´s name, and links are the tuple of output IDS
def input_link(document_id, links):
    if len(links) == 0:
        return []
    
    return [(link, document_id) for link in links]
# As we say in the title, this retrieves the links of a document in a reversed order.

# Now we are going to reverse the ouput and obtain the input links

In [None]:
reverse_rdd=(forward_df.rdd
            .flatMap(lambda r: input_link(r.id, r.links)) # We just need these two columns of the original DF
            .groupByKey()
            .map(lambda r: (r[0], list(r[1]), [int(outgoings_links_counters_pdf.loc[outgoings_links_counters_pdf['id'] == s, 'count_output_links'].values[0]) for s in list(r[1])] )) 
            )

reverse_df = spark.createDataFrame(reverse_rdd,["id", "links", "outgoings_links_counters_pdf"]) 
# reverse_df stores the in/out links of each of the documents that we have. And we can also see the amount of output links

In [None]:
# We have to handle the fact that users stop searching. In practice, the Page Rank algorithm adds a damping factor at each stage to model it, 
# here we take into account the case in which a page has NO input links. 
reverse_pdf = reverse_df.toPandas()

for i in forward_df.toPandas()['id']:
    if i not in reverse_pdf['id'].values:
        reverse_pdf.loc[reverse_pdf.shape[0]]= [i, [], []]

In [None]:
N = len(reverse_pdf)
reverse_pdf['PageRank'] = float(0.85 / N) # Notice that 0.85 is the damping factor!
d = sc.broadcast(0.85) 
broadcast_count_total = sc.broadcast(N)
broadcast_count_links_pdf = sc.broadcast(outgoings_links_counters_pdf) 
#The broadcast term allows that variable to be used by any entity in the code.

# This convergence function is a binary one, that returns 1 if the difference does not exceed the threshold and 0 otherwise. 

In [None]:
def convergence(document_id, prev_page_rank, new_page_rank, threshold):    
    previous_value = prev_page_rank.loc[prev_page_rank['id'] == document_id, 'PageRank'].values[0]
    new_value = new_page_rank.loc[new_page_rank['id'] == document_id, 'PageRank'].values[0]
    
    return 1 if (previous_value - new_value) < threshold else 0
#The function has as inputes the id of the document, the previous and new page rank of that page and the stablished threshold. With all of this, 
# it returns a 1 or a 0 if the difference between the previous value and the new one is lower than the threshold. 

In [None]:
FullReversed = sqlContext.createDataFrame(reverse_pdf)
FullReversedPDF = FullReversed.toPandas()

iterations = 20 # Maximum amount of times this loop can be run. 
threshold = 0.00001 # Convergence threshold
new_page_rank_df = FullReversed.select("id", 'links', 'outgoings_links_counters_pdf', "PageRank")
N = broadcast_count_total.value # This is the number of total pages. 
num_links = broadcast_count_links_pdf.value

# Page Rank Algorithm Implementation
In the following for loop, we are going to compute the final Page Rank. If we number of iterations, 20, is reached, the loop will end and we have finished. And if Page Rank has converged, we will also break. 

In [None]:
for i in range(iterations):
    share = 0
    
    for index in range(len(FullReversedPDF)):
        id_link = FullReversedPDF.loc[index, 'id']

        if (num_links.loc[num_links['id'] == id_link, 'count_output_links'].values[0]) == 0: 
            # This condition is reached when the node is floating, no links associated to it, and we have to redistribute the rank: 
            page_rank = FullReversedPDF.loc[index, 'PageRank']
            share = page_rank / N 
        # Following the algorithm, we must compute the share, that it´s as we have been explained, the page rank of that web over N, the number of total pages. 
    
    new_page_rank_pdf = new_page_rank_df.toPandas() # creating the pandas df
    
    for index in range(len(FullReversedPDF)): 
        # Again, following the algorithm steps, at each iteration the rank for ALL the documents must be updated
        temp_num_links = FullReversedPDF.loc[index, 'outgoings_links_counters_pdf']
        list_of_ids = FullReversedPDF.loc[index, 'links']
        
        # Notice that obviously we have to define a new rank 
        new_rank = share
        if len(list_of_ids) != 0:
            # When there is at least one id, we proceed: 
            for l in range(len(list_of_ids)):
                new_rank += ((new_page_rank_pdf.loc[new_page_rank_pdf['id'] == list_of_ids[l], 'PageRank'].values[0]) / temp_num_links[l])
                # Here we are just applying the general formulation to get the new rank of a page. 

        new_page_rank_pdf.loc[index, 'PageRank'] = float( ((1 - d.value) / N) + (d.value * new_rank) )
        # As explained in step 9 of the document, we must apply a damping factor so that at each stage we model the fact that users stop searching

    new_page_rank_df = sqlContext.createDataFrame(new_page_rank_pdf)
    
    #Now we are going to apply the requested UDF (User Defined Function), which parses the text field from each record, and extracts the outgoing links.
    # It will check for convergence
    UDF_checking =  udf(lambda l: convergence(l, FullReversedPDF, new_page_rank_pdf, threshold), FloatType())
    ConvergenceCheckDataFrame = FullReversed.select("id", UDF_checking("id").alias("Condition")).toPandas()
    # Here we have just computed the convergence and now we are going to check if it satisties the condition: 
    
    FullReversedPDF = new_page_rank_pdf
                                                                         
    if ConvergenceCheckDataFrame['Condition'].sum() == N: 
        break
    # If we dont break means that the Page Rank is not computed yet, that we have not reached the N maximum. 


In [None]:
new_page_rank_df = sqlContext.createDataFrame(new_page_rank_pdf) # Creating the Data Fram of the Newest Page Rank

# Now we are going to check for Correctness of our results: 


In [None]:
new_page_rank_df.select('PageRank').distinct().show(10, False) # showing first 10 results: 
len(new_page_rank_pdf['PageRank'].unique()) # Checking the length of the output


+---------------------+
|PageRank             |
+---------------------+
|4.742657426317904E-5 |
|3.1418983320806394E-4|
|3.6531280175691966E-5|
|5.492433516006009E-4 |
|1.3736072465105387E-4|
|4.0671567608422055E-4|
|1.7086809609378057E-4|
|1.7086130588891258E-4|
|1.6991290964813455E-4|
|1.058048828092645E-4 |
+---------------------+
only showing top 10 rows

Out[26]: 14

In [None]:
new_page_rank_df.orderBy("PageRank").show(15) # Showing 15 samples to check the correctness: 

+--------+-----+----------------------------+--------------------+
|      id|links|outgoings_links_counters_pdf|            PageRank|
+--------+-----+----------------------------+--------------------+
|42043097|   []|                          []|2.563598608820488...|
|   55578|   []|                          []|2.563598608820488...|
|39317197|   []|                          []|2.563598608820488...|
|  193013|   []|                          []|2.563598608820488...|
|10598848|   []|                          []|2.563598608820488...|
|   60303|   []|                          []|2.563598608820488...|
|19435596|   []|                          []|2.563598608820488...|
|   14800|   []|                          []|2.563598608820488...|
| 3356661|   []|                          []|2.563598608820488...|
|   68554|   []|                          []|2.563598608820488...|
|19437501|   []|                          []|2.563598608820488...|
|   23343|   []|                          []|2.563598608820488

# Alejo´s Individual Conclusions: 
