# Wikipedia Analysis

#### Daniel Toribio (100454242) 

This last practical work is focused on putting all our skills together, in order to develop a page rank algorithm on the wikipedia dataset. For this purpose, we will follow a number of steps:

* Import the data 
* Filter the data to keep only essential variables (title, id, text).
* Create a pipeline to extract the forward links of each document (in the form of its ID). Used to create *Forward Table* 
* Use *Forward Table* to calculate number of outgoing links per document
* Use *Forward Table* to create a *Reverse Table*
* Initialize *Page Rank Table*
* Loop through the page rank update algorithm until convergence. This will be when:
  * Max number of iterations is reached (20)
  * Page rank update magnitude is lower than the threshold we have defined

# 1. Reading the data

The database comprises 5.823.210 entries, organized in a set of parquet files. For the initial functional version, we'll assess our algorithm on a subset of 0.01% of records, roughly 580 entries.

In [None]:
import pandas as pd
import re
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StringType,LongType

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
WikipediaDF = spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet") #Here we create a Spark DataFrame with all the Wikipedia entries

PartialWikipediaDF = WikipediaDF.sample(fraction=0.01,seed=0).cache() #Here we take a fraction of the Wikipedia Spark Dataframe to avoid using a big number of entries and have lower execution time


In [None]:
PartialWikipediaDF.display()

As can be seen above, the original dataset has many variables that will not be used in our analysis. We are only interested in 3 of them, which are the **title** of the document, its **id**, and its **text**. It is from the latter that we will extract the forward links.

# 2. Parsing the Data

Up until this step, we have all the links information stored in the variable **text**, where the outgoing links are identified if they are enclosed within "[[]]". Inside this, we find the title of the document it is referring to. Therefore, in order to create a dataframe with the desired structure, there are 2 main steps to execute:
* Parse the text to find all linked document titles
* Map the document titles to their id

### 2.1 Extracting linked document titles

This will simply be done by creating a new user defined function, that given the text, uses regular expressions to find all text enclosed in double '[[]]' marks. We add some extra conditions to filter out undesired results, such as image or file attachments.

In [None]:
def parse_links(document_body):
    '''
    Recieves text field and parses it to only keep references to other articles (enclosed between [[]])
    Some bugs with attched files and references, as also use [[]]. 
    Ej:  [[File:Carl XCH-4.jpg|thumb|The [[United States Navy|U.S. Navy's]] ''XCH-4'', with hydrofoils clearly lifting the hull out of the water]] 
    For these cases, we search for reference inside these substrings ([[United States Navy|U.S. Navy's]] in this case)
    If none are found, we delete this entry to our list of references
    
    INPUT: 
        - text: string to be parsed
        
    OUTPUT:
        - references: a list of titles of references entries
        
    '''
    
    #Finds initial [[]]
    ref = re.findall("\[\[(.+?)\]\]", document_body)

    # Sometimes for file and image attachments some [[]] are used, but no citation. If the citation has no internal references, remove too.
    del_ = []
    for i in range(len(ref)):
        ref[i] = ref[i].lower()
        if '[[' in ref[i]:
            #Search the internal references in file attachments
            ref[i] = re.search("\[\[(.*)", ref[i]).group()[2:]
        if ref[i].startswith(('file:', 'image:', 'category:', ':')):
            #If plain file/image entry, delete
            del_.append(i)

    #If we do not sort in descending order, indexing will give errors as dimensions change
    del_.sort(reverse=True)
    for i in del_:   
        del ref[i]

    return ref

parse_links_udf = udf(parse_links,ArrayType(StringType())) #Here we create the UDF which parses the text field from each record, and extracts the outgoing links

In [None]:
## This is a test to see the correct functionality of the code

