# Загрузка данных

In [1]:
# !pip install kaggle

Нужен свой токен с kaggle

In [2]:
!echo '{"username":"","key":""}' > ../.kaggle/kaggle.json

In [3]:
!kaggle datasets download -d datasnaek/youtube -p ../datasets --unzip

Downloading youtube.zip to ../datasets
100%|██████████████████████████████████████| 55.9M/55.9M [00:11<00:00, 5.24MB/s]
100%|██████████████████████████████████████| 55.9M/55.9M [00:11<00:00, 5.02MB/s]


In [4]:
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

# Задание 1

scored_videos - датасет на основе файла USvideos.csv с добавлением колонки, содержащей скор (показатель качества) видео. Никто не знает, как считать скор, поэтому формулу предлагается придумать вам. Но она должна включать в себя просмотры, лайки, дизлайки видео, лайки и дизлайки к комментариям к этому видео.

In [5]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show(5)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|           Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|       PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|       The Verge|         28|apple iphone x ha...|2642103| 24975| 

In [6]:
videos.count(), videos.distinct().count()

(7998, 7997)

In [7]:
comments_schema = T.StructType([
    T.StructField("video_id", T.StringType(), True),
    T.StructField("comment_text", T.StringType(), True),
    T.StructField("likes", T.IntegerType(), True),
    T.StructField("replies", T.IntegerType(), True)
])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show(5)

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
+-----------+--------------------+-----+-------+
only showing top 5 rows



In [8]:
comments.count(), comments.distinct().count()

(691722, 471596)

Предлагаю ранжировать видео через линейную комбинацию признаков. Поскольку нет меток для регрессии, веса для признаков выберу по интуиции. Так мы получим некое значение по степени "крутости". Поскольку у одного ролиа может быть несколько комментариев, для простоты можно посчитать агрегацию по лайкам и ответам, например, сумму

### $$w_1 * views + w_2 * likes + w_3 * dislikes + w_4 * sum(comments\_likes) + w_5 * sum(comments\_replies)$$

In [9]:
comments_agg = (
    comments
    .distinct()
    .groupBy(F.col('video_id'))
    .agg(
        F.sum(F.col('likes')).alias('comments_likes'),
        F.sum(F.col('replies')).alias('comments_replies'),
    )
)

In [10]:
comments_agg.show(5)

+-----------+--------------+----------------+
|   video_id|comments_likes|comments_replies|
+-----------+--------------+----------------+
|xPS7bqBePSs|           990|              27|
|dInwVhRtN4E|            63|              13|
|rn5Xgak1zzA|            14|               7|
|eHq6ZA6uKOg|           688|             100|
|7TN09IP5JuI|            34|               0|
+-----------+--------------+----------------+
only showing top 5 rows



In [11]:
comments_agg.count(), comments_agg.distinct().count()

(2266, 2266)

In [12]:
(
    videos
    .groupBy('video_id')
    .agg(
        F.count('title').alias('partition_size')
    )
    .orderBy(F.desc('partition_size'))
).show()

+-----------+--------------+
|   video_id|partition_size|
+-----------+--------------+
|LunHybOKIjU|             8|
|Hlt3rA-oDao|             8|
|Oo0NJsr5m4I|             8|
|jUrpOg4fBs0|             7|
|j5YSOabmFgw|             7|
|CYoRmfI0LUc|             7|
|mlxdnyfkWKQ|             7|
|udnGW3E1vxY|             7|
|3QWQ4gN3j4E|             7|
|M16CGK1T9MM|             7|
|XpVt6Z1Gjjo|             7|
|74zJ4scJzNs|             7|
|APHgDFRpCi0|             7|
|4X6a3G_0HjY|             7|
|SHq2qrFUlGY|             7|
|rgbnZG85IRo|             7|
|oKzFGhlFqqE|             7|
|5ggZ9jIHnr8|             7|
|DeTu8xSGpEM|             7|
|OlI8r3nNUVw|             7|
+-----------+--------------+
only showing top 20 rows



В датасете videos нет сильной скошенности по ключу, значит от добавления соли выигрыша нет. В датасете comments_agg ключи уже уникальны и их меньше, чем в videos. Можно попробовать партиционирование по ключу video_id, чтобы равномерно распределить записи

