#  WikiPedia's Page Rank Algorithm (Coded in PySpark)         
#  Author: Claudio Sotillos Peceroso         


In [None]:
# Import Libraries
from pyspark.sql.types import ArrayType, StringType,LongType
from pyspark.sql.functions import collect_list, monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql.types import *
import pandas as pd
import re
import numpy as np
from operator import truediv

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
%fs ls dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet

# SOLUTION APPROACH: 

** Objective of the Work **

The idea is to select a fraction of the wikipedia database and compute the page rank of every page contained in the wikipedia subset.<br>
Since we will work with a large data set, we will use mainly Pyspark DFs, so that we can parallelize computations (making use of UDFs) and obtain the results in a shorter period of time. <br>
We will only work with pages of Wikipedia. If, for instance, a wikipedia web page is pointing to a YouTube page, this won´t be taken into account for the page rank computation. 

### IMPORTANT REMARK <br>
A given page can be in any of these situations: 
 1. It is a page which is only pointed by other pages/s.    
 2. It is a page which point/s other page/s and is pointed by other page/s.
 3. It is a page which only point/s to other page/s.
 4. It is a page which doesn´t point to other page/s and which isn´t pointed by others. 

The Page Rank (I will use 'PR' many times to abbreviate) in the first two situations is computed in the same way, since for the PR computation what is needed is information with respect the pages which are pointing towards the Page which we want to compute its PR. Imagine that page A is pointed by pages B and C, then it´s PR computation would be:

\\( PR_A= \\frac{PR_B}{L(B)}+ \\frac{PR_C}{L(C)}\\)   being L(B) the number of pages B is pointing (outgoing links) and \\(PR_B \\) the PR of B in the previous iteration (same for C). <br>

**General Formula: **  \\(PR_i = \sum \\frac{PR_j}{L(j)}\\)  being j the pages that point to i.

Initially the PR of all the pages is initiallized to \\(\\frac{1}{N}\\), being N the total number of pages in the Wikipedia fraction which we are working with.

***What happens with the pages which aren´t pointed by a single page? (cases 3 and 4 of the situations above) *** <br>

Since these pages aren´t pointed by others, we can´t compute their PR with the above formula. That is why we must apply a slightly modified version of the above formula, including what is known as the **dumping factor**. Conceptually, the dumping factor is the probability that a user clicks in a link to another page. It is a constant value, typically 0.85. The new formula for the PR computation is (imagine the same scenario as before): 

\\( PR_A= \\frac{d}{N} + (1-d)(\\frac{PR_B}{L(B)}+ \\frac{PR_C}{L(C)}\\)) being d=0.85  and N= total number of pages in the Wikipedia fraction<br>

**General Formula (This Formula will be used for the PR computation): **  \\(PR_i = \\frac{d}{N} + (1-d)(\sum \\frac{PR_j}{L(j)}\\))

Thus if a certain page isn´t pointed by others, the term "\\((1-d)(\sum \\frac{PR_j}{L(j)}\\))" would be 0. <br>
This means that these pages will always end having the same PR after the first iteration: \\(\\frac{d}{N}  \\). 

<br>
###  How we will attain the solution  ?

We start up with a Spark DataFrame which is a fraction of the original wikipedia database. 

<table>
  <tr>
  <th>PartialWikipediaDF</th>
  </tr>
  <tr>
  <td>title:string</td>
  <td>id:integer</td>
  <td>text:string</td>
  </tr>
</table>

Also we must create a broadcast Variable of a Pandas DataFrame, with the titles of all the pages in one column, and with their respective ID in another column. This variable will be called 'pages_index'.

<table>
  <tr>
  <th>pages_index </th>
  </tr>
  <tr>
  <td>title:string</td>
  <td>id:integer</td>
  </tr>
</table>

For attaining the solution we will follow the steps noted in the following MarkDown cells. 

*** Check Points (.cache) *** <br>
Along the steps we will make some 'checkpoints' with the '.cache()' spark tool. The reason of doing this is because once the computations (of the instructions set before making the checkpoint) are done, these are stored in cache and don´t need to be recomputed again. Spark will just to take the needed information from cache memory. Of course, the check point will be done over inmutable datasets (datasets which values won´t change along the code execution).

In [None]:
wikipediaDF=spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet") # Full wikipedia set

