In [79]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pandas as pd
import timeit
import time

In [80]:
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import col

In [81]:
def timing(f):
    def wrap(*args, **kwargs):
        time1 = time.time()
        ret = f(*args, **kwargs)
        time2 = time.time()
        print('{:s} function took {:.3f} ms'.format(f.__name__, (time2-time1)*1000.0))

        return ret
    return wrap

In [82]:
@timing
def dfCollect( df ):
  list = df.collect()

@timing
def dfShow( df ):
  df.show()

@timing
def dfCount( df ):
  print( df.count() )

In [83]:
spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()
spark

In [84]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()    # 7998

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [85]:
categories = spark.read.option("multiline","true").json("../datasets/US_category_id.json")\
                .select( explode("items")).select('col.kind','col.etag','col.id','col.snippet.channelId','col.snippet.title','col.snippet.assignable')
categories.show()

+--------------------+--------------------+---+--------------------+--------------------+----------+
|                kind|                etag| id|           channelId|               title|assignable|
+--------------------+--------------------+---+--------------------+--------------------+----------+
|youtube#videoCate...|"m2yskBQFythfE4ir...|  1|UCBR8-60-B28hp2Bm...|    Film & Animation|      true|
|youtube#videoCate...|"m2yskBQFythfE4ir...|  2|UCBR8-60-B28hp2Bm...|    Autos & Vehicles|      true|
|youtube#videoCate...|"m2yskBQFythfE4ir...| 10|UCBR8-60-B28hp2Bm...|               Music|      true|
|youtube#videoCate...|"m2yskBQFythfE4ir...| 15|UCBR8-60-B28hp2Bm...|      Pets & Animals|      true|
|youtube#videoCate...|"m2yskBQFythfE4ir...| 17|UCBR8-60-B28hp2Bm...|              Sports|      true|
|youtube#videoCate...|"m2yskBQFythfE4ir...| 18|UCBR8-60-B28hp2Bm...|        Short Movies|     false|
|youtube#videoCate...|"m2yskBQFythfE4ir...| 19|UCBR8-60-B28hp2Bm...|     Travel & Events|  

In [86]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show() #691722

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

In [87]:
#
# 1. scored_videos
#
# Выбор джойна:
#    * Broadcast не подходит из-за больших рекордсетов.
#    * (Пред)разбиение на партиции с вменяемым их количеством по осмысленному ключу не очень подходит из-за отсутсвия такого ключа.
#      Ключ video_id высокоселективный. Соление тоже не походит, т.к. оно работает для разрезания больших партиций.
#    * Ключ высокоселективный, но Блум не походит, т.к. отсечь лишнего из датасетов нечего - любой video_id есть и там, и там.
#    * Остается бакетирование, что по сути - партиционирование по хэшу, т.е. похоже на партиции.
#
# В изначальном рекордсете videos нет уникальности по video_id. Туда как бы время от времени записываются обновленные статистики по просмотрам.
#   Я не удержался и сделал предварительную очистку video - беру строку с максимальным числом просмотров, что очевидно - самая последняя версия статистики.
#   В рекордсете comments похожий артефакт в данных, но его я уже не стал очищать, т.к. цель курса не в этом.
#
# Кстати, в условии задачи: "Но она должна включать в себя просмотры, лайки, дизлайки видео, лайки и дизлайки к комментариям к этому видео."
#   К сожалению, дизлайков к комментариям в рекордсете нет, только количество ответов на коммент. Но на смысл это особо не влияет.
#

In [88]:
vid = Window.partitionBy("video_id").orderBy(col("views").desc())
videos_prep = ( 
    videos.select("video_id","category_id","tags",
      row_number().over( vid ).alias("rn_views"),
      col("views").alias("v_views"),         col("likes").alias("v_likes"),
      col("dislikes").alias("v_dislikes"),   col("comment_total").alias("v_comm_tot"),
    )
    .where( (col("rn_views") == lit(1))  &  (col("v_views") > lit(0)) )
)

In [92]:
# rm -r work/spark-warehouse/videos_bucket
# rm -r work/spark-warehouse/comments_bucket

videos_prep.repartition(1).write \
    .bucketBy( 16, 'video_id') \
    .saveAsTable('videos_bucket', format='csv', mode='overwrite')

comments.write \
    .bucketBy( 16, 'video_id') \
    .saveAsTable('comments_bucket', format='csv', mode='overwrite')

