In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [2]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X ‚Äî Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple i

In [3]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True), \
])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending üòâ|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though üò≠...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro

In [4]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

videos.groupby("video_id") \
    .agg(count("video_id").alias("part_size")) \
    .orderBy(desc("part_size")).show()


videos.write.bucketBy(16,"video_id") \
  .saveAsTable("bucket_videos", format = "parquet", mode = "overwrite")

comments.write.bucketBy(16,"video_id") \
  .saveAsTable("bucket_comments", format = "parquet", mode = "overwrite")




+-----------+---------+
|   video_id|part_size|
+-----------+---------+
|LunHybOKIjU|        8|
|Hlt3rA-oDao|        8|
|Oo0NJsr5m4I|        8|
|jUrpOg4fBs0|        7|
|j5YSOabmFgw|        7|
|CYoRmfI0LUc|        7|
|mlxdnyfkWKQ|        7|
|udnGW3E1vxY|        7|
|3QWQ4gN3j4E|        7|
|M16CGK1T9MM|        7|
|XpVt6Z1Gjjo|        7|
|74zJ4scJzNs|        7|
|APHgDFRpCi0|        7|
|4X6a3G_0HjY|        7|
|SHq2qrFUlGY|        7|
|rgbnZG85IRo|        7|
|oKzFGhlFqqE|        7|
|5ggZ9jIHnr8|        7|
|DeTu8xSGpEM|        7|
|OlI8r3nNUVw|        7|
+-----------+---------+
only showing top 20 rows



In [5]:
bucket_videos   = spark.table("bucket_videos")
bucket_comments = spark.table("bucket_comments")


In [6]:
comments_stat = bucket_comments.distinct().select("video_id", "likes", "replies").groupBy("video_id").agg(
    sum("likes").alias("comment_likes"),
    sum("replies").alias("comment_replies"))

comments_stat.show()

+-----------+-------------+---------------+
|   video_id|comment_likes|comment_replies|
+-----------+-------------+---------------+
|zgLtEob6X-Q|           43|              2|
|B7YaMkCl3XA|           54|             96|
|6vGg-jJl30A|            9|              4|
|bp6uJJJMaLs|          629|             35|
|Pp19TkIU_fw|         1966|            181|
|u6iVspBWzZU|           11|              0|
|wGQtrwey-TI|          833|            401|
|ykvX-E1nuag|            1|              0|
|AR4UgRJOUQY|           28|              1|
|Zy6vBxqlapw|           58|              0|
|Lv5DFKceFac|           37|              8|
|9YyB6sQ4iwA|          191|             43|
|IYvEhgYy35I|           88|              7|
|JZDM1bLn7sM|           10|              3|
|tBN9kLaS-uw|           61|              3|
|bvim4rsNHkQ|           65|            101|
|zKriLekFPwg|          288|             41|
|4F2KWDQQMhY|           51|             22|
|z5eG8fD-hQw|            6|              6|
|FfRGxN2zeWU|          338|     

# 1) Scored_videos. Scoring videos.

In [7]:
def scoring_videos(views: pd.Series,
                   likes: pd.Series,
                   dislikes: pd.Series,
                   comment_likes: pd.Series,
                   comment_replies: pd.Series) -> pd.Series:
                       return views * 0.03 \
                            + likes * 0.07 \
                            + dislikes * 0.05 \
                            + comment_likes * 0.01 \
                            + comment_replies * 0.02
scoring_videos = pandas_udf(scoring_videos, returnType=LongType())  

In [8]:
scored_videos = bucket_videos.join(comments_stat, "video_id", "left")\
                          .withColumn("score", scoring_videos("views", 
                                                              "likes",
                                                              "dislikes",
                                                              "comment_likes", 
                                                              "comment_replies"))

In [9]:
scored_videos.select("video_id", "score").show(50)

