In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

spark

In [2]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.orderBy('video_id').show()

+-----------+--------------------+-------------------+-----------+--------------------+-------+-----+--------+-------------+--------------------+-----+
|   video_id|               title|      channel_title|category_id|                tags|  views|likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-------------------+-----------+--------------------+-------+-----+--------+-------------+--------------------+-----+
|--JinobXWPk|DANGEROUS Jungle ...|   Brave Wilderness|         15|adventure|adventu...|1319945|38949|     533|         6768|https://i.ytimg.c...|20.10|
|-1fzGnFwz9M|9 Things You Need...|        Simon's Cat|         15|cartoon|simons ca...| 189414| 7070|     112|          288|https://i.ytimg.c...|13.09|
|-3AGlBYyLjo|Best Tom Petty In...|   CrazyLaughAction|         24|tom|petty|tom pet...|   2143|   16|       2|            4|https://i.ytimg.c...|06.10|
|-3lMEZ6k5NA|170912 BTS singin...|        Kpop Plus01|         10|          170912 BTS| 

In [3]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

## Задание 1

In [15]:
# Перезапишем исходный датасет с видео с применением бакетирования по video_id

videos.write.bucketBy(16, 'video_id').saveAsTable('USvideos_bkt', format = 'parquet', mode = 'overwrite')
videos_bkt = spark.table('USvideos_bkt')

In [16]:
# Считаем в датасете с видео количество просмотров, лайков, дизлайков и количества комментариев для каждого видео. 
# Сразу считаем значение скора по просмотрам как общее кол-во лайков минус общее кол-во дизлайков делить на общее кол-во просмотров (за все даты)

videos_gr = videos_bkt.groupBy('video_id', 'title', 'channel_title', 'category_id', 'tags', 'thumbnail_link')\
                      .agg(sum('views').alias('views'),
                           sum('likes').alias('likes'),
                           sum('dislikes').alias('dislikes'),
                           sum('comment_total').alias('comment_total'),
                           round(((sum('likes') - sum('dislikes')) / sum('views')), 5).alias('score_view')
                          )

In [17]:
videos_gr.show()

+-----------+--------------------+--------------------+-----------+--------------------+--------------------+--------+------+--------+-------------+----------+
|   video_id|               title|       channel_title|category_id|                tags|      thumbnail_link|   views| likes|dislikes|comment_total|score_view|
+-----------+--------------------+--------------------+-----------+--------------------+--------------------+--------+------+--------+-------------+----------+
|zgLtEob6X-Q|Honest Trailers -...|      Screen Junkies|          1|screenjunkies|scr...|https://i.ytimg.c...| 8814577|211090|    6702|        27203|   0.02319|
|B7YaMkCl3XA|Hurricane Irma de...|  Al Jazeera English|         25|5573051142001|ame...|https://i.ytimg.c...|  778973|  3078|     562|         2536|   0.00323|
|6vGg-jJl30A|THIS MADE MY DAD ...|         Nile Wilson|         17|nile wilson|nile ...|https://i.ytimg.c...|  400025| 25087|     240|         1764|   0.06211|
|bp6uJJJMaLs|Things you need t...|     J

In [18]:
# Перезапишем исходный датасет с комментариями с применением бакетирования по video_id

comments.write.bucketBy(16, 'video_id').saveAsTable('USComments_bkt', format = 'parquet', mode = 'overwrite')
comments_bkt = spark.table('USComments_bkt')

In [19]:
# Считаем в датасете с комментариямидля каждого видео количество лайков и ответов на комментарии. 

comments_gr = comments_bkt.groupBy('video_id').agg(sum('likes').alias('comment_likes'),
                                                   sum('replies').alias('comment_replies')
                                                   )

In [20]:
# Объединяем сгруппированные датасеты (оптимизация на основе предварительной записи с бакетированием), 
# считаем значение скора по комментариям и итоговый скор как произведение скоров по просмотрам и по комментариям,
# итоговое значение умножаем на 100000 для наглядности

videos_all = videos_gr.join(comments_gr,'video_id', 'left').fillna(0)\
                      .withColumn('score_comment', round(col('comment_likes')/(col('comment_total') + col('comment_replies')), 5))\
                      .withColumn('score', (round(col('score_view')*col('score_comment'), 5) * 100000).cast('int'))

