<a href="https://colab.research.google.com/github/Sankytanky100/Data-Engineering/blob/main/Analyzing_Wikipedia_Clickstreams_with_PySpark_in_Google_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
!pip install pyspark

# Install findspark to locate Spark in the system
!pip install --quiet findspark




In [2]:
# Check Java version
!java -version

# Check PySpark version
!pyspark --version


openjdk version "11.0.25" 2024-10-15
OpenJDK Runtime Environment (build 11.0.25+9-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.25+9-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.3
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.25
Branch HEAD
Compiled by user haejoon.lee on 2024-09-09T05:20:05Z
Revision 32232e9ed33bb16b93ad58cfde8b82e0f07c0970
Url https://github.com/apache/spark
Type --help for more information.


In [3]:
from pyspark.sql import SparkSession
import findspark
findspark.init()

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Wikipedia Clickstream Analysis") \
    .getOrCreate()

# Verify the SparkSession
print("SparkSession created successfully.")



SparkSession created successfully.


In [4]:
# Sample clickstream data
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14600],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create an RDD
rdd = spark.sparkContext.parallelize(sample_clickstream_counts)


In [5]:
# Define column names
columns = ["source_page", "target_page", "link_category", "link_count"]

# Create DataFrame from RDD
clickstream_sample_df = rdd.toDF(columns)

# Display the DataFrame
clickstream_sample_df.show()


+--------------------+--------------------+-------------+----------+
|         source_page|         target_page|link_category|link_count|
+--------------------+--------------------+-------------+----------+
|        other-search|Hanging_Gardens_o...|     external|     47000|
|         other-empty|Hanging_Gardens_o...|     external|     34600|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|     14600|
|             Babylon|Hanging_Gardens_o...|         link|      2500|
+--------------------+--------------------+-------------+----------+



In [6]:
# Download the clickstream data
!wget -q -P ./clickstream_data/ https://dumps.wikimedia.org/other/clickstream/2018-01/clickstream-enwiki-2018-01.tsv.gz

# Unzip the data
!gunzip ./clickstream_data/clickstream-enwiki-2018-01.tsv.gz

# Path to the data file
data_file = "./clickstream_data/clickstream-enwiki-2018-01.tsv"

# Read the data into a DataFrame
clickstream = spark.read.csv(data_file, sep='\t', header=False)

# Assign column names
clickstream = clickstream.toDF("prev", "curr", "type", "n")

# Display the first few rows
clickstream.show(5)