+-----------+------+
|   video_id| score|
+-----------+------+
|3TDk34hnSXc|  2756|
|4F2KWDQQMhY|  7235|
|4F2KWDQQMhY|  7988|
|5eSSL8hRU_E|   940|
|5eSSL8hRU_E|  1448|
|6vGg-jJl30A|  6424|
|6vGg-jJl30A|  7344|
|9YyB6sQ4iwA| 25214|
|9YyB6sQ4iwA| 40984|
|AR4UgRJOUQY| 39563|
|B7YaMkCl3XA| 11598|
|B7YaMkCl3XA| 12019|
|B7YaMkCl3XA| 12240|
|FfRGxN2zeWU| 53452|
|FfRGxN2zeWU| 63714|
|IYvEhgYy35I|  3697|
|IYvEhgYy35I|  5894|
|IYvEhgYy35I|  7293|
|JZDM1bLn7sM|  7315|
|JZDM1bLn7sM|  8437|
|JZDM1bLn7sM|  9027|
|Lv5DFKceFac| 11924|
|Pp19TkIU_fw|  5820|
|Pp19TkIU_fw|  6208|
|Q1eQw4ycgQM|   477|
|Q1eQw4ycgQM|  3238|
|SHq2qrFUlGY|137052|
|Zy6vBxqlapw| 96337|
|bp6uJJJMaLs|  1358|
|bp6uJJJMaLs|  1566|
|bp6uJJJMaLs|  1677|
|bvim4rsNHkQ| 47532|
|bvim4rsNHkQ| 70881|
|lno7AN8hLvQ|  4280|
|tBN9kLaS-uw| 64290|
|tBN9kLaS-uw| 80963|
|u6iVspBWzZU|  6117|
|u6iVspBWzZU|  6506|
|uWFAD84I66I|  3327|
|wGQtrwey-TI| 19657|
|wGQtrwey-TI| 20352|
|ykvX-E1nuag|  5864|
|ykvX-E1nuag|  5973|
|z5eG8fD-hQw|  4058|
|z5eG8fD-hQw|

# 2) Categories_score. Median

In [10]:
import json
with open("../datasets/US_category_id.json") as f:
    data_json = json.load(f)
    categories = spark.createDataFrame(data_json['items'])


–∫–∞—Ç–µ–≥–æ—Ä–∏–∏ - –º–µ–ª–∫–∏–π —Å–ø—Ä–∞–≤–æ—á–Ω–∏–∫, –∫–æ—Ç–æ—Ä—ã–π –ª—É—á—à–µ –∑–∞–±—Ä–æ–¥–∫–∞—Å—Ç–∏—Ç—å

In [11]:
video_categories = scored_videos.join(broadcast(categories.selectExpr("id", "snippet.title as category")),
                                          scored_videos.category_id == categories.id,'left'
                                    ).drop("id","thumbnail_link" )
video_categories.show(10)

+-----------+--------------------+------------------+-----------+--------------------+-------+-----+--------+-------------+-----+-------------+---------------+-----+--------------------+
|   video_id|               title|     channel_title|category_id|                tags|  views|likes|dislikes|comment_total| date|comment_likes|comment_replies|score|            category|
+-----------+--------------------+------------------+-----------+--------------------+-------+-----+--------+-------------+-----+-------------+---------------+-----+--------------------+
|6vGg-jJl30A|THIS MADE MY DAD ...|       Nile Wilson|         17|nile wilson|nile ...| 185541|12179|     115|          827|13.09|            9|              4| 6424|              Sports|
|AR4UgRJOUQY|What Does Your Se...|       AsapSCIENCE|         28|Search History|De...|1244953|29748|    2639|         4112|13.09|           28|              1|39563|Science & Technology|
|B7YaMkCl3XA|Hurricane Irma de...|Al Jazeera English|         25|

In [12]:
from numpy import median
@pandas_udf(DoubleType(),PandasUDFType.GROUPED_AGG)
def mean(score: pd.Series) -> float:
    return median(score)

categories_scored = video_categories.fillna({"score": 0}).groupBy("category").agg(mean("score").alias("score"))
categories_scored.show(100)



