# FINAL WORK 

## Authors:
**Full name: DIEGO HERNÁNDEZ SUÁREZ AND ÁNGELA DURÁN PINTO**
    
**NIA : 100472809 AND 100472766**

First, we need to import to import the required libraries

In [0]:
import pandas as pd
from pyspark.sql.functions import split, explode, collect_list, count, length
import numpy as np
import re
from pyspark.sql.types import *


Now, we read the Wikipedia dataset stored in Parquet format from the specified path. The dataset is loaded into a Spark DataFrame named wikipediaDF. This dataset contains information about Wikipedia articles. <br>

Then, we take a small random sample from the entire Wikipedia dataset. The fraction parameter determines the proportion of the dataset to be sampled. . The seed=123 ensures reproducibility by setting the random seed. Finally, the sampled DataFrame is cached for efficient reuse during subsequent operations. <br> 

In [0]:
wiki_data =spark.read.parquet("dbfs:/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")

# Obtain a sample from the whole data set.
sample_data = wiki_data.sample(fraction=0.001, seed=123).cache()


Now, we define the function **"document_links_extractor"** to extract links from the content of a document. The function uses the re.findall method to identify substrings enclosed within double square brackets, representing potential links. If links are found, they are converted to lowercase, and the resulting list is returned. In the absence of links, an empty list is initialized. This function is intended to be used as part of a larger process, such as parsing and processing text data, especially in scenarios where links within documents need to be extracted and processed. The conversion to lowercase ensures uniformity and consistency in handling links. <br>

Then, we creates a User-Defined Function (UDF) named **"document_links_extractor"** using the previously defined parse_links function. This UDF is designed to extract links from document content and is configured to return an array of strings. The udf function allows this functionality to be applied efficiently across Spark DataFrames, enabling distributed and parallelized processing. Consequently, when incorporated into Spark DataFrame operations, parse_links_udf can be employed to extract and manipulate link information within a distributed computing environment, in this case, a Spark cluster.

In [0]:
# Define a function to parse links from document body
def document_links_extractor(content):
    data = re.findall(r'\[\[(.+?)\]\]', content)
    
    # If links are found, convert them to lowercase; otherwise, initialize an empty list
    links = [z.lower() for z in data] if len(data) > 0 else []

    return links

# Create a UDF for link extraction
extract_links_udf = udf(document_links_extractor, ArrayType(StringType()))


We define a UDF using a lambda function to convert strings to lowercase

In [0]:
to_lower_udf = udf(lambda x: x.lower())

Now, we apply the previous defined functions to obtain our sample parsed saving the extracted links.

In [0]:
# Apply UDFs to select and transform columns
parsed_sample_data = sample_data.select(
    to_lower_udf("title").alias("title"),
    "id",
    extract_links_udf("text").alias("text_links")
).cache()

We want to save all the titles(in lower case) with their corresponding ids from the entire wikipedia dataframe. <br>

We save it as a broadcast variable. This is a strategy in Spark to efficiently share read-only data across tasks in a distributed computation. It helps in avoiding unnecessary data shuffling and reduces the overhead of sending large datasets to each worker node independently. 

In [0]:

# Compute the panda DF obtaining the id of the titles
titles_id_pandas_df = wiki_data.select(to_lower_udf("title").alias("title"), "id").toPandas()

# Compute the broadcoast to the titles id panda DF
titles_id_BC = sc.broadcast(titles_id_pandas_df)


**convert_titles_to_id** is designed to map a list of titles to their corresponding IDs. The function takes two parameters: titles, representing the input list of titles, and titles_data_broadcast, which is a broadcasted Pandas DataFrame containing information about titles and their associated IDs. Within the function, two lists are initialized: title_id_list to store the resulting IDs and titles_data to access the broadcasted data efficiently. The function then iterates over the input titles, searching for matching rows in the broadcasted titles data. If matches are found, the ID value is extracted from the last occurrence in the row and appended to the title_id_list. The function ultimately returns a list of integer IDs corresponding to the input list of titles. This function is particularly useful in scenarios where efficient title-to-ID mapping is required, leveraging the broadcasted data to avoid redundant transfers in a distributed computing environment.

Again, we define its corresponding udf.

In [0]:
def convert_titles_to_id(titles, titles_data_broadcast):
   
    title_id_list, titles_data = [], titles_data_broadcast.value

    # Iterate over input titles
    for title in titles:
        # Find the row in titles_data corresponding to the title
        matching_rows = titles_data[titles_data["title"] == title]

        # Check if there are matching rows
        if not matching_rows.empty:
            # Extract the ID value from the last occurrence in the row
            id_value = int(matching_rows["id"].iloc[-1])
            title_id_list.append(id_value)

    return title_id_list