+--------------------+--------------------+--------+---+
|                prev|                curr|    type|  n|
+--------------------+--------------------+--------+---+
|         other-empty|2013–14_Croatian_...|external| 30|
|                TACV|   TACV_destinations|    link|501|
|Femme_Fatales_(ma...|      Brinke_Stevens|    link| 18|
|        other-search|   TACV_destinations|external| 18|
|      Linnea_Quigley|      Brinke_Stevens|    link| 46|
+--------------------+--------------------+--------+---+
only showing top 5 rows



In [7]:

# Print the schema
clickstream.printSchema()


root
 |-- prev: string (nullable = true)
 |-- curr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- n: string (nullable = true)



In [8]:
# Rename columns
clickstream = clickstream.withColumnRenamed("prev", "source_page") \
                         .withColumnRenamed("curr", "target_page") \
                         .withColumnRenamed("type", "link_category") \
                         .withColumnRenamed("n", "click_count")

# Display the DataFrame
clickstream.show(5)

# Print the schema
clickstream.printSchema()


+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|         other-empty|2013–14_Croatian_...|     external|         30|
|                TACV|   TACV_destinations|         link|        501|
|Femme_Fatales_(ma...|      Brinke_Stevens|         link|         18|
|        other-search|   TACV_destinations|     external|         18|
|      Linnea_Quigley|      Brinke_Stevens|         link|         46|
+--------------------+--------------------+-------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: string (nullable = true)



In [9]:
# Register the DataFrame as a temporary view
clickstream.createOrReplaceTempView("clickstream")


In [10]:
# Filter and order using DataFrame methods
hg_df = clickstream.filter(clickstream.target_page == "Hanging_Gardens_of_Babylon") \
                   .orderBy(clickstream.click_count.desc())

# Display the results
hg_df.show()


+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|Great_Pyramid_of_...|Hanging_Gardens_o...|         link|         81|
|         Mesopotamia|Hanging_Gardens_o...|         link|         75|
|         Engineering|Hanging_Gardens_o...|         link|         64|
|     Amytis_of_Media|Hanging_Gardens_o...|         link|         60|
|List_of_destroyed...|Hanging_Gardens_o...|         link|         59|
|    Botanical_garden|Hanging_Gardens_o...|         link|         55|
|Lighthouse_of_Ale...|Hanging_Gardens_o...|         link|         53|
|    Stephanie_Dalley|Hanging_Gardens_o...|         link|         52|
|Mausoleum_at_Hali...|Hanging_Gardens_o...|         link|         50|
|      Tower_of_Babel|Hanging_Gardens_o...|         link|        476|
|        other-search|Hanging_Gardens_o...|     external|      47088|
|   Nebuchadnezzar_I

In [13]:
# SQL query
hg_sql = spark.sql("""
    SELECT source_page, target_page, link_category, click_count
    FROM clickstream
    WHERE target_page = 'Hanging_Gardens_of_Babylon'
    ORDER BY click_count DESC
""")

# Display the results
hg_sql.show()

# Import necessary libraries
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

# Cast 'click_count' to IntegerType
clickstream = clickstream.withColumn("click_count", col("click_count").cast(IntegerType()))

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|Great_Pyramid_of_...|Hanging_Gardens_o...|         link|         81|
|         Mesopotamia|Hanging_Gardens_o...|         link|         75|
|         Engineering|Hanging_Gardens_o...|         link|         64|
|     Amytis_of_Media|Hanging_Gardens_o...|         link|         60|
|List_of_destroyed...|Hanging_Gardens_o...|         link|         59|
|    Botanical_garden|Hanging_Gardens_o...|         link|         55|
|Lighthouse_of_Ale...|Hanging_Gardens_o...|         link|         53|
|    Stephanie_Dalley|Hanging_Gardens_o...|         link|         52|
|Mausoleum_at_Hali...|Hanging_Gardens_o...|         link|         50|
|      Tower_of_Babel|Hanging_Gardens_o...|         link|        476|
|        other-search|Hanging_Gardens_o...|     external|      47088|
|   Nebuchadnezzar_I

In [14]:

# Group by link_category and sum click_count
link_category_counts_df = clickstream.groupBy("link_category") \
                                     .sum("click_count") \
                                     .orderBy("sum(click_count)", ascending=False)

# Display the results
link_category_counts_df.show()


+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|     external|      6203240927|
|         link|      1368426718|
|        other|        51398287|
|         NULL|            NULL|
+-------------+----------------+



In [15]:
# SQL query
link_category_counts_sql = spark.sql("""
    SELECT link_category, SUM(click_count) AS total_clicks
    FROM clickstream
    GROUP BY link_category
    ORDER BY total_clicks DESC
""")

# Display the results
link_category_counts_sql.show()


+-------------+-------------+
|link_category| total_clicks|
+-------------+-------------+
|     external|6.203240927E9|
|         link|1.368426718E9|
|        other|  5.1398287E7|
|         NULL|         NULL|
+-------------+-------------+



In [16]:
# Filter and select required columns
internal_clickstream = clickstream.filter(clickstream.link_category == "link") \
                                  .select("source_page", "target_page", "click_count")

# Display the DataFrame
internal_clickstream.show(5)

# Print the schema
internal_clickstream.printSchema()


+--------------------+-----------------+-----------+
|         source_page|      target_page|click_count|
+--------------------+-----------------+-----------+
|                TACV|TACV_destinations|        501|
|Femme_Fatales_(ma...|   Brinke_Stevens|         18|
|      Linnea_Quigley|   Brinke_Stevens|         46|
|  List_of_cosplayers|   Brinke_Stevens|         10|
|        Emmanuelle_4|   Brinke_Stevens|         27|
+--------------------+-----------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [17]:
# Save as CSV
internal_clickstream.write.csv("./results/article_to_article_csv/", header=True)

print("DataFrame saved as CSV in './results/article_to_article_csv/'.")


DataFrame saved as CSV in './results/article_to_article_csv/'.


In [18]:
# Save as Parquet
internal_clickstream.write.parquet("./results/article_to_article_pq/")

print("DataFrame saved as Parquet in './results/article_to_article_pq/'.")


DataFrame saved as Parquet in './results/article_to_article_pq/'.


In [19]:
# Stop the SparkSession
spark.stop()

print("SparkSession stopped.")


SparkSession stopped.