In [None]:
# Select a fraction of the Total wikipedia pages you want to pick
PartialWikipediaDF=wikipediaDF.sample(fraction=0.01,seed=122370982).cache() # Fraction of full wikipedia set --> Fraction Limit: 0.000001
PartialWikipediaDF = PartialWikipediaDF.dropDuplicates()  # Remove the duplicated instances

In [None]:
tolower_udf= udf(lambda x: x.lower()) #  UDF used to transform every title to lower case

In [None]:
# Generating the mappings of all the links in the Wkipedia DB with their respetive id
intermediate_value=wikipediaDF.select(tolower_udf(wikipediaDF["title"]).alias("title"),wikipediaDF["id"])
pages_index=intermediate_value.toPandas()
pages_index

In [None]:
pages_index = pages_index.drop_duplicates() # Removing Duplicates 

In [None]:
# We transform the pandas DF into a broadcast variable. A broadcast variable is a read-only variable, whose 
# information can be obtained by other machines working on parallel.
# We will use this variable in the next UDF 'titles_to_id' which is used for mapping each of the links found in the text to its respective ID.
broadcastVar = sc.broadcast(pages_index)  # Transform it into a broadcast variable

In [None]:
broadcastVar.value

** Step 1: Finding links inside the Text ** <br>
Obtain the name of the links which are in the text of each of the pages. For doing this we will create a UDF (parse_links) which searches all link structures. Then, apply the UDF over the 'text' column and obtain as a result a column of arrays (links), each list containing the names of the outgoing links. In case that there isn´t a link in the text, we will return an empty list.  

<table>
  <tr>
  <th>Data</th>
  </tr>
  <tr>
  <td>title:string</td>
  <td>id:integer</td>
  <td>links:array of strings</td>
  </tr>
</table>

In [None]:
# The 'parse_links' UDF is applied over the text field of each instance, and it saves in a list all the names of links (to other pages) found in the text. 
# The purpose of using UDFs is because we can make computations row by row in a much faster way since it can be parallelized.
# (we can input one or more columns for applying the UDF over them)

def parse_links(document_body):
  data=re.findall(r'\[\[(.+?)\]\]',document_body)
  
  if (len(data)>0): # In case that more than one link is found in the text 
    links=[s.lower() for s in data]
  else:   # In case there arent links in the text
    links= []
  return links

In [None]:
parse_links_udf = udf(parse_links,ArrayType(StringType())) # Transform to UDF function 

In [None]:
# Select the Columns of interest (title, id, links found in the text) from the whole Partial Dataset. We apply 'parse_links_udf' over the text column 
# obtaining the corrersponding link names found on each page. 
data=PartialWikipediaDF.select(tolower_udf(PartialWikipediaDF["title"]).alias("title"),PartialWikipediaDF["id"],parse_links_udf(PartialWikipediaDF["text"]).alias("links")).cache()

** Step 2: Map the links to their ID ** <br>
Now, we should create a UDF (titles_to_id) to transform the names of the link into their respective IDs. <br>
Some of the names come splitted by a bar "|", this is because that page has different alias. Thus we must split the string into the different names it has and search which one is on our 'pages_index' variable (it can happen that none the alias are inside the variable, meaning that it isn´t a wikipedia page, or that more than one of the names are inside the variable. In the latter case, we don´t have to worry since we will delete duplicate values at the end of the function). <br>
Other links will be unique (with that I mean that they are strings which aren´t divided by "|") and we will only have to search its name in the 'pages_index' variable. Also can happen that the list is empty and in that case an empty list will be returned too. 

Once we have created the UDF we apply it over the 'links' column obtaining a column of arrays of the corresponding link IDs. 

<table>
  <tr>
  <th>Partialdirect_linksDF</th>
  </tr>
  <tr>
  <td>id:integer</td>
  <td>links_id:array of integers (the IDs)</td>
  </tr>
</table>

In [None]:
# This function will be a UDF which maps each list of outgoing links into its respective ids
# i.e : Actual page id--> 345 ; links--> ['History','Mathematics'] ; titles_to_id_udf(title) --> [1234,9886]  (links_id)

def titles_to_id(titles):

  data_titles=broadcastVar.value  
  # Split the pages which have alias names
  result_clean = list(map(lambda sub: sub.split("|"), titles))
  result_clean = sum(result_clean, [])
    
  # To id
  if (len(result_clean)>0):
    ids=data_titles[data_titles.title.isin(result_clean)].id.to_list()
    ids = list(set(ids))  # With this we ensure that aren´t duplicates in our ultimate list
  else: # In case that there aren´t links in the text of the page 
    ids= [] 
  return ids

