In [17]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets1/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [18]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets1/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

In [43]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

# смотрим есть ли перекос
videos.groupBy('video_id') \
    .agg(count('video_id').alias('partition_size')) \
    .orderBy(desc('partition_size')).show()
# т.к. перекоса нет, можно обойтись без соли

# используем оптимизацию - бакетинг 
videos.write \
    .bucketBy(32, 'video_id') \
    .saveAsTable('bucket_videos', format = 'csv', mode = 'overwrite')

comments.write \
    .bucketBy(32, 'video_id') \
    .saveAsTable('bucket_comments', format = 'csv', mode = 'overwrite')

+-----------+--------------+
|   video_id|partition_size|
+-----------+--------------+
|Oo0NJsr5m4I|             8|
|LunHybOKIjU|             8|
|Hlt3rA-oDao|             8|
|8ndhidEmUbI|             7|
|CYoRmfI0LUc|             7|
|odhMmAPDc54|             7|
|t8sgy0faXyg|             7|
|5ggZ9jIHnr8|             7|
|iALfvFpcItE|             7|
|3QWQ4gN3j4E|             7|
|5Xe0Qd6bUFo|             7|
|udnGW3E1vxY|             7|
|1QWLyi03twg|             7|
|SHq2qrFUlGY|             7|
|M16CGK1T9MM|             7|
|mlxdnyfkWKQ|             7|
|jUrpOg4fBs0|             7|
|j5YSOabmFgw|             7|
|WYYvHb03Eog|             7|
|UXdbCReBTR8|             7|
+-----------+--------------+
only showing top 20 rows



In [74]:
bucket_videos = spark.table('bucket_videos')
bucket_videos.show()
bucket_comments = spark.table('bucket_comments')
bucket_comments.show()

bucket_videos.join(bucket_comments, bucket_comments.video_id == bucket_videos.video_id) \
        .groupBy("bucket_videos.video_id") \
        .agg(sum('bucket_videos.likes').alias('video_likes'),
             first('bucket_videos.category_id').alias('category_id'),
             sum('bucket_videos.dislikes').alias('video_dislikes'),
             sum('bucket_comments.likes').alias('comments_likes'),
             count('bucket_comments.video_id').alias('comments_count'),
             sum('views').alias('video_views')
        ) \
        .write.bucketBy(32, 'video_id') \
        .saveAsTable('agg_video', format='parquet', mode='overwrite')

agg_video = spark.table("agg_video")

agg_video.show()

+-----------+--------------------+--------------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags| views|likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+
|JO7X9ZPoAp8|Blind People Desc...|      WatchCut Video|         24|blind people|buzz...|136643| 6218|     127|          524|https://i.ytimg.c...|13.09|
|qJJHhVf3_ZM|Gwyneth Paltrow C...|The Late Late Sho...|         24|James Corden|The ...|220661| 2070|     449|          352|https://i.ytimg.c...|13.09|
|ah8SQNOXgrQ|Obsessed With Net...|         Gus Johnson|         23|obsessed with net...| 62884| 6003|      72|          364|https://i.ytimg.c...|13.09|
|5kZi3J2S52E|The Final Steps o...|            NurdRage|         27|Science|experimen...|

In [77]:
# 1. Scoring

@pandas_udf('double')
def score(views: pd.Series, likes: pd.Series, dislikes: pd.Series, comments_likes: pd.Series, comments_count: pd.Series) -> pd.Series:
    return (1 - 1 / (views + likes - dislikes)) * 0.7 + (1 - 1 / (comments_likes + comments_count)) * 0.3

scored_videos = agg_video.withColumn('score', score('video_views', 'video_likes', 'video_dislikes', 'comments_likes', 'comments_count'))

scored_videos.show(50)


+-----------+-----------+-----------+--------------+--------------+--------------+-----------+------------------+
|   video_id|video_likes|category_id|video_dislikes|comments_likes|comments_count|video_views|             score|
+-----------+-----------+-----------+--------------+--------------+--------------+-----------+------------------+
|-B9z3az6Axc|   37006000|         24|        889000|          9120|          2500|  443328000|0.9999741809840403|
|-Jdc7FXupWQ|    4234000|         10|         91200|          5820|          1600|   68369200|0.9999595590795789|
|-uYWFqTPHd0|    9804000|         23|        291200|           864|          1600|  186038000|0.9998782431736142|
|0bcHEsmmpzU|   35771500|         24|       3031500|           435|          2500| 1654968500|0.9998977849344698|
|1Sm8A-u1vIA|    6264063|         24|        919269|           398|           234| 1683411210|0.9995253160411899|
|1u5jO57eD-U|  175871400|         24|       3457800|           785|          3000| 78072

In [93]:
# 2 Categories median 

categories = spark.read.option('header', 'true').option("inferSchema", "true").option('multiline', 'true')\
    .option("mode", "DROPMALFORMED")\
    .json('../datasets1/US_category_id.json') \
    .select(explode(col('items').alias('category'))).select('col.id', 'col.snippet.title')
categories.show()

@pandas_udf("double")
def median(v: pd.Series) -> float:
  return v.median()
    