In [21]:
scored_videos = videos_all.select('video_id', 'title', 'channel_title', 'category_id', 'tags', 'thumbnail_link', 'score')

scored_videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|      thumbnail_link|score|
+-----------+--------------------+--------------------+-----------+--------------------+--------------------+-----+
|4F2KWDQQMhY|Riverdale: Betwee...|    Madelaine Petsch|         22|madelaine|madelai...|https://i.ytimg.c...|   90|
|5eSSL8hRU_E|Kelly Clarkson Ta...|    Z100 is New York|         10|kelly clarkson|lo...|https://i.ytimg.c...| 2635|
|6vGg-jJl30A|THIS MADE MY DAD ...|         Nile Wilson|         17|nile wilson|nile ...|https://i.ytimg.c...|   46|
|9YyB6sQ4iwA|iPhone X and iPho...|            iJustine|         28|ijustine|iphone x...|https://i.ytimg.c...|   82|
|AR4UgRJOUQY|What Does Your Se...|         AsapSCIENCE|         28|Search History|De...|https://i.ytimg.c...|   14|
|B7YaMkCl3XA|Hurricane Irma de...|  Al Jazeera English|         25|55730

## Задание 2

In [102]:
# Читаем json

categories = spark.read.format('json').option('multiline', 'true').load('../datasets/US_category_id.json').select(inline_outer('items'))

In [103]:
# Вытаскиваем id категории и название

categories = categories.select(categories.id.alias('category_id'), categories.snippet.title.alias('title'))

In [11]:
categories.show()

+-----------+--------------------+
|category_id|               title|
+-----------+--------------------+
|          1|    Film & Animation|
|          2|    Autos & Vehicles|
|         10|               Music|
|         15|      Pets & Animals|
|         17|              Sports|
|         18|        Short Movies|
|         19|     Travel & Events|
|         20|              Gaming|
|         21|       Videoblogging|
|         22|      People & Blogs|
|         23|              Comedy|
|         24|       Entertainment|
|         25|     News & Politics|
|         26|       Howto & Style|
|         27|           Education|
|         28|Science & Technology|
|         29|Nonprofits & Acti...|
|         30|              Movies|
|         31|     Anime/Animation|
|         32|    Action/Adventure|
+-----------+--------------------+
only showing top 20 rows



In [114]:
# UDF-функция для вычисления медианы. На вход получает сгруппированный датафрейм

@pandas_udf('category_id int, score int', PandasUDFType.GROUPED_MAP)
def compute_median(pdf):
    return pdf.assign(score = pdf.score.median())

In [118]:
# Применяем функцию к датафрейму

scored_categories = scored_videos.select('category_id', 'score').groupBy('category_id').apply(compute_median).dropDuplicates().withColumnRenamed('score', 'median_score')

In [116]:
scored_categories.show()

+-----------+------------+
|category_id|median_score|
+-----------+------------+
|         17|          89|
|          1|         227|
|         25|          27|
|         23|          38|
|         26|         300|
|         22|         101|
|         19|         172|
|          2|         150|
|         20|          43|
|         28|          91|
|         29|        1246|
|         27|         219|
|         43|        1494|
|         15|         304|
|         24|          93|
|         10|         183|
+-----------+------------+



In [120]:
# Приджойниваем названия категорий. Т.к. датасет с названиями маленький, то присоединяем его броадкастом.

scored_categories = scored_categories.join(broadcast(categories), 'category_id', 'inner').select('title', 'median_score')

In [121]:
scored_categories.orderBy(desc('median_score')).show(truncate=False)

+---------------------+------------+
|title                |median_score|
+---------------------+------------+
|Shows                |1494        |
|Nonprofits & Activism|1246        |
|Pets & Animals       |304         |
|Howto & Style        |300         |
|Film & Animation     |227         |
|Education            |219         |
|Music                |183         |
|Travel & Events      |172         |
|Autos & Vehicles     |150         |
|People & Blogs       |101         |
|Entertainment        |93          |
|Science & Technology |91          |
|Sports               |89          |
|Gaming               |43          |
|Comedy               |38          |
|News & Politics      |27          |
+---------------------+------------+



