# Analyzing Wikipedia Clickstreams with PySpark

https://www.codecademy.com/courses/big-data-pyspark/projects/analyzing-wikipedia-pyspark

In order to read a csv file locally:
1. Download Windows binaries for Hadoop versions (winutils) https://github.com/steveloughran/winutils
2. Extract files to a new folder
3. Add a folder to the PATH in environmental variables


In [71]:
from pyspark.sql import SparkSession

## Introduction to Clickstream Data

### Create a new SparkSession

In [72]:
spark = SparkSession.builder.getOrCreate()

### Create an RDD from a list of sample clickstream counts

In [73]:
# Sample clickstream counts
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

# Create RDD from sample data
clickstream_counts_rdd = spark.sparkContext.parallelize(sample_clickstream_counts, 5)

# check data
clickstream_counts_rdd.collect()

[['other-search', 'Hanging_Gardens_of_Babylon', 'external', 47000],
 ['other-empty', 'Hanging_Gardens_of_Babylon', 'external', 34600],
 ['Wonders_of_the_World', 'Hanging_Gardens_of_Babylon', 'link', 14000],
 ['Babylon', 'Hanging_Gardens_of_Babylon', 'link', 2500]]

### Create a DataFrame from the RDD

In [74]:
# Create a DataFrame
clickstream_sample_df = clickstream_counts_rdd.toDF()

# Display the DataFrame
clickstream_sample_df.show(truncate=False)

+--------------------+--------------------------+--------+-----+
|_1                  |_2                        |_3      |_4   |
+--------------------+--------------------------+--------+-----+
|other-search        |Hanging_Gardens_of_Babylon|external|47000|
|other-empty         |Hanging_Gardens_of_Babylon|external|34600|
|Wonders_of_the_World|Hanging_Gardens_of_Babylon|link    |14000|
|Babylon             |Hanging_Gardens_of_Babylon|link    |2500 |
+--------------------+--------------------------+--------+-----+



## Inspecting Clickstream Data

### Read the csv file and save as a new DataFrame

In [75]:
clickstream =  spark.read \
                .option('header', True) \
                .option('delimiter', '\t') \
                .option('inferSchema', True) \
                .csv('./cleaned/clickstream/')

clickstream.show()

+-------------------+--------------------+-------------+-------------+-----------+
|           referrer|            resource|link_category|language_code|click_count|
+-------------------+--------------------+-------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|           en|      43190|
|     other-internal|      Phantom_Thread|     external|           en|      21683|
|        other-empty|      Phantom_Thread|     external|           en|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|           en|      40449|
|       other-search|      Phantom_Thread|     external|           en|     536940|
|       other-search|Tara_Grinstead_mu...|     external|           en|      30041|
|       other-search|      Yossi_Benayoun|     external|           en|      11045|
|        other-empty|       Parthiv_Patel|     external|           en|      11481|
|       other-search|       Parthiv_Patel|     external|           en|      34953|
|   

### Display the schema

In [76]:
clickstream.schema

StructType([StructField('referrer', StringType(), True), StructField('resource', StringType(), True), StructField('link_category', StringType(), True), StructField('language_code', StringType(), True), StructField('click_count', IntegerType(), True)])

### Data modifications

#### Drop columns

In [77]:
clickstream = clickstream.drop('language_code')
clickstream.show()

+-------------------+--------------------+-------------+-----------+
|           referrer|            resource|link_category|click_count|
+-------------------+--------------------+-------------+-----------+
|   Daniel_Day-Lewis|      Phantom_Thread|         link|      43190|
|     other-internal|      Phantom_Thread|     external|      21683|
|        other-empty|      Phantom_Thread|     external|     169532|
|90th_Academy_Awards|      Phantom_Thread|         link|      40449|
|       other-search|      Phantom_Thread|     external|     536940|
|       other-search|Tara_Grinstead_mu...|     external|      30041|
|       other-search|      Yossi_Benayoun|     external|      11045|
|        other-empty|       Parthiv_Patel|     external|      11481|
|       other-search|       Parthiv_Patel|     external|      34953|
|        other-empty|   Cosimo_de'_Medici|     external|      16418|
|       other-search|   Cosimo_de'_Medici|     external|      22190|
|       other-search|University_of

#### Rename columns

In [78]:
clickstream = clickstream.withColumnRenamed('referrer', 'source_page') \
                         .withColumnRenamed('resource', 'target_page')

In [79]:
clickstream.schema

StructType([StructField('source_page', StringType(), True), StructField('target_page', StringType(), True), StructField('link_category', StringType(), True), StructField('click_count', IntegerType(), True)])

## Querying Clickstream Data

### Create a temp view and filter data

In [80]:
# temp view is created to make data queryable with sparkSession.sql()
clickstream.createOrReplaceTempView('clickstream')

In [81]:
# filter using SQL method
hanging_gardens_of_babylon_qry = """
                                 SELECT * FROM clickstream WHERE target_page = 'Hanging_Gardens_of_Babylon'
                                 ORDER BY click_count DESC
                                 """

In [82]:
spark.sql(hanging_gardens_of_babylon_qry).show(truncate=False)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



In [83]:
# filter using PySpark Methods
clickstream_filtered = clickstream.filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon').orderBy('click_count', ascending = False)
clickstream_filtered.show(5)

+--------------------+--------------------+-------------+-----------+
|         source_page|         target_page|link_category|click_count|
+--------------------+--------------------+-------------+-----------+
|        other-search|Hanging_Gardens_o...|     external|      47088|
|         other-empty|Hanging_Gardens_o...|     external|      34619|
|Wonders_of_the_World|Hanging_Gardens_o...|         link|      14668|
|Seven_Wonders_of_...|Hanging_Gardens_o...|         link|      12296|
+--------------------+--------------------+-------------+-----------+



In [84]:
# sum of click_count using a SQL query
sum_click_count_qry = """
                      SELECT SUM(click_count), link_category FROM clickstream
                      GROUP BY link_category
                      """

In [85]:
spark.sql(sum_click_count_qry).show(truncate=False)

+----------------+-------------+
|sum(click_count)|link_category|
+----------------+-------------+
|97805811        |link         |
|9338172         |other        |
|3248677856      |external     |
+----------------+-------------+



In [86]:
# sum of click_count using PySpark method
clickstream_click_count = clickstream.groupBy('link_category').sum('click_count')
clickstream_click_count.show()

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|         link|        97805811|
|        other|         9338172|
|     external|      3248677856|
+-------------+----------------+



## Save to Disk

In [88]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream.filter(clickstream.link_category == 'link').select(['source_page', 'link_category', 'click_count'])
internal_clickstream.show(5)

+-------------------+-------------+-----------+
|        source_page|link_category|click_count|
+-------------------+-------------+-----------+
|   Daniel_Day-Lewis|         link|      43190|
|90th_Academy_Awards|         link|      40449|
|             Shinee|         link|      24433|
|     Agnyaathavaasi|         link|      15020|
|     Naa_Peru_Surya|         link|      12361|
+-------------------+-------------+-----------+
only showing top 5 rows



In [91]:
# save as csv
internal_clickstream.write.csv('./results/article_to_article_csv/')

In [92]:
# save as parquet
internal_clickstream.write.parquet('./results/article_to_article_pq/')

## Close SparkSession

In [98]:
spark.stop()