In [2]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "128g").config("spark.executor.memory", "32g").getOrCreate()

In [3]:
schema = StructType([
    StructField("vid", StringType(), True),
    StructField("text", StringType(), True),
    StructField("likes", IntegerType(), True),
])

df_comments_base = spark.read.schema(schema).format("csv").option("header",True).option("encoding", "utf-8").load("./comments.csv")
df_comments_base.show()

+-----------+--------------------+-----+
|        vid|                text|likes|
+-----------+--------------------+-----+
|HlEFrbLeDks|I&#39;m vaccine a...|    1|
|HlEFrbLeDks|Hi from the futur...|    1|
|HlEFrbLeDks|He will always be...|    0|
|HlEFrbLeDks|I remember watchi...|    0|
|HlEFrbLeDks|Introverts were l...|    4|
|HlEFrbLeDks|         Good times.|    0|
|HlEFrbLeDks|The beginning of ...|    3|
|HlEFrbLeDks|This must never h...|    0|
|HlEFrbLeDks|Watching this a y...|    2|
|HlEFrbLeDks|One year on and I...|    0|
|HlEFrbLeDks|I didnt mind stay...|    2|
|HlEFrbLeDks|Tomorrow marks on...|    3|
|HlEFrbLeDks|Can’t believe it’...|    7|
|HlEFrbLeDks|This was the wors...|    1|
|HlEFrbLeDks|here we are lockd...|    0|
|HlEFrbLeDks|The day it all we...|    5|
|HlEFrbLeDks|What happens if i...|    1|
|HlEFrbLeDks|  This is soo COOOOL|    0|
|HlEFrbLeDks|Interesting to re...|    2|
|HlEFrbLeDks|9 months later an...|    1|
+-----------+--------------------+-----+
only showing top

In [4]:
df_comments_base = df_comments_base.filter(col("likes") >= 0)
df_comments_base = df_comments_base.withColumn("rank", row_number().over(Window.partitionBy(col("vid")).orderBy(desc(col("likes"))))).filter(col("rank") <= 20)
df_comments_base.show()

+-----------+--------------------+-----+----+
|        vid|                text|likes|rank|
+-----------+--------------------+-----+----+
|LyiUHxD3S8s|Six brain cells i...|  156|   1|
|LyiUHxD3S8s|It&#39;s not maki...|  135|   2|
|LyiUHxD3S8s|The snitches shou...|   81|   3|
|LyiUHxD3S8s|Soo everyone arou...|   78|   4|
|LyiUHxD3S8s|If you are the so...|   76|   5|
|LyiUHxD3S8s|making it up as w...|   71|   6|
|LyiUHxD3S8s|Fake news<br>Gove...|   67|   7|
|LyiUHxD3S8s|0.1% death rate o...|   47|   8|
|LyiUHxD3S8s|It&#39;s simple, ...|   39|   9|
|LyiUHxD3S8s|And Grouse shoote...|   31|  10|
|LyiUHxD3S8s|&quot;People shou...|   26|  11|
|LyiUHxD3S8s|Anyone noticed ho...|   25|  12|
|LyiUHxD3S8s|There&#39;s six o...|   24|  13|
|LyiUHxD3S8s|Bollocks to the g...|   23|  14|
|LyiUHxD3S8s|Clearly &quot;The...|   18|  15|
|LyiUHxD3S8s|make your voice h...|   15|  16|
|LyiUHxD3S8s|Unless you&#39;re...|   13|  17|
|LyiUHxD3S8s|Another power tri...|   13|  18|
|LyiUHxD3S8s|I&#39;ve got a ch...|