test="{{Use Indian English|date=April 2015}} {{Infobox person | name = Shavez Khan | image = | caption = | birth_date = | birth_place = India | nationality = India | residence = [[Mumbai]], India | occupation = [[Actor]] | years_active = present | height = }} '''Shavez Khan''' is an [[India]]n television [[actor]]. He has done his roles in various Indian television shows like Shaitaan,<ref>{{cite web|url=http://www.tellychakkar.com/tv/tv-news/shavez-khan-feature-episodic-of-colors-shaitaan|title=Shavez Khan to feature in an episodic of Colors' Shaitaan|work=Tellychakkar|date=11 April 2013|accessdate=24 April 2015}}</ref> [[Encounter (Indian TV series)|Encounter]], [[Ek Hasina Thi (TV series)|Ek Hasina Thi]], [[Savdhaan India]],<ref>{{cite web|url=http://www.tellychakkar.com/tv/tv-news/shavez-khan-anshul-singh-and-damini-joshi-episodic-of-savdhan-india-140915|title=Shavez Khan, Anshul Singh and Damini Joshi in an episodic of Savdhan India|work=Tellychakkar|date=15 September 2014|accessdate=24 April 2015}}</ref> [[SuperCops vs Supervillains]],<ref>{{cite web|url=http://www.tellychakkar.com/tv/tv-news/rituraj-singh-and-shavez-khan-life-oks-shapath-141009|title=Rituraj Singh and Shavez Khan in Life OK's Shapath|work=Tellychakkar|date=9 October 2014|accessdate=24 April 2015}}</ref> Pyaar Ka The End,<ref>{{cite web|url=http://www.tellychakkar.com/tv/tv-news/shavez-khan-bindass-pyaar-ka-the-end-141029|title=Shavez Khan in Bindass' Pyaar Ka The End|work=Tellychakkar|date=29 October 2014|accessdate=24 April 2015}}</ref> [[Pyaar Kii Ye Ek Kahaani]], [[MTV Fanaah]], [[Crime Patrol (TV series)|Crime Patrol]]. He has played his recent role in [[Sony Entertainment Television (India)|Sony TV]]'s [[C.I.D. (Indian TV series)|CID]].<ref>{{cite web|url=http://www.tellychakkar.com/tv/tv-news/shavez-khan-sony-tvs-cid-150417|title=Shavez Khan in Sony TV's CID|work=Tellychakkar|date=17 April 2015|accessdate=24 April 2015}}</ref> ==Television== *[[Colors (TV channel)|Colors]]'s Shaitaan *[[Sony Entertainment Television (India)|Sony TV]]'s [[Encounter (Indian TV series)|Encounter]], [[Crime Patrol (TV series)|Crime Patrol]] & [[C.I.D. (Indian TV series)|CID]] *[[Star Plus]]'s [[Ek Hasina Thi (TV series)|Ek Hasina Thi]] *[[Life OK]]'s [[Savdhaan India]] & [[SuperCops vs Supervillains]] *[[Bindass]]' Pyaar Ka The End *[[Star One]]'s [[Pyaar Kii Ye Ek Kahaani]] *[[MTV]]'s [[MTV Fanaah]] ==References== {{Reflist}} ==External links== {{Persondata | NAME = Khan, Shavez | ALTERNATIVE NAMES = | SHORT DESCRIPTION = Indian model and television actor | DATE OF BIRTH = <!--Birth date has been contested. Do not add without providing a reliably published source with a reputation for editorial oversight--> | PLACE OF BIRTH = India | DATE OF DEATH = | PLACE OF DEATH = }} {{DEFAULTSORT:Khan, Shavez}} [[Category:Living people]] [[Category:Indian male television actors]] [[Category:Actors in Hindi television]] [[Category:Indian television personalities]]"

parse_links(test)

In [None]:
#Now we use the above function to create a first forward DF and change the text column with only the links
TempForwardDF=PartialWikipediaDF.select("title","id",parse_links_udf("text").alias("links"))
display(TempForwardDF)


### 2.2 Mapping titles to IDs

