# Wikipedia Analysis

By Francisco Landa and Teresa Gener

We will start loading the necessary libraries for the task

In [0]:
import pandas as pd
import re
import numpy as np
pd.set_option('display.max_columns', None)



In [0]:
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StringType,LongType
from pyspark.sql.functions import split, explode, collect_list, count, length

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Data analysis and preparation
In this section we prepare the dataset in order to extract the links, quantify them and so on.
Firstly, we read the data.

In [0]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

We select a partial sample of full database, to be able to work in a short time.
To evaluate this work, please use the entire dataset. We will use a fraction of the data and comment on the results.

In [0]:
PartialWikipediaDF=wikipediaDF.sample(fraction=0.00001,seed=0).cache()

We will omit any display but the last one, in order to increase speed beacuse displaying and showing parts heads of the dataframe take a lot of time.

In [0]:
#PartialWikipediaDF.limit(20).toPandas().head()

Unnamed: 0,title,id,revisionId,revisionTimestamp,revisionUsername,revisionUsernameId,text
0,Shenzhou 5,340927,676959742,2015-08-20 06:12:27,Welsh,310131,{{Infobox spaceflight | name ...
1,Krylov subspace,2542615,672927206,2015-07-24 20:33:42,Cuzkatzimhut,2996924,"In [[linear algebra]], the order-''r'' '''Kryl..."
2,Randleman,4254040,535859287,2013-01-31 13:49:01,Xezbeth,86247,"'''Randleman''' may refer to: * [[Randleman, ..."
3,Loango National Park,3509649,659005208,2015-04-24 15:42:18,John of Reading,11308236,{{Infobox protected area | name = Loango Nati...
4,Jan Vang Sørensen,4060126,638876953,2014-12-20 06:36:40,Zyxw,473593,{{Infobox poker player | name = Jan Vang Søren...


Analysing the database, we see the outgoing links are enclosed in "[[]]". We parse the document and detect the links in each wikipedia page.

In [0]:
def parse_links(document_body):
  data=re.findall(r'\[\[(.+?)\]\]',document_body)
  if (len(data)>0):
    links=[s.lower() for s in data]
  else:
    links=[]
  return links

In [0]:
parse_links_udf = udf(parse_links,ArrayType(StringType()))

Then we convert to lowercase the strings to normalize the text.

In [0]:
tolower_udf= udf(lambda x: x.lower())

Create a Parsed Wikipedia Dataframe, with the principal columns, and the column text parsed

In [0]:
ParsedWikipediaDF = PartialWikipediaDF.select(tolower_udf("title").alias("title"),"id",parse_links_udf("text").alias("text_links"))

In [0]:
#ParsedWikipediaDF.limit(20).toPandas().head()       #to show the dataframe

Unnamed: 0,title,id,text_links
0,shenzhou 5,340927,"[shenzhou (spacecraft)|shenzhou, long march 2f..."
1,krylov subspace,2542615,"[linear algebra, linear subspace, linear span|..."
2,randleman,4254040,"[randleman, north carolina, randleman high sch..."
3,loango national park,3509649,"[gabon, national agency for national parks, na..."
4,jan vang sørensen,4060126,"[2005 world series of poker, seven-card stud, ..."


In [0]:
ParsedWikipediaDF.cache

Out[14]: <bound method DataFrame.cache of DataFrame[title: string, id: int, text_links: array<string>]>

We extract from the full Wikipedia Dataframe the table docuemnt id against title, to convert the outgoing text links to id list, this will be used at the end of the work to match the Page Ranks to ID's and Page Titles

In [0]:
Titles_ID_DF=wikipediaDF.select(tolower_udf("title").alias("title"),"id")

In [0]:
#Titles_ID_DF.limit(80).toPandas().head(40)      #uncoment this line to show the ids and title of the pages

Unnamed: 0,title,id
0,history of physics,13758
1,hydrofoil,13761
2,henri chopin,13763
3,hassium,13764
4,hydrus,13768
5,hercules,13770
6,history of poland,13772
7,hradčany,13773
8,houston,13774
9,hard disk drive,13777


We will need the titles as static and broadcasted variable, so, we download the Titles_ID SPARK dataframe to Pandas Dataframe (local variable)

In [0]:
Titles_ID_PDF = Titles_ID_DF.toPandas()

In [0]:
#display(Titles_ID_PDF)  #uncoment this line to show the titles and ids of the pages

title,id
history of physics,13758
hydrofoil,13761
henri chopin,13763
hassium,13764
hydrus,13768
hercules,13770
history of poland,13772
hradčany,13773
houston,13774
hard disk drive,13777


We broadcast the Titles_ID_PDF to all workers

In [0]:
Titles_ID_BPDF = sc.broadcast(Titles_ID_PDF)

With the Titles2ID Python function converts a list of titles to list of Documents IDs.

Then, we define the UDF function to be used in Spark

In [0]:
def Titles2ID (titles):
  links = []
  titles_id_pdf = Titles_ID_BPDF.value # This is a PandasDF
  
  for t in titles:
    row = titles_id_pdf[titles_id_pdf["title"]==t]
    if len(row)>0:
      id = int(row["id"].iloc[-1])
      links.append(id)
  return links

In [0]:
Titles2ID_udf= udf(Titles2ID,ArrayType(LongType()))

Now, we create a ForwardLinksTempDF, with the information 
* "id": id of the document 
* "links": list of target documents IDs

In [0]:
ForwardLinksTempDF=ParsedWikipediaDF.select("id",Titles2ID_udf("text_links").alias("links")).cache()

Now, we define a UDF function counting, to generate the number of outgoing links information.

In [0]:
def counting(links):
  return len(links)

In [0]:
counting_udf=udf(counting,LongType())

Now, e create the final ForwardLinksDF, with the follow information:
* "id": documents ids
* "links": list of outgoing documentID links
* "outgoing": number of outgoing links for this document

We will need this Dataframe several times, so the better is cache it

In [0]:
ForwardLinksDF=ForwardLinksTempDF.select("id","links",counting_udf("links").alias("outgoing")).cache()

In [0]:
ForwardLinksDF.toPandas()

Unnamed: 0,id,links,outgoing
0,340927,"[340917, 869878, 357847, 340934, 26779, 25391,...",23
1,2542615,"[56357, 173547, 15237, 5975550, 201611, 1134614]",6
2,4254040,"[128150, 10894321, 38383124]",3
3,3509649,"[12027, 41618052, 21818, 12027, 5334607, 81818...",13
4,4060126,"[2210466, 22512, 1439194, 23014, 22512, 33116,...",12
5,7009414,"[69080, 285314, 6143251, 63403, 325569, 116886...",9
6,7159975,"[192573, 167409, 7160158, 192573, 192578]",5
7,6667872,[6667940],1
8,10919930,"[528282, 3021828, 414082, 3021828]",4
9,6572839,"[2339717, 3097027, 30118, 27283349]",4


# Reverse Links Dataframe 
Using the ForwardLinks Dataframe, we will calculate the ReverseLinksDataframe.
* First, we explode the ForwardLinks Dataframe, to get a big dataframe with:
  * id: Id of the source document
  * target: Id of the target document
  * outgoing: Number of outgoing links for the id document
* The group by target links, and create a new ReverseLinks Dataframe with the information:
  * id: Id of the target document
  * links: list of Ids of all documents who point to this document
  * outgoing: list of counters of outgoing links for each one of documents Ids inthe links list


The ReverseLinksDF is cached and this is the dataframe we will use to calculate the PageRank. 

We must also save the terminal nodes for the final output as the future created Reverse Links will not contain them

In [0]:
TerminalNodes = ForwardLinksDF.filter(ForwardLinksDF["outgoing"] == 0).select("id").toPandas()
TerminalNodes["PR"]=0.85/ForwardLinksDF.count()
#display(TerminalNodes)

id,PR
32367472,0.0188888888888888
45404199,0.0188888888888888


In [0]:
ReverseTempDF=ForwardLinksDF.select("id",explode("links").alias("target"),"outgoing").distinct()

In [0]:
#display(ReverseTempDF)

id,target,outgoing
340927,25391,23
340927,869878,23
340927,357847,23
340927,553623,23
340927,340934,23
340927,3434750,23
340927,2481352,23
340927,263163,23
340927,151210,23
340927,22309268,23


In [0]:
ReverseLinksDF = ReverseTempDF.groupBy("target").agg(collect_list("id").alias("links"),collect_list("outgoing").alias("outgoing")).withColumnRenamed("target","id").cache()

In [0]:
#display(ReverseLinksDF)

id,links,outgoing
15237,List(2542615),List(6)
63125,List(23255883),List(5)
34063244,List(46508985),List(11)
390975,List(22410581),List(16)
95840,List(36893675),List(20)
3414021,List(340927),List(23)
17013058,List(39014312),List(12)
47836950,List(29406590),List(48)
313645,List(22410581),List(16)
1982004,List(32281941),List(19)


#Initialize the first PageRank Pandas Dataframe
Using the ReverseLinksDF, we will initialize the PageRank Pandas Dataframe.

In [0]:
TotalDocs=ReverseLinksDF.count()

First, we select the documents IDs

In [0]:
PageRankPDF=ReverseLinksDF.select("id").toPandas()

Now, we create a new column called PR with the initial PageRank for all documents. We select 0.85/Number of documents.

We also duplicate this column for saving the new calculated PR and the previous one, in order to check faster the variance

In [0]:
PageRankPDF["PR"] = (0.85/TotalDocs)
PageRankPDF["NewPR"] = PageRankPDF["PR"]

In [0]:
PageRankPDF

Unnamed: 0,id,PR
0,15237,0.0
1,63125,0.0
2,34063244,0.0
3,390975,0.0
4,95840,0.0
...,...,...
441,484254,0.0
442,1503080,0.0
443,8943760,0.0
444,1827332,0.0


We define a Python function which will calculate the next page rank based on the previous PageRank, and the links and counters information in the ReverseLinksDF
The follow function MUST BE rewrited 
Input parameters:
* links: list of incoming links (column links in the ReverseDataDF)
* counters: lis of outgoing links for each document ID in the list links
* currentPR: Current PageRank Pandas Dataframe, you need this information to calculate the next PR

The exit condition is: more than 20 iterations or the PageRank variation is less than 1e-4:
$$Vary=\frac{Pr_{n+1}-Pr_{n}}{Pr_n}$$

Vary should be less than 1e-4 for all PageRanks

In [0]:
# We will receive two pandas columns, PR and NewPR and a t variable to modify the variance if wished
def check_variance(new, old, t):
    return (abs((new-old)/old)<t).head()[0]         # we compute and return a True/False if the entire row of the dataset fullfils the variance condition. If true, the main loop ends


#Function to compute the PR, we will receive a row of the dataframe.
def NewPagerank(links, counters, currentPR)->float:
    d = 0.85                                                            # damping factor
    newPR = 0                                                           # set to 0 the newPR value
    for i in range(len(links)):                                         
        filtered_rows = PageRankPDF[PageRankPDF['id'] == links[i]]      # look or a certain ID's PR 
        if not filtered_rows.empty:                                     # if we have a match
            current_PR = filtered_rows.iloc[0]["PR"]                    # we get the PR
            newPR += (1 - d) + d * (current_PR/ counters[i])            # compute the PR with the given formula and return
        else:
            print(f"No matching row for id {links[i]} found.")          #We inform the user if there is a mismatch
    return float(newPR)

Out[38]: '\ndef NewPagerank2(links, counters, currentPR)->float:\n    d = 0.85  # Damping factor\n    newPR = 0\n    for i in range(len(links)):\n        if counters[i] != 0:\n            newPR += (1 - d) + d * (PageRankPDF[PageRankPDF[\'id\'] == links[i]].iloc[0]["PR"]/ counters[i]) \n        else:\n            #newPR += (1 - d) + d * (PageRankPDF[PageRankPDF[\'id\'] == links[i]].iloc[0]["PR"]/ counters[i])\n    return float(newPR)\n'

# Main Loop
We now enter the main loop. 20 iterations or less than a threshold in the variance computation in order to end the loop.

The procedure is simple, iterate the dataframe by rows, call our NewPageRank function with the links, counters and the previous PR of that row, append it to a list we will later add as a column to the dataset.
Once done, we check the variance condition and end or continue with the algorithm.


In [0]:
for i in range(20):
  pr = []                                   #create empy list 
  for r in ReverseLinksDF.collect():        #iterate by rows
    pr.append(NewPagerank(r[1],r[2],PageRankPDF[PageRankPDF['id'] == r[0]].iloc[0]["PR"]))    #compute the PR for the row r in the dataframe
  PageRankPDF['NewPR'] = pr                 #add it to the newPR column
  if check_variance(PageRankPDF['NewPR'],PageRankPDF['PR'],0.0001):         #check variance condition if True we finish else we continue
    PageRankPDF['PR'] = PageRankPDF['NewPR']
    print(f"The Page Rank calculation has converged in {i} iterations.")
    break
  else:
    PageRankPDF['PR'] = PageRankPDF['NewPR']                                # we update the columns

No matching row for id 2542615 found.
No matching row for id 23255883 found.
No matching row for id 46508985 found.
No matching row for id 22410581 found.
No matching row for id 36893675 found.
No matching row for id 340927 found.
No matching row for id 39014312 found.
No matching row for id 29406590 found.
No matching row for id 22410581 found.
No matching row for id 32281941 found.
No matching row for id 9597841 found.
No matching row for id 2542615 found.
No matching row for id 340927 found.
No matching row for id 29406590 found.
No matching row for id 4060126 found.
No matching row for id 20895542 found.
No matching row for id 9597841 found.
No matching row for id 23736968 found.
No matching row for id 36893675 found.
No matching row for id 23736968 found.
No matching row for id 20482878 found.
No matching row for id 26036541 found.
No matching row for id 23736968 found.
No matching row for id 23736968 found.
No matching row for id 45446916 found.
No matching row for id 32281941 fo

We drop the NewPR column as we will no longer need it and can slow down the concatenation of the dataset with the terminal nodes' PR

In [0]:
PageRankPDF = PageRankPDF[['id','PR']]
PageRankPDF = pd.concat([PageRankPDF, TerminalNodes], ignore_index=True)
#display(PageRankPDF)

id,PR
15237,0.0
63125,0.0
34063244,0.0
390975,0.0
95840,0.0
3414021,0.0
17013058,0.0
47836950,0.0
313645,0.0
1982004,0.0


#Final Result
We display the final results by joining the Titles and ID dataframe with the PR and ID dataframe. Last but not least we drop duplicates and show the columns.

In [0]:
RankPDF = pd.merge(PageRankPDF, Titles_ID_PDF[["id","title"]], on='id').drop_duplicates()

display(RankPDF[["id","title","PR"]])

id,title,PR
15237,iterative method,0.0
63125,veracruz,0.0
34063244,life ok,0.0
390975,national party (south africa),0.0
95840,greenwich,0.0
3414021,george w. bush,0.0
17013058,list of new testament lectionaries,0.0
47836950,guangxi,0.0
313645,eastern cape,0.0
1982004,saucony,0.0


# Conclusions
In order for the program to run as espected, the entire dataset of the wikipedia should be tested or an important number of ID's may find a mismatch
### Analysis of the dataset
By making some displays we were able to get some first feelings on how the data was given. Noting that in the "text" column will be very relevant and that many other info was given.

### Preparing and parsing the text in each page
With the Re module and a quite simple regular expresion we could parse the entire dataset and extract the info we needed, links, titles and id's.

### Preparing the dataframes
Having applied a similar structure as in the Counting Words lab, we managed to count all the links in a specific id. Later on we created the Forward dataframe where we could find the id of a page, the outgoing links from the same page and the outgoing links from the links it had.

An issue appeared when we got to terminal nodes, those with no links, we kept them appart for a while because their PR was going to be fixed.
We then created the reverse links dataframe, which was the one to use in the calculation of the PageRank, with a similar structure as the previous one but now we had an id the links that pointed to that id and the links' outgoing counter. More than enough to calculate the PR for terminal and non-terminal nodes.

### Main loop & functions
We tried hard trying to create the User Defined Functions but when trying to convert them to pandas dataframes was a complete mess so, we decided to do taylor it in our own way.
We iterated by rows in the dataframe and because we had lists inside the links and outgoing columns with the same lenght we just called our function NewPageRank with those columns as parameters and the PR for the id of that row. Once in the function, it checked if there was a match between ids in the PageRank dataframe and if it was successful it took the PR and made the needed calculations, else it returned a warning error message and skiped to the next one. Once all the rows' PR was computed, we checked the variance condition with a very simple function, because we worked with pandas dataframes, we could make an operation on an entire column and so we did. The functin returned True or False if the variance condition was or not satisfied.
On each iteration and before exiting the loop NewPR and PR are updated in order to have the latest computed value in PR in order to make the join with the Titles data.

Before getting the final result, we appended the terminal nodes' PR computed long ago to the dataframe for it to contain all the possible information.

Finally we made a join by the id column with the Title dataframe and reorder the columns to display it as suggested.

### Final statements
It has been quite difficult to deal with the user defined function and with such a "large" dataset as we needed lots of instances of id, links and outgoing to propperly test our algorithms and performances. In fact, we could not test the performance as we would've liked to because we had some id's PR but their ingoing links weren't in the subset we could get. But at least we could test the mismatching-case in our algorithm.

Having done and understood the previous homeworks and projects has indeed helped us figuring out a good algorithm and also to understand the errors and why certain things happen. Also we had to deal with the fact that the terminal nodes were not included in the ReverseLinks DF and so.