In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

import pandas as pd
import random


In [2]:
spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [3]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [4]:
videos.count()

7998

In [5]:
videos.drop_duplicates().count()

7997

In [6]:
videos = videos.drop_duplicates()

In [7]:
videos.select("video_id").drop_duplicates().count()

2364

In [8]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

In [9]:
DATA_PATH = "../datasets"

In [10]:
def rand(): 
    return random.randint(0, 3) 
    
rand_udf = udf(rand)
salt_df = spark.range(0, 4)


In [11]:
comments_aggregated = comments.groupBy('video_id').sum()


In [12]:
comments_aggregated.columns

['video_id', 'sum(likes)', 'sum(replies)']

In [13]:
videos_with_comments = videos.join(broadcast(comments_aggregated), "video_id", "left")

In [14]:
# бродкаст меньшего дф

# task 1

score = (1/100) * views + 10 * likes - 15 * dislikes + sqrt(comment_total) + 0.5 * sum(likes) + 3 * sum(replies)

In [15]:
scored_videos = videos_with_comments.fillna(0).withColumn('score', 0.01 * col('views') + 10 * col('likes') 
                           - 15 * col('dislikes') + sqrt(col('comment_total')) + 0.5 * col('sum(likes)')
                                 + 3 * col('sum(replies)'))

In [16]:
scored_videos.select('video_id', 'title', 'views', 'likes', 'dislikes', 'comment_total', 'score').show(10)

+-----------+--------------------+-------+-----+--------+-------------+------------------+
|   video_id|               title|  views|likes|dislikes|comment_total|             score|
+-----------+--------------------+-------+-----+--------+-------------+------------------+
|vu_9muoxT50|Colin Cloud: Mind...| 990274|22897|     639|         2430|229364.53503017547|
|uBGECC5U09Q|Everything Wrong ...|1596548|37556|    1144|         6731|374468.52267182386|
|lno7AN8hLvQ|My Butt Has Some ...| 240137|12298|     554|         2044|117140.58061822182|
|Eat7nKj_30o|Massive earthquak...|1028351| 4606|    1846|         4068| 29121.79087487641|
|cWfTBkTJOH8|Diving with Sea T...| 433887|16784|     206|         2858|169162.33026561848|
|HjkmacUCJw8|2017 Emmy Fashion...|  39113|  140|      74|          130|1775.5317542509915|
|8F_eHiMYQjg|Saquon Barkley 85...|  28694|  143|       2|           82|1849.9953851381374|
|yIq-N7m0upU|Taking Everything...|1008102|71323|    1003|         4886| 735638.4199284692|

In [17]:
scored_videos_agg = scored_videos.groupBy('video_id', 'title').sum('score')

In [18]:
scored_videos_agg.show(10)

+-----------+--------------------+-------------------+
|   video_id|               title|         sum(score)|
+-----------+--------------------+-------------------+
|sNSzJiF4sB8|Chance The Rapper...|  538.6774468787578|
|Uw3eeaFT5Rw|My Top 3 Drugstor...| 115074.33281475658|
|3y5A4paFOb4|Logan Paul - Outt...|5.239298140833002E7|
|cPZ3u1z8kU0|Emmalyn - Self Ca...|  8023.514402419595|
|HsiWOp-ImoE|HURRY UP! by SUPE...| 1229760.1635473885|
|NheUm_izr6A|Tim McGraw, Faith...| 61249.469918559145|
|QJVlhvyLmPw|The Ford GT drive...| 3676.3889794855663|
|xGzppQWQvq0|Darius Rucker - T...| 51859.525789009895|
|IvFw9zhIYkQ|TOMB RAIDER - The...|  81812.17811212421|
|tbDr_zAcM5g|David Lynch Comme...|  2559.391570966068|
+-----------+--------------------+-------------------+
only showing top 10 rows



In [19]:
scored_videos_agg.toPandas().to_csv(f'{DATA_PATH}/scored_videos.csv')

# task 2

In [20]:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def median_udf(v):
    return v.median()



In [21]:
category_id = pd.read_json(f'{DATA_PATH}/US_category_id.json')


In [22]:
categories = [(i['id'], i['snippet']['title']) for i in category_id['items'].to_list()]

In [23]:
columns = ["category_id","title"]
category_df = spark.createDataFrame(data=categories, schema = columns)

In [26]:
categories_score = scored_videos.groupBy('category_id').agg(median_udf(scored_videos['score']))

