***Analyzing Wikipedia Clickstream Data***
Introduction to Clickstream Data

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#create a sample dataset to test whether the apsrk work
# sample_clickstream_counts = [
#     ["other-search", "Hanging_Gardens_of Babalon", "external", 47000],
#     ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
#     ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
#     ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
# ]
# clickstream_counts_rdd = spark.sparkContext.parallelize(sample_clickstream_counts)
# clickstream_sample_df = clickstream_counts_rdd.toDF(["source_page", "target_page", "link_category", "link_count"])
# clickstream_sample_df.show(5, truncate=True)

+--------------------+--------------------+-------------+----------+
|         source_page|         target_page|link_category|link_count|
+--------------------+--------------------+-------------+----------+
|        other-search|Hanging_Gardens_o...|     external|     47000|
|         other-empty|Hanging_Gardens_o...|     external|     34600|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|     14000|
|             Babylon|Hanging_Gardens_o...|         link|      2500|
+--------------------+--------------------+-------------+----------+



In [15]:
# download the dataset and unzip 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Wikipedia Clickstream").getOrCreate()
clickstream = spark.read.csv(
    "clickstream-enwiki-2024-09.tsv", 
    sep="\t", 
    header=False, 
    inferSchema=True
).toDF("source_page", "target_page", "link_category", "link_count")# add the column name, no need to transfer to df when iport from csv/tsv
clickstream.show(10, truncate=False)

# download the dataset without unzip 
# clickstream = spark.read.csv(
#     "clickstream-enwiki-2024-09.tsv.gz", 
#     sep="\t", 
#     header=False, 
#     inferSchema=True
# ).toDF("source_page", "target_page", "link_category", "link_count")

# clickstream.show(10, truncate=False)



+---------------------+----------------------+-------------+----------+
|source_page          |target_page           |link_category|link_count|
+---------------------+----------------------+-------------+----------+
|other-empty          |Main_Page             |external     |119417306 |
|Charles_Tilly        |Joseph_L._Buttenwieser|link         |12        |
|other-search         |Lyle_and_Erik_Menendez|external     |11111073  |
|Charles_Tilly        |Talcott_Parsons       |link         |12        |
|other-empty          |Hyphen-minus          |external     |10101208  |
|Charles_Tiu          |Mighty_Sports         |link         |12        |
|other-internal       |Main_Page             |external     |7254647   |
|Charles_Tolliver     |Action_Action_Action  |link         |12        |
|other-empty          |Cleopatra             |external     |4087571   |
|Charles_Town,_Jamaica|Jamaican_Maroons      |link         |12        |
+---------------------+----------------------+-------------+----

                                                                                

Querying Clickstream Data

In [None]:
# Create a temporary view in the metadata for this `SparkSession` to make the data. createOrReplaceTempView() for tempory view,createGlobalTempView() for global view
clickstream.createOrReplaceTempView("clickstream")

# Filter and sort the DataFrame using PySpark DataFrame methods
clickstream\
    .filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
    .orderBy('click_count', ascending=False)\
    .show(10, truncate=False)

# queryable with `sparkSession.sql()`
spark.sql(
    """
    SELECT *
    FROM clickstream
    WHERE target_page = 'Hanging_Gardens_of_Babylon'
    ORDER BY click_count DESC
    """
).show(10, truncate=False)



+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |42642      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |19759      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |9154       |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |8593       |
|Babylon                           |Hanging_Gardens_of_Babylon|link         |3064       |
|Nebuchadnezzar_II                 |Hanging_Gardens_of_Babylon|link         |2414       |
|other-internal                    |Hanging_Gardens_of_Babylon|external     |1593       |
|other-external                    |Hanging_Gardens_of_Babylon|external     |434        |
|Tower_of_

                                                                                

In [None]:
# Aggregate the DataFrame using PySpark DataFrame Methods 
clickstream\
    .groupBy('link_category')\
    .sum()\
    .show(truncate=False)

# Aggregate the DataFrame using SQL
spark.sql(
    """
    SELECT link_category, SUM(click_count) FROM clickstream
    GROUP BY link_category
    """
).show(truncate=False)



CodeCache: size=131072Kb used=38972Kb max_used=39889Kb free=92099Kb
 bounds [0x00000001079e8000, 0x000000010a128000, 0x000000010f9e8000]
 total_blobs=14299 nmethods=13343 adapters=865
 compilation: disabled (not enough contiguous free space left)




+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|link         |2216084832      |
|other        |48024351        |
|external     |4261629291      |
+-------------+----------------+



                                                                                

In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files in `./results/article_links_csv/`
internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

internal_clickstream\
    .write\
    .csv('./results/article_links_csv/', mode="overwrite")