The last step is to be able to map the document titles to their ids to deal with numbers instead of dealing with strings1. In order to do so, we will create a pandas DF containing 2 values; document title and document ID. With this, we can create a new function that given the document title, retrieves its ID. Putting it all together, we will be able to create a new *Forward table* containing document title, document id, and **links**, a list of IDs of linked documents.

As title_id DF will be used in many distributed machines, having it as a read-only variable in all of these will help speed up the processing. For this reason, we will broadcast this variable.

In [None]:
# Create the title-id dataframe
title_idDF=PartialWikipediaDF.select(lower("title").alias("title"),"id")
title_idPDF=title_idDF.toPandas()

print(len(title_idPDF))

In [None]:
broadcast_title_idPDF = sc.broadcast(title_idPDF)

In [None]:
# Create the mapping
def titles2id(links,titleidPDF):
    data_titles=broadcast_title_idPDF.value
    if (len(links)>0):
        ids=data_titles[data_titles.title.isin(links)].id.to_list()
    else:
        ids=[]
    return list(set(ids))

titles2id_UDF=udf(lambda x: titles2id(x,title_idPDF),ArrayType(LongType(),False))

In [None]:
#Modify the forwatd DF to include linked IDs rather than titles
ForwardDF=TempForwardDF.select("id",titles2id_UDF("links").alias("links")).cache()
display(ForwardDF)

We can see that as we are using a portion of the Wikipedia dataset some of the links entries are an empty list. This happens because in the portion of the dataset we are using those outgoing links that do not have a matching with an id. This problem will be solved latter.

# 3. Creating Other Tables

In order to be able to execute the page rank algorithm, we need 2 other tables. Namely, an *outgoing links table*, in which for each document ID we count how many external references we have, and a *reverse table*, in which for each document ID we take all the document ids that are referencing back to it.

### 3.1 Outgoing Links

We simply use the *forward table*, and compute the size of the variable links which is a list.

In [None]:
OutgoingsLinksCountersDF=ForwardDF.select("id",size("links").alias("counter"))
OutgoingsLinksCountersPDF=OutgoingsLinksCountersDF.toPandas()

OutgoingsLinksCountersPDF.display()

### 3.2 Reverse Links

We make use of the Forward table. We explode the links column to obtain a single relation between each pair of documents (link, t_link). We then group by **t_link**, creating a list with all instances associated to it. We rename the first as **id**, and the latter as **links**.

In [None]:
TemporalReverseLinks=ForwardDF.select("id",explode("links").alias("t_link"))
ReverseDF=TemporalReverseLinks.groupBy("t_link").agg(collect_list ("id").alias("counters")).cache()
ReverseDF = ReverseDF.withColumnRenamed("t_link","id").withColumnRenamed("counters","links")

ReverseDF.display()

Lastly, we want to unify these tables for convenience. That is, we would like to add a new column to *ReverseDF*, in which we had a list with the number of **outgoing** links in each of the documents referred to in the column **links**. Those with no outgoing links will be skipped.

In [None]:
def find_outgoing(links):
    if (len(links)>0):
        outgoing=OutgoingsLinksCountersPDF[OutgoingsLinksCountersPDF.id.isin(links)].counter.to_list()
    else:
        outgoing = []      
    
    return list(set(outgoing)) 

find_outgoing_UDF=udf(lambda x: find_outgoing(x),ArrayType(LongType(),False))

In [None]:
ReverseDF=ReverseDF.select("id","links", find_outgoing_UDF("links").alias("outgoing")).cache()
ReverseDF.display()

We have to remove the outgoings which contains a value of zero to avoid obtaining infinites in the final result

In [None]:
def remove_zeros(arr):
    return [x for x in arr if x != 0]

remove_zeros_udf = udf(remove_zeros, ArrayType(LongType(),False))

ReverseDF = ReverseDF.withColumn("outgoing", remove_zeros_udf("outgoing"))