In [25]:
categories_score = categories_score.join(broadcast(category_df), "category_id", "left")

In [27]:
# по логике тут вообще оптимизировать смысла не имеет - оба дф маленькие, но предположим что один из них меньше

In [28]:
categories_score.show()

+-----------+------------------+
|category_id| median_udf(score)|
+-----------+------------------+
|         28|102903.86358260454|
|         26|126200.46018588994|
|         27| 79998.44713047001|
|         22|101496.21589471283|
|          1| 66313.44580448601|
|         20| 40213.25209422822|
|         19| 82802.32745776343|
|         15| 76319.85694380733|
|         43|1195.0077625302981|
|         17|16948.421900349353|
|         23|252882.44692637963|
|         10|107185.15181449405|
|         25| 6684.407448713916|
|         24| 85696.83696125855|
|         29| 5696.682886552931|
|          2| 13876.75154961165|
+-----------+------------------+



In [29]:
categories_score.toPandas().to_csv(f'{DATA_PATH}/categories_score.csv')

# task 3

In [30]:
videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_total: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- date: string (nullable = true)



In [31]:
import timeit

@pandas_udf("array<string>", PandasUDFType.SCALAR)
def split_udf(v):
    return v.str.split("|")



In [32]:
%timeit for x in range(100): split_udf(col("tags"))

425 ms ± 19.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [33]:
videos.select(split_udf(col("tags"))).show()

