In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [2]:
# Доп. библиотеки
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.window import Window
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
# pагрузка json справочника
path = '../datasets/US_category_id.json'
category = spark.read.json(path, multiLine=True)

In [4]:
# Обработка справочника
sdf_category_id = category.select(explode(col("items").getField("id")).alias("category_id"))
sdf_category_title = category.select(explode(col("items").getField("snippet").getField("title")))

w = Window().orderBy(lit(None))
sdf_category_id = sdf_category_id.withColumn('row_num', row_number().over(w))
sdf_category_title = sdf_category_title.withColumn('row_num', row_number().over(w))

category_res = sdf_category_id.join(sdf_category_title, 'row_num').drop('row_num')
category_res = category_res.select("category_id", col("col").alias("category_title"))
category_res.show(3, False)

+-----------+----------------+
|category_id|category_title  |
+-----------+----------------+
|1          |Film & Animation|
|2          |Autos & Vehicles|
|10         |Music           |
+-----------+----------------+
only showing top 3 rows



In [5]:
# Загрузка инфы по видео
videos = (spark.read
               .option('header', 'true')
               .option("inferSchema", "true")
               .csv('../datasets/USvideos.csv')
             )
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [6]:
comments_schema = StructType([ \
    StructField("comm_video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("comment_likes", IntegerType(), True), \
    StructField("comment_replies", IntegerType(), True)])
comments = (spark.read
                    .option('header', 'true')
                    .option("mode", "DROPMALFORMED")
                    .schema(comments_schema)
                    .csv('../datasets/UScomments.csv'))
comments.show()

+-------------+--------------------+-------------+---------------+
|comm_video_id|        comment_text|comment_likes|comment_replies|
+-------------+--------------------+-------------+---------------+
|  XpVt6Z1Gjjo|Logan Paul it's y...|            4|              0|
|  XpVt6Z1Gjjo|I've been followi...|            3|              0|
|  XpVt6Z1Gjjo|Say hi to Kong an...|            3|              0|
|  XpVt6Z1Gjjo| MY FAN . attendance|            3|              0|
|  XpVt6Z1Gjjo|         trending 😉|            3|              0|
|  XpVt6Z1Gjjo|#1 on trending AY...|            3|              0|
|  XpVt6Z1Gjjo|The end though 😭...|            4|              0|
|  XpVt6Z1Gjjo|#1 trending!!!!!!!!!|            3|              0|
|  XpVt6Z1Gjjo|Happy one year vl...|            3|              0|
|  XpVt6Z1Gjjo|You and your shit...|            0|              0|
|  XpVt6Z1Gjjo|There should be a...|            0|              0|
|  XpVt6Z1Gjjo|Dear Logan, I rea...|            0|              

### 1. scored_videos

In [7]:
# Датасет на основе файла USvideos.csv с добавлением колонки, содержащей скор (показатель качества) видео.
# Никто не знает, как считать скор, поэтому формулу предлагается придумать вам.
# Но она должна включать в себя просмотры, лайки, дизлайки видео, лайки и дизлайки к комментариям к этому видео.

In [8]:
# Выбираем бакетирование потому что у нас достаточно бальшие датасеты(оба) а партиционирование по ID не выполняется
# Создаем бакеты. Только я не понял, как правильно выбрать кол-во бакетов ?
# Папки с данными не удаляются из spark-warehouse, проще в терминале сносить rm -r 'spark-warehouse'
(comments.write
     .bucketBy(25, "comm_video_id")
     .saveAsTable("comments_bucketed2", format = 'csv', mode = 'overwrite') )

(videos.write
     .bucketBy(25, "video_id")
     .saveAsTable("videos_bucketed2", format = 'csv', mode = 'overwrite') )

In [9]:
comments_bucketed = spark.table("comments_bucketed2")
videos_bucketed = spark.table("videos_bucketed2")

video_and_comm = comments_bucketed.join(videos_bucketed, comments_bucketed.comm_video_id == videos_bucketed.video_id)

In [10]:
# Функция для расчета скора
def score_func(likes: pd.Series
                , dislikes: pd.Series
                , views: pd.Series
                , comment_likes: pd.Series
                , comment_replies: pd.Series
                , comment_total: pd.Series) -> pd.Series:
    # (видео:(лайки - дизлайки)/просмотры + комментарии: (лайки + ответы на комментарии)/общее кол-во комментариев ) / 2
    # (likes - dislikes)/views) + (comment_likes + comment_replies  / comment_total) / 2
    return ((likes + comment_likes + comment_replies + comment_total) - dislikes)/views
    # ( ((likes - dislikes)/views) + (comment_likes + (comment_replies  / comment_total)) ) / 2
multiply = pandas_udf(score_func, returnType=DoubleType())

In [11]:
# Агрегируем
video_and_comm_gr = (video_and_comm.groupBy("video_id", "category_id", "tags")
                                    .agg(
                                        sum("likes").alias("likes")
                                      , sum("dislikes").alias("dislikes")
                                      , sum("views").alias("views")
                                      , sum("comment_likes").alias("comment_likes")
                                      , sum("comment_replies").alias("comment_replies")
                                      , sum("comment_total").alias("comment_total")
                                    )
                                )

In [12]:
video_and_comm_gr = video_and_comm_gr.withColumn("score", multiply(col("likes"),col("dislikes"),col("views")
                                                                   ,col("comment_likes"),col("comment_replies"),col("comment_total")))
video_and_comm_gr.show(5)

+-----------+-----------+--------------------+---------+--------+----------+-------------+---------------+-------------+--------------------+
|   video_id|category_id|                tags|    likes|dislikes|     views|comment_likes|comment_replies|comment_total|               score|
+-----------+-----------+--------------------+---------+--------+----------+-------------+---------------+-------------+--------------------+
|-QL9dvmddYs|         23|Jolly|JOLLY|jolly...| 59100400|  317200|1161168400|          168|             24|      8225200| 0.05770790179960116|
|-UAdFerZMWc|         24|lego|lego star wa...|   404085|   13035|  10344180|         1936|            198|        49335| 0.04277951466428465|
|0R7MQwmbiQc|          1|jaiden|animations...|225604575|  927276|3219444018|       726220|           8064|     43480227|  0.0835211944971301|
|0qfgZJNCCJQ|         22|ranz|ranz kyle|ni...| 40423600|  406000| 550939600|          236|              4|      3565200| 0.07910674781772811|
|1u5jO

### 2. categories_score

In [13]:
# Датасет по категориям, в котором присутствуют следующие поля: Название категории (не id, он непонятный для аналитиков!) 
# - можно найти в файле US_category_id.json . Медиана показателя score из датасета scored_videos по каждой категории.

In [18]:
# тут фрейм по категория маленький и по условиям его можно поместить в broadcast
category_res_br = broadcast(category_res)

sdf_categ_med = video_and_comm_gr.join(category_res_br, video_and_comm_gr.category_id == category_res_br.category_id)
sdf_categ_med.show()

+-----------+-----------+--------------------+----------+--------+-----------+-------------+---------------+-------------+--------------------+-----------+----------------+
|   video_id|category_id|                tags|     likes|dislikes|      views|comment_likes|comment_replies|comment_total|               score|category_id|  category_title|
+-----------+-----------+--------------------+----------+--------+-----------+-------------+---------------+-------------+--------------------+-----------+----------------+
|-QL9dvmddYs|         23|Jolly|JOLLY|jolly...|  59100400|  317200| 1161168400|          168|             24|      8225200| 0.05770790179960116|         23|          Comedy|
|-UAdFerZMWc|         24|lego|lego star wa...|    404085|   13035|   10344180|         1936|            198|        49335| 0.04277951466428465|         24|   Entertainment|
|0R7MQwmbiQc|          1|jaiden|animations...| 225604575|  927276| 3219444018|       726220|           8064|     43480227|  0.083521194

In [19]:
# UDF для медианы
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
    return v.median()

In [20]:
# Считаем медиану
sdf_categ_med.groupby("category_title").agg(median_udf("score")).show(5, False)

+---------------+--------------------+
|category_title |median_udf(score)   |
+---------------+--------------------+
|Shows          |0.016979687959964674|
|Education      |0.038062898359452195|
|Gaming         |0.023096633509649425|
|Entertainment  |0.022277975638122213|
|Travel & Events|0.02704722125536605 |
+---------------+--------------------+
only showing top 5 rows



### 3. popular_tags 

In [21]:
# Датасет по самым популярным тэгам (название тэга + количество видео с этим тэгом). В исходном датасете тэги лежат строкой в поле tags.
# Другие разработчики уже сталкивались с подобной задачей, поэтому написали Scala-функцию для разбиения
# тегов (код функции - в разделе Примечания). Но не доверяйте им вслепую! Обязательно напишите свою UDF-функцию разбиения
# строки на тэги и сравните время работы с её Scala-версией. Можно замерять своими силами, а можно воспользоваться библиотекой timeit. 
# Стандартные функции Spark из пакета pyspark.sq.functions использовать нельзя, нужно написать свою функцию.

In [22]:
# функция для разбиения тегов
@pandas_udf('array<string>', PandasUDFType.SCALAR) 
def split_msg(string): 
    msg_ = string.str.split("\\|") 
    return msg_

In [23]:
sdf_tags = videos.select("video_id",explode(split_msg("tags")).alias("tag_nm"))
sdf_tags_gr = (sdf_tags.groupBy("tag_nm")
                            .agg(count("video_id")
                                 .alias("video_qty")))
sdf_tags_gr.orderBy(col("video_qty").desc()).show(5)

+------+---------+
|tag_nm|video_qty|
+------+---------+
| funny|      722|
|comedy|      572|
|[none]|      491|
|  2017|      309|
|how to|      284|
+------+---------+
only showing top 5 rows



In [26]:
# UDF Scala не понял как загружать и создавать. 
# Не уверен что мне это вообще когда-то понадобится в роли аналитика, да и Scala я не знаю))
# Егор, если есть возможность, можешь пояснить порядок действий

### 4. Личная просьба от Марка

In [27]:
# он любит котов (а кто не их не любит!) и хочет найти самые интересные комментарии (топ-5) к видео про котов.
# “Видео про котов” - видео, у которого есть тэг “cat”.

In [24]:
# Находим видео с тегом про котов и сохраняем в отдельный сет
# sdf_cat_tag = sdf_tags.where(col("tag_nm").like('% cat %'))
sdf_cat_tag = (sdf_tags.where(col("tag_nm") == 'cat')
                       .select("video_id").distinct()
              )
sdf_cat_tag.count()

13

In [25]:
# Фрейм с данными по нужному видео получился маленьким, поэтому снова broadcast
sdf_cat_tag_br = broadcast(sdf_cat_tag)
(comments.join(sdf_cat_tag_br, comments.comm_video_id == sdf_cat_tag.video_id)
         .orderBy(col("comment_likes").desc())
         .select("comment_text")
         .show(5, False)
)

+----------------------------------------------------------------------------------+
|comment_text                                                                      |
+----------------------------------------------------------------------------------+
|The second I read this title in my notification, I started to giggle.             |
|talk about the ocean sunfish build                                                |
|talk about the ocean sunfish build                                                |
|talk about the ocean sunfish build                                                |
|I make interesting cartoons and I need your help! Go to the channel, rate my work!|
+----------------------------------------------------------------------------------+
only showing top 5 rows