convert_titles_to_id_udf = udf(lambda titles: convert_titles_to_id(titles, titles_id_BC), ArrayType(LongType()))

### Forward Link Martix
Create a new DataFrame by applying the convert_titles_to_id_udf User-Defined Function (UDF) to the "text_links" column of the existing DataFrame ParsedWikipediaDF. This transformation effectively maps titles to corresponding IDs, and the resulting DataFrame contains two columns: "id" and "links."

In [0]:
auxForward_df = parsed_sample_data.select(
    "id",
    convert_titles_to_id_udf("text_links").alias("links")
).cache()

Now, we define a simple Python function that takes a list of links as input and returns the count of elements in the list using the len function. Subsequently, a User-Defined Function (UDF) is created using wrapping the functionality of the counting function. This UDF is configured to return a long integer (LongType()). The purpose of this UDF is to help us to calculate the number of outlinks associated with each record.

In [0]:
def count_links(links):
    return len(links)

count_links_udf = udf(count_links, LongType())

Finally, we create our last version of our **Forward Link Martix** by selecting the "id" and "links" columns from the existing DataFrame auxForward_df. Additionally, it applies the previously defined User-Defined Function (UDF) named count_links_udf to the "links" column, creating a new column named "outgoing" that represents the count of outgoing links associated with each record.

In [0]:
forward_df = auxForward_df.select(
    "id",
    "links",
    count_links_udf("links").alias("outgoing")
).cache()
forward_df.toPandas()

### Backward Link Matrix
Using the forward_df DataFrame, the backward_df DataFrame is computed through a multi-step process. Initially, the ForwardLinks DataFrame is expanded, resulting in a comprehensive DataFrame with "id" representing the source document's ID, "target" indicating the target document's ID, and "outgoing" denoting the count of outgoing links from the source document. <br>

Subsequently, the DataFrame is grouped by the "target" column, leading to the creation of the backward_df DataFrame. This new DataFrame encompasses columns like "id" for the target document's ID, "links" presenting a list of IDs for all documents pointing to the target document, and "outgoing" containing a list of counters representing the outgoing links for each document ID in the "links" list. It's noteworthy that the backward_df DataFrame is strategically cached for subsequent PageRank calculations. Additionally, last nodes are stored separately to ensure their inclusion in the final output, given that they may not be represented explicitly in the backward_df.

In [0]:
aux_backward_df = forward_df.select(
    "id",
    explode("links").alias("target"),
    "outgoing"
).distinct()


backward_df = aux_backward_df.groupBy("target").agg(
    collect_list("id").alias("links"),
    collect_list("outgoing").alias("outgoing")
).withColumnRenamed("target", "id").cache()

## Page Rank Calculation
 

### Initialization
In this code segment, a Pandas DataFrame named pagerank_pandas_df is created by selecting the "id" column from the Spark DataFrame backward_df and converting it to Pandas using the toPandas() method. Subsequently, two new columns are added to this Pandas DataFrame: "PGRK" and "UpdatedPGRK." The "PGRK" column is initialized with a calculated value, where each row is assigned 0.85 divided by the total count of records in the backward_df. The "UpdatedPGRK" column is set equal to the "PGRK" column initially.

In [0]:
pagerank_pandas_df = backward_df.select("id").toPandas()

pagerank_pandas_df["PGRK"] = (0.85 / backward_df.count())
pagerank_pandas_df["UpdatedPGRK"] = pagerank_pandas_df["PGRK"]


### Calculation

**calculate_page_rank_iteration**, is used in an iterative process to compute the PageRank values for a set of documents based on incoming links. <br> 

The function takes four main parameters:
- links representing the incoming links' IDs
- counters indicating the count of outgoing links for each incoming link
- current_pgrk denoting the current PageRank value
- pagerank_data containing the existing PageRank values for all documents
- dp representing the damping factor.

Within the function, a new PageRank value (new_pgrk) is initialized with the current PageRank value. It then iterates over the incoming links and corresponding counters. For each link, it retrieves the matching row from the pagerank_data DataFrame based on the link's ID. If the row is not empty and the counter is positive (to avoid division by zero), it updates the new_pgrk using the PageRank formula specified in the code. The function ultimately returns the updated PageRank value.

The use of the damping factor (dp) ensures a balance between existing PageRank values and the influence of incoming links in the iterative update process.