In [13]:
videos = (
    videos.repartition(100, 'video_id')
    .join(comments_agg, on='video_id', how='left')
)

In [14]:
videos.select('video_id', 'title', 'views', 'likes', 'dislikes', 'comments_likes', 'comments_replies').show(5)

+-----------+--------------------+------+-----+--------+--------------+----------------+
|   video_id|               title| views|likes|dislikes|comments_likes|comments_replies|
+-----------+--------------------+------+-----+--------+--------------+----------------+
|xPS7bqBePSs|LÉON - I Believe ...| 14598| 1432|       6|           990|              27|
|a7Sf_H2cFdM|Drowning for Powe...|917643|25456|     731|            17|               5|
|a7Sf_H2cFdM|Drowning for Powe...|947706|25968|     741|            17|               5|
|mGqR9sgMIyA|Orlando braces fo...| 25652|   73|      11|            79|              40|
|H8t5M9_Tvzk|Depeche Mode - Co...| 93030| 6162|      81|           186|              25|
+-----------+--------------------+------+-----+--------+--------------+----------------+
only showing top 5 rows



In [15]:
@F.udf(returnType=T.FloatType())
def model_score(
    views,
    likes,
    dislikes,
    comments_likes,
    comments_replies,
    w_1: float = 0.1, 
    w_2: float = 10.0,
    w_3: float = -100.0,
    w_4: float = 2.0,
    w_5: float = 1.0,
):
    score = w_1 * views + w_2 * likes + w_3 * dislikes + w_4 * comments_likes + w_5 * comments_replies
    return score

подстелим соломку и обработаем пропуски

In [16]:
scored_videos = (
    videos
    .na.fill({'views': 0, 'likes': 0, 'dislikes': 0, 'comments_likes': 0, 'comments_replies': 0})
    .withColumn('model_score',
                model_score(F.col('views'), F.col('likes'), F.col('dislikes'), F.col('comments_likes'), F.col('comments_replies'))
               )
)

In [17]:
scored_videos.select('video_id', 'title', 'views', 'likes', 'dislikes', 'comments_likes', 'comments_replies', 'model_score').show(10)

+-----------+--------------------+------+-----+--------+--------------+----------------+-----------+
|   video_id|               title| views|likes|dislikes|comments_likes|comments_replies|model_score|
+-----------+--------------------+------+-----+--------+--------------+----------------+-----------+
|xPS7bqBePSs|LÉON - I Believe ...| 14598| 1432|       6|           990|              27|    17186.8|
|xPS7bqBePSs|LÉON - I Believe ...| 26232| 1968|      14|           990|              27|    22910.2|
|a7Sf_H2cFdM|Drowning for Powe...|917643|25456|     731|            17|               5|   273263.3|
|a7Sf_H2cFdM|Drowning for Powe...|947706|25968|     741|            17|               5|   280389.6|
|a7Sf_H2cFdM|Drowning for Powe...|971242|26435|     748|            17|               5|   286713.2|
|a7Sf_H2cFdM|Drowning for Powe...|986576|26725|     759|            17|               5|   290046.6|
|mGqR9sgMIyA|Orlando braces fo...| 25652|   73|      11|            79|              40|   

In [18]:
scored_videos.count(), scored_videos.distinct().count()

(7998, 7997)

## Задание 2

categories_score - датасет по категориям, в котором присутствуют следующие поля: Название категории (не id, он непонятный для аналитиков!). Медиана показателя score из датасета scored_videos по каждой категории.

Для расчета медианы нельзя использовать встроенную Spark-функцию median из пакета pyspark.sql.functions

Сначала прочитаем категории, извлечем их названия, а потом подклеим по category_id к видео

In [19]:
category_schema = T.StructType([
    T.StructField("etag", T.StringType(), True),
    T.StructField("kind", T.StringType(), True),
    T.StructField("items", T.ArrayType(T.MapType(T.StringType(), T.StringType(), True), True)),
])

In [20]:
category = spark.read.option("multiline","true").json('../datasets/US_category_id.json', schema=category_schema)
category = category.withColumn('items', F.explode(F.col('items')))
category.show()

