<a href="https://colab.research.google.com/github/deedee-ke/pyspark-exploring_wikipedia_clickstreams/blob/main/Wikipedia_Clickstream_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Analyzing Wikipedia Clickstream Data


### Import Libraries

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1bd7a5913fc927a8b3fc97f748979b4d8fd42ad3fa0a60fd03ba23a1ae3ef2cb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

## Task Group 1 - Introduction to Clickstream Data

### Task 1
Create a new `SparkSession` and assign it to a variable named `spark`.

In [None]:
# Create a new SparkSession
spark = SparkSession.builder \
        .appName("Wikipedia Clickstream Analysis") \
        .getOrCreate()

### Task 2

Create an RDD from a list of sample clickstream counts and save it as `clickstream_counts_rdd`.

In [None]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd =  spark.sparkContext.parallelize(sample_clickstream_counts)

### Task 3

Using the RDD from the previous step, create a DataFrame named `clickstream_sample_df`

In [None]:
# Create a DataFrame from the RDD of sample clickstream counts
columns = ["source_page", "target_page", "link_category", "link_count"]
clickstream_sample_df = clickstream_counts_rdd.toDF(columns)

# Display the DataFrame to the notebook
clickstream_sample_df.show()

+--------------------+--------------------+-------------+----------+
|         source_page|         target_page|link_category|link_count|
+--------------------+--------------------+-------------+----------+
|        other-search|Hanging_Gardens_o...|     external|     47000|
|         other-empty|Hanging_Gardens_o...|     external|     34600|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|     14000|
|             Babylon|Hanging_Gardens_o...|         link|      2500|
+--------------------+--------------------+-------------+----------+



## Task Group 2 - Inspecting Clickstream Data

### Task 4

Read the files in `./cleaned/clickstream/` into a new Spark DataFrame named `clickstream` and display the first few rows of the DataFrame in the notebook

