In [1]:
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, size, col, countDistinct, array_contains, count, sum
from pyspark.sql.window import Window
os.environ["HADOOP_HOME"] = r"C:\hadoop"
os.environ["PATH"] += os.pathsep + r"C:\hadoop\bin"
os.environ['JAVA_HOME'] = r"D:\Applns\Amazon Corretto\jdk17.0.15_6"
os.environ['PATH'] = r"D:\Applns\Amazon Corretto\jdk17.0.15_6\bin" + os.pathsep + os.environ['PATH']


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("RumbleVideoETL_gldlayer") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()


In [3]:
silver_data = spark.read.parquet("../data/silver")
exploaded_video_tag = silver_data.withColumn("video_tag", explode("video_tags"))
exploaded_video_tag.show(n=50, vertical=True)

-RECORD 0---------------------------------
 video_id          | v5c48et              
 video_url         | https://rumble.co... 
 video_title       | Is Trump Winning ... 
 video_host        | Tim Pool             
 video_duration_s  | 7823                 
 video_date        | 2024-08-23 00:00:00  
 video_views       | 3970                 
 upvotes_count     | 61                   
 downvotes_count   | 4                    
 video_description | Host:\nTim Pool      
 video_tags        | [Entertainment, t... 
 comments          | [{0, You're not g... 
 video_tag         | Entertainment        
-RECORD 1---------------------------------
 video_id          | v5c48et              
 video_url         | https://rumble.co... 
 video_title       | Is Trump Winning ... 
 video_host        | Tim Pool             
 video_duration_s  | 7823                 
 video_date        | 2024-08-23 00:00:00  
 video_views       | 3970                 
 upvotes_count     | 61                   
 downvotes_

In [4]:
number_of_comments = exploaded_video_tag.withColumn("number_of_comments", size("comments"))
number_of_comments.show(n=30)

+--------+--------------------+--------------------+-----------------+----------------+--------------------+-----------+-------------+---------------+--------------------+--------------------+--------------------+--------------------+------------------+
|video_id|           video_url|         video_title|       video_host|video_duration_s|          video_date|video_views|upvotes_count|downvotes_count|   video_description|          video_tags|            comments|           video_tag|number_of_comments|
+--------+--------------------+--------------------+-----------------+----------------+--------------------+-----------+-------------+---------------+--------------------+--------------------+--------------------+--------------------+------------------+
| v5c48et|https://rumble.co...|Is Trump Winning ...|         Tim Pool|            7823| 2024-08-23 00:00:00|       3970|           61|              4|     Host:\nTim Pool|[Entertainment, t...|[{0, You're not g...|       Entertainment|    

In [5]:
exploade_comments = number_of_comments.withColumn("comments",explode("comments"))
exploade_comments.show()

+--------+--------------------+--------------------+----------+----------------+-------------------+-----------+-------------+---------------+-----------------+--------------------+--------------------+-------------+------------------+
|video_id|           video_url|         video_title|video_host|video_duration_s|         video_date|video_views|upvotes_count|downvotes_count|video_description|          video_tags|            comments|    video_tag|number_of_comments|
+--------+--------------------+--------------------+----------+----------------+-------------------+-----------+-------------+---------------+-----------------+--------------------+--------------------+-------------+------------------+
| v5c48et|https://rumble.co...|Is Trump Winning ...|  Tim Pool|            7823|2024-08-23 00:00:00|       3970|           61|              4|  Host:\nTim Pool|[Entertainment, t...|{0, You're not go...|Entertainment|                14|
| v5c48et|https://rumble.co...|Is Trump Winning ...|  Ti

In [6]:
commented_users = exploade_comments.select(
    "video_id",
    "comments.username_hash"
).dropna()
users_count = commented_users.groupBy("video_id").agg(countDistinct("username_hash").alias("number_of_commented_users"))
users_count.show()

+--------+-------------------------+
|video_id|number_of_commented_users|
+--------+-------------------------+
| v5cqqvl|                        1|
| v5c4ee4|                       11|
| v5d0f90|                        4|
| v5c40ii|                        4|
| v5codsz|                       22|
| v5d2f3t|                        2|
| v5dmwhx|                       18|
| v5cmmn9|                        7|
| v5cwrdf|                       10|
| v5c5ydx|                       12|
| v5c48et|                        8|
| v5ci9np|                       10|
| v5c4c3v|                       18|
| v5dcy2a|                        6|
| v5d06le|                       16|
| v5d2heh|                       18|
| v5crt31|                        5|
| v5cvnlf|                        6|
| v5bwjq5|                        3|
+--------+-------------------------+



#Golden Layer1

In [7]:
gold_layer1 = exploade_comments.join(users_count, on="video_id", how="left")
gold_layer1.show()
gold_layer1.write.mode("overwrite").parquet("../data/gold/gold_layer1")

+--------+--------------------+--------------------+----------+----------------+-------------------+-----------+-------------+---------------+-----------------+--------------------+--------------------+-------------+------------------+-------------------------+
|video_id|           video_url|         video_title|video_host|video_duration_s|         video_date|video_views|upvotes_count|downvotes_count|video_description|          video_tags|            comments|    video_tag|number_of_comments|number_of_commented_users|
+--------+--------------------+--------------------+----------+----------------+-------------------+-----------+-------------+---------------+-----------------+--------------------+--------------------+-------------+------------------+-------------------------+
| v5c48et|https://rumble.co...|Is Trump Winning ...|  Tim Pool|            7823|2024-08-23 00:00:00|       3970|           61|              4|  Host:\nTim Pool|[Entertainment, t...|{0, You're not go...|Entertainmen

#Golden Layer 2

In [8]:
gold_layer1 = spark.read.parquet("../data/gold/gold_layer1")
# gold_layer1.show()

gold_layer2 = gold_layer1.filter((gold_layer1["video_host"] == "Benny Johnson") & (array_contains(gold_layer1["video_tags"], "News")))

gold_layer2.show()
gold_layer2.write.mode("overwrite").parquet("../data/gold/gold_layer2")

+--------+--------------------+--------------------+-------------+----------------+--------------------+-----------+-------------+---------------+--------------------+----------+--------------------+---------+------------------+-------------------------+
|video_id|           video_url|         video_title|   video_host|video_duration_s|          video_date|video_views|upvotes_count|downvotes_count|   video_description|video_tags|            comments|video_tag|number_of_comments|number_of_commented_users|
+--------+--------------------+--------------------+-------------+----------------+--------------------+-----------+-------------+---------------+--------------------+----------+--------------------+---------+------------------+-------------------------+
| v5dmwhx|https://rumble.co...|Kamala's EMPTY Wo...|Benny Johnson|             817|2024-09-04T19:00:...|       8400|           80|              3|Benny Johnson and...|    [News]|{0, I heard TENET...|     News|                22|       

#Gold Layer 3

In [9]:
gold_layer2  = spark.read.parquet("../data/gold/gold_layer2")
# gold_layer2.show()
explode = gold_layer2.select("video_id","comments")
# explode.show()
user_comments = explode.select("video_id",
    col("comments.username_hash").alias("username_hash"),
    col("comments.num_likes").alias("likes")
)
user_comments.show()

+--------+-----------------+-----+
|video_id|    username_hash|likes|
+--------+-----------------+-----+
| v5cwrdf|  768a13ee19f869f|    4|
| v5cwrdf|  768a13ee19f869f|    4|
| v5cwrdf|x7302e0aea80c6e7f|    2|
| v5cwrdf|x7302e0aea80c6e7f|    2|
| v5cwrdf| 3b49236d844754a9|    1|
| v5cwrdf|x75767465b0479afa|    0|
| v5cwrdf|x54c9c226e00a85b0|    0|
| v5cwrdf|x548a925eb2b191d0|    0|
| v5cwrdf| 7a92ab03edd1cb9d|    0|
| v5cwrdf|x621a53ff9a2f37ca|    0|
| v5cwrdf| 64aa69972841fd06|    0|
| v5cwrdf| 26a520314654c7ab|    0|
| v5cwrdf| 26a520314654c7ab|    0|
| v5cwrdf| 26a520314654c7ab|    0|
| v5dmwhx|x555878aa98066637|   13|
| v5dmwhx| 47522f6251808e98|    4|
| v5dmwhx|x555878aa98066637|    3|
| v5dmwhx| 1af378e7a8f87c24|    3|
| v5dmwhx| 52d69853c2dfef64|    3|
| v5dmwhx|x26028a9fd839421f|    2|
+--------+-----------------+-----+
only showing top 20 rows



In [10]:
agg = user_comments.groupBy("video_id", "username_hash").agg(
    count("*").alias("number_of_comments"),
    sum("likes").alias("number_of_comments_like")
)
agg.show()
agg.write.mode("overwrite").parquet("../data/gold/gold_layer3")

+--------+-----------------+------------------+-----------------------+
|video_id|    username_hash|number_of_comments|number_of_comments_like|
+--------+-----------------+------------------+-----------------------+
| v5cwrdf| 64aa69972841fd06|                 1|                      0|
| v5cwrdf| 26a520314654c7ab|                 3|                      0|
| v5cwrdf| 3b49236d844754a9|                 1|                      1|
| v5cwrdf|x75767465b0479afa|                 1|                      0|
| v5cwrdf| 7a92ab03edd1cb9d|                 1|                      0|
| v5cwrdf|x548a925eb2b191d0|                 1|                      0|
| v5cwrdf|  768a13ee19f869f|                 2|                      8|
| v5cwrdf|x7302e0aea80c6e7f|                 2|                      4|
| v5cwrdf|x54c9c226e00a85b0|                 1|                      0|
| v5cwrdf|x621a53ff9a2f37ca|                 1|                      0|
| v5dmwhx| 4974a64c1652952b|                 1|                 

In [11]:
spark.stop()