+--------------------+--------------------+--------------------+
|                etag|                kind|               items|
+--------------------+--------------------+--------------------+
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|youtube#videoCate...|{kind -> youtube#...|
|"m2yskBQFythfE4ir...|you

Достанем заголовок из вложения

In [21]:
snippet_schema = T.StructType([
    T.StructField("channelId", T.StringType(), True),
    T.StructField("title", T.StringType(), True),
    T.StructField("assignable", T.BooleanType(), True),
])

In [22]:
category = (
    category
    .withColumn('snippet_json',
                F.from_json(F.col('items.snippet'), schema=snippet_schema)
               )
    .select(
        F.col('items.id').alias('category_id'),
        F.col('snippet_json.title').alias('category_title')
    )
)

In [23]:
category.show()

+-----------+--------------------+
|category_id|      category_title|
+-----------+--------------------+
|          1|    Film & Animation|
|          2|    Autos & Vehicles|
|         10|               Music|
|         15|      Pets & Animals|
|         17|              Sports|
|         18|        Short Movies|
|         19|     Travel & Events|
|         20|              Gaming|
|         21|       Videoblogging|
|         22|      People & Blogs|
|         23|              Comedy|
|         24|       Entertainment|
|         25|     News & Politics|
|         26|       Howto & Style|
|         27|           Education|
|         28|Science & Technology|
|         29|Nonprofits & Acti...|
|         30|              Movies|
|         31|     Anime/Animation|
|         32|    Action/Adventure|
+-----------+--------------------+
only showing top 20 rows



In [24]:
category.count(), category.distinct().count()

(32, 32)

Возможно, тут лучше broadcast сделать, так как категорий мало

In [25]:
categories_score = (
    scored_videos
    .join(F.broadcast(category), on='category_id', how='left')
)

In [26]:
categories_score.explain()