In [None]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clickstream`)
clickstream = spark.read.option("header", True) \
                        .option("delimiter", "\t") \
                        .option("inferSchema", True) \
                        .csv("/content/sample_data/part-00000-58fb80d1-6fa0-45cd-a14d-1e6c0ce2f34c-c000.csv")

# Display the DataFrame to the notebook
clickstream.show()

+-------------------+--------------------+-------------+-------------+-----------+
|           referrer|            resource|link_category|language_code|click_count|
+-------------------+--------------------+-------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|           en|      43190|
|     other-internal|      Phantom_Thread|     external|           en|      21683|
|        other-empty|      Phantom_Thread|     external|           en|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|           en|      40449|
|       other-search|      Phantom_Thread|     external|           en|     536940|
|       other-search|Tara_Grinstead_mu...|     external|           en|      30041|
|       other-search|      Yossi_Benayoun|     external|           en|      11045|
|        other-empty|       Parthiv_Patel|     external|           en|      11481|
|       other-search|       Parthiv_Patel|     external|           en|      34953|
|   

### Task 5

Print the schema of the DataFrame in the notebook.

In [None]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



### Task 6

Drop the `language_code` column from the DataFrame and display the new schema in the notebook.

In [None]:
# Drop target columns
clickstream = clickstream.drop("language_code")

# Display the first few rows of the DataFrame
clickstream.show()
# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------------+-------------+-----------+
|           referrer|            resource|link_category|click_count|
+-------------------+--------------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|      43190|
|     other-internal|      Phantom_Thread|     external|      21683|
|        other-empty|      Phantom_Thread|     external|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|      40449|
|       other-search|      Phantom_Thread|     external|     536940|
|       other-search|Tara_Grinstead_mu...|     external|      30041|
|       other-search|      Yossi_Benayoun|     external|      11045|
|        other-empty|       Parthiv_Patel|     external|      11481|
|       other-search|       Parthiv_Patel|     external|      34953|
|        other-empty|   Cosimo_de'_Medici|     external|      16418|
|       other-search|   Cosimo_de'_Medici|     external|      22190|
|       other-search|University_of

### Task 7

Rename `referrer` and `resource` to `source_page` and `target_page`, respectively,

In [None]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream.withColumnRenamed("referrer", "source_page") \
                         .withColumnRenamed("resource", "target_page")

# Display the first few rows of the DataFrame
clickstream.show()
# Display the new schema in the notebook
clickstream.printSchema()

+-------------------+--------------------+-------------+-----------+
|        source_page|         target_page|link_category|click_count|
+-------------------+--------------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|      43190|
|     other-internal|      Phantom_Thread|     external|      21683|
|        other-empty|      Phantom_Thread|     external|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|      40449|
|       other-search|      Phantom_Thread|     external|     536940|
|       other-search|Tara_Grinstead_mu...|     external|      30041|
|       other-search|      Yossi_Benayoun|     external|      11045|
|        other-empty|       Parthiv_Patel|     external|      11481|
|       other-search|       Parthiv_Patel|     external|      34953|
|        other-empty|   Cosimo_de'_Medici|     external|      16418|
|       other-search|   Cosimo_de'_Medici|     external|      22190|
|       other-search|University_of

## Task Group 3 - Querying Clickstream Data

### Task 8

Add the `clickstream` DataFrame as a temporary view named `clickstream` to make the data queryable with `sparkSession.sql()`

In [None]:
# Create a temporary view in the metadata for this `SparkSession`
clickstream.createOrReplaceTempView("clickstream")

### Task 9

Filter the dataset to entries with `Hanging_Gardens_of_Babylon` as the `target_page` and order the result by `click_count` using PySpark DataFrame methods.

In [None]:
# Filter and sort the DataFrame using PySpark DataFrame methods
filtered_df = clickstream.filter(clickstream.target_page == "Hanging_Gardens_of_Babylon") \
                         .orderBy(clickstream.click_count.desc())
filtered_df.show()

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|        other-search|Hanging_Gardens_o...|     external|      47088|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
+--------------------+--------------------+-------------+-----------+



### Task 10

Perform the same analysis as the previous exercise using a SQL query.

In [None]:
# Filter and sort the DataFrame using SQL
query = """
SELECT * FROM clickstream
WHERE target_page = 'Hanging_Gardens_of_Babylon'
ORDER BY click_count DESC
"""
result_df = spark.sql(query)
result_df.show()


+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|        other-search|Hanging_Gardens_o...|     external|      47088|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
+--------------------+--------------------+-------------+-----------+



### Task 11

Calculate the sum of `click_count` grouped by `link_category` using PySpark DataFrame methods.

In [None]:
# Aggregate the DataFrame using PySpark DataFrame Methods
category_counts_df = clickstream.groupBy("link_category").sum("click_count") \
                                .withColumnRenamed("sum(click_count)", "total_clicks")

category_counts_df.show()

+-------------+------------+
|link_category|total_clicks|
+-------------+------------+
|         link|    97805811|
|        other|     9338172|
|     external|  3248677856|
+-------------+------------+



### Task 12

Perform the same analysis as the previous exercise using a SQL query.

In [None]:
# Aggregate the DataFrame using SQL
query = """
SELECT link_category, SUM(click_count) AS total_clicks
FROM clickstream
GROUP BY link_category
"""

result_df = spark.sql(query)
result_df.show()

+-------------+------------+
|link_category|total_clicks|
+-------------+------------+
|         link|    97805811|
|        other|     9338172|
|     external|  3248677856|
+-------------+------------+



## Task Group 4 - Saving Results to Disk

### Task 13

Let's create a new DataFrame named `internal_clickstream` that only contains article pairs where `link_category` is `link`. Use `filter()` to select rows to a specific condition and `select()` to choose which columns to return from the query.

In [None]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream.filter(clickstream.link_category == "link") \
                                  .select("source_page", "target_page", "click_count")


# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show()
internal_clickstream.printSchema()

+--------------------+--------------------+-----------+
|         source_page|         target_page|click_count|
+--------------------+--------------------+-----------+
|    Daniel_Day-Lewis|      Phantom_Thread|      43190|
| 90th_Academy_Awards|      Phantom_Thread|      40449|
|              Shinee|Kim_Jong-hyun_(si...|      24433|
|      Agnyaathavaasi|        Anu_Emmanuel|      15020|
|      Naa_Peru_Surya|        Anu_Emmanuel|      12361|
|        Mariah_Carey|         Nick_Cannon|      16214|
|               Kesha|Rainbow_(Kesha_al...|      11448|
|  David_Attenborough|   John_Attenborough|      11252|
|            Boney_M.|       Bobby_Farrell|      14095|
|The_End_of_the_F*...|      Jessica_Barden|     237279|
|   Quentin_Tarantino|   The_Hateful_Eight|      12018|
|Ready_Player_One_...|        Olivia_Cooke|      17468|
| Royal_Rumble_(2018)|Kevin_Owens_and_S...|      11503|
|     Macaulay_Culkin|         Brenda_Song|      20477|
|      Altered_Carbon|Altered_Carbon_(T...|     

### Task 14

Using `DataFrame.write.csv()`, save the `internal_clickstream` DataFrame as CSV files in a directory called `./results/article_to_article_csv/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files
internal_clickstream.write.csv("./results/article_to_article_csv/", mode="overwrite")

### Task 15

Using `DataFrame.write.parquet()`, save the `internal_clickstream` DataFrame as parquet files in a directory called `./results/article_to_article_pq/`.

In [None]:
# Save the `internal_clickstream` DataFrame to a series of parquet files
internal_clickstream.write.parquet("./results/article_to_article_pq/")

### Task 16

Close the `SparkSession` and underlying `SparkContext`. What happens if you we call `clickstream.show()` after closing the `SparkSession`?

In [None]:
# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()

In [None]:
# The SparkSession and sparkContext are stopped; the following line will throw an error:
clickstream.show()

Py4JJavaError: An error occurred while calling o77.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1659)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1644)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:138)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:346)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:548)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:575)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:498)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