In [None]:
titles_to_id_udf=udf(titles_to_id,ArrayType(IntegerType())) # transform to UDF

In [None]:
# Apply the 'titles_to_id_udf' over the links column of the 'data' DF for obtaining the IDs of the outgoing links.
Partialdirect_linksDF= data.select(data["id"],titles_to_id_udf(data["links"]).alias("links_id")).cache()

** Step 3: Create the Count DF ** <br>
Create another UDF which counts the lengths of the lists of the previous computed column 'links_id'. Conceptually, these counts are the number of outgoing links which are in the text of the pages in the 'ids' column. 
We will generate a different Data Frame for storing these results. Once we have created the Spark DF, we will pass it to pandas and make it a broadcast Variable (good idea since it is a static dataframe). 


At a beginning, I didn´t perform this transformation. What I did before was to create a count column in the Reverse DataFrame (which contained the counts of the links which pointed to our page of interest). <br>
The reason why I have decided to do it in this way instead is because it takes much less time to perform the final 'while' loop due to how Spark works. What I mean is that Spark doesn´t make the computations up to we use a df.toPandas() for instance ( or also --> df.collect(), dislplay(df) ). It is true that takes time for transforming to Pandas but in the long run it comes out being much more efficient. 

Count DF:

<table>
  <tr>
  <th>Partialdirect_links_count_DF</th>
  </tr>
  <tr>
  <td>ids:integer</td>
  <td>count:integer</td>
  </tr>
</table>

In [None]:
# UDF for counting the number of links that each page contains. It just takes the length of the links_id list 
def count_links(links):
  return len(links)

In [None]:
count_links_udf = udf(count_links,IntegerType())  # transform to UDF

In [None]:
# Generate a DF which has the id of the page and the number of outgoing links it has  (the 'count' var)
Partialdirect_links_count_DF=Partialdirect_linksDF.select(Partialdirect_linksDF["id"].alias('ids'),count_links_udf(Partialdirect_linksDF["links_id"]).alias("count"))

In [None]:
count_pd = Partialdirect_links_count_DF.toPandas()  # To pandas 

In [None]:
count_broad= sc.broadcast(count_pd)    # Making it Broadcast

** Step 4: Creating the Reverse DF ** <br>

Since for the PR compuation we need to know the pages which point to our actual page, we need to obtain the **Reverse DF** from the 'Partialdirect_linksDF'.<br>  A simple way of understanding how this DF is, imagine an instance of the direct DF :

<table>
  <tr>
  <th>Direct DF</th>
  </tr>
  <tr>
  <th>id (points to links in 'link_id')</th>
  <th>link_id</th>
  </tr>
  <tr>
  <td>id_23</td>
  <td>[id_10,id_34]</td>
  </tr>
</table>

In the Reverse DF it would be tranformed into two instances :

<table>
  <tr>
  <th>Reverse DF</th>
  </tr>
  <tr>
  <th>id (pointed by links in 'link_id')</th>
  <th>link_id</th>
  </tr>
  <tr>
  <td>id_10</td>
  <td>id_23</td>
  </tr>
  <tr>
  <td>id_34</td>
  <td>id_23</td>
  </tr>
</table>

<br>For making this transformation I use the 'explode' function which basically divides the instances (From: id_23 --> [id_10,id_34] ; To: id_23 --> id_10 and id_23 --> id_34). After using 'explode', I make a shift in the columns and the reverse would be done. The only thing left would be to group by the new 'id' column since can happen that other links are poinitng to the links in 'id' (imagine id_50 --> [id_10] ; Apply Reverse: id_10 --> id_50; Apply Groupby --> id_10 --> [id_23,id_50]).



Also, it would be very usefull to have the count attribute of those pages which point to other (in the above example, the count of id_23 would be 2).
However, because of computational reasons (as explained in Step 3), I have decided just to leave the Reverse DF with the two necessary Columns:

<table>
  <tr>
  <th>Partialreverse_linksDF</th>
  </tr>
  <tr>
  <td>id:integer</td>
  <td>link_id: array of integers (the IDs which point to id)</td>
  </tr>
</table>