+--------------------+-------+
|            category|  score|
+--------------------+-------+
|               Shows|  263.0|
|           Education| 7528.5|
|              Gaming| 8423.5|
|       Entertainment|11966.0|
|     Travel & Events| 7692.0|
|Science & Technology|12217.0|
|              Sports| 4850.5|
|       Howto & Style| 9070.0|
|Nonprofits & Acti...|    0.0|
|    Film & Animation|13676.0|
|      People & Blogs| 8930.0|
|     News & Politics| 4436.0|
|      Pets & Animals| 7487.0|
|    Autos & Vehicles| 6833.5|
|               Music| 7324.0|
|              Comedy|29389.0|
+--------------------+-------+



# 3) Popular tags

In [13]:

from pyspark.sql.column import Column, _to_java_column, _to_seq
sc = spark.sparkContext
def udfSplitTagsScala(tags):
    udf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(udf.apply(_to_seq(sc,[tags], _to_java_column)))

In [14]:
%%timeit -r 50
videos.select(udfSplitTagsScala(col("tags")).alias("tags_arr")).count()


66.8 ms ¬± 10.1 ms per loop (mean ¬± std. dev. of 50 runs, 1 loop each)


In [15]:
@pandas_udf(ArrayType(StringType()), functionType=PandasUDFType.SCALAR)
def udfPandasSplit(tags: pd.Series) -> pd.Series:
    return tags.str.split('|')

In [16]:
%%timeit -r 50
videos.select(udfPandasSplit(col("tags")).alias("tags_arr")).count()

51.3 ms ¬± 3.71 ms per loop (mean ¬± std. dev. of 50 runs, 10 loops each)


In [17]:
popular_tags = videos.select("video_id",explode(udfPandasSplit(col("tags")))
                             .alias("tag")).groupBy("tag").count()

In [18]:
popular_tags.show(10)

+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|apple iphone x event|    1|
|                 hud|    3|
|                 NBC|  118|
|      hannahstocking|   77|
|              online|   14|
|           trailer 1|    5|
|              travel|   90|
|           traveling|   23|
|  darkest foundation|    9|
|              outfit|   25|
+--------------------+-----+
only showing top 10 rows



# 4) Top 5 comment CAT

In [19]:
tags = videos.withColumn("tag_split", udfPandasSplit(col("tags"))).drop("likes")
videos_cat = tags.filter(array_contains("tag_split","cat"))
videos_cat.count()
videos_cat.show()

+-----------+------------------------------------+-------------+-----------+--------------------+------+--------+-------------+--------------------+-----+--------------------+
|   video_id|                               title|channel_title|category_id|                tags| views|dislikes|comment_total|      thumbnail_link| date|           tag_split|
+-----------+------------------------------------+-------------+-----------+--------------------+------+--------+-------------+--------------------+-----+--------------------+
|Vjc459T6wX8|Êõ¥„Å™„Çã„Éï„Ç£„ÉÉ„ÉàÊÑü„ÇíËøΩÊ±Ç„Åô„Çã„Å≠„Åì„ÄÇ-...|     mugumogu|         15|Maru|cat|kitty|pe...| 43199|       7|          294|https://i.ytimg.c...|13.09|[Maru, cat, kitty...|
|0Yhaei1S5oQ|                Japan's Ominous D...|      SciShow|         27|SciShow|science|H...|295156|     149|          781|https://i.ytimg.c...|13.09|[SciShow, science...|
|-1fzGnFwz9M|                9 Things You Need...|  Simon's Cat|         15|cartoon|simons ca...|189414|

In [20]:
videos_cat_comments = videos_cat.join(comments, "video_id")

In [21]:
videos_cat = videos_cat_comments.groupBy("comment_text") \
                .agg(sum("likes").alias("likes_total")) \
                .orderBy(desc("likes_total")) \
                .show(5,False,True)
        

-RECORD 0--------------------------------------------------------------------------------------------------------------
 comment_text | talk about the ocean sunfish build                                                                     
 likes_total  | 19840                                                                                                  
-RECORD 1--------------------------------------------------------------------------------------------------------------
 comment_text | The second I read this title in my notification, I started to giggle.                                  
 likes_total  | 11775                                                                                                  
-RECORD 2--------------------------------------------------------------------------------------------------------------
 comment_text | Make sure to check back next Friday as we are launching our brand new animated HALLOWEEN special! üê±üï∑ 
 likes_total  | 6645                