In [90]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()
spark

#### Spark UI:
- http://127.0.0.1:4040/stages/

In [91]:
!du datasets/*

12	datasets/US_category_id.json
71108	datasets/UScomments.csv
2916	datasets/USvideos.csv


In [4]:
import traceback
def get_videos_df():
    videos_df = spark.read.parquet('spark-warehouse/videos')
    videos_df.cache().count(), \
    videos_df.show(5, 55)
    return videos_df

try:
    videos = get_videos_df()
except: traceback.print_exc()

+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|                score|category_name|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      5| 0.004770164787510842|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      3|0.0039028620988725065|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      6| 0.005203816131830009|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
only showing top 5 rows



In [93]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('datasets/USvideos.csv')
# videos.show(3, False, True)
videos.limit(3).toPandas().T

Unnamed: 0,0,1,2
video_id,XpVt6Z1Gjjo,K4wEI5zhHB0,cLdxuaxaQwc
title,1 YEAR OF VLOGGING -- HOW LOGAN PAUL CHANGED Y...,iPhone X — Introducing iPhone X — Apple,My Response
channel_title,Logan Paul Vlogs,Apple,PewDiePie
category_id,24,28,22
tags,logan paul vlog|logan paul|logan|paul|olympics...,Apple|iPhone 10|iPhone Ten|iPhone|Portrait Lig...,[none]
views,4394029,7860119,5845909
likes,320053,185853,576597
dislikes,5931,26679,39774
comment_total,46245,0,170708
thumbnail_link,https://i.ytimg.com/vi/XpVt6Z1Gjjo/default.jpg,https://i.ytimg.com/vi/K4wEI5zhHB0/default.jpg,https://i.ytimg.com/vi/cLdxuaxaQwc/default.jpg


In [94]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('datasets/UScomments.csv')
(comments
 # .where("likes < 0")
 .limit(3).toPandas()
)

Unnamed: 0,video_id,comment_text,likes,replies
0,XpVt6Z1Gjjo,Logan Paul it's yo big day ‼️‼️‼️,4,0
1,XpVt6Z1Gjjo,I've been following you from the start of your...,3,0
2,XpVt6Z1Gjjo,Say hi to Kong and maverick for me,3,0


1 scored_videos - датасет на USvideos.csv
 - с добавлением колонки, содержащей скор (показатель качества) видео: ФОРМУЛА должна включать в себя
     - просмотры,
     - лайки,дизлайки видео,
     - лайки и дизлайки к комментариям к видео
       
2 categories_score - по категориям, в котором поля: 
 - Название категории (не id) -  в US_category_id.json
 - Медиана показателя score из scored_videos по каждой категории

3 popular_tags - по самым популярным тэгам
  - название тэга + количество видео с этим тэгом
    ! тэги лежат строкой в поле tags
      - Scala-функцию для разбиения тегов: 
        - Но напишите свою UDF-функцию разбиения строки на тэги
        - и сравните время работы с её Scala-версией.
          - Можно замерять своими силами,
            - а можно воспользоваться библиотекой timeit
            - функции Spark из пакета pyspark.sq.functions использовать нельзя,
              - нужно написать свою функцию.

In [95]:
videos.limit(3).toPandas().T

Unnamed: 0,0,1,2
video_id,XpVt6Z1Gjjo,K4wEI5zhHB0,cLdxuaxaQwc
title,1 YEAR OF VLOGGING -- HOW LOGAN PAUL CHANGED Y...,iPhone X — Introducing iPhone X — Apple,My Response
channel_title,Logan Paul Vlogs,Apple,PewDiePie
category_id,24,28,22
tags,logan paul vlog|logan paul|logan|paul|olympics...,Apple|iPhone 10|iPhone Ten|iPhone|Portrait Lig...,[none]
views,4394029,7860119,5845909
likes,320053,185853,576597
dislikes,5931,26679,39774
comment_total,46245,0,170708
thumbnail_link,https://i.ytimg.com/vi/XpVt6Z1Gjjo/default.jpg,https://i.ytimg.com/vi/K4wEI5zhHB0/default.jpg,https://i.ytimg.com/vi/cLdxuaxaQwc/default.jpg


In [96]:
videos.printSchema()
comments.printSchema()
comments.limit(3).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_total: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- video_id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- likes: integer (nullable = true)
 |-- replies: integer (nullable = true)



Unnamed: 0,video_id,comment_text,likes,replies
0,XpVt6Z1Gjjo,Logan Paul it's yo big day ‼️‼️‼️,4,0
1,XpVt6Z1Gjjo,I've been following you from the start of your...,3,0
2,XpVt6Z1Gjjo,Say hi to Kong and maverick for me,3,0


In [97]:
coalesce??

[0;31mSignature:[0m [0mcoalesce[0m[0;34m([0m[0;34m*[0m[0mcols[0m[0;34m:[0m [0;34m'ColumnOrName'[0m[0;34m)[0m [0;34m->[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mcolumn[0m[0;34m.[0m[0mColumn[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
[0;34m@[0m[0mtry_remote_functions[0m[0;34m[0m
[0;34m[0m[0;32mdef[0m [0mcoalesce[0m[0;34m([0m[0;34m*[0m[0mcols[0m[0;34m:[0m [0;34m"ColumnOrName"[0m[0;34m)[0m [0;34m->[0m [0mColumn[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""Returns the first column that is not null.[0m
[0;34m[0m
[0;34m    .. versionadded:: 1.4.0[0m
[0;34m[0m
[0;34m    .. versionchanged:: 3.4.0[0m
[0;34m        Supports Spark Connect.[0m
[0;34m[0m
[0;34m    Parameters[0m
[0;34m    ----------[0m
[0;34m    cols : :class:`~pyspark.sql.Column` or str[0m
[0;34m        list of columns to work on.[0m
[0;34m[0m
[0;34m    Returns[0m
[0;34m    -------[0m
[0;34m    :class:`~pyspark.sql.Column`[0m

In [98]:
# 1 scored_videos - датасет на USvideos.csv
#  - с добавлением колонки, содержащей скор (показатель качества) видео:
#    - ФОРМУЛА должна включать в себя
#      - просмотры,
#      - лайки, дизлайки видео,
#      - лайки и дизлайки к комментариям к видео
from pyspark.sql.functions import *
videos_s = (
    videos.select('video_id', 'views', 'likes', 'dislikes', 'category_id', 'tags')
          .join(comments
                .select('video_id', col('likes').alias('likes_c'),  # col('dislikes').alias('dislikes_c')  - не нашёл 🤔
                ),
                on='video_id', how='left'
            )
          .select('video_id', 'views', 'likes', 'dislikes', 'category_id', 'tags', coalesce(col('likes_c'), lit(0)).alias('likes_c'))
)
videos_s.cache().show(3, False)

+-----------+-----+-----+--------+-----------+------+-------+
|video_id   |views|likes|dislikes|category_id|tags  |likes_c|
+-----------+-----+-----+--------+-----------+------+-------+
|4yCkkOvIkUI|2306 |7    |1       |24         |[none]|5      |
|4yCkkOvIkUI|2306 |7    |1       |24         |[none]|3      |
|4yCkkOvIkUI|2306 |7    |1       |24         |[none]|6      |
+-----------+-----+-----+--------+-----------+------+-------+
only showing top 3 rows



In [99]:
# 1) score = (likes - dislikes + likes-comments) / views 
scored_videos = videos_s.withColumn('score', (col('likes') - col('dislikes') + col('likes_c')) / col('views'))
scored_videos.orderBy(desc('score')).cache().show(5, False)
scored_videos.where('score > 0').orderBy(asc('score')).cache().show(5, False)
scored_videos.cache().count()

+-----------+------+------+--------+-----------+--------------------------------+-------+------------------+
|video_id   |views |likes |dislikes|category_id|tags                            |likes_c|score             |
+-----------+------+------+--------+-----------+--------------------------------+-------+------------------+
|EUoe7cf0HYw|497846|160690|5323    |10         |Taylor Swift|Gorgeous|reputation|26     |0.3121306588784484|
|EUoe7cf0HYw|497846|160690|5323    |10         |Taylor Swift|Gorgeous|reputation|7      |0.3120924944661602|
|EUoe7cf0HYw|497846|160690|5323    |10         |Taylor Swift|Gorgeous|reputation|7      |0.3120924944661602|
|EUoe7cf0HYw|497846|160690|5323    |10         |Taylor Swift|Gorgeous|reputation|6      |0.3120904858128819|
|EUoe7cf0HYw|497846|160690|5323    |10         |Taylor Swift|Gorgeous|reputation|5      |0.3120884771596036|
+-----------+------+------+--------+-----------+--------------------------------+-------+------------------+
only showing top 5 

2945294

In [100]:
# Research file with categories:..
!head -n 22 datasets/US_category_id.json
!echo ...

{
 "kind": "youtube#videoCategoryListResponse",
 "etag": "\"m2yskBQFythfE4irbTIeOgYYfBU/S730Ilt-Fi-emsQJvJAAShlR6hM\"",
 "items": [
  {
   "kind": "youtube#videoCategory",
   "etag": "\"m2yskBQFythfE4irbTIeOgYYfBU/Xy1mB4_yLrHy_BmKmPBggty2mZQ\"",
   "id": "1",
   "snippet": {
    "channelId": "UCBR8-60-B28hp2BmDPdntcQ",
    "title": "Film & Animation",
    "assignable": true
   }
  },
  {
   "kind": "youtube#videoCategory",
   "etag": "\"m2yskBQFythfE4irbTIeOgYYfBU/UZ1oLIIz2dxIhO45ZTFR3a3NyTA\"",
   "id": "2",
   "snippet": {
    "channelId": "UCBR8-60-B28hp2BmDPdntcQ",
    "title": "Autos & Vehicles",
    "assignable": true
...


### мда... тут без помощи Друзя" никак 😅🤓

#### https://chat.openai.com/c/fac3b2e7-81c4-4ef5-9cdd-120a459b0fcf
how to exctract to spark dataframe with 2 cols:
- category_id – from path: "items"."id" where "kind" = "youtube#videoCategory"
- category_name – from path: "items"."snippet"."title" for category_id

from this file:
{ "kind": "youtube#videoCategoryListResponse",
 "etag": "\"m2yskBQFythfE4irbTIeOgYYfBU/S730Ilt-Fi-emsQJvJAAShlR6hM\"",..

In [111]:
# 2) read categories from json
df = spark.read.option("multiline", "true").json("datasets/US_category_id.json")

df_exploded = df.select(explode(df.items).alias("item"))

categories_df = df_exploded.select(
    df_exploded["item.id"].alias("category_id"),
    df_exploded["item.snippet.title"].alias("category_name")
)

categories_df = categories_df.filter(df_exploded["item.kind"] == "youtube#videoCategory")

categories_df.cache().show(5, False)

+-----------+----------------+
|category_id|category_name   |
+-----------+----------------+
|1          |Film & Animation|
|2          |Autos & Vehicles|
|10         |Music           |
|15         |Pets & Animals  |
|17         |Sports          |
+-----------+----------------+
only showing top 5 rows



In [112]:
for df in (scored_videos, categories_df):
    df.show(3, 33), df.count()

+-----------+-----+-----+--------+-----------+------+-------+---------------------+
|   video_id|views|likes|dislikes|category_id|  tags|likes_c|                score|
+-----------+-----+-----+--------+-----------+------+-------+---------------------+
|4yCkkOvIkUI| 2306|    7|       1|         24|[none]|      5| 0.004770164787510842|
|4yCkkOvIkUI| 2306|    7|       1|         24|[none]|      3|0.0039028620988725065|
|4yCkkOvIkUI| 2306|    7|       1|         24|[none]|      6| 0.005203816131830009|
+-----------+-----+-----+--------+-----------+------+-------+---------------------+
only showing top 3 rows

+-----------+----------------+
|category_id|   category_name|
+-----------+----------------+
|          1|Film & Animation|
|          2|Autos & Vehicles|
|         10|           Music|
+-----------+----------------+
only showing top 3 rows



In [None]:
videos_with_categories

In [114]:
# 2) categories_score - по категориям, в котором поля: 
#  - Название категории (не id) -  в US_category_id.json
#  - Медиана показателя score из scored_videos по каждой категории

# Join scored_videos and categories_df on category_id
##videos_with_categories = scored_videos.join(categories_df, scored_videos.category_id == categories_df.category_id)
videos_with_categories = scored_videos.join(categories_df, on='category_id', how='left')

# Compute median
median_scores = videos_with_categories.groupby("category_name").agg(
    expr('percentile_approx(score, 0.5)').alias('median_score')
)

median_scores.orderBy(desc('median_score')).cache().show(11, False)

+--------------------+--------------------+
|category_name       |median_score        |
+--------------------+--------------------+
|Music               |0.04884415551818479 |
|Howto & Style       |0.048267083261007926|
|Comedy              |0.03698591280865014 |
|People & Blogs      |0.03503300560376566 |
|Education           |0.03368260385297636 |
|Pets & Animals      |0.03165374907625246 |
|Science & Technology|0.02789074214986546 |
|Travel & Events     |0.02688961421740934 |
|Entertainment       |0.024058260052228886|
|Film & Animation    |0.022790679555816682|
|Autos & Vehicles    |0.02108892065686186 |
+--------------------+--------------------+
only showing top 11 rows



In [120]:
category_score_median.join(median_scores, on='category_name'
    ).select(category_score_median['*'], median_scores['median_score'].alias('median_score_pd')
    ).orderBy(desc('median_score')).cache().show(11, False)

+--------------------+--------------------+--------------------+
|category_name       |median_score        |median_score_pd     |
+--------------------+--------------------+--------------------+
|Music               |0.11608667440748582 |0.04884415551818479 |
|Howto & Style       |0.048267083261007926|0.048267083261007926|
|Comedy              |0.03698591280865014 |0.03698591280865014 |
|People & Blogs      |0.03503300560376566 |0.03503300560376566 |
|Education           |0.03368260385297636 |0.03368260385297636 |
|Pets & Animals      |0.03165374907625246 |0.03165374907625246 |
|Science & Technology|0.02789074214986546 |0.02789074214986546 |
|Travel & Events     |0.02688961421740934 |0.02688961421740934 |
|Entertainment       |0.024058260052228886|0.024058260052228886|
|Film & Animation    |0.022790679555816682|0.022790679555816682|
|Autos & Vehicles    |0.02108892065686186 |0.02108892065686186 |
+--------------------+--------------------+--------------------+
only showing top 11 rows


In [117]:
from pyspark.sql.functions import * #pandas_udf, PandasUDFType, groupBy as groupby
import pandas as pd, statistics

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def median_score(score: pd.Series) -> float: return statistics.median(score)

scored_videos_with_category_name = scored_videos.join(categories_df, on='category_id')

category_score_median = scored_videos_with_category_name.groupby('category_name').agg(median_score(scored_videos_with_category_name['score']).alias('median_score'))

category_score_median.orderBy(desc('median_score')).cache().show(11, False)



+--------------------+--------------------+
|category_name       |median_score        |
+--------------------+--------------------+
|Music               |0.11608667440748582 |
|Howto & Style       |0.048267083261007926|
|Comedy              |0.03698591280865014 |
|People & Blogs      |0.03503300560376566 |
|Education           |0.03368260385297636 |
|Pets & Animals      |0.03165374907625246 |
|Science & Technology|0.02789074214986546 |
|Travel & Events     |0.02688961421740934 |
|Entertainment       |0.024058260052228886|
|Film & Animation    |0.022790679555816682|
|Autos & Vehicles    |0.02108892065686186 |
+--------------------+--------------------+
only showing top 11 rows



In [81]:
videos_df = videos_with_categories
videos_df.show(5, 44)

+-----------+-----------+-----+-----+--------+--------------------------------------------+-------+--------------------+--------------------+
|category_id|   video_id|views|likes|dislikes|                                        tags|likes_c|               score|       category_name|
+-----------+-----------+-----+-----+--------+--------------------------------------------+-------+--------------------+--------------------+
|         28|CAQ2wWVlOuc|25541|  510|      94|Tech Insider|TI|Tech|Science|Innovation|D...|    278| 0.02717199796405779|Science & Technology|
|         28|CAQ2wWVlOuc|25541|  510|      94|Tech Insider|TI|Tech|Science|Innovation|D...|    144|0.021925531498375162|Science & Technology|
|         28|CAQ2wWVlOuc|25541|  510|      94|Tech Insider|TI|Tech|Science|Innovation|D...|     83| 0.01953721467444501|Science & Technology|
|         28|CAQ2wWVlOuc|25541|  510|      94|Tech Insider|TI|Tech|Science|Innovation|D...|    133|0.021494851415371365|Science & Technology|
|     

In [82]:
# spark.sql('drop table scored_videos')
videos_df.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- video_id: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- likes_c: integer (nullable = false)
 |-- score: double (nullable = true)
 |-- category_name: string (nullable = true)



In [83]:
get_videos_df??

[0;31mSignature:[0m [0mget_videos_df[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mSource:[0m   
[0;32mdef[0m [0mget_videos_df[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0mvideos_df[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mparquet[0m[0;34m([0m[0;34m'spark-warehouse/videos'[0m[0;34m)[0m[0;34m[0m
[0;34m[0m    [0mvideos_df[0m[0;34m.[0m[0mcache[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m,[0m \
    [0mvideos_df[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;36m5[0m[0;34m,[0m [0;36m55[0m[0;34m)[0m[0;34m[0m
[0;34m[0m    [0;32mreturn[0m [0mvideos_df[0m[0;34m[0m[0;34m[0m[0m
[0;31mFile:[0m      /tmp/ipykernel_24031/4189858458.py
[0;31mType:[0m      function

In [84]:
## videos_df.write.saveAsTable('videos', partitionBy='category_name', mode='overwrite')

##
# videos_df = get_videos_df()
# spark.sql('drop table videos')

!rm -r spark-warehouse/videos

videos_df.write.saveAsTable('videos', partitionBy='category_name') #, mode='overwrite')
videos_df = spark.table('videos')
videos_df.cache().count(), \
videos_df.show(5, 55)

+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|                score|category_name|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      5| 0.004770164787510842|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      3|0.0039028620988725065|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      6| 0.005203816131830009|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
only showing top 5 rows



(2945294, None)

In [85]:
# 3) popular_tags - по самым популярным тэгам
  # - название тэга + количество видео с этим тэгом
  #   ! тэги лежат строкой в поле tags
  #     - Scala-функцию для разбиения тегов: 
  #       - Но напишите свою UDF-функцию разбиения строки на тэги
  #       - и сравните время работы с её Scala-версией.
  #         - Можно замерять своими силами,
  #           - а можно воспользоваться библиотекой timeit
  #           - функции Spark из пакета pyspark.sq.functions использовать нельзя,
  #             - нужно написать свою функцию.

In [88]:
try:
    scored_videos.registerTempTable('scored_videos')
    df = spark.sql(f"""
        select splitTags(tags) as tags
          from scored_videos
    """)
    df.cache()
    df.limit(11).toPandas()
except: traceback.print_exc()

Traceback (most recent call last):
  File "/tmp/ipykernel_24031/3166177611.py", line 3, in <module>
    df = spark.sql(f"""
         ^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/sql/session.py", line 1440, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `splitTags` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 2 pos 15


In [2]:
spark.sparkContext.addJar("super_udf_lib.jar")

AttributeError: 'SparkContext' object has no attribute 'addJar'

In [22]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
# spark.stop(); del spark
spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .config("spark.jars", "super_udf_lib.jar") \
    .getOrCreate()
display(spark)

def get_videos_df():
    videos_df = spark.read.parquet('spark-warehouse/videos')
    videos_df.cache().count(), \
    videos_df.show(5, 55)
    return videos_df
##videos = spark.table('videos')
videos = get_videos_df()
# videos.show(3, 33)

[(g, v) for g, v in dict(spark.sparkContext.getConf().getAll()).items()
          if 'jar' in g]

+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|                score|category_name|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      5| 0.004770164787510842|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      3|0.0039028620988725065|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      6| 0.005203816131830009|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.0026019080659150044|Entertainment|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+
only showing top 5 rows



[('spark.app.initial.jar.urls',
  'spark://76c09a2dee74:33239/jars/super_udf_lib.jar'),
 ('spark.jars', 'super_udf_lib.jar'),
 ('spark.repl.local.jars', 'file:///home/jovyan/notebooks/super_udf_lib.jar')]

In [24]:
df = spark.sql("SELECT splitTagsUDF(tags) as tags_array FROM table")

AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `splitTagsUDF` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 1 pos 7

In [23]:
# spark.udf.registerJavaFunction("splitTags", "CustomUDFs.splitTags", ArrayType(StringType()))
spark.udf.registerJavaFunction("splitTagsUDF", "super_udf_lib.CustomUDFs.splitTagsUDF", ArrayType(StringType()))

AnalysisException: Can not load class super_udf_lib.CustomUDFs.splitTagsUDF, please make sure it is on the classpath.

In [25]:
' '.join([g for g in globals().key() if 'vid' in g])

'get_videos_df videos'

In [27]:
# %%timeit
videos.registerTempTable('scored_videos')
df = spark.sql(f"""
    select splitTagsUDF(tags) as tags
      from scored_videos
""")
df.count()

AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `splitTagsUDF` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 2 pos 11

#### Spark UI:
- http://127.0.0.1:4040/stages/

In [None]:
!which java

In [None]:
%%bash
echo $PATH
export PATH=/usr/bin/java:$PATH
echo $PATH

In [None]:
!ls super*
!jar tf super_udf_lib.jar

In [None]:
from pyspark.sql.types import ArrayType, StringType

spark.udf.registerJavaFunction(
    "splitTags", 
    "com.example.super_udf_lib.CustomUDFs.splitTagsUDF",
    # "super_udf_lib.CustomUDFs.splitTagsUDF",
    ArrayType(StringType())
)

### чот со Scala-UDF'кой пока никак... 🥹
ну да ладно!.. 😅

In [None]:
# def get_videos_df():
#     videos_df = spark.read.parquet('spark-warehouse/videos')
#     videos_df.cache().count(), \
#     videos_df.show(5, 55)
#     return videos_df

# videos = get_videos_df()

In [None]:
from pyspark.sql.functions import udf

# Define a Python function
def split_tags(tags_str): return tags_str.split('|')

# Create a UDF from the Python function
split_tags_udf = udf(split_tags)

# Now you can use this UDF in your DataFrame transformations
videos = videos.withColumn('tags_array', split_tags_udf(videos['tags']))

In [None]:
videos.printSchema()
videos.where(f"tags is not null and tags != '[none]'").show(5, 33)

In [26]:
# так, пришла помощь из зала..
from pyspark.sql.functions import pandas_udf, PandasUDFType, explode
from pyspark.sql import DataFrame
import pandas as pd

@pandas_udf('array<string>', PandasUDFType.SCALAR)
def split_tags(tags_series: pd.Series) -> pd.Series: return tags_series.str.split('|')
#...



In [None]:
#...
def add_tags_array(df: DataFrame, tags_col: str) -> DataFrame: return df.withColumn('tags_array', split_tags(df[tags_col]))

# A function to count the number of videos for each tag
def count_videos_per_tag(df: DataFrame, tags_array_col: str) -> DataFrame: return (
    df.select(
        explode(
                df[tags_array_col]
                   ).alias('tag')
     ).groupBy('tag').count()
)

# Load your DataFrame here
# videos = spark.read...

# Example data
# video_id | title | tags
# 1        | video1| tag1|tag2|tag3
# 2        | video2| tag1|tag3
# 3        | video3| tag2|tag3|tag4

videos = add_tags_array(videos, 'tags')
videos.show(3, 33)


popular_tags = count_videos_per_tag(videos, 'tags_array')

# popular_tags will be:
# tag  | count
# tag1 | 2
# tag2 | 2
# tag3 | 3
# tag4 | 1
popular_tags.show(3, 33)

In [108]:
# сократим код:..
df = videos
df = df.withColumn('tags_array', split_tags(df['tags']))
df.cache().show(3, 33)
df = df.select('*',
            explode(
                df['tags_array']
                   ).alias('tag')
      )
df.show(33, 33)
videos_tagged = df
# df = df.groupBy('tag').count()
# df = df.orderBy(desc('count'))
# df.cache().show(33, 33)

+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+----------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|                score|category_name|tags_array|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+----------+
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      5| 0.004770164787510842|Entertainment|  [[none]]|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      3|0.0039028620988725065|Entertainment|  [[none]]|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      6| 0.005203816131830009|Entertainment|  [[none]]|
+-----------+-----------+-----+-----+--------+------+-------+---------------------+-------------+----------+
only showing top 3 rows

+-----------+-----------+-----+-----+--------+------+-------+----------------------+-------------+----------+------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|                 score|category_na

39.4 ms ± 4.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## 4) И личная просьба от Марка: он любит котов (а кто не их не любит!) и хочет найти самые интересные комментарии (топ-5) к видео про котов. “Видео про котов” - видео, у которого есть тэг “cat”.


In [31]:
df = videos
df = df.cache().where(f"tag like '%cat%'")
df.show(33, 33)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `tag` cannot be resolved. Did you mean one of the following? [`tags`, `likes`, `score`, `views`, `likes_c`].; line 1 pos 0;
'Filter 'tag LIKE %cat%
+- Relation [category_id#1429,video_id#1430,views#1431,likes#1432,dislikes#1433,tags#1434,likes_c#1435,score#1436,category_name#1437] parquet


### ладно, с котами потом разберёмся – надо 2-ю лабу поделать... 😅

In [47]:
videos_tagged.where(f"tag like '%cat%'").show(3, 33)

+-----------+-----------+-----+-----+--------+---------------------------------+-------+--------------------+-------------+---------------------------------+-----------------+
|category_id|   video_id|views|likes|dislikes|                             tags|likes_c|               score|category_name|                       tags_array|              tag|
+-----------+-----------+-----+-----+--------+---------------------------------+-------+--------------------+-------------+---------------------------------+-----------------+
|         24|kbvS-aeEi-8|68806| 3248|      68|slow motion|SLOW MO|make|exper...|    127| 0.04806266895328896|Entertainment|[slow motion, SLOW MO, make, e...|science education|
|         24|kbvS-aeEi-8|68806| 3248|      68|slow motion|SLOW MO|make|exper...|      3|0.046260500537743804|Entertainment|[slow motion, SLOW MO, make, e...|science education|
|         24|kbvS-aeEi-8|68806| 3248|      68|slow motion|SLOW MO|make|exper...|      0|0.046216899688980614|Entertainme

In [67]:
(videos_tagged['tag', 'score']
    .where(f"tag rlike '.*[^a-z]*cat.*'")
    .groupBy('tag')
    .agg(
        count(col('*')).alias('cnt'),
        avg('score').alias('avg_score'),
    )
    .orderBy(desc('cnt'))
    .cache()
    .show(111, 33)
)

+---------------------------------+-----+---------------------+
|                              tag|  cnt|            avg_score|
+---------------------------------+-----+---------------------+
|                        education|40916| 0.041109722218122514|
|                              cat|19505|  0.04787155963837726|
|                        cute cats|16100| 0.046893144295952593|
|                             cats|14301| 0.047816222558131616|
|                        funny cat|12500|  0.04065602435588546|
|                            catch|12319| 0.010961173701992502|
|                      educational|11620|  0.04402938622946798|
|                        cat fails| 8700|  0.04295206130011518|
|                      simons cats| 8700|  0.04295206130011518|
|                    simon the cat| 8700|  0.04295206130011518|
|                       simons cat| 8700|  0.04295206130011518|
|                        simonscat| 8700|  0.04295206130011518|
|                    animated cats| 8700

In [72]:
from pyspark.sql.window import Window
# window = Window().spec()
window = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = (videos_tagged['tag', 'score']
    .where("tag RLIKE '.*[ ]*(cat|kitten)[ sty]*'")
#   .where("tag RLIKE '\\b(cat|kitten)\\b'")
    .groupBy('tag')
    .agg(
        count(col('*')).alias('cnt'),
        avg('score').alias('avg_score'),
    )
    .withColumn('cnt_total', count('*').over(window))
    .orderBy(desc('cnt'))
    .cache()
    .show(11, 33)
)

# Критерии приёмки
! Оказывается не все ещё закрыты! 🤯��
-
- ...
- 6 Для каждого join должна быть использована какая-либо из оптимизаций, предложенных на лекции. Выбор типа оптимизации нужно обосновать, описав его в комментариях в ноутбуке.
- 7 Для расчета медианы нельзя использовать встроенную Spark-функцию median из пакета pyspark.sql.functions .
- 8 Для функции разделения тегов должны быть представлены выводы по сравнению времени работы обычной и scala-версии.

In [107]:
videos_tagged.show(11, 11)

+-----------+-----------+-----+-----+--------+------+-------+-----------+-------------+
|category_id|   video_id|views|likes|dislikes|  tags|likes_c|      score|category_name|
+-----------+-----------+-----+-----+--------+------+-------+-----------+-------------+
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      5|0.004770...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      3|0.003902...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      6|0.005203...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.002601...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.002601...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.002601...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      2|0.003469...|  Entertai...|
|         24|4yCkkOvIkUI| 2306|    7|       1|[none]|      0|0.002601...|  Entertai...|
|         24|4yCkkOvIkUI| 2306| 

In [109]:
# videos_tagged = videos
from pyspark.sql.window import Window
# window = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
top_cats_tags = (videos_tagged['tag', 'score']
        .where("regexp_like(tag, '.*[ ](cat|kitten)[ sty].*')")
        .groupBy('tag')
        .agg( count(col('*')).alias('cnt'),
              avg('score').alias('avg_score'),
        )
        # .withColumn('cnt_total', count('*').over(window))
        .orderBy(desc('cnt'))
)
top_cats_tags.cache().show(11, 33)

+----------------------+-----+--------------------+
|                   tag|  cnt|           avg_score|
+----------------------+-----+--------------------+
|             cute cats|16100|0.046893144295952593|
|           simons cats| 8700| 0.04295206130011518|
|         animated cats| 8700| 0.04295206130011518|
|            funny cats| 8700| 0.04295206130011518|
|      every cat at 3am| 4900|    0.03736468181558|
|            crazy cats| 4900|    0.03736468181558|
|      funny cat videos| 2500| 0.03134983025391582|
|        crazy cat lady| 2500| 0.07928389958303095|
| simplynailogical cats| 2500| 0.07928389958303095|
|halloween cat costumes| 2500| 0.07928389958303095|
|biggest cat that purrs| 2500| 0.03134983025391582|
+----------------------+-----+--------------------+
only showing top 11 rows



In [29]:
%%timeit
# - 8 Для функции разделения тегов должны быть представлены выводы по сравнению времени работы обычной и scala-версии.
## spark.sql("SELECT split_tags(tags) FROM t
videos.select(split_tags('tags')).count()

33.4 ms ± 976 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