Something we must take into account is that, in the 'id' column, we don´t have all the pages of our wikipedia fraction. We just have those nodes which are pointed by others. That is why in the next step we must obtain an updated 'Partialreverse_linksDF' with all the pages of our subset (called 'Complete_df').

In [None]:
from pyspark.sql.functions import explode

# TRANSFORMING THE DIRECT DF INTO REVERSE DF 
# I decided to apply the explode function because can be used for making the reverse in a faster way. 
Partialreverse_links= Partialdirect_linksDF.select("id",explode("links_id").alias('link_id'))
Partialreverse_links = Partialreverse_links.select(Partialreverse_links["link_id"].alias("id"),Partialreverse_links["id"].alias("links_id"))

# GROUP BY 
# Group by the column id (since there will be more than one id in the 'link_id' column that is pointing to the same page in the 'id' column)
Partialreverse_linksDF = Partialreverse_links.groupBy("id").agg(collect_list("links_id").alias('link_id'))

** Step 5: Creating a DF with all the Nodes in the Subset ** <br>

For obtaining a column with all the ids of the subset, we can take the id column of the 'Partialdirect_linksDF' (or from 'Partialdirect_links_count_DF' since it is the same) and of the 'Partialreverse_linksDF' and make its union (avoiding duplicate ids). This will give us a DF with a column containing all the ids (called 'result' in the code). <br>
Then we have to join the result DF with 'Partialreverse_linksDF' (by their id column) obtaining a very similar DF as 'Partialreverse_linksDF' but with a larger id column. Those ids which weren´t before in the 'Partialreverse_linksDF' now have NaN values in the column 'link_id'.

<table>
  <tr>
  <th>Complete_df</th>
  </tr>
  <tr>
  <td>id:integer</td>
  <td>link_id: array of integers (the IDs which point to id; will contain NaN values)</td>
  </tr>
</table>

same as 'Partialreverse_linksDF' but rememeber keep in mind that 'Complete_df' has more instances

In [None]:
# OBTAINING THE IDS OF ALL THE PAGES IN OUR SUBSET
rev_id = Partialreverse_linksDF.select(Partialreverse_linksDF["id"])  # IDs from the Reverse DF 
dir_id = Partialdirect_links_count_DF.select(Partialdirect_links_count_DF["ids"]) # IDs from the Direct DF 

# Union of the ids of Partialreverse_linksDF and  Partialdirect_links_count_DF (which has the same id column as Partialdirect_linksDF)
result = rev_id.union(dir_id).distinct()  # With distinct we avoid duplicates
result = result.select(result.id.alias('ids'))

In [None]:
# OBTAIN THE FULL REVERSE DF 
# Join with the Reverse DF for having the 'link_id' column information. Two situations will happen:
#  'id'        'link_id'      (columns)
# id_20 <-- [id_69, id_39]  This node is pointed by others
# id_2  <--       NaN       This node isn´t pointed by other pages

Complete_df = result.join(Partialreverse_linksDF,result.ids == Partialreverse_linksDF.id,"full")  

# Null values are generated but no problem, they will be usefull for differentiate which ids aren´t pointed by others  (null == None)
Complete_df = Complete_df.select(Complete_df['ids'].alias('id'),Complete_df['link_id']).cache()

** Step 6: Initialize the Pandas DF (containing the initial PR) ** <br>

The Page Rank column that we are going to calculate at each iteration will be needed for the next iteration. That is why it would be of interest having this information in an accessible variable. That is why we are going to create a Pandas DF with all the pages´ id and their respective PR.
Taking advantage of the fact that we are passing it to pandas, we can obtain very easily and quickly the total number of pages that our subset has (N). In case we wanted to obtain this value from a spark dataframe it would be much more expensive. <br>
After obtaining N, we can initiallize easily the PR column (all with 1/N values). 

Since we will need to update our Pandas DF at the end of every iteration, make it a broadcast variable isn´t a good idea in this case.

<table>
  <tr>
  <th>partial_pandas</th>
  </tr>
  <tr>
  <td>id:integer</td>
  <td>PR_prev:integer</td>
  </tr>
</table>

In [None]:
# Transform to pandas the column of ids (contains all the ids in the subset)
result_ = result.select(result['ids'].alias('id'))
partial_pandas = result_.toPandas()

In [None]:
len_col = len(partial_pandas.index)  # Computing the value of N 
partial_pandas['PR_prev'] = 1/len_col  # Initializing the initial PR column (all with 1/N values)