In [93]:
videos_bucketed = spark.table('videos_bucket')
comments_bucketed = spark.table('comments_bucket')

In [94]:
# Бакетированный вариант (с предварительной очисткой).

scored_videos = ( 
  videos_bucketed
  .join( comments_bucketed,'video_id','left') 
  .groupBy("video_id","category_id","v_likes","v_views","v_dislikes","v_comm_tot")
    .agg(
      count( lit(1) ).alias("com_count"),
      sum("likes").alias("com_likes"),
      sum("replies").alias("com_replies")
    )
  .withColumn("views_all", col("v_views") + col("v_comm_tot") + col("com_likes") + col("com_replies") )
  .withColumn("likes_vws", (col("v_likes") / (col("views_all")) * 100)) 
  .withColumn("disl_vws", (col("v_dislikes") / col("views_all") * 100)) 
  .withColumn("score", (col("likes_vws") - col("disl_vws")) * (sqrt( col("views_all") + lit(10000) ) / 100) )
)

In [95]:
# Небакетированный вариант с очисткой вместе.
vid = Window.partitionBy("video_id").orderBy(col("views").desc())

scored_videos_nonbucketed = ( 
    videos.select("video_id","category_id",
      row_number().over( vid ).alias("rn_views"),
      col("views").alias("v_views"),         col("likes").alias("v_likes"),
      col("dislikes").alias("v_dislikes"),   col("comment_total").alias("v_comm_tot")
    )
    .where( (col("rn_views") == lit(1))  &  (col("v_views") > lit(0)) )
  .join( comments,'video_id','left') 
  .groupBy("video_id","category_id","v_likes","v_views","v_dislikes","v_comm_tot")
    .agg(
      count( lit(1) ).alias("com_count"),
      sum("likes").alias("com_likes"),
      sum("replies").alias("com_replies")
    )
  .withColumn("views_all", col("v_views") + col("v_comm_tot") + col("com_likes") + col("com_replies") )
  .withColumn("likes_vws", (col("v_likes") / (col("views_all")) * 100)) 
  .withColumn("disl_vws", (col("v_dislikes") / col("views_all") * 100)) 
  .withColumn("score", (col("likes_vws") - col("disl_vws")) * (sqrt( col("views_all") + lit(10000) ) / 100) )
)

In [96]:
print("Bucketed:\n")
dfShow( scored_videos )
print("\n\nNon-bucketed:\n")
dfShow( scored_videos_nonbucketed )

Bucketed:

+-----------+-----------+-------+-------+----------+----------+---------+---------+-----------+---------+------------------+--------------------+------------------+
|   video_id|category_id|v_likes|v_views|v_dislikes|v_comm_tot|com_count|com_likes|com_replies|views_all|         likes_vws|            disl_vws|             score|
+-----------+-----------+-------+-------+----------+----------+---------+---------+-----------+---------+------------------+--------------------+------------------+
|--JinobXWPk|         15|  38949|1319945|       533|      6768|      100|       19|          1|  1326733|2.9357074859824848| 0.04017387070344975| 33.47734022533458|
|-LoSw4o2zDQ|         26|   4715|  62600|        29|       483|      300|      567|        161|    63811| 7.389008164736488|0.045446709814922194| 19.95111988385199|
|0lDRz8qmXpE|         23|  24509| 467127|      2302|      3058|      300|      102|         31|   470318| 5.211155005762059|  0.4894560701482827| 32.72372416871783|

In [97]:
#
# 2. categories_score
#
# Выбор джойна: Broadcast из-за маленького categories

In [98]:
@pandas_udf(DoubleType())
def pandas_median_udf( agg_df: pd.Series) -> float:
    return agg_df.mean()

In [99]:
categories_bcs = broadcast( categories ).select("id","title")

categories_score = (
    scored_videos.select("video_id","category_id","score")
    .join( categories, categories_bcs['id'] == scored_videos['category_id'])
    .groupby("category_id","title")
      .agg( pandas_median_udf( col("score")).alias("category_score"))
    .drop("category_id")
    .sort( col("category_score").desc() )
)

categories_score.show()

