# Analyzing Wikipedia Clickstream Data
I will be working with Wikipedia’s open-source datasets. Wikipedia maintains a dataset called “Clickstream” that records clicks between pairs of linked Wikipedia articles. Because not all readers arrive at a Wikipedia article by clicking a link from another, the dataset also includes categories for clicks that originate outside of Wikipedia (e.g., external-search tracks all clicks coming from major search engines).

* [Project Page Link](https://www.codecademy.com/courses/big-data-pyspark/projects/analyzing-wikipedia-pyspark)

## Acquiring and Loading Data

In [1]:
from pyspark.sql import SparkSession

## Introduction to Clickstream Data

In [2]:
# Create a new SparkSession
spark = SparkSession.builder.getOrCreate()

Create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [4]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext.parallelize(
    sample_clickstream_counts
)

Now, create a DataFrame named `clickstream_sample_df`

In [5]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(["source_page", "target_page",  "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show(5)

+--------------------+--------------------+-------------+----------+
|         source_page|         target_page|link_category|link_count|
+--------------------+--------------------+-------------+----------+
|        other-search|Hanging_Gardens_o...|     external|     47000|
|         other-empty|Hanging_Gardens_o...|     external|     34600|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|     14000|
|             Babylon|Hanging_Gardens_o...|         link|      2500|
+--------------------+--------------------+-------------+----------+



Inspecting Clickstream Data

Read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook

In [6]:
clickstream = spark.read.option('header', True) \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv("./cleaned/clickstream/")

clickstream.show(5)

+-------------------+--------------+-------------+-------------+-----------+
|           referrer|      resource|link_category|language_code|click_count|
+-------------------+--------------+-------------+-------------+-----------+
|   Daniel_Day-Lewis|Phantom_Thread|         link|           en|      43190|
|     other-internal|Phantom_Thread|     external|           en|      21683|
|        other-empty|Phantom_Thread|     external|           en|     169532|
|90th_Academy_Awards|Phantom_Thread|         link|           en|      40449|
|       other-search|Phantom_Thread|     external|           en|     536940|
+-------------------+--------------+-------------+-------------+-----------+
only showing top 5 rows



Print the schema of the DataFrame in the notebook.

In [8]:
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



Drop the `language_code` column from clickstream.

In [10]:
clickstream = clickstream.drop("language_code")

clickstream.show(5)
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|           referrer|      resource|link_category|click_count|
+-------------------+--------------+-------------+-----------+
|   Daniel_Day-Lewis|Phantom_Thread|         link|      43190|
|     other-internal|Phantom_Thread|     external|      21683|
|        other-empty|Phantom_Thread|     external|     169532|
|90th_Academy_Awards|Phantom_Thread|         link|      40449|
|       other-search|Phantom_Thread|     external|     536940|
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

In [11]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream.withColumnRenamed("referrer", "source_page")\
        .withColumnRenamed("resource", "target_page")
  
clickstream.show(5)
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|        source_page|   target_page|link_category|click_count|
+-------------------+--------------+-------------+-----------+
|   Daniel_Day-Lewis|Phantom_Thread|         link|      43190|
|     other-internal|Phantom_Thread|     external|      21683|
|        other-empty|Phantom_Thread|     external|     169532|
|90th_Academy_Awards|Phantom_Thread|         link|      40449|
|       other-search|Phantom_Thread|     external|     536940|
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



## Querying Clickstream Data

Using PySpark DataFrame methods and SQL, add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`

In [14]:
clickstream.createOrReplaceTempView("clickstream")

Filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [15]:
clickstream.filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
    .orderBy('click_count', ascending=False)\
    .show(10)

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|        other-search|Hanging_Gardens_o...|     external|      47088|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
+--------------------+--------------------+-------------+-----------+



Now performing with a SQL query. 

In [16]:
spark.sql(
    """
    SELECT * FROM clickstream
    WHERE target_page = "Hanging_Gardens_of_Babylon"
    ORDER BY click_count desc
    """
).show(10)

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|        other-search|Hanging_Gardens_o...|     external|      47088|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
+--------------------+--------------------+-------------+-----------+



Calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [18]:
clickstream.groupBy('link_category')\
    .sum()\
    .show(5)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|         link|        97805811|
|        other|         9338172|
|     external|      3248677856|
+-------------+----------------+



Now, using a SQL query:

In [19]:
spark.sql(
    """
    SELECT link_category, SUM(click_count) FROM clickstream
    GROUP BY link_category
    """
).show(5)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|         link|        97805811|
|        other|         9338172|
|     external|      3248677856|
+-------------+----------------+



## Saving Results to Disk

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. Use `filter()` to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [20]:
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

+----------------------------+----------------------------+-----------+
|source_page                 |target_page                 |click_count|
+----------------------------+----------------------------+-----------+
|Daniel_Day-Lewis            |Phantom_Thread              |43190      |
|90th_Academy_Awards         |Phantom_Thread              |40449      |
|Shinee                      |Kim_Jong-hyun_(singer)      |24433      |
|Agnyaathavaasi              |Anu_Emmanuel                |15020      |
|Naa_Peru_Surya              |Anu_Emmanuel                |12361      |
|Mariah_Carey                |Nick_Cannon                 |16214      |
|Kesha                       |Rainbow_(Kesha_album)       |11448      |
|David_Attenborough          |John_Attenborough           |11252      |
|Boney_M.                    |Bobby_Farrell               |14095      |
|The_End_of_the_F***ing_World|Jessica_Barden              |237279     |
|Quentin_Tarantino           |The_Hateful_Eight           |12018

Save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [21]:
internal_clickstream.write.csv('./results/article_links_csv/', mode="overwrite")

Save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [22]:
internal_clickstream.write.parquet('./results/article_links_parquet/', mode="overwrite")

In [23]:
spark.stop()