In [None]:
partial_pandas

** Step 7: UDF for computing the Page Rank of each page ** <br>

Now we are ready to compute the Page Ranks. We will code a UDF called 'iterat', which by imputing the column 'links_id' and the 'partial_pandas' DF (updated at each iteration), returns a column with the  updated Page Rank. 

***As has been explained before, we have two situations:*** <br>
 * *Pages which are pointed by others:* <br>In this case, first we will need to find the PRs of the links which are pointing to the page of interest (that is, the Page Ranks belonging to the links inside the 'links_id' list). We will search the PRs of the links in the Pandas DF. Afterwards, we will need to find the count values of the links which are pointing to the page of interest (use the Count Broadcast variable 'count_broad' for doing this). <br>Once we have the PRs (saved in a list called 'link_id_pr') and the count values, we perform the one by one division of the lists 'link_id_pr' and 'count' (i.e. link_id_pr= [0.003,0.001]; count = [4,7] --> division_list = [0.003/4,0.001/7]). <br>  Having the list of quotients, we add its values and apply the dumping factor criterion. That would give us the updated PR ('PR_actual' in code).
 
 * *Pages which aren´t pointed by others:* <br>For these pages, we can´t compute the summatory of quotients, since they aren´t pointed by other pages. As a result, the value of this sum is 0. Thus we can only update the PR of these pages by using \\(\\frac{d}{N}\\). This is why the page rank for these pages changes only once, then iteration after iteration they keep the same PR (and thus their difference with the previous PR is 0). 
 
 
We will have to refresh this UDF at every iteration, because if not, it won´t work with the updated pandas of each iteration (it will only use the initial one, with all PRs 1/N). 

Resulting DF (this DF will be converted to Pandas for updating the 'partial_pandas' DF):

<table>
  <tr>
  <th>secondary_DF</th>
  </tr>
  <tr>
  <td>id:integer</td>
  <td>PR_prev:float</td>
  </tr>
</table>

Initially, I also computed the difference with the previous PR (and returned a list with the updated PR and the difference). But for computational reasons I decided to take advantage of the pandas DFs for obtaining the differences (In the next Step is all explained).

In [None]:
dumping = 0.85   # Dumping Factor
def iterat(links, prt_pd = partial_pandas):     # 'links' correspond to each instance in the 'link_id' column
   
    # In case it is a page that isn´t pointed by others
    if links == None:
      PR_actual = (dumping)/len_col  # New Page Rank 
      return float(PR_actual)
    
    # In case it is a page that is pointed by others
    else:   
      # Find the Page ranks of the ids pointing to the id of interest (that is, the Page Ranks of the ids inside the list 'links')
      a = prt_pd[prt_pd.id.isin(links)]  
      pr_p = a.PR_prev
      link_id_pr = list(map(float, pr_p))  # List of Page Ranks 
      
      # Find the count of outgoing links of the ids pointing to the id of interest (that is, the counts of the ids inside the list 'links')
      count_list = count_broad.value[count_broad.value.ids.isin(links)]
      cl = count_list['count'].to_list()
      count = list(map(int, cl))     # List of Count values
      
      # Compute the individual quotients
      act_PR = list(map(truediv, link_id_pr, count)) #--> link_id_pr/count (these are lists)  link_id_pr: list of Page ranks | count: list of counts
    
      PR_actual = (dumping/len_col)+ ((1-dumping)*sum(act_PR))   # New Page Rank 
      return float(PR_actual)

** Step 8: Iterative Computation of the Page Rank ** <br>

At this point, we already have everything we need for the PR computation. We must perform a while loop which stops either when all the values in the column of differences are smaller than the tolerance (0.001) or when the maximum number of iterations (20) is reached. <br>
What I have done for checking if the loop must stop by tolerance is to initialize a boolean value as True (condit) which only will turn to False if the maximum number of the 'Diff' column is smaller than the tolerance. In this way, when the maximum is smaller than the tolerance, that implies that the rest of the differences are also smaller.