# используем оптимизацию - broadcast, на небольшой таблице categories
categories_score = scored_videos.groupBy('category_id') \
  .agg(median('score').alias('score')) \
  .join(broadcast(categories), scored_videos['category_id'] == categories['id']) \
  .select(col('title'), col('score'))

categories_score.show()

+---+--------------------+
| id|               title|
+---+--------------------+
|  1|    Film & Animation|
|  2|    Autos & Vehicles|
| 10|               Music|
| 15|      Pets & Animals|
| 17|              Sports|
| 18|        Short Movies|
| 19|     Travel & Events|
| 20|              Gaming|
| 21|       Videoblogging|
| 22|      People & Blogs|
| 23|              Comedy|
| 24|       Entertainment|
| 25|     News & Politics|
| 26|       Howto & Style|
| 27|           Education|
| 28|Science & Technology|
| 29|Nonprofits & Acti...|
| 30|              Movies|
| 31|     Anime/Animation|
| 32|    Action/Adventure|
+---+--------------------+
only showing top 20 rows

+--------------------+------------------+
|               title|             score|
+--------------------+------------------+
|Science & Technology|0.9998348000793098|
|       Howto & Style|0.9998342512383545|
|           Education|0.9998576821747818|
|      People & Blogs|0.9998351561933709|
|    Film & Animation|0.99988163

In [88]:
# 3 popular_tags

from timeit import Timer
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.functions import pandas_udf

# Scala
def split_tags_scala(col):
    sc = spark.sparkContext
    _split_tags_udf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_split_tags_udf.apply(_to_seq(sc, [col], _to_java_column)))

tags_scala = bucket_videos\
                          .withColumn('tag', explode(split_tags_scala('tags'))) \
                        .groupBy('tags').agg(count('video_id').alias('count'))

scala_udf_timer = Timer(lambda: tags_scala.show())
print('split_tags_scala ' + str(scala_udf_timer.timeit(number = 1)))

# Python udf
@udf('array<string>')
def split_tags_udf(tags):
    return tags.split('|')

tags_udf = bucket_videos\
                          .withColumn('tag', explode(split_tags_udf('tags'))) \
                        .groupBy('tags').agg(count('video_id').alias('count'))

udf_timer = Timer(lambda: tags_udf.show())
print('split_tags_udf ' + str(udf_timer.timeit(number = 1)))

# Panda udf
@pandas_udf('array<string>', PandasUDFType.SCALAR)
def split_tags_pandas_udf(tags):
    return tags.str.split('|')

tags_pandas = bucket_videos\
                          .withColumn('tag', explode(split_tags_pandas_udf('tags'))) \
                        .groupBy('tags').agg(count('video_id').alias('count'))

pandas_udf_timer = Timer(lambda: tags_pandas.show())
print('split_tags_pandas_udf ' + str(pandas_udf_timer.timeit(numbe r=1)))

# Результат по времени scala < pandas_udf < udf

+--------------------+-----+
|                tags|count|
+--------------------+-----+
|Exclusive Content...|  128|
|kid rock|greatest...|   66|
|What What Happens...|   88|
|taylor swift|look...|   22|
|Google|YouTube|Yo...|   60|
|nbc news|us news|...|  100|
|ESPN|first take|s...|   96|
|sean spicer|steph...|   30|
|hannah|hart|hanna...|   57|
|high note challen...|   64|
+--------------------+-----+
only showing top 10 rows

split_tags_scala 0.7145087399985641
+--------------------+-----+
|                tags|count|
+--------------------+-----+
|Exclusive Content...|  128|
|kid rock|greatest...|   66|
|What What Happens...|   88|
|taylor swift|look...|   22|
|Google|YouTube|Yo...|   60|
|nbc news|us news|...|  100|
|ESPN|first take|s...|   96|
|sean spicer|steph...|   30|
|hannah|hart|hanna...|   57|
|high note challen...|   64|
+--------------------+-----+
only showing top 10 rows

split_tags_udf 2.458179980982095
+--------------------+-----+
|                tags|count|
+--------

In [92]:
# 4 Cats

cats = bucket_videos.withColumn('tags', explode(split_tags_pandas_udf('tags'))) \
                    .filter(col('tags') == 'cat')

bucket_comments.join(broadcast(cats), bucket_comments['video_id'] == cats['video_id']) \
  .select(col('comment_text'), col('bucket_comments.likes')).show()

+--------------------+-----+
|        comment_text|likes|
+--------------------+-----+
|How does Mugumogu...|   17|
|How does Mugumogu...|   17|
|How does Mugumogu...|   17|
|How does Mugumogu...|   17|
|                 Wtf|    0|
|                 Wtf|    0|
|                 Wtf|    0|
|                 Wtf|    0|
|Another Maru mast...|    0|
|Another Maru mast...|    0|
|Another Maru mast...|    0|
|Another Maru mast...|    0|
|         Very funny!|    0|
|         Very funny!|    0|
|         Very funny!|    0|
|         Very funny!|    0|
|Cube Maru Swish Tail|    0|
|Cube Maru Swish Tail|    0|
|Cube Maru Swish Tail|    0|
|Cube Maru Swish Tail|    0|
+--------------------+-----+
only showing top 20 rows

