<center><h1>PySpark - Analyzing Wikipedia Clickstream Data</h1></center>

This project is centered around showcasing how to use PySpark to analyze Wikipedia clickstream data. This notebook begins by showing users how to start a new PySpark session and guide them through creating a Resilient Distributed Dataset (RDD) from sample clickstream counts. This project involved hands-on data manipulation with a PySpark RDD that specifically focused on handling clickstream data to analyze traffic patterns on Wikipedia pages.

## Starting PySpark with Clickstream Data

Let's create a new `SparkSession` and assign it to a variable named `spark`.

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create a new SparkSession
spark = SparkSession.builder.getOrCreate()

Now let's create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [4]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext\
.parallelize(sample_clickstream_counts)

Using the RDD from the previous step, let's create a DataFrame named `clickstream_sample_df`.

In [5]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd\
.toDF(['source_page', 'target_page', 'link_category', 'link_count'])

# Now to display the DataFrame to the notebook
clickstream_sample_df.show(truncate=False)

+--------------------+--------------------------+-------------+----------+
|source_page         |target_page               |link_category|link_count|
+--------------------+--------------------------+-------------+----------+
|other-search        |Hanging_Gardens_of_Babylon|external     |47000     |
|other-empty         |Hanging_Gardens_of_Babylon|external     |34600     |
|Wonders_of_the_World|Hanging_Gardens_of_Babylon|link         |14000     |
|Babylon             |Hanging_Gardens_of_Babylon|link         |2500      |
+--------------------+--------------------------+-------------+----------+



## Inspecting Clickstream Data

Let's read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook to get a sense of the dataframe.

In [6]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read\
.option('header', True)\
.option('delimiter', '\t')\
.option('inferSchema', True)\
.csv('./cleaned/clickstream/')

# Display the DataFrame to the notebook
clickstream.show(10, truncate=False)

+-------------------+--------------------------+-------------+-------------+-----------+
|referrer           |resource                  |link_category|language_code|click_count|
+-------------------+--------------------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |en           |43190      |
|other-internal     |Phantom_Thread            |external     |en           |21683      |
|other-empty        |Phantom_Thread            |external     |en           |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |en           |40449      |
|other-search       |Phantom_Thread            |external     |en           |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |en           |30041      |
|other-search       |Yossi_Benayoun            |external     |en           |11045      |
|other-empty        |Parthiv_Patel             |external     |en           |11481      |
|other-search       |

Now to print the schema of the DataFrame in the notebook.

In [7]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



Let's drop the `language_code` column from the DataFrame and see the new schema.

In [8]:
# Drop target columns
clickstream = clickstream.drop('language_code')

# Display the first few rows of the DataFrame
clickstream.show(5, truncate=False)
# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|referrer           |resource      |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



Here we should rename `referrer` and `resource` to `source_page` and `target_page`, respectively.

In [9]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream\
    .withColumnRenamed('referrer', 'source_page')\
    .withColumnRenamed('resource', 'target_page') 
  
# Display the first few rows of the DataFrame
clickstream.show(5, truncate=False)
# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|source_page        |target_page   |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



## Querying Clickstream Data


Let's add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`.

In [10]:
# Create a temporary view in the metadata for this `SparkSession` to make the data queryable with `sparkSession.sql()`
clickstream.createOrReplaceTempView("clickstream")

Not to filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [11]:
# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream\
    .filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
    .orderBy('click_count', ascending=False)\
    .show(5, truncate=False)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



Let's perform the same analysis as the previous exercise using a SQL query. 

In [12]:
# Filter and sort the DataFrame using SQL
query = """
SELECT *
FROM clickstream
WHERE clickstream.target_page == 'Hanging_Gardens_of_Babylon'
ORDER BY click_count DESC
"""

spark.sql(query).show(truncate=False)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



Now to calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [13]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream\
    .select(['link_category', 'click_count'])\
    .groupBy('link_category')\
    .sum() \
    .orderBy('sum(click_count)', ascending=False)\
    .show(5, truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|external     |3248677856      |
|link         |97805811        |
|other        |9338172         |
+-------------+----------------+



In [14]:
# Aggregate the DataFrame using SQL
query = """
SELECT 
    link_category,
    SUM(click_count) AS total_click_count
FROM clickstream
GROUP BY 1
ORDER BY 2 DESC
"""

spark.sql(query).show(truncate=False)

+-------------+-----------------+
|link_category|total_click_count|
+-------------+-----------------+
|external     |3248677856       |
|link         |97805811         |
|other        |9338172          |
+-------------+-----------------+



## Saving Results to Disk

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. We should use `filter()` here to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [20]:
# Create a new DataFrame (named `internal_clickstream`) using `filter` to select rows to a specific condition and 
# `select` to choose which columns to return from the query.
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(10, truncate=False)

+----------------------------+----------------------+-----------+
|source_page                 |target_page           |click_count|
+----------------------------+----------------------+-----------+
|Daniel_Day-Lewis            |Phantom_Thread        |43190      |
|90th_Academy_Awards         |Phantom_Thread        |40449      |
|Shinee                      |Kim_Jong-hyun_(singer)|24433      |
|Agnyaathavaasi              |Anu_Emmanuel          |15020      |
|Naa_Peru_Surya              |Anu_Emmanuel          |12361      |
|Mariah_Carey                |Nick_Cannon           |16214      |
|Kesha                       |Rainbow_(Kesha_album) |11448      |
|David_Attenborough          |John_Attenborough     |11252      |
|Boney_M.                    |Bobby_Farrell         |14095      |
|The_End_of_the_F***ing_World|Jessica_Barden        |237279     |
+----------------------------+----------------------+-----------+
only showing top 10 rows



Now using `DataFrame.write.csv()`, let's save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [21]:
# Save the `internal_clickstream` DataFrame to a series of CSV files
internal_clickstream\
    .write.csv('./results/article_to_article_csv/', mode="overwrite")

Using `DataFrame.write.parquet()`, let's save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [23]:
# Save the `internal_clickstream` DataFrame to a series of parquet files
internal_clickstream\
.write.parquet('./results/article_to_article_pq/', mode="overwrite")

Great! Let's close the `SparkSession` and underlying `SparkContext` to wrap this up.

In [26]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()