In [0]:
def calculate_page_rank_iteration(links, counters, current_pgrk, pagerank_data, dp):
    
    new_pgrk = current_pgrk

    # Iterate over incoming links
    for link_id, counter in zip(links, counters):
        # Find the row in pagerank_data corresponding to the incoming link
        matching_row = pagerank_data[pagerank_data['id'] == link_id]

        # Check if the row is not empty
        if not matching_row.empty:
          # check the counter is positive to avoid division by zero
          if counter > 0:
            link_pgrk_value = matching_row.iloc[0]["PGRK"]

            # Update the PageRank
            new_pgrk = new_pgrk + (1 - dp) + dp * (link_pgrk_value / counter)
    # Return the new pagerank
    return new_pgrk

Then, we define an iterative process for calculating PageRank values using the PageRank algorithm with a maximum of 20 iterations and a specified tolerance level. Within each iteration, a loop iterates over nodes in the backward_df DataFrame to calculate new PageRank values using the calculate_page_rank_iteration function. The results are stored in a list named new_pgrk_values. Subsequently, the DataFrame columns storing the previous and new PageRank values ('PR' and 'UpdatedPGRK') are updated based on the calculated values.

A convergence check is performed by comparing the absolute difference between the new and previous PageRank values relative to the previous values. If the convergence condition is satisfied (i.e., the difference is lower or equal to the established tolerance), the iteration process is halted, and a message indicating convergence is printed. This convergence check ensures that the iterative algorithm stops when the PageRank values stabilize within the defined tolerance, optimizing computational resources. Overall, the code orchestrates the iterative computation of PageRank values while monitoring convergence.

In [0]:

max_iterations = 20
tolerance = 0.0001
for iteration in range(max_iterations):
    print(f"Iteration {iteration+1}")
    new_pgrk_values = []
    
    # Iterate over nodes to calculate new PageRank values
    for row_data in backward_df.collect():
      # Append to the new page rank by using the calculate page rank function.
        new_pgrk_values.append(calculate_page_rank_iteration(row_data[1], row_data[2], pagerank_pandas_df[pagerank_pandas_df['id'] == row_data[0]].iloc[0]["PGRK"], pagerank_pandas_df, 0.85))

    # Update PageRank values in the DataFrame
    pagerank_pandas_df['UpdatedPGRK'], previous_pgrk, pagerank_pandas_df['PGRK'] = new_pgrk_values, pagerank_pandas_df['PGRK'], pagerank_pandas_df['UpdatedPGRK']
    
    # Check convergence in the case the difference between the page ranks is lower or equal to the tolerance established
    if (abs((pagerank_pandas_df['UpdatedPGRK'] - previous_pgrk) / previous_pgrk) <= tolerance).head()[0]:
      print("Convergence condition satisfied. Stoping loop")
      break



Now, we define a Pandas DataFrame named lastnode by filtering the forward_df DataFrame to select records where the "outgoing" column is equal to 0, indicating final nodes with no outgoing links. The resulting DataFrame consists of the "id" column representing the IDs of these last nodes. Subsequently, the "PGRK" column in the lastnode DataFrame is initialized with PageRank values calculated as 0.85 divided by the total count of records in the original forward_df. Finally, the resulting DataFrame is displayed. This code snippet is likely addressing the handling of final nodes in the PageRank algorithm, initializing their PageRank values before the final output.

In [0]:
lastnode = forward_df.filter(forward_df["outgoing"] == 0).select("id").toPandas()

# Calculate PageRank for final nodes
lastnode["PGRK"] = 0.85 / forward_df.count()

# Display the resulting DataFrame
display(lastnode)

We select only the "id" and "PGRK" columns from the existing DataFrame page_rank_pdf. The code then concatenates this DataFrame with another DataFrame named lastnode, representing final nodes and their corresponding PageRank values. The pd.concat function is utilized to combine the two DataFrames, ensuring a continuous index in the resulting pagerank_pdf DataFrame. Finally, the combined DataFrame is displayed, providing a view of the calculated PageRank values for all nodes, including last nodes. This operation is crucial for consolidating PageRank information and presenting the final results in a comprehensive format.

In [0]:
# Select 'id' and 'PR' columns from PageRankPDF
pagerank_pdf = pagerank_pandas_df[['id', 'PGRK']]

# Concatenate PageRankPDF and final
pagerank_pdf = pd.concat([pagerank_pandas_df, lastnode])

# Display the resulting DataFrame
display(pagerank_pdf)

## Final Results