***What is repeated at each iteration are the following steps: ***

 * Step 0: Update the 'iterat' UDF (explained in Step 7) with the corresponding 'partial_pandas' DF at each iteration. 

 * Step 1: Apply the 'iteration_udf' UDF as has been explained before. The DF 'secondary_DF' is obtained (with 'id' and 'PR_prev', which is the updated page rank, but for code reasons is named like that) and then is transformed to a Pandas DF (next_pandas).
 
 * Step 2: Now, since we have generated a new pandas with the updated PR column, we can obtain the 'Diff' column in a very quick way, by substracting the partial_pandas PR_prev column and the next_pandas PR_prev column. Obtaining the maximum value by this procedure is much faster than computing it in the Spark DF and then finding the max of this column (because spark is going to perform all the computations when we use the 'collect' tool). After obtaining the 'Diff' column we just have to find the maximum of this and check if is higher or equal than the tolerance. 

 * Step 3: Finally, we must update the 'partial_pandas' DF with the values of the 'next_pandas' DF, since for the next iteration, the new PRs are computed with the PRs of 'next_pandas' DF. Also we increase by one the counter 'iteration'. These would be the steps to perform at each iteration. 
  
Once the while loop finishes, we will just have to display the result in a clear way:

<table>
  <tr>
  <th>Final_display</th>
  </tr>
  <tr>
  <td>title:string</td>
  <td>id:integer</td>
  <td>Page_Rank:float </td>
  </tr>
</table>

In [None]:
# Initializing the necesary parameters 
tolerance = 0.001 # For stopping by tolerance condition
condit = True  # Boolean var for stopping by tolerance
max_iterations = 20 # For stopping by iteration condition
iteration = 0  # Counter for iterations

while (condit and iteration < max_iterations):
      # Step 0
      iteration_udf = udf( lambda links: iterat(links,partial_pandas), DoubleType())  # Update the UDF for computing the PRs
      
      # Step 1
      secondary_DF = Complete_df.select('id',iteration_udf('link_id').alias('PR_prev')) # Computing the new PRs
      next_pandas = secondary_DF.toPandas()   # Transform to Pandas the previous DF 
      
      # Step 2
      # Obtain the Diff column (the result of this computation is a Pandas column with all the differences)
      diff = abs(partial_pandas.PR_prev - next_pandas.PR_prev)/partial_pandas.PR_prev 
      condit = max(diff) >= tolerance # Obtain the maximum of this column and compare with tolerance 
      
      # Step 3
      partial_pandas = next_pandas   # Update the 'partial_pandas' DF substituting it by 'next_pandas'
      iteration += 1  # Increase counter

print('Number of Iterations: ',iteration)          

**Page Rank Sum Aclaration**

If the subset on which we execute the code is very small, then when we sum all the final PRs, we will notice that the result of this sum  tends towards the dumping factor. This makes sense, since in a small subset there are a lot of pages which aren´t pointed by others (remember that the PR for these pages is (dumping factor/N), thus the final PR sum tends to 0.85 (in case we are using a dumping factor of 0.85).

As long as we take a subset with more instances, the final sum of PRs will tend to 1.

In [None]:
val = partial_pandas["PR_prev"].to_list()    # Takig the PR_prev column of the Pandas DF as list, for then making the sum in a very easy way
print('Total Sum of Page Ranks: ',sum(val))

In [None]:
# This is a simple UDF for searching in the broadcast var the respective titles of the ids in the 'id' col. 
def title_id (id_):
  elem = broadcastVar.value[broadcastVar.value.id.isin([id_])]
  return elem.title.to_list()[0]

In [None]:
title_udf = udf(title_id, StringType())

In [None]:
# DF for making the Display of the results 
Final_display = secondary_DF.select( title_udf(secondary_DF['id']).alias('title'),'id',(secondary_DF.PR_prev).alias('Page_Rank'))

In [None]:
display(Final_display)

## Conclusion

After having done this work, it has become more than clear to me that spark is a great tool for solving Big Data problems. In my opinion, the biggest advantage it has is the fact that code can be executed by more than one device (if it is coded with Spark structures in an appropriate way). <br> For a basic laptop like mine, it takes much time in running all the code for a relatively small fraction of data. I guess that for a more powerfull computer (which can handle a bigger amount of workers) it can run this code for a bigger fraction of data in less time even. 

However, there are other problems which require the use of much more instances than the total of instances of this problem (5823210) and in those cases it would be unfeasible to execute the code with only one computer, no matter how powerful it is. 

I´m sure that in the future we will have to work with Spark or similar tools, since today, to solve a problem in the most effective way a huge amount of data needs to be processed.