+--------------------+------------------+
|               title|    category_score|
+--------------------+------------------+
|              Comedy|40.181263578596386|
|               Music|39.092901033993016|
|       Howto & Style|30.475138427851846|
|      People & Blogs|25.805464187614252|
|       Entertainment| 20.65546215537229|
|           Education|19.839263917424503|
|    Film & Animation|18.761611241549065|
|Science & Technology| 18.05447175762292|
|      Pets & Animals|17.616609649290506|
|              Gaming|17.072251620557175|
|     Travel & Events|14.206129734242946|
|    Autos & Vehicles|10.841859102784374|
|              Sports|  8.37007224239672|
|     News & Politics| 4.207622322354988|
|Nonprofits & Acti...| 2.695436482782739|
|               Shows|1.6231780562460565|
+--------------------+------------------+



In [100]:
#
# 3. popular_tags
#
# К моему удивлению, кардинальной разницы в скорости между разными вариантами UDF мной не замечено.
# Вероятно, во всех случаях рекордсеты обрабатываются в одном массиве без переключения контекстов.

In [101]:
def udfSplitScalaWrapper(field):
  _scala_split = spark._jvm.CustomUDFs.splitTagsUDF()
  return Column( _scala_split.apply( _to_seq( spark, [field], _to_java_column)))

In [102]:
def split_func(a: pd.Series) -> pd.Series:
  return a.str.split("|")
udfSplitPandasWrapper = pandas_udf( split_func, returnType = ArrayType( StringType()))

In [103]:
def split_str(s):
  return s.split("|")
udfSplitPythonWrapper = udf( split_str, ArrayType( StringType()))

In [104]:
popular_tags_scala = videos.select( explode( udfSplitScalaWrapper( col('tags'))).alias("tag") ).groupby( col('tag') ).count().sort(col("count").desc())
popular_tags_pandas = videos.select( explode( udfSplitPandasWrapper( col('tags'))).alias("tag") ).groupby( col('tag') ).count().sort(col("count").desc())
popular_tags_python = videos.select( explode( udfSplitPythonWrapper( col('tags'))).alias("tag") ).groupby( col('tag') ).count().sort(col("count").desc())

print("Scala:\n")
dfCollect( popular_tags_scala )
dfShow( popular_tags_scala )
print("\n\nPandas:\n")
dfCollect( popular_tags_pandas )
dfShow( popular_tags_pandas )
print("\n\nPython:\n")
dfCollect( popular_tags_python )
dfShow( popular_tags_python )

Scala:

dfCollect function took 3175.726 ms
+---------+-----+
|      tag|count|
+---------+-----+
|    funny|  722|
|   comedy|  572|
|   [none]|  491|
|     2017|  309|
|   how to|  284|
|     vlog|  273|
|    humor|  258|
|   makeup|  254|
|    music|  250|
| tutorial|  235|
|     food|  224|
|    video|  219|
|   review|  218|
|celebrity|  211|
|     news|  211|
|   beauty|  210|
|interview|  209|
|  science|  197|
|      Pop|  190|
|  trailer|  180|
+---------+-----+
only showing top 20 rows

dfShow function took 1074.809 ms


Pandas:

dfCollect function took 3081.661 ms
+---------+-----+
|      tag|count|
+---------+-----+
|    funny|  722|
|   comedy|  572|
|   [none]|  491|
|     2017|  309|
|   how to|  284|
|     vlog|  273|
|    humor|  258|
|   makeup|  254|
|    music|  250|
| tutorial|  235|
|     food|  224|
|    video|  219|
|   review|  218|
|celebrity|  211|
|     news|  211|
|   beauty|  210|
|interview|  209|
|  science|  197|
|      Pop|  190|
|  trailer|  180|
+---

In [105]:
#
# 4. cat_top5_comments
#
#  *  Выбор джойна: соединяются два больших рекордсета, наподобие, как в п.1 (scored_videos), так что все почти рассуждения справедливы и здесь. За исключением
#      фильтра Блума, т.к. в итоге надо соединить большУю, но все же подчасть videos - те, которые с котиками.
#  *  "хочет найти самые интересные комментарии (топ-5)" - интересность я определяю, как сумму лайков и реплаев.
#
#  * В случае реализации Блум-фильтра возникает ошибка, с которой, к сожалению, я не смоог соаладать (см. комментарий ниже)
#

In [106]:
# Без Блюма
cat_top5_comments_bucketed = (
    videos_bucketed.select("video_id", explode( udfSplitScalaWrapper( col('tags'))).alias("tag") )
    .groupBy("video_id")
    .agg(
      sum( expr("case when tag like '%cat%' then 1 else 0 end")).alias("cat_tag")
    )
    .where( col("cat_tag") > lit(0))
    .join( comments_bucketed,'video_id','left')
    .sort( expr("likes + replies").desc())
    .limit(5)
)