Construct the final results by merging two existing DataFrames: pagerank_pandas_df and a subset of the titles_id_pandas_df DataFrame containing only the "id" and "title" columns. The merge operation is performed based on the "id" column, linking the PageRank values with their corresponding titles. Following the merge, duplicates in the resulting DataFrame are removed, specifically based on the "id" column, using the drop_duplicates method. This ensures that each unique ID is associated with only one set of PageRank and title values. Finally, the cleaned and merged DataFrame is displayed, offering a comprehensive view of the final PageRank results with associated titles for each document in the dataset.

In [0]:
# Merge pagerank_pandas_df with titles_id_pandas_df to get titles for corresponding IDs
final_results = pd.merge(pagerank_pandas_df, titles_id_pandas_df[["id", "title"]])

# Remove duplicates based on 'id' column
final_results = final_results.drop_duplicates(subset=['id'])

# Display the resulting DataFrame
display(final_results)

We can see that we obtain appropiate results, the contributions are well computed. If we put just a few data, the results are going to be the same as maybe the pages does not have references.

# CONCLUSIONS
The project embarked on the ambitious task of implementing Brin and Page's seminal PageRank algorithm, a cornerstone of the original Google Search Engine. The algorithm's primary objective was to assign weights to webpages based on the number of external links pointing to them, thereby quantifying their significance within the web ecosystem. Leveraging the power of Apache Spark, a distributed computing framework, the analysis centered on a subset of Wikipedia data stored in a Databricks database. The first step involved creating a Spark DataFrame from the dataset, offering a scalable and distributed structure for efficient manipulation. A thoughtful decision to use a smaller version of the database for initial analysis allowed for a comprehensive understanding of the dataset's structure, with a sample of records providing insights into the seven crucial columns: 'title,' 'id,' 'revisionId,' 'revisionTimestamp,' 'revisionUsername,' 'revisionUsernameId,' and 'text.'

To initiate the PageRank algorithm, the project introduced a forward link matrix by parsing the outgoing links from the 'text' column using a User-Defined Function (UDF) for link extraction. The resultant matrix was then transformed into a backward link matrix. Key optimizations included the use of broadcast variables to efficiently map titles to IDs, enhancing the computational efficiency of the algorithm. The iterative PageRank computation, limited to a maximum of 20 iterations, involved calculating new PageRank values based on incoming links, with a convergence check implemented to terminate the loop when the PageRank values stabilized. The algorithm's final output was a Pandas DataFrame containing webpage titles, IDs, and their corresponding PageRank values. Despite the complexities of handling web data and the intricacies of link structures, the project successfully demonstrated the power of Apache Spark in distributed data processing for web analytics.

In extracting conclusions from this project, the strategic utilization of Apache Spark stands out as a cornerstone for handling extensive datasets with efficiency. Apache Spark's distributed computing paradigm excels in parallelizing computations across a cluster of machines, enabling seamless scalability to process large-scale datasets. The use of Spark DataFrames for crucial operations, including the creation of forward and backward link matrices, not only streamlined the implementation but also highlighted the adaptability of Spark to diverse data processing tasks. The DataFrame abstraction, providing a high-level and structured interface for data manipulation, contributed to code readability and ease of implementation. The distributed nature of Spark becomes particularly advantageous when dealing with web analytics, where the sheer volume of data demands parallel processing for timely and efficient insights.

The implementation's adept use of broadcast variables further underscored the efficiency gains achievable in distributed computing. By strategically broadcasting essential data like title-to-ID mappings, redundant data transfers were minimized, contributing to significant performance improvements. This optimization is particularly crucial in scenarios where certain data, such as titles and IDs, is repeatedly used across multiple nodes in a distributed environment. Moreover, the seamless transition between Spark DataFrames and Pandas DataFrames showcased the flexibility of the approach. This flexibility becomes pivotal in scenarios where local computations are essential, offering a smooth integration between the distributed capabilities of Spark and the convenience of Pandas for local analysis. The interoperability between these frameworks presents a versatile solution capable of addressing a wide spectrum of data processing requirements.

Despite the success in implementing the PageRank algorithm, certain challenges were encountered during the project. The iterative nature of PageRank computations, especially with large datasets, led to extended processing times. The algorithm's convergence and the sheer volume of webpages in the dataset contributed to computational challenges. Future optimizations could explore parallelization strategies, algorithmic improvements, or leveraging more advanced distributed computing frameworks to further enhance performance. The complexities of handling web data and the intricate nature of link structures underscore the ongoing need for innovations in distributed computing to tackle web analytics tasks efficiently.