We have noticed that some of the values in the **ForwardDF** do not have any documents referencing them in our data scope, and therefore they do not appear in the **ReverseDF** table. This can be a possible source of problems moving on, so we decide to fix this by making sure all ids in the forward table are included in the reverse one. For the values of links and outgoing, we will use '[]' (empty list). In this way we are able to solve the problem.

In [None]:
#Lastly, we create a pandas DF for both forward and reverse tables. Add needed entries to reverse table too
ReversePDF = ReverseDF.toPandas()
ForwardPDF = ForwardDF.toPandas()

for i in ForwardPDF['id']:
    if i not in list(ReversePDF['id']):
        ReversePDF.loc[len(ReversePDF)]= [i, [], []]

In [None]:
ReversePDF.display()

# 4. PageRank 

The first step is to create the structure for our dataframe. For this, we assign to each link in **ReverseDF** a weight of 0.85/N, where N is now the number of documents considered (length of reverse table). We use 0.85 and not 1 in order to consider the damping factor, which it represents the probability that a user will continue clicking on links rather than jumping to a new page.

In [None]:
pageRankPDF=ReversePDF.copy()

N  =  len(ReversePDF)
pageRankPDF["pagerank"]= 0.85/N 

Now that we have the basic structure for our results, we have to create an algorithm that iteratively updates values until convergence or max iterations are reached. In order to make the process straightfoward, we create 2 auxiliary functions:

* The first searches for all nodes with no incoming links (dangling nodes). In these nodes, we distribute their PR to all other documents in the network. The function sums all the values to distribute (total re-distribution added to each new PR)

* The second computes the new PR of all nodes in each iteration. It takes into account the function above, and applies the basic formula to all documents that do have incoming links. Also, it applies a damping factor of 0.85.

Once we have this, the loop is straightforward. For max_iterations, it:
* Creates a Spark DF with the updated information (same structure as ReverseDF)
* Calculates the total weight to re-distribute
* Uses the above to calculate new PR
* Computes difference in PR for all nodes. Takes the maximum. If above threshold, we continue. Else, we break the loop.
* We update the initial Spark DF (**resultDF**) with the new PRs and repeat

In [None]:
def aux_find_dangling(links, pr, N):
    if len(links) == 0:
        return (pr/N)
    else:
        return 0

udf_aux_find_dangling = udf(lambda l,  pr: aux_find_dangling(l,pr,N), FloatType())

In [None]:
def aux_PR(links, outgoing, pageRankPDF, N, redistribute):
    #We start of each new PR with the value we need to re-distribute because of the solitary nodes in the network.
    new_pr = redistribute
    
    if len(links) != 0:
        for l in range(len(outgoing)):
            l_pr = pageRankPDF.loc[pageRankPDF["id"] == links[l], 'pagerank'].values[0]
            l_outgoing = outgoing[l]
            new_pr += l_pr/l_outgoing       
        
    new_pr = (0.85/N) + (0.15*new_pr)
    return float(new_pr)
    

In [None]:
def new_pagerank(pageRankPDF, tsh, max_iterations = 20):
        
    stop = False
    iter_ = 0

    while stop == False:
        resultDF = sqlContext.createDataFrame(pageRankPDF)
        
        # Calculate weight to redistribute across network
        r = resultDF.select(udf_aux_find_dangling("links", "pagerank").alias('dangling')).toPandas()['dangling'].sum()
        udf_aux_PR = udf(lambda l, c: aux_PR(l,c, pageRankPDF, N, r), FloatType())
        
        #Computing new rank
        NewPageRankDF = resultDF.withColumn("NewPR", udf_aux_PR("links", "outgoing")) 

        #New column with difference between PR and NewPR
        NewPageRankDF = NewPageRankDF.withColumn('Tsh', abs(col("pagerank") - col('NewPR')))

        max_diff = NewPageRankDF.agg(max("Tsh")).collect()[0][0]        
        iter_ +=1
        
        if iter_ == max_iterations or max_diff < tsh:
            stop = True
        
        pageRankPDF = NewPageRankDF.select("id", "links", "outgoing", "NewPR").withColumnRenamed("NewPR","pagerank").toPandas()

    return (pageRankPDF, iter_)
    

