# Подключаем библиотеки

In [2]:
!pip install --no-cache-dir mmh3 bitarray

Collecting mmh3
  Downloading mmh3-4.0.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (68 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.3/68.3 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting bitarray
  Downloading bitarray-2.7.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (282 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m282.8/282.8 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: mmh3, bitarray
Successfully installed bitarray-2.7.4 mmh3-4.0.0


In [32]:
import json
import math
import mmh3
import numpy as np
import os
import pandas as pd
import pyspark
import pyspark.pandas as ps
import random

from bitarray import bitarray
from pyspark.sql import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Читаем данные

Оптимизация

1. На видео и комментарии вешаем bucketing по video id - по этому полю происходит join чаще всего, и подобный подход позволяет избежать практически всех Exchange за исключением случаев, например, с groupby по другой колонке.
2. На категории вешаем `broadcast` - маленький справочник
3. Другие вариант оптимизации нерентабельны для соления нет значительного перекоса, фильтр Блума практически ничего не отфильтрует - джоин видео и комментариев не так много строк отбрасывает

In [4]:
spark = SparkSession.builder.master('local') \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [6]:
# videos = spark.read\
#     .option('header', 'true')\
#     .option('inferSchema', 'true')\
#     .csv('datasets/USvideos.csv')

videos_raw = spark.read\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv('datasets/USvideos.csv')
videos_raw.write\
    .bucketBy(128, 'video_id')\
    .saveAsTable('USvideos', format = 'csv', mode = 'overwrite')
videos = spark.table('USvideos')

In [7]:
videos_sdim_df = videos\
    .withColumn('rnk', row_number()\
                .over(Window.partitionBy('video_id').orderBy(col('date').desc(), col('views').desc()))\
                )\
                .filter(col('rnk') == 1)\
                .drop(col('rnk'))

In [8]:
comments_schema = StructType([ \
    StructField('video_id', StringType(), True), \
    StructField('comment_text', StringType(), True), \
    StructField('likes', IntegerType(), True), \
    StructField('replies', IntegerType(), True)])

# comments = spark.read.option('header', 'true').option('mode', 'DROPMALFORMED').schema(comments_schema).csv('datasets/UScomments.csv')

comments_raw = spark.read.option('header', 'true').option('mode', 'DROPMALFORMED').schema(comments_schema).csv('datasets/UScomments.csv')

comments_raw.write \
    .bucketBy(128, 'video_id') \
    .saveAsTable('USComments', fprmat = 'csv', mode = 'overwrite')

comments = spark.table('USComments')

# Считаем рейтинг видео

In [9]:
@udf(returnType = DoubleType())
def calculate_video_score(views: int, likes: int, dislikes: int,comment_likes_count: int):
    return\
        int(likes or 0)\
        + random.random() * int(views or 0) * 0.2\
        - 2 * int(dislikes or 0)\
        + int(comment_likes_count or 0) * 0.01


In [10]:
@pandas_udf(DoubleType(),PandasUDFType.SCALAR)
def calculate_video_score(views, likes, dislikes,comment_likes_count):
    return\
        likes.fillna(0)\
        + np.random.rand(*views.shape) * views.fillna(0) * 0.2\
        - 2 * dislikes.fillna(0)\
        + comment_likes_count.fillna(0) * 0.01




In [11]:
video_comments_figures = comments.groupBy('video_id')\
    .agg(\
        sum('likes').alias('comment_likes_count'),\
        sum('replies').alias('comment_replies_count')\
        )

In [12]:
scored_video = videos_sdim_df.alias('videos')\
    .join(video_comments_figures, videos_sdim_df.video_id == video_comments_figures.video_id, 'left')\
    .select([
        'videos.*',
        col('comment_likes_count')
    ])\
    .withColumn(\
        'video_score',\
        calculate_video_score(\
            col('views'),\
            col('likes'),\
            col('dislikes'),\
            col('comment_likes_count')\
        )\
    )
scored_video.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+-------------------+------------------+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|comment_likes_count|       video_score|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+-------------------+------------------+
|-LoSw4o2zDQ|How to Make Pushe...|    kawaiisweetworld|         26|recipe|recipes|ho...|  62600|  4715|      29|          483|https://i.ytimg.c...|14.10|                567|  7331.97481412826|
|2Hz4sDjXPQY|Kingsman: The Gol...|    20th Century Fox|          1|Puppy|Spy Movies|...| 288222|  2003|      77|          208|https://i.ytimg.c...|21.09|               3692| 45818.36661693138|
|2YGrrsKs-Xg|Jimmy Kimmel’s FU...| 

# Собираем медиану рейтинга по категории

In [13]:
@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def pandas_mean(x):
    return x.mean()


In [14]:
categories_json = json.load(open('datasets/US_category_id.json'))
[x.update(x['snippet']) for x in categories_json['items']]

# categories_df = spark.read.json(\
#         spark.sparkContext.parallelize(categories_json['items']).map(lambda x: json.dumps(x))
#     ).select(['id', 'title'])\
#     .withColumnRenamed('title', 'category_title')

categories_df = broadcast(spark.read.json(\
        spark.sparkContext.parallelize(categories_json['items']).map(lambda x: json.dumps(x))
    ).select(['id', 'title'])\
    .withColumnRenamed('title', 'category_title'))

In [15]:
categories_score_df =  scored_video\
    .join(\
        categories_df,\
        scored_video.category_id == categories_df.id,\
        'left')\
    .groupBy('category_title')\
    .agg(
        pandas_mean(scored_video.video_score).alias('mean_score')
    )
categories_score_df.show()


+--------------------+------------------+
|      category_title|        mean_score|
+--------------------+------------------+
|               Shows| 781.8538583764346|
|           Education| 83200.28397116455|
|              Gaming| 91813.66738934316|
|       Entertainment|129587.28933603792|
|     Travel & Events|49384.036198117385|
|Science & Technology|105232.93772166967|
|              Sports| 77201.76227951267|
|       Howto & Style| 84627.50188521037|
|Nonprofits & Acti...| 26836.66607927107|
|    Film & Animation|117124.36388825392|
|      People & Blogs|118933.22384255001|
|     News & Politics|49529.351386162256|
|      Pets & Animals| 73817.38143531102|
|    Autos & Vehicles| 87624.90155207676|
|               Music| 236634.7083211785|
|              Comedy|185271.31446977268|
+--------------------+------------------+



In [16]:
categories_score_psdf = scored_video\
                .join(\
                    categories_df,\
                    scored_video.category_id == categories_df.id,\
                    'left')\
                .select([
                    categories_df.category_title,
                    scored_video.video_score
                ]).pandas_api()\
                .groupby(['category_title'])['video_score'].mean()
categories_score_psdf.to_frame().to_spark(index_col = 'category_title').show()

+--------------------+------------------+
|      category_title|       video_score|
+--------------------+------------------+
|               Shows|1797.9088336726456|
|           Education| 70727.74417637438|
|              Gaming|  90991.2812355959|
|       Entertainment| 131538.9753914034|
|     Travel & Events| 79585.41706166138|
|Science & Technology| 116646.9337100415|
|              Sports| 74188.48787450088|
|       Howto & Style| 82296.86617911432|
|Nonprofits & Acti...|122660.94911182692|
|    Film & Animation|123743.57479902435|
|      People & Blogs| 110720.6687553905|
|     News & Politics| 47435.46835195115|
|      Pets & Animals| 87043.92337701372|
|    Autos & Vehicles| 69452.26399906151|
|               Music| 165965.0716780983|
|              Comedy|184005.62990291786|
+--------------------+------------------+



In [17]:
categories_score_psdf_full = ps.merge(
    scored_video.pandas_api(),
    categories_df.pandas_api(),
    how = 'inner',
    left_on = 'category_id',
    right_on = 'id').groupby(['category_title'])['video_score'].mean()
categories_score_psdf_full.to_frame().to_spark(index_col = 'category_title').show()

+--------------------+------------------+
|      category_title|       video_score|
+--------------------+------------------+
|               Shows| 452.8823490433658|
|           Education| 54552.37507135073|
|              Gaming| 83466.48427084014|
|       Entertainment|138385.43657490818|
|     Travel & Events|  45424.6745538014|
|Science & Technology|105591.14975591475|
|              Sports| 84113.87940129952|
|       Howto & Style|  87215.9552810169|
|Nonprofits & Acti...|12634.302236719013|
|    Film & Animation|138075.28692698054|
|      People & Blogs|106462.21505400244|
|     News & Politics|46084.764782033744|
|      Pets & Animals| 77713.76606922636|
|    Autos & Vehicles| 64702.59248374513|
|               Music|193810.47583083526|
|              Comedy|203670.40852252278|
+--------------------+------------------+



# Смотрим на популярные тэги

In [18]:
def split_udf(value, separator):
    @udf(returnType = ArrayType(StringType()))
    def split_udf_inner(value):
        return value.split(separator)
    return split_udf_inner(value)

In [19]:
def split_pudf(value, separator):
    @pandas_udf(ArrayType(StringType()), PandasUDFType.SCALAR)
    def split_pudf_inner(value):
        return value.str.split(pat=separator)
    return split_pudf_inner(value)

In [20]:
def split_scala(tags):
    _split_scala_udf = spark._jvm.CustomUDFs.splitTagsUDF()
    return Column(_split_scala_udf.apply(_to_seq(spark, [tags], _to_java_column)))

In [21]:
tags = videos_sdim_df\
    .select([\
        videos_sdim_df.video_id.alias('tag_video_id'),\
        split(videos_sdim_df.tags, '\|').alias('tags_array'),\
        videos_sdim_df.tags]\
    )\
    .select([\
        col('tag_video_id'),\
        explode(col('tags_array')).alias('tag'),\
        col('tags')\
    ])
tags.show()

+------------+----------------+--------------------+
|tag_video_id|             tag|                tags|
+------------+----------------+--------------------+
| -LoSw4o2zDQ|          recipe|recipe|recipes|ho...|
| -LoSw4o2zDQ|         recipes|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to make|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to bake|recipe|recipes|ho...|
| -LoSw4o2zDQ|         cooking|recipe|recipes|ho...|
| -LoSw4o2zDQ|            food|recipe|recipes|ho...|
| -LoSw4o2zDQ|          sweets|recipe|recipes|ho...|
| -LoSw4o2zDQ|         dessert|recipe|recipes|ho...|
| -LoSw4o2zDQ|            cute|recipe|recipes|ho...|
| -LoSw4o2zDQ|            easy|recipe|recipes|ho...|
| -LoSw4o2zDQ|           quick|recipe|recipes|ho...|
| -LoSw4o2zDQ|          kawaii|recipe|recipes|ho...|
| -LoSw4o2zDQ|   kawaii baking|recipe|recipes|ho...|
| -LoSw4o2zDQ|kawaiisweetworld|recipe|recipes|ho...|
| -LoSw4o2zDQ|     kawaii food|recipe|recipes|ho...|
| -LoSw4o2zDQ|       cute food|recipe|recipes|

In [22]:
tags_udf = videos_sdim_df\
    .select([\
        videos_sdim_df.video_id.alias('tag_video_id'),\
        split_udf(videos_sdim_df.tags, '|').alias('tags_array'),\
        videos_sdim_df.tags]\
    )\
    .select([\
        col('tag_video_id'),\
        explode(col('tags_array')).alias('tag'),\
        col('tags')\
    ])
tags_udf.show()

+------------+----------------+--------------------+
|tag_video_id|             tag|                tags|
+------------+----------------+--------------------+
| -LoSw4o2zDQ|          recipe|recipe|recipes|ho...|
| -LoSw4o2zDQ|         recipes|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to make|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to bake|recipe|recipes|ho...|
| -LoSw4o2zDQ|         cooking|recipe|recipes|ho...|
| -LoSw4o2zDQ|            food|recipe|recipes|ho...|
| -LoSw4o2zDQ|          sweets|recipe|recipes|ho...|
| -LoSw4o2zDQ|         dessert|recipe|recipes|ho...|
| -LoSw4o2zDQ|            cute|recipe|recipes|ho...|
| -LoSw4o2zDQ|            easy|recipe|recipes|ho...|
| -LoSw4o2zDQ|           quick|recipe|recipes|ho...|
| -LoSw4o2zDQ|          kawaii|recipe|recipes|ho...|
| -LoSw4o2zDQ|   kawaii baking|recipe|recipes|ho...|
| -LoSw4o2zDQ|kawaiisweetworld|recipe|recipes|ho...|
| -LoSw4o2zDQ|     kawaii food|recipe|recipes|ho...|
| -LoSw4o2zDQ|       cute food|recipe|recipes|

In [23]:
tags_pudf = videos_sdim_df\
    .select([\
        videos_sdim_df.video_id.alias('tag_video_id'),\
        split_pudf(videos_sdim_df.tags, '|').alias('tags_array'),\
        videos_sdim_df.tags]\
    )\
    .select([\
        col('tag_video_id'),\
        explode(col('tags_array')).alias('tag'),\
        col('tags')\
    ])
tags_pudf.show()

+------------+----------------+--------------------+
|tag_video_id|             tag|                tags|
+------------+----------------+--------------------+
| -LoSw4o2zDQ|          recipe|recipe|recipes|ho...|
| -LoSw4o2zDQ|         recipes|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to make|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to bake|recipe|recipes|ho...|
| -LoSw4o2zDQ|         cooking|recipe|recipes|ho...|
| -LoSw4o2zDQ|            food|recipe|recipes|ho...|
| -LoSw4o2zDQ|          sweets|recipe|recipes|ho...|
| -LoSw4o2zDQ|         dessert|recipe|recipes|ho...|
| -LoSw4o2zDQ|            cute|recipe|recipes|ho...|
| -LoSw4o2zDQ|            easy|recipe|recipes|ho...|
| -LoSw4o2zDQ|           quick|recipe|recipes|ho...|
| -LoSw4o2zDQ|          kawaii|recipe|recipes|ho...|
| -LoSw4o2zDQ|   kawaii baking|recipe|recipes|ho...|
| -LoSw4o2zDQ|kawaiisweetworld|recipe|recipes|ho...|
| -LoSw4o2zDQ|     kawaii food|recipe|recipes|ho...|
| -LoSw4o2zDQ|       cute food|recipe|recipes|

In [24]:
tags_scala = videos_sdim_df\
    .select([\
        videos_sdim_df.video_id.alias('tag_video_id'),\
        split_scala(videos_sdim_df.tags).alias('tags_array'),\
        videos_sdim_df.tags]\
    )\
    .select([\
        col('tag_video_id'),\
        explode(col('tags_array')).alias('tag'),\
        col('tags')\
    ])
tags_scala.show()

+------------+----------------+--------------------+
|tag_video_id|             tag|                tags|
+------------+----------------+--------------------+
| -LoSw4o2zDQ|          recipe|recipe|recipes|ho...|
| -LoSw4o2zDQ|         recipes|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to make|recipe|recipes|ho...|
| -LoSw4o2zDQ|     how to bake|recipe|recipes|ho...|
| -LoSw4o2zDQ|         cooking|recipe|recipes|ho...|
| -LoSw4o2zDQ|            food|recipe|recipes|ho...|
| -LoSw4o2zDQ|          sweets|recipe|recipes|ho...|
| -LoSw4o2zDQ|         dessert|recipe|recipes|ho...|
| -LoSw4o2zDQ|            cute|recipe|recipes|ho...|
| -LoSw4o2zDQ|            easy|recipe|recipes|ho...|
| -LoSw4o2zDQ|           quick|recipe|recipes|ho...|
| -LoSw4o2zDQ|          kawaii|recipe|recipes|ho...|
| -LoSw4o2zDQ|   kawaii baking|recipe|recipes|ho...|
| -LoSw4o2zDQ|kawaiisweetworld|recipe|recipes|ho...|
| -LoSw4o2zDQ|     kawaii food|recipe|recipes|ho...|
| -LoSw4o2zDQ|       cute food|recipe|recipes|

## Время выполнения верхних блоков

Отображается в ноутбуке после отработки блока в vscode

| вариант | время в секундах | повторный перезапуск |
| --: | --:| --: |
| split spark| 0.3 | 0.2 |
| split udf| 0.4 | 0.3
| split pandas udf| 0.5 | 0.5 |
| split scala | 0.3 | 0.2 |

In [25]:
popular_tags = tags\
    .filter(col('tag') != '[none]')\
    .groupBy('tag')\
    .agg(
        countDistinct('tag_video_id').alias('video_count')
    )\
    .sort(col('video_count').desc())
popular_tags.show()

+-----------+-----------+
|        tag|video_count|
+-----------+-----------+
|      funny|        217|
|     comedy|        163|
|       2017|         93|
|      humor|         92|
|     how to|         84|
|     makeup|         77|
|      music|         74|
|       vlog|         73|
|      video|         71|
|   tutorial|         69|
|  interview|         69|
|  celebrity|         64|
|     review|         61|
|       news|         61|
|celebrities|         59|
|     beauty|         58|
|       food|         57|
|    science|         56|
|   comedian|         55|
|funny video|         54|
+-----------+-----------+
only showing top 20 rows



# Выбираем топ-5 комментариев по кошачьим видео

## Создаем обвязку для фильтра Блума

In [26]:
class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

## Создаем фильтр

In [27]:
cat_tagged_df = tags.filter(col('tag') == 'cat').cache()

In [37]:
filter_size = cat_tagged_df.count()
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items: bf.add(str(i[0]))
    return bf

bloom_filter = BloomFilter(filter_size, prob)

general_bit_array = cat_tagged_df\
                    .select(col('tag_video_id')).rdd\
                    .mapPartitions(lambda p: [fill_bloom_filter(BloomFilter(filter_size, prob), p).bit_array]) \
                    .reduce(lambda a, b: a | b)

bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = udf(lambda video_id: bloom_filter.check(str(video_id)))

In [40]:
cats_top5_df = videos_sdim_df\
    .select(['video_id', col('title').alias('video_title')])\
    .filter(maybe_in_bf(col('video_id')) == True)\
    .join(cat_tagged_df, videos_sdim_df.video_id == tags.tag_video_id, 'inner')\
    .join(comments.withColumnRenamed('video_id', 'comment_video_id'), col('comment_video_id') == videos_sdim_df.video_id, 'inner')\
    .select([
        col('video_title'),\
        col('comment_text'),\
        col('likes'),\
        row_number().over(Window.partitionBy(col('video_id')).orderBy(col('likes').desc())).alias('row_number')
    ])\
    .filter(col('row_number') <= 5)
print(cats_top5_df.count())
cats_top5_df.show()

+------------------------------------+--------------------+-----+----------+
|                         video_title|        comment_text|likes|row_number|
+------------------------------------+--------------------+-----+----------+
|                  Husky's First Howl|The expression at...|    5|         1|
|                  Husky's First Howl|can I get subtitl...|    3|         2|
|                  Husky's First Howl|This shit hella c...|    2|         3|
|                  Husky's First Howl|There's still hop...|    2|         4|
|                  Husky's First Howl|      How wholesome.|    2|         5|
|                Cats Can Be A Rea...|Make sure to chec...|  304|         1|
|                Cats Can Be A Rea...|Make sure to chec...|  293|         2|
|                Cats Can Be A Rea...|Make sure to chec...|  279|         3|
|                Cats Can Be A Rea...|Make sure to chec...|  259|         4|
|                Cats Can Be A Rea...|Make sure to chec...|  194|         5|

In [None]:
cats_top5_df = videos_sdim_df\
    .select(['video_id', col('title').alias('video_title')])\
    .join(cat_tagged_df, videos_sdim_df.video_id == tags.tag_video_id, 'inner')\
    .join(comments.withColumnRenamed('video_id', 'comment_video_id'), col('comment_video_id') == videos_sdim_df.video_id, 'inner')\
    .select([
        col('video_title'),\
        col('comment_text'),\
        col('likes'),\
        row_number().over(Window.partitionBy(col('video_id')).orderBy(col('likes').desc())).alias('row_number')
    ])\
    .filter(col('row_number') <= 5)
print(cats_top5_df.count())
cats_top5_df.show()