In [None]:

!pip install pyspark==3.3.0




In [None]:
!pip install --no-cache-dir mmh3 bitarray



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.window import Window
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
# from pyspark.sql.functions import PandasUDFType, pandas_udf, UserDefinedFunction

import timeit
import pandas as pd
from typing import Any, Dict, List, Optional



In [None]:
spark = (
    SparkSession.builder.master("local[*]")
    .appName("TEST-R")
   # .config("spark.yarn.queue", "data_science")#  data_science -> user
    .config("spark.driver.cores", "4")
    .config("spark.driver.memory", "6g")
    .config("spark.executor.cores", "4")
    .config("spark.executor.memory", "10g")
    .config("spark.default.parallelism", "10")
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "1")
    .config("spark.dynamicAllocation.maxExecutors", "10")
    .config("spark.shuffle.service.enabled", "true")
    .config('spark.sql.autoBroadcastJoinThreshold', -1)
    .config('spark.sql.adaptive.enabled', 'false')
    .config("spark.jars", "/content/super_udf_lib.jar")
    .config("spark.port.maxRetries", "100000")
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
spark

In [None]:
sc = spark.sparkContext

In [None]:
_scala_func = sc._jvm.CustomUDFs.splitTagsUDF()

# Новый раздел

In [None]:
def scl_split(col: f.Column):
    _scala_func = sc._jvm.CustomUDFs.splitTagsUDF()
    return f.Column(_scala_func.apply(_to_seq(sc, [col], _to_java_column)))

In [None]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('./USvideos.csv')

