# Analyzing Wikipedia Clickstream Data
* [View Solution Notebook](./solution.html)

* [Project Page Link](https://www.codecademy.com/courses/big-data-pyspark/projects/analyzing-wikipedia-pyspark)

### Import Libraries

In [11]:
from pyspark.sql import SparkSession

## Task Group 1 - Introduction to Clickstream Data

### Task 1
Create a new `SparkSession` and assign it to a variable named `spark`.

In [28]:
# Create a new SparkSession
spark = SparkSession.builder.getOrCreate()
# spark.stop()

### Task 2

Create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [29]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext.parallelize(sample_clickstream_counts)
clickstream_counts_rdd.collect()

[['other-search', 'Hanging_Gardens_of_Babylon', 'external', 47000],
 ['other-empty', 'Hanging_Gardens_of_Babylon', 'external', 34600],
 ['Wonders_of_the_World', 'Hanging_Gardens_of_Babylon', 'link', 14000],
 ['Babylon', 'Hanging_Gardens_of_Babylon', 'link', 2500]]

### Task 3

Using the RDD from the previous step, create a DataFrame named `clickstream_sample_df`

In [30]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd.toDF(["source_page", "target_page", "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.printSchema()
clickstream_sample_df.show(truncate=False)

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- link_count: long (nullable = true)

+--------------------+--------------------------+-------------+----------+
|source_page         |target_page               |link_category|link_count|
+--------------------+--------------------------+-------------+----------+
|other-search        |Hanging_Gardens_of_Babylon|external     |47000     |
|other-empty         |Hanging_Gardens_of_Babylon|external     |34600     |
|Wonders_of_the_World|Hanging_Gardens_of_Babylon|link         |14000     |
|Babylon             |Hanging_Gardens_of_Babylon|link         |2500      |
+--------------------+--------------------------+-------------+----------+



## Task Group 2 - Inspecting Clickstream Data

### Task 4

Read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook

In [31]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream_df = spark.read.\
option("header", True).\
option("delimiter", "\t").\
option("inferSchema", True).\
csv('./cleaned/clickstream/')

# Display the DataFrame to the notebook
clickstream_df.printSchema()
clickstream_df.show(10, truncate=False)

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)

+-------------------+--------------------------+-------------+-------------+-----------+
|referrer           |resource                  |link_category|language_code|click_count|
+-------------------+--------------------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |en           |43190      |
|other-internal     |Phantom_Thread            |external     |en           |21683      |
|other-empty        |Phantom_Thread            |external     |en           |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |en           |40449      |
|other-search       |Phantom_Thread            |external     |en           |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |en    

### Task 5

Print the schema of the DataFrame in the notebook.

In [32]:
# Display the schema of the `clickstream` DataFrame to the notebook
# clickstream.drop("language_code")
clickstream_df.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



### Task 6

Drop the `language_code` column from the DataFrame and display the new schema in the notebook.

In [33]:
# Drop target columns
clickstream_df = clickstream_df.drop("language_code")
# Display the first few rows of the DataFrame
clickstream_df.show(10, truncate=False)
# Display the new schema in the notebook
clickstream_df.printSchema()

+-------------------+--------------------------+-------------+-----------+
|referrer           |resource                  |link_category|click_count|
+-------------------+--------------------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |43190      |
|other-internal     |Phantom_Thread            |external     |21683      |
|other-empty        |Phantom_Thread            |external     |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |40449      |
|other-search       |Phantom_Thread            |external     |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |30041      |
|other-search       |Yossi_Benayoun            |external     |11045      |
|other-empty        |Parthiv_Patel             |external     |11481      |
|other-search       |Parthiv_Patel             |external     |34953      |
|other-empty        |Cosimo_de'_Medici         |external     |16418      |
+-------------------+----

### Task 7

Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

In [34]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream_df = clickstream_df.withColumnRenamed("referrer", "source_page").withColumnRenamed("resource", "target_page")
  
# Display the first few rows of the DataFrame
clickstream_df.show(10, truncate=False)
# Display the new schema in the notebook
clickstream_df.printSchema()

+-------------------+--------------------------+-------------+-----------+
|source_page        |target_page               |link_category|click_count|
+-------------------+--------------------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread            |link         |43190      |
|other-internal     |Phantom_Thread            |external     |21683      |
|other-empty        |Phantom_Thread            |external     |169532     |
|90th_Academy_Awards|Phantom_Thread            |link         |40449      |
|other-search       |Phantom_Thread            |external     |536940     |
|other-search       |Tara_Grinstead_murder_case|external     |30041      |
|other-search       |Yossi_Benayoun            |external     |11045      |
|other-empty        |Parthiv_Patel             |external     |11481      |
|other-search       |Parthiv_Patel             |external     |34953      |
|other-empty        |Cosimo_de'_Medici         |external     |16418      |
+-------------------+----

## Task Group 3 - Querying Clickstream Data

### Task 8

Add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`

In [38]:
# Create a temporary view in the metadata for this `SparkSession` 
clickstream_df.createOrReplaceTempView("clickstream_view")

### Task 9

Filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [39]:
# Filter and sort the DataFrame using PySpark DataFrame methods
spark.catalog.listTables()

[Table(name='clickstream', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='clickstream_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [49]:
babylon_table = spark.read.table("clickstream_view")
babylon_table.count()
babylon_table.collect()

[Row(source_page='Daniel_Day-Lewis', target_page='Phantom_Thread', link_category='link', click_count=43190),
 Row(source_page='other-internal', target_page='Phantom_Thread', link_category='external', click_count=21683),
 Row(source_page='other-empty', target_page='Phantom_Thread', link_category='external', click_count=169532),
 Row(source_page='90th_Academy_Awards', target_page='Phantom_Thread', link_category='link', click_count=40449),
 Row(source_page='other-search', target_page='Phantom_Thread', link_category='external', click_count=536940),
 Row(source_page='other-search', target_page='Tara_Grinstead_murder_case', link_category='external', click_count=30041),
 Row(source_page='other-search', target_page='Yossi_Benayoun', link_category='external', click_count=11045),
 Row(source_page='other-empty', target_page='Parthiv_Patel', link_category='external', click_count=11481),
 Row(source_page='other-search', target_page='Parthiv_Patel', link_category='external', click_count=34953),
 Row

In [53]:
babylon_table\
.filter(babylon_table.target_page == "Hanging_Gardens_of_Babylon")\
.orderBy(babylon_table.click_count, ascending=False)\
.show(truncate=False)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



### Task 10

Perform the same analysis as the previous exercise using a SQL query. 

In [54]:
# Filter and sort the DataFrame using SQL
result = spark.sql("""
SELECT * FROM clickstream_view
WHERE target_page = 'Hanging_Gardens_of_Babylon'
ORDER BY click_count DESC
""")
result.printSchema()
result.show(10, truncate=False)

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



### Task 11

Calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [58]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
babylon_table\
.groupBy(babylon_table.link_category)\
.sum()\
.orderBy("sum(click_count)", ascending=False)\
.show(truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|external     |3248677856      |
|link         |97805811        |
|other        |9338172         |
+-------------+----------------+



### Task 12

Perform the same analysis as the previous exercise using a SQL query.

In [62]:
spark.sql(
    """
    SELECT *
    FROM clickstream_view
    LIMIT 5
    """
).show(truncate=False)

+-------------------+--------------+-------------+-----------+
|source_page        |target_page   |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+



In [63]:
# Aggregate the DataFrame using SQL
spark.sql(
    """
    SELECT link_category, SUM(click_count) AS links_count
    FROM clickstream_view
    GROUP BY link_category
    ORDER BY links_count DESC
    """
).show(truncate=False)

+-------------+-----------+
|link_category|links_count|
+-------------+-----------+
|external     |3248677856 |
|link         |97805811   |
|other        |9338172    |
+-------------+-----------+



## Task Group 4 - Saving Results to Disk

### Task 13

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. Use `filter()` to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [65]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream_df\
.filter(clickstream_df.link_category == "link")\
.drop("link_category")

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(5, truncate=False)

+-------------------+----------------------+-----------+
|source_page        |target_page           |click_count|
+-------------------+----------------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread        |43190      |
|90th_Academy_Awards|Phantom_Thread        |40449      |
|Shinee             |Kim_Jong-hyun_(singer)|24433      |
|Agnyaathavaasi     |Anu_Emmanuel          |15020      |
|Naa_Peru_Surya     |Anu_Emmanuel          |12361      |
+-------------------+----------------------+-----------+
only showing top 5 rows



### Task 14

Using `DataFrame.write.csv()`, save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [68]:
# Save the `internal_clickstream` DataFrame to a series of CSV files
internal_clickstream.write.csv("./notebook-output/article_to_article_csv")

In [72]:
spark.read\
    .option('header', False) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
.csv("./notebook-output/article_to_article_csv")\
.show(5, truncate=False)

+-------------------+----------------------+-----+
|_c0                |_c1                   |_c2  |
+-------------------+----------------------+-----+
|Daniel_Day-Lewis   |Phantom_Thread        |43190|
|90th_Academy_Awards|Phantom_Thread        |40449|
|Shinee             |Kim_Jong-hyun_(singer)|24433|
|Agnyaathavaasi     |Anu_Emmanuel          |15020|
|Naa_Peru_Surya     |Anu_Emmanuel          |12361|
+-------------------+----------------------+-----+
only showing top 5 rows



### Task 15

Using `DataFrame.write.parquet()`, save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [73]:
# Save the `internal_clickstream` DataFrame to a series of parquet files
internal_clickstream.write.parquet("./notebook-output/article_to_article_pq")

### Task 16

Close the `SparkSession` and underlying `SparkContext`. What happens if you we call `clickstream.show()` after closing the `SparkSession`?

In [74]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()

In [75]:
# The SparkSession and sparkContext are stopped; the following line will throw an error:
clickstream.show()

AttributeError: 'NoneType' object has no attribute 'show'