In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', -1) \ # отключим авто бродкаст
    .config('spark.sql.adaptive.enabled', 'true') \ # включим оптимизатор, лишнем не будет
    .getOrCreate()

In [2]:
spark

In [12]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show(5)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|           Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|       PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|       The Verge|         28|apple iphone x ha...|2642103| 24975| 

In [13]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show(5)

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
+-----------+--------------------+-----+-------+
only showing top 5 rows



## Task 1

In [14]:
scored_videos = (
    videos
    .withColumn('score', (col('views') + col('likes') + col('comment_total') - col('dislikes')) / 100000)
)

In [15]:
scored_videos.limit(1).toPandas()

Unnamed: 0,video_id,title,channel_title,category_id,tags,views,likes,dislikes,comment_total,thumbnail_link,date,score
0,XpVt6Z1Gjjo,1 YEAR OF VLOGGING -- HOW LOGAN PAUL CHANGED Y...,Logan Paul Vlogs,24,logan paul vlog|logan paul|logan|paul|olympics...,4394029,320053,5931,46245,https://i.ytimg.com/vi/XpVt6Z1Gjjo/default.jpg,13.09,47.54396


## Task 2

In [3]:
import json
import pandas as pd
from pyspark.sql.types import StringType

In [4]:
with open("../datasets/US_category_id.json", "r") as write_file:
    cat = json.load(write_file)

items = cat.get('items')

category_dict = {}
for i in items:
    category_dict[i.get('id')] = i.get('snippet').get('title')
    
data_list = [(k, v) for k, v in category_dict.items()]

In [8]:
category_df = spark.createDataFrame(data_list, ['category_id', 'title_name'])

In [9]:
category_df.limit(5).toPandas()

Unnamed: 0,category_id,title_name
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports


In [16]:
from pyspark.sql.functions import pandas_udf, udf, PandasUDFType
import numpy as np

In [17]:
# датасет маленький, поэтому лучше использовать бродкаст джоин
categories_score = scored_videos.join(category_df.hint('broadcast'), ['category_id'])

In [18]:
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def median__pandas_udf(scores):
    median = np.median(scores)
    return median



In [19]:
%time
medians_df = (
    categories_score
    .groupBy(col('title_name'))
    .agg(median__pandas_udf(col('score')))
)

CPU times: user 4 µs, sys: 2 µs, total: 6 µs
Wall time: 47.4 µs


In [20]:
medians_df.limit(3).toPandas()

Unnamed: 0,title_name,median__pandas_udf(score)
0,Shows,0.086335
1,Education,2.526625
2,Gaming,2.779255


## Task 3

### Тест обычной udf

In [24]:
@udf('array<string>')
def split_tags_udf(tags):
    return tags.split('|')

In [25]:
%time
popular_tags = (
    scored_videos
    .withColumn('tags', explode(split_tags_udf('tags')))
    .groupBy('tags').agg(count('video_id').alias('cnt_tags'))
)

CPU times: user 8 µs, sys: 0 ns, total: 8 µs
Wall time: 43.9 µs


In [26]:
popular_tags.orderBy(desc('cnt_tags')).limit(3).toPandas()

Unnamed: 0,tags,cnt_tags
0,funny,722
1,comedy,572
2,[none],491


### Тест pandas_udf

In [44]:
@pandas_udf('array<string>', PandasUDFType.SCALAR)
def split_tags_pandas_udf(tags):
    return tags.str.split('|')



In [28]:
%time
popular_tags = (
    scored_videos
    .withColumn('tags', explode(split_tags_pandas_udf('tags')))
    .groupBy('tags').agg(count('video_id').alias('cnt_tags'))
)

CPU times: user 18 µs, sys: 4 µs, total: 22 µs
Wall time: 38.4 µs


### Тест scala udf

In [29]:
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column 
from pyspark.sql.column import _to_seq

sc = spark.sparkContext

def udf_split_tags_scala_wrapper(tags):
    split_tags_scala = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(split_tags_scala.apply(_to_seq(sc, [tags], _to_java_column)))

In [30]:
%time
popular_tags = (
    scored_videos
    .withColumn('tags', explode(udf_split_tags_scala_wrapper('tags')))
    .groupBy('tags').agg(count('video_id').alias('cnt_tags'))
)

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 13.6 µs


In [32]:
popular_tags.orderBy(desc('cnt_tags')).limit(3).toPandas()

Unnamed: 0,tags,cnt_tags
0,funny,722
1,comedy,572
2,[none],491


### Вывод
Очевидно самая быстрая реализация udf в Scala, а самая медленная у обычной питоновской udf

## Task 4

In [84]:
cats_df = videos.withColumn('tags', explode(split_tags_pandas_udf('tags'))).filter(col('tags') == 'cat')

Так как мы уменьшили размер выборки, можно попробовать использовать бродкаст джоин

In [110]:
%time
cats_comments = (
    cats_df
    .join(comments.hint('broadcast').alias('c'), ['video_id'])
    .select(col('title'), col('comment_text'), col('c.likes'), )
    .distinct()
    .orderBy(desc('likes'))
)

CPU times: user 6 µs, sys: 2 µs, total: 8 µs
Wall time: 15 µs


И проверим без бродкаста 

In [112]:
%time
cats_comments = (
    cats_df
    .join(comments.alias('c'), ['video_id'])
    .select(col('title'), col('comment_text'), col('c.likes'), )
    .distinct()
    .orderBy(desc('likes'))
)

CPU times: user 10 µs, sys: 0 ns, total: 10 µs
Wall time: 15.7 µs


Прирост на уровне погрешности

In [113]:
cats_comments.limit(5).toPandas()

Unnamed: 0,title,comment_text,likes
0,Cat vs Dog - Best Support Class,The second I read this title in my notificatio...,2355
1,Cat vs Dog - Best Support Class,talk about the ocean sunfish build,1070
2,Cat vs Dog - Best Support Class,talk about the ocean sunfish build,1021
3,Cat vs Dog - Best Support Class,talk about the ocean sunfish build,957
4,9 Things You Need To Know About Kittens - Simo...,I make interesting cartoons and I need your he...,839