## Задание 3

In [78]:
import pandas as pd
import timeit

In [128]:
# UDF-функция для разбиения колонки с тэгами на Scala

from pyspark.sql.column import Column, _to_java_column, _to_seq

sc = spark.sparkContext

def udfSplitTagsScalaWraper(tags):
    _udf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_udf.apply(_to_seq(sc, [tags], _to_java_column)))

splited_tags = videos.select('tags', udfSplitTagsScalaWraper(col('tags')).alias('tags'))

print(f"Время выполнения UDF на Scala: {timeit.timeit('splited_tags.count()', number=1, globals=globals())} сек")

Время выполнения UDF на Scala: 0.08486641698982567 сек


In [127]:
# UDF-функция для разбиения колонки с тэгами на Python

@pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
def split_tags(tags):
    return tags.str.split('|')

splited_tags = videos.select(split_tags('tags').alias('tags'))

print(f"Время выполнения UDF на Python: {timeit.timeit('splited_tags.count()', number=1, globals=globals())}  сек")

Время выполнения UDF на Python: 0.05377666593994945  сек


In [129]:
# Применияем UDF-функцию для разбиения колонки с тэгами, для каждого тэга создаём отдельную запись

videos_tags = videos.select('video_id', explode_outer(split_tags('tags')).alias('tags'))

In [130]:
# Считаем количество уникальных видео для каждого тэга, выводим результат в порядке убывания популярности тэгов

popular_tags = videos_tags.distinct().groupBy('tags').agg(count('video_id').alias('sum_videos')).orderBy(col('sum_videos').desc()).show()

+-----------+----------+
|       tags|sum_videos|
+-----------+----------+
|      funny|       217|
|     comedy|       163|
|     [none]|       146|
|       2017|        93|
|      humor|        92|
|     how to|        84|
|     makeup|        77|
|      music|        74|
|       vlog|        73|
|      video|        72|
|  interview|        70|
|   tutorial|        69|
|  celebrity|        64|
|     review|        61|
|       news|        61|
|celebrities|        59|
|     beauty|        58|
|       food|        57|
|    science|        56|
|   comedian|        55|
+-----------+----------+
only showing top 20 rows



## Задание 4

In [25]:
# Фильтруем бакетированный датасет с видео по тэгу cat через регулярное выражение (выбираем в данном случае только отдельно стоящий регистрозависимый тэг "cat"), убираем дубликаты

video_cats = videos_bkt.filter(regexp_extract('tags', r'\|cat\|', 0) != '').select('video_id').distinct()
video_cats.count()

11

In [26]:
# К отфильтрованному датасету с видео джойним датасет с комментариями, используем броадкаст для video_cats,
# группируем по видео и комментариям, считаем количество лайков, сортируем и выводим 5 комменариев с самым большим количеством лайков

video_cats = broadcast(video_cats)
top_5_comments = comments.join(video_cats, 'video_id', 'inner')\
                         .groupBy('video_id', 'comment_text')\
                         .agg(sum('likes').alias('likes'),
                              sum('replies').alias('replies'))\
                         .orderBy(desc('likes'))\
                         .limit(5)

In [27]:
top_5_comments.show(truncate=False)

+-----------+------------------------------------------------------------------------------------------------------+-----+-------+
|video_id   |comment_text                                                                                          |likes|replies|
+-----------+------------------------------------------------------------------------------------------------------+-----+-------+
|tp9aQXDFHbY|Make sure to check back next Friday as we are launching our brand new animated HALLOWEEN special! 🐱🕷|1329 |162    |
|-1fzGnFwz9M|I make interesting cartoons and I need your help! Go to the channel, rate my work!                    |839  |5      |
|tp9aQXDFHbY|1:51 so your nuts are your most prized possession?                                                    |121  |8      |
|Vjc459T6wX8|How does Mugumogu not collapse in a heap of laughter?!! Maru's liquified form is hilarious!           |50   |7      |
|tp9aQXDFHbY|If Simon will be make animation movie of Simons Cat adventures, I’ll go 

In [28]:
spark.stop()