In [None]:
res, iters = new_pagerank(pageRankPDF, 0.0000000001) # The second argument is the tsh we are going to use
display(res)
print('We have converged in ', iters, 'iterations')

# 5. Final Results

The last step is to return the results in the desired format. That is, a pandas Df containing document title, ID, and its pagerank. We will use some of the previously defined variables.

In [None]:
# Create the mapping
def id2titles(id):
    data_titles=broadcast_title_idPDF.value
    
    title = data_titles[data_titles['id'] == id].title.values[0]
    return title

In [None]:
res['title'] = res['id'].map(lambda x: id2titles(x))
res_final = res[['title', 'id', 'pagerank']]
res_final.display()

In [None]:
suma_total = res['pagerank'].sum()
print(suma_total)

We can see that the sum of the Page rank values is almost 1, so we have checked that the algorithm is correct

## CONCLUSIONS - Daniel Toribio (100454242)

The PageRank algorithm played a crucial role in the initial deployment of the Google Search Engine, establishing itself as the cornerstone of this practical endeavor. The fundamental concept involved assigning a significance to each webpage, where pages garnered more significance if they were frequently referenced by other noteworthy pages. Page rank gauges the likelihood of a random internet user arriving at a specific webpage. The probability of a user landing on a particular page rises with the number of pages linked to it. However, we encountered challenges that required resolution, including the presence of dangling nodes (pages lacking links) and the necessity to depict the likelihood of a web user's search (the damping factor indicating the probability of the user continuing to browse). To address the issue of dangling nodes and acknowledge that a user can persist in searching even when encountering a dead end, the page rank of these nodes is distributed among the remaining documents. This is achieved, for instance, through initiating a new search rather than clicking on a link.

Concerning the implementation process, we initiated by employing regular expressions to extract the titles of the linked documents. Subsequently, we transformed these titles into IDs through a supplementary function, optimizing the process with the utilization of sc.broadcast for enhanced speed. This led to the creation of ForwardDF. The next step involved determining the count of outgoing links for each document, consolidating all the data into a unified dataframe named ReverseDF.

Following this consolidation, we sought all the outgoing links from each ID, excluding the documents pointing to them. The page rank was then initialized and iterated until convergence. To update the pagerank, we devised two auxiliary functions. Initially, we computed the total weight that necessitated redistribution due to dangling nodes. Subsequently, the second function utilized the fundamental formula outlined in the original paper, commencing with the initialization of the new page rank to the previously calculated value. The damping factor was set at 0.85.

Once the largest page rank change across all nodes was computed using the new page rank, we assessed whether it fell below our predetermined threshold. In the event that this criterion was met or the maximum number of iterations had been reached, the loop concluded. This iterative process allowed for the efficient computation of the page rank until the desired convergence was achieved.

In the context of Spark's capabilities, two notable examples include the effective use of user-defined functions, particularly in the generation of **ReverseDF**, where they significantly contributed to expediting the process. Despite these advantages, there were instances where the computational time was extensive, necessitating the use of pandas' dataframes.

In summary, this practical work has served as a valuable opportunity to enhance our understanding of Spark's capabilities, especially when dealing with substantial volumes of data. The utilization of user-defined functions within Spark has proven instrumental in optimizing performance. While Spark excelled in many aspects, the integration of pandas' dataframes in specific scenarios underscored the need for flexibility in handling diverse computational demands.

This experience has reinforced our confidence in Spark as a potent tool for managing extensive datasets, and we anticipate leveraging its capabilities in future projects with similar requirements. Furthermore, delving deeper into the PageRank algorithm has been an enriching experience, providing insights into its significance and practical applications. The knowledge gained from this endeavor positions us well for addressing similar challenges in the future.