In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [16]:
# Create an RDD from a list of sample clickstream count
sample_clickstream_counts = [
    ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],
    ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],
    ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],
    ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500]
]

clickstream_counts_rdd = spark.sparkContext.parallelize(
    sample_clickstream_counts
)

In [17]:
clickstream_sample_df = clickstream_counts_rdd\
    .toDF(["source_page", "target_page",  "link_category", "link_count"])

# Display the DataFrame to the notebook
clickstream_sample_df.show(5, truncate=False)

+--------------------+--------------------------+-------------+----------+
|source_page         |target_page               |link_category|link_count|
+--------------------+--------------------------+-------------+----------+
|other-search        |Hanging_Gardens_of_Babylon|external     |47000     |
|other-empty         |Hanging_Gardens_of_Babylon|external     |34600     |
|Wonders_of_the_World|Hanging_Gardens_of_Babylon|link         |14000     |
|Babylon             |Hanging_Gardens_of_Babylon|link         |2500      |
+--------------------+--------------------------+-------------+----------+



In [39]:
clickstream = spark.read \
    .option("header", True) \
    .option("delimiter", "\t") \
    .option("inferSchema", True) \
    .csv("part-00000-58fb80d1-6fa0-45cd-a14d-1e6c0ce2f34c-c000.csv")

In [40]:
clickstream.show(5,truncate=False)

+-------------------+--------------+-------------+-------------+-----------+
|referrer           |resource      |link_category|language_code|click_count|
+-------------------+--------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |en           |43190      |
|other-internal     |Phantom_Thread|external     |en           |21683      |
|other-empty        |Phantom_Thread|external     |en           |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |en           |40449      |
|other-search       |Phantom_Thread|external     |en           |536940     |
+-------------------+--------------+-------------+-------------+-----------+
only showing top 5 rows



In [38]:
clickstream.printSchema()

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [42]:
clickstream=clickstream.drop('language_code')
clickstream.show(5,truncate=False)
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|referrer           |resource      |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- referrer: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [51]:
# Rename `referrer` and `resource` to `source_page` and `target_page`
clickstream = clickstream\
    .withColumnRenamed("referrer", "source_page")\
    .withColumnRenamed("resource", "target_page")
clickstream.show(5, truncate=False)
clickstream.printSchema()

+-------------------+--------------+-------------+-----------+
|source_page        |target_page   |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
only showing top 5 rows

root
 |-- source_page: string (nullable = true)
 |-- target_page: string (nullable = true)
 |-- link_category: string (nullable = true)
 |-- click_count: integer (nullable = true)



In [55]:
clickstream.createOrReplaceTempView('clickstream')

In [56]:
# using dataframe pyspark methods
clickstream.filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
            .orderBy('click_count',ascending=False)\
            .show(10,truncate=False)


+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



In [57]:
# using sql
spark.sql(
    """
    select * from clickstream
    where target_page = 'Hanging_Gardens_of_Babylon'
    order by click_count desc
    """
).show(5,truncate=False)

+----------------------------------+--------------------------+-------------+-----------+
|source_page                       |target_page               |link_category|click_count|
+----------------------------------+--------------------------+-------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external     |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external     |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link         |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link         |12296      |
+----------------------------------+--------------------------+-------------+-----------+



### sum(click_count) group by link_category

In [60]:
clickstream.groupBy('link_category')\
            .sum('click_count')\
            .show(10,truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|link         |97805811        |
|other        |9338172         |
|external     |3248677856      |
+-------------+----------------+



In [61]:
# using sql
spark.sql(
    """
    select link_category,sum(click_count)
    from clickstream
    group by link_category
    """
).show(10,truncate=False)

+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|link         |97805811        |
|other        |9338172         |
|external     |3248677856      |
+-------------+----------------+



create a new DataFrame named internal_clickstream that only contains article pairs where link_category is link

In [63]:
# using dataframe
result = clickstream.select(['source_page','target_page','click_count'])\
            .filter(clickstream.link_category=='link')
result.show(truncate=False)

+----------------------------+----------------------------+-----------+
|source_page                 |target_page                 |click_count|
+----------------------------+----------------------------+-----------+
|Daniel_Day-Lewis            |Phantom_Thread              |43190      |
|90th_Academy_Awards         |Phantom_Thread              |40449      |
|Shinee                      |Kim_Jong-hyun_(singer)      |24433      |
|Agnyaathavaasi              |Anu_Emmanuel                |15020      |
|Naa_Peru_Surya              |Anu_Emmanuel                |12361      |
|Mariah_Carey                |Nick_Cannon                 |16214      |
|Kesha                       |Rainbow_(Kesha_album)       |11448      |
|David_Attenborough          |John_Attenborough           |11252      |
|Boney_M.                    |Bobby_Farrell               |14095      |
|The_End_of_the_F***ing_World|Jessica_Barden              |237279     |
|Quentin_Tarantino           |The_Hateful_Eight           |12018

In [65]:
result.write.csv('./results/article_links_csv',mode ="overwrite")


In [67]:
result.write.parquet('./results/article_links_parquet',mode ="overwrite")

                                                                                