+--------------------+
|     split_udf(tags)|
+--------------------+
|[America's Got Ta...|
|[alien, alien cov...|
|            [[none]]|
|[mexico, earthqua...|
|[adventure, adven...|
|[Access Hollywood...|
|[penn state nitta...|
|[taking everythin...|
|[administracja, l...|
|[fox, fox sports,...|
|[What What Happen...|
|[kungs, more mess...|
|[iphone x by pine...|
|[jeffree star, ve...|
|[iphone 8, iphone...|
|[fleurdeforce, fl...|
|[cars, climbkhana...|
|[Shay Mitchell, S...|
|[a24, a24 films, ...|
|[Shania Twain, Sh...|
+--------------------+
only showing top 20 rows



In [34]:
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq

sc = spark.sparkContext

def splitTagsUDFWrapper(row):
    _ipToIntUDF = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_ipToIntUDF.apply(_to_seq(sc, [row], _to_java_column)))

In [35]:
%timeit for x in range(100): splitTagsUDFWrapper(col("tags"))

513 ms ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [36]:
map = videos.select(split_udf(col("tags"))).rdd.flatMap(lambda x: 
                                                        x).flatMap(lambda x: x).countByValue().items()

In [37]:
popular_tags = spark.createDataFrame(data=map, schema = ["tag", "count"])
popular_tags.printSchema()
popular_tags.show()

root
 |-- tag: string (nullable = true)
 |-- count: long (nullable = true)

+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|America's Got Tal...|   13|
|america's got talent|   16|
|america's got tal...|   13|
|america's got tal...|   13|
|america's got tal...|   13|
|                 AGT|   13|
|  AGT 2017 auditions|   13|
|  AGT best auditions|   13|
|                 NBC|  118|
|                  TV|   49|
|            TV Shows|   13|
|          Highlights|   21|
|            Previews|   13|
|        Simon Cowell|   13|
|        Howie Mandel|   13|
|          Tyra Banks|   13|
|          Heidi Klum|   14|
|               Mel B|   13|
|           season 12|   50|
|           America's|   13|
+--------------------+-----+
only showing top 20 rows



In [38]:
popular_tags.toPandas().to_csv(f'{DATA_PATH}/popular_tags.csv')

# task 4

In [39]:
cat_tags = (lower(col('tags')).startswith('cat|') | lower(col('tags')).contains('|cat|') 
           | lower(col('tags')).endswith('|cat'))


has_comments = (col('comment_total')>0)

cat_video = videos.filter(cat_tags & has_comments)

In [40]:
cat_video.select('tags').show(10)

+--------------------+
|                tags|
+--------------------+
|cats|cat|kittens|...|
|cat|dog|cute|gami...|
|cartoon|simons ca...|
|cartoon|simons ca...|
|colleen ballinger...|
|Husky's First How...|
|cat|dog|cute|gami...|
|cartoon|simons ca...|
|cartoon|simons ca...|
|cat|dog|cute|gami...|
+--------------------+
only showing top 10 rows



In [41]:
salted_comments = comments.withColumn("salted_video_id", concat("video_id", lit("_"), lit(rand_udf())))

In [42]:
salted_comments.show(10)

+-----------+--------------------+-----+-------+---------------+
|   video_id|        comment_text|likes|replies|salted_video_id|
+-----------+--------------------+-----+-------+---------------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|  XpVt6Z1Gjjo_0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|  XpVt6Z1Gjjo_1|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|  XpVt6Z1Gjjo_2|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|  XpVt6Z1Gjjo_3|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|  XpVt6Z1Gjjo_1|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|  XpVt6Z1Gjjo_1|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|  XpVt6Z1Gjjo_2|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|  XpVt6Z1Gjjo_1|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|  XpVt6Z1Gjjo_2|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|  XpVt6Z1Gjjo_1|
+-----------+--------------------+-----+-------+---------------+
only showing top 10 rows



In [43]:
# ниже солю, допуская кейс, что у каких-то видео может быть слишком много комментов а у каких-то мало

In [44]:
salted_cat_video = cat_video.join(salt_df, 
                            how="cross").withColumn("salted_video_id", concat("video_id", lit("_"), "id")).drop("id")

salted_joined_df = salted_cat_video.join(salted_comments.withColumnRenamed( 'likes', 'comment_likes').drop('video_id'), on="salted_video_id", how="inner")


In [45]:
salted_joined_df.orderBy(col('comment_likes').desc()).show(20)

+---------------+-----------+--------------------+-------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+--------------------+-------------+-------+
|salted_video_id|   video_id|               title|channel_title|category_id|                tags| views|likes|dislikes|comment_total|      thumbnail_link| date|        comment_text|comment_likes|replies|
+---------------+-----------+--------------------+-------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+--------------------+-------------+-------+
|  xbBMVa2A68s_2|xbBMVa2A68s|Cat vs Dog - Best...|      TierZoo|         20|cat|dog|cute|gami...|320175|19593|     608|         4904|https://i.ytimg.c...|17.10|The second I read...|         2355|     15|
|  xbBMVa2A68s_2|xbBMVa2A68s|Cat vs Dog - Best...|      TierZoo|         20|cat|dog|cute|gami...|370320|21325|     694|         5356|https://i.ytimg.c...|20.10|The second I read...|   

In [46]:
salted_joined_df.orderBy(col('comment_likes').desc()).select('comment_total', 'thumbnail_link').show(20)

+-------------+--------------------+
|comment_total|      thumbnail_link|
+-------------+--------------------+
|         4904|https://i.ytimg.c...|
|         5356|https://i.ytimg.c...|
|         5202|https://i.ytimg.c...|
|         5412|https://i.ytimg.c...|
|         5293|https://i.ytimg.c...|
|         4904|https://i.ytimg.c...|
|         5293|https://i.ytimg.c...|
|         5356|https://i.ytimg.c...|
|         5202|https://i.ytimg.c...|
|         5412|https://i.ytimg.c...|
|         4904|https://i.ytimg.c...|
|         5356|https://i.ytimg.c...|
|         5202|https://i.ytimg.c...|
|         5412|https://i.ytimg.c...|
|         5293|https://i.ytimg.c...|
|         5356|https://i.ytimg.c...|
|         5202|https://i.ytimg.c...|
|         4904|https://i.ytimg.c...|
|         5412|https://i.ytimg.c...|
|         5293|https://i.ytimg.c...|
+-------------+--------------------+
only showing top 20 rows



В данных есть полудубликаты, которые все же различаются по полям thumbnail_link и comment_total, так что финального результата дропаю их

In [47]:
salted_joined_df.drop_duplicates(['comment_text']).orderBy(col('comment_likes').desc()).select('video_id', 'comment_text', 'comment_likes', 'replies').show(5)

+-----------+--------------------+-------------+-------+
|   video_id|        comment_text|comment_likes|replies|
+-----------+--------------------+-------------+-------+
|xbBMVa2A68s|The second I read...|         2355|     15|
|-1fzGnFwz9M|I make interestin...|          839|      5|
|xbBMVa2A68s|talk about the oc...|          802|     27|
|tp9aQXDFHbY|Make sure to chec...|          194|     22|
|tp9aQXDFHbY|If Simon will be ...|           37|      1|
+-----------+--------------------+-------------+-------+
only showing top 5 rows