In [5]:
vschema = StructType([
    StructField("index_label", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("channelTitle", StringType(), True),
    StructField("videoId", StringType(), True),
    StructField("videoTitle", StringType(), True),
    StructField("description", StringType(), True),
    StructField("link", StringType(), True),
    StructField("time", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True)
])

df_bbc = spark.read.schema(vschema).format("csv").option("header",True).option("encoding", "utf-8").load("./final/BBC_videos_final.csv")
df_bbc = df_bbc.select(col("videoId"), col("year"), col("month"), col("day"))
df_bbc = df_bbc.withColumn("Date", col("year") * 10000 + col("month") * 100 + col("day")).select(col("videoId"), col("Date")).withColumn("from", lit("BBC"))

df_guardian = spark.read.schema(vschema).format("csv").option("header",True).option("encoding", "utf-8").load("./final/Guardian_videos_final.csv")
df_guardian = df_guardian.filter(col("channelId") != "na").select(col("videoId"), col("year"), col("month"), col("day")).filter(col("videoId") != "#NAME?")
df_guardian = df_guardian.withColumn("Date", col("year") * 10000 + col("month") * 100 + col("day")).select(col("videoId"), col("Date")).withColumn("from", lit("Guardian"))
df_guardian.show()

+-----------+--------+--------+
|    videoId|    Date|    from|
+-----------+--------+--------+
|8bUCwmYcvzY|20200323|Guardian|
|Ml-1hUISPj8|20200323|Guardian|
|ZD4PYP6eayQ|20200324|Guardian|
|x5XT8H5PLzQ|20200329|Guardian|
|x2axKYqLXso|20200416|Guardian|
|RVHKA3BKpBo|20200416|Guardian|
|CCOnQAmbDNc|20200430|Guardian|
|Y7hYnZnSliI|20200430|Guardian|
|WRs-dC-gmVk|20200510|Guardian|
|PPhf8LfrS_c|20200510|Guardian|
|6jBur0Bu7L8|20200529|Guardian|
|df41adre86A|20200623|Guardian|
|0dBS5xEWlsI|20200623|Guardian|
|RDY8W11ROgM|20200623|Guardian|
|uqzggg3nooo|20200629|Guardian|
|GS4nRFfM_Fs|20200922|Guardian|
|u63SBuGqN6Y|20200922|Guardian|
|weANMwwcLA8|20200930|Guardian|
|yWAniPfrBDo|20200930|Guardian|
|NCy7Q6Nr4_4|20201013|Guardian|
+-----------+--------+--------+
only showing top 20 rows



In [6]:
special_schema = StructType([
    StructField("index_label", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("channelTitle", StringType(), True),
    StructField("videoId", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("videoTitle", StringType(), True),
    StructField("description", StringType(), True),
    StructField("link", StringType(), True),
    StructField("time", StringType(), True),
])
df_dailymail = spark.read.schema(special_schema).format("csv").option("header",True).option("encoding", "utf-8").load("./final/DailyMail_videos_final.csv")
df_dailymail = df_dailymail.filter(col("channelId") != "na").select(col("videoId"), col("year"), col("month"), col("day")).filter(col("videoId") != "#NAME?")
df_dailymail = df_dailymail.withColumn("Date", col("year") * 10000 + col("month") * 100 + col("day")).select(col("videoId"), col("Date")).withColumn("from", lit("DailyMail"))

df_independent = spark.read.schema(special_schema).format("csv").option("header",True).option("encoding", "utf-8").load("./final/Independent_videos_final.csv")
df_independent = df_independent.filter(col("channelId") != "na").select(col("videoId"), col("year"), col("month"), col("day")).filter(col("videoId") != "#NAME?")
df_independent = df_independent.withColumn("Date", col("year") * 10000 + col("month") * 100 + col("day")).select(col("videoId"), col("Date")).withColumn("from", lit("Independent"))

df_sun = spark.read.schema(special_schema).format("csv").option("header",True).option("encoding", "utf-8").load("./final/SUN_videos_final.csv")
df_sun = df_sun.filter(col("channelId") != "na").select(col("videoId"), col("year"), col("month"), col("day")).filter(col("videoId") != "#NAME?")
df_sun = df_sun.withColumn("Date", col("year") * 10000 + col("month") * 100 + col("day")).select(col("videoId"), col("Date")).withColumn("from", lit("SUN"))
df_sun.show()

+-----------+--------+----+
|    videoId|    Date|from|
+-----------+--------+----+
|-Lfy5GcJkNk|20200320| SUN|
|FqDo_QWoqtE|20200325| SUN|
|Pu4GWu3cMec|20200325| SUN|
|iM6onBpOPUE|20200326| SUN|
|bYA3iTt0Shw|20200416| SUN|
|Cr5A7Q_MruU|20200416| SUN|
|8zGQMzs_ows|20200430| SUN|
|DmeQSrTAXac|20200430| SUN|
|TPAHsgwEvsU|20200510| SUN|
|AMRYSm54SKk|20200510| SUN|
|V1jqlU6pTE0|20200510| SUN|
|IlLrVsnLkzc|20200528| SUN|
|PMjTS-4WFLA|20200615| SUN|
|wOgc9g9iNlY|20200623| SUN|
|6LUxrhF1W38|20200629| SUN|
|NNNznC7hdho|20200629| SUN|
|TnHDev5qT-A|20200914| SUN|
|PNDC-Llff9c|20200922| SUN|
|Vufy881Y4kc|20200922| SUN|
|SW5XYszDTPE|20200930| SUN|
+-----------+--------+----+
only showing top 20 rows



In [7]:
df_media = df_bbc.unionAll(df_guardian).unionAll(df_dailymail).unionAll(df_independent).unionAll(df_sun)
df_comments = df_comments_base.join(df_media, on = (col("videoId") == col("vid"))).select(col("text"), col("Date").alias("date"), col("from"))
df_comments.show()

+--------------------+--------+-----------+
|                text|    date|       from|
+--------------------+--------+-----------+
|Six brain cells i...|20200914|Independent|
|It&#39;s not maki...|20200914|Independent|
|The snitches shou...|20200914|Independent|
|Soo everyone arou...|20200914|Independent|
|If you are the so...|20200914|Independent|
|making it up as w...|20200914|Independent|
|Fake news<br>Gove...|20200914|Independent|
|0.1% death rate o...|20200914|Independent|
|It&#39;s simple, ...|20200914|Independent|
|And Grouse shoote...|20200914|Independent|
|&quot;People shou...|20200914|Independent|
|Anyone noticed ho...|20200914|Independent|
|There&#39;s six o...|20200914|Independent|
|Bollocks to the g...|20200914|Independent|
|Clearly &quot;The...|20200914|Independent|
|make your voice h...|20200914|Independent|
|Unless you&#39;re...|20200914|Independent|
|Another power tri...|20200914|Independent|
|I&#39;ve got a ch...|20200914|Independent|
|Blackened tongues...|20200914|I