print("Без Блума:\n")
dfShow( cat_top5_comments_bucketed )

Без Блума:

+-----------+-------+--------------------+-----+-------+
|   video_id|cat_tag|        comment_text|likes|replies|
+-----------+-------+--------------------+-----+-------+
|eJB1gcydsbU|      1|thnks s mch fr wt...| 9197|    351|
|C25qzDhGLx8|      1|Whenever Kurzgesa...| 4541|    243|
|C25qzDhGLx8|      1|Whenever Kurzgesa...| 3648|    190|
|rl4ofGm2aNI|      2|Who do you think ...| 2592|    382|
|rl4ofGm2aNI|      2|Who do you think ...| 2529|    371|
+-----------+-------+--------------------+-----+-------+

dfShow function took 1979.245 ms


In [107]:
# !pip install --no-cache-dir mmh3 bitarray

In [108]:
import math
import mmh3
from bitarray import bitarray
  
class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

In [109]:
cats_videos = (
    videos_bucketed.select("video_id", explode( udfSplitScalaWrapper( col('tags'))).alias("tag") )
    .groupBy("video_id")
    .agg(
      sum( expr("case when tag like '%cat%' then 1 else 0 end")).alias("cat_tag")
    )
    .where( col("cat_tag") > lit(0))
    .persist(pyspark.StorageLevel.DISK_ONLY)
)
#cats_videos.show()

In [110]:
# Создаем фильтр

filterSize = 3177
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items:
        bf.add( i[0] )
        #print ( i[0])
    return bf

bloom_filter = BloomFilter( filterSize, prob)

In [111]:
g = fill_bloom_filter( BloomFilter( filterSize, prob), ('GuEQtn2nm9A','xwW0VfkFljE','DUMEmGUc7yc'))
len( g.bit_array )

19809

In [112]:
#
#
# Здесь возникает ошибка, с которой, к сожалению, я не смоог соаладать. Почему-то в reduce() после map() приезжают объекты Блум-фильтров
#   без свойства bit_array, т.е. как-будто бы нулевые и не прошедшие функцию fill_bloom_filter(). Которая сама по себе работает на первый взгляд
#   нормально (см. предыдущую ячейку)   
#

In [113]:
general_bit_array = cats_videos.select(col('video_id')).rdd \
    .mapPartitions(lambda p: [fill_bloom_filter( BloomFilter( filterSize, prob), p).bit_array]) \
    .reduce(lambda a, b: a | b)

bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = udf(lambda video_id: bloom_filter.check( video_id))

In [114]:
comments_bloomed = comments_bucketed.filter( maybe_in_bf( col('video_id')) == True)

print(f"Количество комментов всего: {comments_bucketed.count()}")
print(f"\nКоличество комментов, прошедших Блум: {comments_bloomed.count()}")

Количество комментов всего: 691318

Количество комментов, прошедших Блум: 38237


In [115]:
# С Блумом
cat_top5_comments_bloomed = (
    videos_bucketed.select("video_id", explode( udfSplitScalaWrapper( col('tags'))).alias("tag") )
    .groupBy("video_id")
    .agg(
      sum( expr("case when tag like '%cat%' then 1 else 0 end")).alias("cat_tag")
    )
    .where( col("cat_tag") > lit(0))
    .join( comments_bloomed,'video_id','left')
    .sort( expr("likes + replies").desc())
    .limit(5)
)

In [116]:
print("C Блумом:\n")
dfShow( cat_top5_comments_bucketed )

C Блумом:

+-----------+-------+--------------------+-----+-------+
|   video_id|cat_tag|        comment_text|likes|replies|
+-----------+-------+--------------------+-----+-------+
|eJB1gcydsbU|      1|thnks s mch fr wt...| 9197|    351|
|C25qzDhGLx8|      1|Whenever Kurzgesa...| 4541|    243|
|C25qzDhGLx8|      1|Whenever Kurzgesa...| 3648|    190|
|rl4ofGm2aNI|      2|Who do you think ...| 2592|    382|
|rl4ofGm2aNI|      2|Who do you think ...| 2529|    371|
+-----------+-------+--------------------+-----+-------+

dfShow function took 1786.810 ms


In [78]:
spark.stop()