== Physical Plan ==
*(7) Project [category_id#20, video_id#17, title#18, channel_title#19, tags#21, views#345, likes#346, dislikes#347, comment_total#25, thumbnail_link#26, date#27, comments_likes#348L, comments_replies#349L, model_score#364, category_title#528]
+- *(7) BroadcastHashJoin [category_id#20], [cast(category_id#527 as int)], LeftOuter, BuildRight, false
   :- *(7) Project [video_id#17, title#18, channel_title#19, category_id#20, tags#21, views#345, likes#346, dislikes#347, comment_total#25, thumbnail_link#26, date#27, comments_likes#348L, comments_replies#349L, pythonUDF0#574 AS model_score#364]
   :  +- BatchEvalPython [model_score(views#345, likes#346, dislikes#347, comments_likes#348L, comments_replies#349L)#363], [pythonUDF0#574]
   :     +- *(5) Project [video_id#17, title#18, channel_title#19, category_id#20, tags#21, coalesce(views#22, 0) AS views#345, coalesce(likes#23, 0) AS likes#346, coalesce(dislikes#24, 0) AS dislikes#347, comment_total#25, thumbnail_link#26, d

In [27]:
categories_score.select('video_id', 'category_id', 'category_title', 'model_score').show(10)

+-----------+-----------+--------------------+-----------+
|   video_id|category_id|      category_title|model_score|
+-----------+-----------+--------------------+-----------+
|xPS7bqBePSs|         10|               Music|    17186.8|
|xPS7bqBePSs|         10|               Music|    22910.2|
|a7Sf_H2cFdM|          1|    Film & Animation|   273263.3|
|a7Sf_H2cFdM|          1|    Film & Animation|   280389.6|
|a7Sf_H2cFdM|          1|    Film & Animation|   286713.2|
|a7Sf_H2cFdM|          1|    Film & Animation|   290046.6|
|mGqR9sgMIyA|         25|     News & Politics|     2393.2|
|H8t5M9_Tvzk|         10|               Music|    63220.0|
|H8t5M9_Tvzk|         10|               Music|    84585.3|
|9W0WPPpCFaM|         28|Science & Technology|    75531.8|
+-----------+-----------+--------------------+-----------+
only showing top 10 rows



In [28]:
categories_score.count()

7998

Теперь сгруппируем и посчитаем медиану по скорингу

In [29]:
import numpy as np
import pandas as pd

In [30]:
@F.pandas_udf(T.FloatType(), F.PandasUDFType.GROUPED_AGG)
def median(scores) -> float:
    return np.median(scores)



In [31]:
categories_score = (
    categories_score
    .groupBy('category_title')
    .agg(
        median(F.col('model_score')).alias('median_model_score')
    )
)

In [32]:
categories_score.orderBy('median_model_score').show()

+--------------------+------------------+
|      category_title|median_model_score|
+--------------------+------------------+
|               Shows|           1252.25|
|     News & Politics|            4357.2|
|Nonprofits & Acti...|         4697.8496|
|              Sports|           20062.2|
|    Autos & Vehicles|          21725.05|
|              Gaming|           43896.5|
|    Film & Animation|           73514.5|
|       Entertainment|           81638.0|
|           Education|           81849.4|
|      Pets & Animals|           85859.1|
|Science & Technology|          89013.55|
|     Travel & Events|          95189.91|
|      People & Blogs|           95631.2|
|               Music|         107169.35|
|       Howto & Style|         125801.91|
|              Comedy|          215351.9|
+--------------------+------------------+



# Задание 3

In [33]:
import time
from tqdm import tqdm

popular_tags - датасет по самым популярным тэгам (название тэга + количество видео с этим тэгом). В исходном датасете тэги лежат строкой в поле tags. Другие разработчики уже сталкивались с подобной задачей, поэтому написали Scala-функцию для разбиения тегов. Но не доверяйте им вслепую! Обязательно напишите свою функцию разбиения строки на тэги и сравните время работы с её Scala-версией. Можно замерять своими силами, а можно воспользоваться библиотекой timeit.

In [34]:
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq

In [35]:
sc = spark.sparkContext

Посмотрим на Scala UDF

In [36]:
def scala_splitTags_udf_wrapper(tags):
    splitTags_idf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(splitTags_idf.apply(_to_seq(sc, [tags], _to_java_column)))

In [37]:
scala_times = []
for _ in tqdm(range(1_000)):
    start_time = time.time()
    videos.withColumn('splitted_tags', scala_splitTags_udf_wrapper(F.col('tags'))).select('video_id', 'tags', 'splitted_tags').count()
    work_time = time.time() - start_time
    scala_times.append(work_time)

100%|██████████| 1000/1000 [06:28<00:00,  2.57it/s]


In [38]:
np.mean(scala_times), np.std(scala_times)

(0.38743825316429137, 0.0319139355599288)

Это было быстро...

Теперь напишем свою UDF

In [39]:
@F.pandas_udf(T.ArrayType(T.StringType(), True), F.PandasUDFType.SCALAR)
def split_tags_custom_udf(tags):
    return tags.split('|')

In [40]:
python_times = []
for _ in tqdm(range(1_000)):
    start_time = time.time()
    videos.withColumn('splitted_tags', split_tags_custom_udf(F.col('tags'))).select('video_id', 'tags', 'splitted_tags').count()
    work_time = time.time() - start_time
    python_times.append(work_time)

100%|██████████| 1000/1000 [06:14<00:00,  2.67it/s]


In [41]:
np.mean(python_times), np.std(python_times)

(0.3732949829101562, 0.03392586365832979)

По наблюдениям сложно сказать, какой вариант здесь значительно лучше, поскольку и тот и другой от итерации к итерации были быстрее. Возможно, при значительном объеме данных разница будет заметнее, но сейчас она мало на что влияет. Обычно, стандартное отклонение у pandas_udf повыше, нет уверенности, что данная реализация будет стабильна

Выберем вариант со Scala UDF, потому что так интереснее

In [42]:
tags = videos.withColumn('splitted_tags', scala_splitTags_udf_wrapper(F.col('tags')))

In [43]:
tags.select('video_id', 'tags', 'likes', 'splitted_tags').show()

+-----------+--------------------+-----+--------------------+
|   video_id|                tags|likes|       splitted_tags|
+-----------+--------------------+-----+--------------------+
|1u5jO57eD-U|James Corden|The ...|34089|[James Corden, Th...|
|1u5jO57eD-U|James Corden|The ...|50615|[James Corden, Th...|
|fcubmjoGH7I|online fashion|be...|23238|[online fashion, ...|
|1u5jO57eD-U|James Corden|The ...|62776|[James Corden, Th...|
|fcubmjoGH7I|online fashion|be...|26026|[online fashion, ...|
|xPS7bqBePSs|Columbia|I Believ...| 1432|[Columbia, I Beli...|
|1u5jO57eD-U|James Corden|The ...|71169|[James Corden, Th...|
|fcubmjoGH7I|online fashion|be...|27889|[online fashion, ...|
|xPS7bqBePSs|Columbia|I Believ...| 1968|[Columbia, I Beli...|
|1u5jO57eD-U|James Corden|The ...|74470|[James Corden, Th...|
|fcubmjoGH7I|online fashion|be...|28630|[online fashion, ...|
|xPS7bqBePSs|Columbia|I Believ...| 2282|[Columbia, I Beli...|
|dInwVhRtN4E|LEGO|LEGO Batman|...|43066|[LEGO, LEGO Batma...|
|fcubmjo

In [44]:
tags.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_total: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- date: string (nullable = true)
 |-- comments_likes: long (nullable = true)
 |-- comments_replies: long (nullable = true)
 |-- splitted_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [45]:
popular_tags = (
    tags
    .withColumn('tag', 
                F.explode(F.col('splitted_tags'))
               )
    .groupBy('tag')
    .agg(
        F.count(F.col('video_id')).alias('count_video_id')
    )
    .orderBy(F.desc('count_video_id'))
)

In [46]:
popular_tags.show()

+---------+--------------+
|      tag|count_video_id|
+---------+--------------+
|    funny|           722|
|   comedy|           572|
|   [none]|           491|
|     2017|           309|
|   how to|           284|
|     vlog|           273|
|    humor|           258|
|   makeup|           254|
|    music|           250|
| tutorial|           235|
|     food|           224|
|    video|           219|
|   review|           218|
|celebrity|           211|
|     news|           211|
|   beauty|           210|
|interview|           209|
|  science|           197|
|      Pop|           190|
|  trailer|           180|
+---------+--------------+
only showing top 20 rows



# Задание 4

И личная просьба от Марка: он любит котов (а кто не их не любит!) и хочет найти самые интересные комментарии (топ-5) к видео про котов. “Видео про котов” - видео, у которого есть тэг “cat”.

In [47]:
cat_videos = tags.filter(
    F.array_contains(F.col('splitted_tags'), 'cat')
)

In [48]:
cat_videos.count()

48

Видно, что достаточно немного видео имеют тег cat. Поскольку мы предполагаем, что датасеты огромные, то попробуем применить бакетирование по video_id

In [50]:
cat_videos.write \
    .bucketBy(16, 'video_id') \
    .saveAsTable('cat_videos_bucketed', format='parquet', mode='overwrite')
    
comments.distinct().write \
    .bucketBy(16, 'video_id') \
    .saveAsTable('comments_bucketed', format='parquet', mode='overwrite')

In [51]:
cat_videos_bucketed = spark.table('cat_videos_bucketed')
comments_bucketed = spark.table('comments_bucketed')

In [52]:
cat_comments = (
    cat_videos_bucketed
    .join(comments_bucketed, on='video_id', how='inner')
    .select('video_id', 'tags', 'comment_text', 'comments_bucketed.likes')
    .distinct()
    .orderBy(F.desc('comments_bucketed.likes'))
)

In [53]:
cat_comments.show(5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 video_id     | xbBMVa2A68s                                                                                                                                                                                                                                                                                                                                                                                                                                                   
 tags         | cat|dog|cute|gaming|overwatch|runescape|osrs              

Код без оптимизаций

In [55]:
(
    tags
    .filter(
        F.array_contains(F.col('splitted_tags'), 'cat')
    )
    .join(comments.withColumnRenamed('likes', 'comment_likes'), on='video_id', how='inner')
    .select('video_id', 'tags', 'comment_text', 'comment_likes')
    .distinct()
    .orderBy(F.desc('comment_likes'))
).show(5, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 video_id      | xbBMVa2A68s                                                                                                                                                                                                                                                                                                                                                                                                                                                   
 tags          | cat|dog|cute|gaming|overwatch|runescape|osrs           

In [56]:
spark.stop()