In [None]:
%%time
videos.withColumn("new_tags", scl_split(f.col("tags"))).show(2)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|            new_tags|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|[logan paul vlog,...|
|K4wEI5zhHB0|iPhone X — Introd...|           Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|[Apple, iPhone 10...|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+--

In [None]:
def py_split(tag: pd.Series) -> pd.Series:
    return tag.str.split(pat="|")
split = f.pandas_udf(py_split, returnType=t.ArrayType(t.StringType()))

In [None]:
%%time
videos.withColumn("new_tags", split(f.col("tags"))).show(2)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|            new_tags|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|[logan paul vlog,...|
|K4wEI5zhHB0|iPhone X — Introd...|           Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|[Apple, iPhone 10...|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+--

Из двух вариантов, побеждает фанкция написанная на scala.

Производительность выше в три раза `Wall time: 457 ms` -> `Wall time: 1.4 s`

In [None]:
videos_tg = videos.withColumn("tags", scl_split(f.col("tags")))
#.cache() -> Cached Partitions 1 Size in Memory 3.8 MiB

In [None]:
videos_tg.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_total: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- date: string (nullable = true)



In [None]:
videos_tg.count()

7998

In [None]:
comments_schema = (
    t.StructType([
    t.StructField("video_id", t.StringType(), True),
    t.StructField("comment_text", t.StringType(), True),
    t.StructField("likes", t.IntegerType(), True),
    t.StructField("replies", t.IntegerType(), True)])
)
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('./UScomments.csv')
#.cache() -> Cached Partitions 10 Size in Memory 60.0 MiB

In [None]:
comments.sort(f.desc("likes"), f.desc("replies")).show(10, False) # 691323

+-----------+----------------------------------------------------------------------------------------------------------------------------+-----+-------+
|video_id   |comment_text                                                                                                                |likes|replies|
+-----------+----------------------------------------------------------------------------------------------------------------------------+-----+-------+
|0R7MQwmbiQc|What if Mable was the duck??                                                                                                |48772|501    |
|0R7MQwmbiQc|What if Mable was the duck??                                                                                                |47785|501    |
|0R7MQwmbiQc|What if Mable was the duck??                                                                                                |45316|500    |
|0R7MQwmbiQc|What if Mable was the duck??                                         

In [None]:
comments.count()

494543

Будем считать что таблица `comments` у нас гиганская -> `691722 rows`, а таблица `videos` среднего размера -> `7998 rows`

Сравним как будет работать обычный join и 'соленый'

In [None]:
%%time
test_join = (
    videos_tg
    .join(comments, on=["video_id"], how="left")
).explain() # show(2)

== Physical Plan ==
*(5) Project [video_id#21, title#22, channel_title#23, category_id#24, tags#195, views#26, likes#27, dislikes#28, comment_total#29, thumbnail_link#30, date#31, comment_text#227, likes#228, replies#229]
+- *(5) SortMergeJoin [video_id#21], [video_id#226], LeftOuter
   :- *(2) Sort [video_id#21 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(video_id#21, 200), ENSURE_REQUIREMENTS, [id=#136]
   :     +- *(1) Project [video_id#21, title#22, channel_title#23, category_id#24, UDF(tags#25) AS tags#195, views#26, likes#27, dislikes#28, comment_total#29, thumbnail_link#30, date#31]
   :        +- FileScan csv [video_id#21,title#22,channel_title#23,category_id#24,tags#25,views#26,likes#27,dislikes#28,comment_total#29,thumbnail_link#30,date#31] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<video_id:string,title:string,channel_title:string,

In [None]:
%%time
test_join = (
    videos_tg
    .join(comments, on=["video_id"], how="left")
).show(2)

+-----------+--------------------+-------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+-----+-------+
|   video_id|               title|channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|        comment_text|likes|replies|
+-----------+--------------------+-------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+-----+-------+
|K4wEI5zhHB0|iPhone X — Introd...|        Apple|         28|[Apple, iPhone 10...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|                null| null|   null|
|cLdxuaxaQwc|         My Response|    PewDiePie|         22|            [[none]]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|Love you Pewdiepi...|    0|      0|
+-----------+--------------------+-------------+-----------+--------------------+-------+-

In [None]:
%%time
test_join = (
    comments
    .join(f.broadcast(videos_tg), on=["video_id"], how="right")
).explain()

== Physical Plan ==
*(5) Project [video_id#21, comment_text#227, likes#228, replies#229, title#22, channel_title#23, category_id#24, tags#195, views#26, likes#27, dislikes#28, comment_total#29, thumbnail_link#30, date#31]
+- *(5) SortMergeJoin [video_id#226], [video_id#21], RightOuter
   :- *(2) Sort [video_id#226 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(video_id#226, 200), ENSURE_REQUIREMENTS, [id=#274]
   :     +- *(1) Filter isnotnull(video_id#226)
   :        +- FileScan csv [video_id#226,comment_text#227,likes#228,replies#229] Batched: false, DataFilters: [isnotnull(video_id#226)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/UScomments.csv], PartitionFilters: [], PushedFilters: [IsNotNull(video_id)], ReadSchema: struct<video_id:string,comment_text:string,likes:int,replies:int>
   +- *(4) Sort [video_id#21 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(video_id#21, 200), ENSURE_REQUIREMENTS, [id=#282]
         +- *(3) Project 

In [None]:
%%time
test_join = (
    comments
    .join(f.broadcast(videos_tg), on=["video_id"], how="right")
).show(2)

+-----------+--------------------+-----+-------+--------------------+-------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|        comment_text|likes|replies|               title|channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-----+-------+--------------------+-------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|K4wEI5zhHB0|                null| null|   null|iPhone X — Introd...|        Apple|         28|[Apple, iPhone 10...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|Love you Pewdiepi...|    0|      0|         My Response|    PewDiePie|         22|            [[none]]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
+-----------+--------------------+-----+-------+--------------------+-------------+-------

Из плана запроса видно что broadcast не сработал и выполнился повторно SortMergeJoin

In [None]:
%%time
test_join = (
    videos_tg
    .join(f.broadcast(comments), on=["video_id"], how="left")
).explain()

== Physical Plan ==
*(2) Project [video_id#21, title#22, channel_title#23, category_id#24, tags#195, views#26, likes#27, dislikes#28, comment_total#29, thumbnail_link#30, date#31, comment_text#227, likes#228, replies#229]
+- *(2) BroadcastHashJoin [video_id#21], [video_id#226], LeftOuter, BuildRight, false
   :- *(2) Project [video_id#21, title#22, channel_title#23, category_id#24, UDF(tags#25) AS tags#195, views#26, likes#27, dislikes#28, comment_total#29, thumbnail_link#30, date#31]
   :  +- FileScan csv [video_id#21,title#22,channel_title#23,category_id#24,tags#25,views#26,likes#27,dislikes#28,comment_total#29,thumbnail_link#30,date#31] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<video_id:string,title:string,channel_title:string,category_id:int,tags:string,views:int,li...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),fals

In [None]:
%%time
test_join = (
    videos_tg
    .join(f.broadcast(comments), on=["video_id"], how="left")
).show(2)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+-----+-------+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|        comment_text|likes|replies|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+--------------------+-----+-------+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|at10 30 on the vi...|    0|      1|
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|O M G !😂💚💚💚🔥...|    3|      0|
+-----------+--------------------+----------------+-----------+-----------------

В этом случае broadcast сработал

После применения время выполнения сокрасилось примерно в два раза с `7.9 s` до `4.6 s`

In [None]:
videos.groupBy(f.col('video_id').alias('partition')).agg(f.count("*").alias('partition_size')).sort(f.desc("partition_size")).show()

+-----------+--------------+
|  partition|partition_size|
+-----------+--------------+
|LunHybOKIjU|             8|
|Oo0NJsr5m4I|             8|
|Hlt3rA-oDao|             8|
|odhMmAPDc54|             7|
|WYYvHb03Eog|             7|
|oKzFGhlFqqE|             7|
|UXdbCReBTR8|             7|
|t8sgy0faXyg|             7|
|KCNvREKTnQc|             7|
|DeTu8xSGpEM|             7|
|dInwVhRtN4E|             7|
|8ndhidEmUbI|             7|
|5Xe0Qd6bUFo|             7|
|sjlHnJvXdQs|             7|
|CwLGro-dFWg|             7|
|1QWLyi03twg|             7|
|cLdxuaxaQwc|             7|
|OlI8r3nNUVw|             7|
|iALfvFpcItE|             7|
|ZczwzVhai9E|             7|
+-----------+--------------+
only showing top 20 rows



In [None]:
salted_df = comments.withColumn('salt', (f.rand() * 3).cast('int'))
salted_df.where("video_id == 'XpVt6Z1Gjjo'").show()

+-----------+--------------------+-----+-------+----+
|   video_id|        comment_text|likes|replies|salt|
+-----------+--------------------+-----+-------+----+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|   0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|   0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|   0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|   0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|   2|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|   1|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|   2|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|   2|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|   1|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|   2|
|XpVt6Z1Gjjo|There should be a...|    0|      0|   0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|   0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|   0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|   2|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|   2|
|XpVt6Z1Gjjo|He happy cause he

In [None]:
enriched_df = (
    videos_tg
    .select(
        f.col('*'),
        f.explode(f.array([f.lit(i) for i in [0, 1, 2]])).alias('salt')
    )
)
enriched_df.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|salt|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|   0|
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|   1|
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|   2|
|K4wEI5zhHB0|iPhone X — Introd...|            

In [None]:
%%time
joined_df = (
    salted_df
    .join(enriched_df, on=['video_id', 'salt'], how="left")
).drop('salt').show(2)

+-----------+--------------------+-----+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|        comment_text|likes|replies|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-----+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|5457497|349857|   17479|        52483|https://i.ytimg.c...|14.09|
+-----------+--------------------+-----+-------+--------------------+------

In [None]:
%%time
joined_df = (
    salted_df
    .join(f.broadcast(enriched_df), on=['video_id', 'salt'], how="left")
).drop('salt').show(2)


+-----------+--------------------+-----+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|        comment_text|likes|replies|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-----+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|6232518|373824|   23701|        39144|https://i.ytimg.c...|19.09|
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|6195810|372700|   23509|        39002|https://i.ytimg.c...|18.09|
+-----------+--------------------+-----+-------+--------------------+------

‘Соленый’ join отработал примерно в 3 раза быстрее Wall time: 2.78 s -> Wall time: 7.68 s

In [None]:
%%time
joined_df = (
    salted_df
    .withColumnRenamed('likes','comment_likes')
    .join(enriched_df, on=['video_id', 'salt'], how="left")
).drop('salt')

joined_df.show(2)

+-----------+--------------------+-------------+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|        comment_text|comment_likes|replies|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-------------+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|Logan Paul it's y...|            4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|XpVt6Z1Gjjo|Logan Paul it's y...|            4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|5457497|349857|   17479|        52483|https://i.ytimg.c...|14.09|
+-----------+--------------------+-

In [None]:
%%time
joined_df = (
    salted_df
    .withColumnRenamed('likes','comment_likes')
    .join(f.broadcast(enriched_df), on=['video_id', 'salt'], how="left")
).drop('salt')

joined_df.show(2)

+-----------+--------------------+-------------+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|        comment_text|comment_likes|replies|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+-------------+-------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|Logan Paul it's y...|            4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|6232518|373824|   23701|        39144|https://i.ytimg.c...|19.09|
|XpVt6Z1Gjjo|Logan Paul it's y...|            4|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|[logan paul vlog,...|6195810|372700|   23509|        39002|https://i.ytimg.c...|18.09|
+-----------+--------------------+-

'Соленый' join + broadcast отработал еще быстрее примерно в 2 раза `Wall time: 2.19 s` -> `Wall time: 1.14 s`

In [None]:
def score_count(views: pd.Series, likes: pd.Series, dislikes: pd.Series, comment_likes: pd.Series) -> pd.Series:
    return views + likes - dislikes + comment_likes
score = f.pandas_udf(score_count, returnType=t.LongType())

In [None]:
scored_videos = (
    joined_df
    .withColumn("score", score(f.col("views"), f.col("likes"), f.col("dislikes"), f.col("comment_likes")))
    .drop("views","likes", "dislikes", "comment_likes")
)

In [None]:
scored_videos. \
select("video_id", "category_id", "title", "channel_title", "score"). \
groupBy("video_id", "category_id",).agg(
    f.max("score").alias('score'),
    f.first("title").alias('title'),
    f.first("channel_title").alias('channel_title'),
).sort(f.desc("score")).show()

+-----------+-----------+--------+-------------------------+--------------------+
|   video_id|category_id|   score|                    title|       channel_title|
+-----------+-----------+--------+-------------------------+--------------------+
|MBdVXkSdhwU|         10|43432970|BTS (방탄소년단) 'DNA'...|             ibighit|
|tt2k8PGm-TI|         10|37727136|     ZAYN - Dusk Till ...|            ZaynVEVO|
|LunHybOKIjU|         24|33990127|     Eminem Rips Donal...|         BETNetworks|
|SHq2qrFUlGY|         10|32671746|     Shakira - Perro F...|         shakiraVEVO|
|Q0CbN8sfihY|         24|28518501|     Star Wars: The La...|           Star Wars|
|r9-DM9uBtVI|         24|19477394|     JUSTICE LEAGUE - ...|Warner Bros. Pict...|
|FSOxXsYVJMQ|         10|16079719|BTS (방탄소년단) 'DNA'...|             ibighit|
|GO6qs83CHpc|         25|16007153|     Sneaky toddler st...|The Royal Family ...|
|D59v74k5flU|         22|15919288|     Primitive Technol...|Primitive Technology|
|J_ub7Etch2U|         10|1

In [None]:
category = spark.read.option("multiline","true") .json('./US_category_id.json')

In [None]:
category = category.withColumn("items", f.explode(category.items))

In [None]:
category.printSchema()

root
 |-- etag: string (nullable = true)
 |-- items: struct (nullable = true)
 |    |-- etag: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- kind: string (nullable = true)
 |    |-- snippet: struct (nullable = true)
 |    |    |-- assignable: boolean (nullable = true)
 |    |    |-- channelId: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |-- kind: string (nullable = true)



In [None]:
category.count()

32

In [None]:
category_only = (
    category
    .select(
    f.col("items.id").alias("category_id"),
    f.col("items.snippet.title").alias("category_title"),
    )
)

category_only.show(40, False)

+-----------+---------------------+
|category_id|category_title       |
+-----------+---------------------+
|1          |Film & Animation     |
|2          |Autos & Vehicles     |
|10         |Music                |
|15         |Pets & Animals       |
|17         |Sports               |
|18         |Short Movies         |
|19         |Travel & Events      |
|20         |Gaming               |
|21         |Videoblogging        |
|22         |People & Blogs       |
|23         |Comedy               |
|24         |Entertainment        |
|25         |News & Politics      |
|26         |Howto & Style        |
|27         |Education            |
|28         |Science & Technology |
|29         |Nonprofits & Activism|
|30         |Movies               |
|31         |Anime/Animation      |
|32         |Action/Adventure     |
|33         |Classics             |
|34         |Comedy               |
|35         |Documentary          |
|36         |Drama                |
|37         |Family         

In [None]:
%%time
pre_categories_score = (
    scored_videos
    .join(category_only, on=["category_id"], how="left")
)
pre_categories_score.show()

+-----------+-----------+--------------------+-------+--------------------+----------------+--------------------+-------------+--------------------+-----+-----+--------------------+
|category_id|   video_id|        comment_text|replies|               title|   channel_title|                tags|comment_total|      thumbnail_link| date|score|      category_title|
+-----------+-----------+--------------------+-------+--------------------+----------------+--------------------+-------------+--------------------+-----+-----+--------------------+
|         28|7DV7TS3XB94|Better than sex e...|      0|Melipona Bee Defi...|explorationfilms|[Melipona, Bee, V...|          457|https://i.ytimg.c...|05.10|50965|Science & Technology|
|         28|7DV7TS3XB94|Better than sex e...|      0|Melipona Bee Defi...|explorationfilms|[Melipona, Bee, V...|          456|https://i.ytimg.c...|04.10|50905|Science & Technology|
|         28|7DV7TS3XB94|They didn't have ...|      0|Melipona Bee Defi...|explorationfilm

In [None]:
%%time
pre_categories_score = (
    scored_videos
    .join(f.broadcast(category_only), on=["category_id"], how="left")
)
pre_categories_score.show()

+-----------+-----------+--------------------+-------+--------------------+----------------+--------------------+-------------+--------------------+-----+-------+--------------+
|category_id|   video_id|        comment_text|replies|               title|   channel_title|                tags|comment_total|      thumbnail_link| date|  score|category_title|
+-----------+-----------+--------------------+-------+--------------------+----------------+--------------------+-------------+--------------------+-----+-------+--------------+
|         24|XpVt6Z1Gjjo|Logan Paul it's y...|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|[logan paul vlog,...|        39144|https://i.ytimg.c...|19.09|6582645| Entertainment|
|         24|XpVt6Z1Gjjo|Logan Paul it's y...|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|[logan paul vlog,...|        39002|https://i.ytimg.c...|18.09|6545005| Entertainment|
|         24|XpVt6Z1Gjjo|Logan Paul it's y...|      0|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|[logan paul vlog,.

После применения broadcast время выполнения сокрасилось примерно в три раза с `6.61 s` по `2.89 s`

In [None]:
import statistics
# Функция для расчета медианы из списка сгруппированных значений
median_udf = f.udf(lambda x: statistics.median(x) / 1, t.DoubleType())

In [None]:
%%time
pre_categories_score.groupBy(f.col("category_id")).agg(median_udf(f.collect_list(f.col('score'))).alias('median')).show()

+-----------+---------+
|category_id|   median|
+-----------+---------+
|         28| 765496.0|
|         26| 350734.0|
|         27| 285455.0|
|         22| 460454.0|
|          1| 933881.0|
|         20| 307766.0|
|         19| 273036.0|
|         15| 374138.0|
|         43|   8606.0|
|         17| 315190.0|
|         23|1127601.0|
|         10| 308850.0|
|         25| 273801.0|
|         24| 497289.0|
|         29|  66828.0|
|          2| 754508.0|
+-----------+---------+

CPU times: user 205 ms, sys: 37.6 ms, total: 242 ms
Wall time: 35.9 s


In [None]:
def pd_median(score: pd.Series) -> float:
    return score.median()
median = f.pandas_udf(pd_median, returnType=t.DoubleType())

In [None]:
%%time
pre_categories_score.groupBy(f.col("category_id")).agg(median(f.col('score')).alias('median')).show()

+-----------+---------+
|category_id|   median|
+-----------+---------+
|         28| 765496.0|
|         26| 350734.0|
|         27| 285455.0|
|         22| 460454.0|
|          1| 933881.0|
|         20| 307766.0|
|         19| 273036.0|
|         15| 374138.0|
|         43|   8606.0|
|         17| 315190.0|
|         23|1127601.0|
|         10| 308850.0|
|         25| 273801.0|
|         24| 497289.0|
|         29|  66828.0|
|          2| 754508.0|
+-----------+---------+

CPU times: user 78.6 ms, sys: 16.2 ms, total: 94.8 ms
Wall time: 10.4 s


После подсказки, функция заработала. После применения pandas_udf время выполнения сокрасилось в 3,5 раза с `35.9 s` до `10.4 s`

In [None]:
categories_score = (
    pre_categories_score
    .groupBy(f.col("category_id"))
    .agg(
        median_udf(f.collect_list(f.col('score'))).alias('median'),
        f.first("video_id").alias('video_id'),
        f.first("title").alias('title'),
        f.first("channel_title").alias('channel_title'),
        f.first("thumbnail_link").alias('thumbnail_link'),
    )
)
categories_score.sort(f.desc("median")).show()

+-----------+---------+-----------+------------------------------------+--------------------+--------------------+
|category_id|   median|   video_id|                               title|       channel_title|      thumbnail_link|
+-----------+---------+-----------+------------------------------------+--------------------+--------------------+
|         23|1127601.0|sjlHnJvXdQs|                   iPhone X (parody)|          jacksfilms|https://i.ytimg.c...|
|          1| 933881.0|cMKX2tE5Luk|                The Disaster Arti...|                 A24|https://i.ytimg.c...|
|         28| 765496.0|WYYvHb03Eog|                Apple iPhone X fi...|           The Verge|https://i.ytimg.c...|
|          2| 754508.0|NzRuDD0iYC0|                Last Week Tonight...|        DeathByPixel|https://i.ytimg.c...|
|         24| 497289.0|XpVt6Z1Gjjo|                1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|https://i.ytimg.c...|
|         22| 460454.0|cLdxuaxaQwc|                         My Response|        

In [None]:
scored_videos.select("video_id", "tags").distinct().show()

+-----------+--------------------+
|   video_id|                tags|
+-----------+--------------------+
|Cz3Coxi9D3E|[Jacob, Banks, Un...|
|JvkLPE8efME|[DESI PERKINS, TH...|
|LTsSGSi9mqo|[Gadgets, Kitchen...|
|MNst7laHZGg|[Morning Joe, Joe...|
|j9UGpBZz7WU|[catalonia, catal...|
|thD6TNUoyIk|[andrew, huang, a...|
|VH1RwkrHPvA|[Canelés, cannele...|
|ql0Op1VcELw|[plane, of, the, ...|
|3hq2c9vdfls|[cal, berkeley, g...|
|q9o_VjdugHw|            [[none]]|
|I_JtJHJ4384|       [madison.com]|
|Q3Rq-irFn1w|[beauty, how to, ...|
|an5vV5aw74I|[screen junkies, ...|
|aVsOXRgjeeU|[Pitch Perfect, m...|
|zWxo4uZW-1I|[makeup, tutorial...|
|HUUsEgAe8x4|[venomous, toxic,...|
|GWKI-CoCCTU|[nile wilson, nil...|
|Q0CbN8sfihY|[star wars, the l...|
|A-fofQ9VpPQ|[Netflix, Trailer...|
|MMEgnbIVWXc|[Frankie Muniz, W...|
+-----------+--------------------+
only showing top 20 rows



In [None]:
popular_tags = (
    scored_videos
    .select("video_id", "tags")
    .withColumn("tag", f.explode(f.col("tags")))
    .groupBy("video_id", "tag")
    .count()
)

In [None]:
popular_tags.sort(f.desc("count")).show()

+-----------+--------------------+-----+
|   video_id|                 tag|count|
+-----------+--------------------+-----+
|CsdzflTXBVQ|     Awkward Puppets| 8400|
|yIq-N7m0upU|        Anwar Jibawi| 7200|
|4X6a3G_0HjY|  Entertainment News| 5600|
|XpVt6Z1Gjjo|              logang| 5600|
|sjlHnJvXdQs|            iphone x| 5600|
|XpVt6Z1Gjjo|     youtube history| 5600|
|XpVt6Z1Gjjo|     logan paul vlog| 5600|
|XpVt6Z1Gjjo|logan paul 1 year...| 5600|
|WYYvHb03Eog|      iPhone X specs| 5600|
|XpVt6Z1Gjjo|       logang 4 life| 5600|
|4X6a3G_0HjY| bella hadid fashion| 5600|
|4X6a3G_0HjY|     bella hadid hot| 5600|
|4X6a3G_0HjY|       Entertainment| 5600|
|WYYvHb03Eog|iPhone X release ...| 5600|
|WYYvHb03Eog|         iphone 2017| 5600|
|XpVt6Z1Gjjo|          10M plaque| 5600|
|4X6a3G_0HjY|bella hadid insta...| 5600|
|XpVt6Z1Gjjo|  logan paul youtube| 5600|
|4X6a3G_0HjY|                 TMZ| 5600|
|XpVt6Z1Gjjo|              comedy| 5600|
+-----------+--------------------+-----+
only showing top

In [None]:
cat_df = (
    joined_df
    .where(f.array_contains(f.col("tags"), "cat"))
    .select("video_id", "comment_text", "comment_likes")
    .groupBy("video_id", "comment_text")
    .agg(f.max("comment_likes").alias("max_likes"))
    .sort(f.desc("max_likes"))
)

In [None]:
cat_df.show(5, False)

+-----------+------------------------------------------------------------------------------------------------------+---------+
|video_id   |comment_text                                                                                          |max_likes|
+-----------+------------------------------------------------------------------------------------------------------+---------+
|-1fzGnFwz9M|I make interesting cartoons and I need your help! Go to the channel, rate my work!                    |839      |
|tp9aQXDFHbY|Make sure to check back next Friday as we are launching our brand new animated HALLOWEEN special! 🐱🕷|304      |
|tp9aQXDFHbY|1:51 so your nuts are your most prized possession?                                                    |100      |
|tp9aQXDFHbY|If Simon will be make animation movie of Simons Cat adventures, I’ll go cinemas to watch it 😻        |37       |
|Vjc459T6wX8|How does Mugumogu not collapse in a heap of laughter?!! Maru's liquified form is hilarious!          

In [None]:
cat_df.count()

1737

In [None]:
import math
import mmh3
from bitarray import bitarray


class BloomFilter(object):

    '''
    Class for Bloom filter, using murmur3 hash function
    '''

    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count

        # False possible probability in decimal
        self.fp_prob = fp_prob

        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)

        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)

        # Bit array of given size
        self.bit_array = bitarray(self.size)

        # initialize all bits as 0
        self.bit_array.setall(0)

    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):

            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)

            # set the bit True in bit_array
            self.bit_array[digest] = True

    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom

    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:

                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True

    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter

    def set_bit_array(self, bit_array):
        self.bit_array = bit_array

    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)

    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)

        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

In [None]:
from random import shuffle

n = 20 #no of items to add
p = 0.05 #false positive probability

bloomf1 = BloomFilter(n,p)
bloomf2 = BloomFilter(n,p)

# words to be added
word_present1 = ['abound','abounds','abundance','abundant','accessible',
                'bloom','blossom','bolster','bonny','bonus','bonuses']

# words to be added
word_present2 = ['coherent','cohesive','colorful','comely','comfort',
                'gems','generosity','generous','generously','genial']

# word not added
word_absent = ['bluff','cheater','hate','war','humanity',
               'racism','hurt','nuke','gloomy','facebook',
               'geeksforgeeks','twitter']

for item in word_present1:
    bloomf1.add(item)

for item in word_present2:
    bloomf2.add(item)

bloomf = bloomf1.union(bloomf2)

shuffle(word_present1)
shuffle(word_present2)
shuffle(word_absent)

test_words = word_present1 + word_present2 + word_absent
shuffle(test_words)
for word in test_words:
    if bloomf.check(word):
        if word in word_absent:
            print("'{}' is a false positive!".format(word))
        else:
            print("'{}' is probably present!".format(word))
    else:
        print("'{}' is definitely not present!".format(word))

'bonuses' is probably present!
'war' is definitely not present!
'geeksforgeeks' is definitely not present!
'bonny' is probably present!
'abundance' is probably present!
'blossom' is probably present!
'hurt' is definitely not present!
'bloom' is probably present!
'humanity' is a false positive!
'comfort' is probably present!
'bonus' is probably present!
'abound' is probably present!
'bluff' is definitely not present!
'genial' is probably present!
'racism' is definitely not present!
'generosity' is probably present!
'comely' is probably present!
'colorful' is probably present!
'bolster' is probably present!
'abundant' is probably present!
'generous' is probably present!
'cohesive' is probably present!
'nuke' is definitely not present!
'cheater' is definitely not present!
'coherent' is probably present!
'hate' is definitely not present!
'facebook' is definitely not present!
'generously' is probably present!
'gems' is probably present!
'twitter' is a false positive!
'accessible' is probabl

In [None]:
sample = joined_df.select("video_id", "comment_text", "comment_likes", "tags").distinct().where(f.col("video_id").isin(["XpVt6Z1Gjjo","WYYvHb03Eog","cMKX2tE5Luk","_ANP3HR1jsM"]))

In [None]:
videos_tg.select("video_id", "tags").where(f.array_contains(f.col("tags"), "cat")).count()

48

In [None]:
cat_only_df = videos_tg.select("video_id", "tags").where(f.array_contains(f.col("tags"), "cat"))

In [None]:
# Создаем фильтр

filterSize = 2945276
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items:
        bf.add(str(i[0]))
    return bf

bloom_filter = BloomFilter(filterSize, prob)

general_bit_array = cat_only_df.select(f.col('video_id')).rdd \
    .mapPartitions(lambda p: [fill_bloom_filter(BloomFilter(filterSize, prob), p).bit_array]) \
    .reduce(lambda a, b: a.bit_array | b.bit_array)  # так падает ошибка что у a/b нет .bit_array     .reduce(lambda a, b: a | b)
bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = f.udf(lambda tags: bloom_filter.check(str(tags)))

In [None]:
# Используем фильтр
blum_cat_df = (
    comments
    .filter(maybe_in_bf(f.col('video_id')) == True)
    .join(videos_tg, on=['video_id'], how="left")
    .select("video_id", "comment_text", comments.likes)
    .groupBy("video_id", "comment_text")
    .agg(f.max("likes").alias("max_likes"))
    .sort(f.desc("max_likes"))
)

In [None]:
blum_cat_df.count()

1737

In [None]:
blum_cat_df.show(5, False)

+-----------+------------------------------------------------------------------------------------------------------+---------+
|video_id   |comment_text                                                                                          |max_likes|
+-----------+------------------------------------------------------------------------------------------------------+---------+
|-1fzGnFwz9M|I make interesting cartoons and I need your help! Go to the channel, rate my work!                    |839      |
|tp9aQXDFHbY|Make sure to check back next Friday as we are launching our brand new animated HALLOWEEN special! 🐱🕷|304      |
|tp9aQXDFHbY|1:51 so your nuts are your most prized possession?                                                    |100      |
|tp9aQXDFHbY|If Simon will be make animation movie of Simons Cat adventures, I’ll go cinemas to watch it 😻        |37       |
|Vjc459T6wX8|How does Mugumogu not collapse in a heap of laughter?!! Maru's liquified form is hilarious!          