In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

## Задание 4

Загрузим все данные

In [160]:
metadata_df = spark.read.format('json').load('movie_dataset_public_final/raw/metadata.json')
ratings_df = spark.read.format('json').load('movie_dataset_public_final/raw/ratings.json')
reviews_df = spark.read.format('json').load('movie_dataset_public_final/raw/reviews.json')
survey_answers_df = spark.read.format('json').load('movie_dataset_public_final/raw/survey_answers.json')
tag_count_df = spark.read.format('json').load('movie_dataset_public_final/raw/tag_count.json')
tags_df = spark.read.format('json').load('movie_dataset_public_final/raw/tags.json')

## Задание 4.1 
Посчитайте количество текстовых отзывов на каждый фильм. <br>
Стоимость - 3 балла <br>

In [166]:
reviews_df.show(5)

+-------+--------------------+
|item_id|                 txt|
+-------+--------------------+
| 172063|one-shot record o...|
|  95541|Banging Away..; T...|
|   7065|unbelievable; I c...|
|   3739|I'm still starry-...|
|   1562|Failed on every F...|
+-------+--------------------+
only showing top 5 rows



Посчитаем количество отзывов на каждый фильм, присоеденим название и отсортируем количество отзывов по убывания

In [164]:
reviews_df \
        .groupBy('item_id').count() \
        .join(metadata_df.select('item_id', 'title'), on='item_id') \
        .orderBy(F.col('count').desc()).show()

+-------+-----+--------------------+
|item_id|count|               title|
+-------+-----+--------------------+
|   4993| 5122|Lord of the Rings...|
|  58559| 5115|Dark Knight, The ...|
|    318| 4913|Shawshank Redempt...|
| 122886| 4384|Star Wars: Episod...|
|   2571| 3765|  Matrix, The (1999)|
|   2628| 3636|Star Wars: Episod...|
|   5378| 3556|Star Wars: Episod...|
|   2710| 3443|Blair Witch Proje...|
| 136864| 3358|Batman v Superman...|
|  33493| 3331|Star Wars: Episod...|
|   7153| 3275|Lord of the Rings...|
|  72998| 3137|       Avatar (2009)|
|   2959| 3082|   Fight Club (1999)|
|  79132| 2967|    Inception (2010)|
| 109487| 2962| Interstellar (2014)|
|   7318| 2829|Passion of the Ch...|
|   6365| 2801|Matrix Reloaded, ...|
|  34048| 2762|War of the Worlds...|
|   2858| 2757|American Beauty (...|
|  91529| 2756|Dark Knight Rises...|
+-------+-----+--------------------+
only showing top 20 rows



## Задание 4.2
Для пользователей, поставивших оценки более, чем 5 фильмам, посчитать дисперсию оценок для каждого пользователя. <br>
Стоимость - 4 балла <br>

In [167]:
ratings_df.show(5)

+-------+------+-------+
|item_id|rating|user_id|
+-------+------+-------+
|      5|   3.0| 997206|
|     10|   4.0| 997206|
|     13|   4.0| 997206|
|     17|   5.0| 997206|
|     21|   4.0| 997206|
+-------+------+-------+
only showing top 5 rows



In [168]:
ratings_df.groupBy('user_id').agg(
                        # считаем количество оценок для пользователя
                        F.count('rating').alias('rating_count'), 
                        # считаем стандартное отклонение оценок пользователя 
                        # возводим в квадрат чтобы получить дисперсию и округляем до 2 знаков
                        F.round(F.pow(F.stddev('rating'), 2), 2).alias('dispersion')
# оставляем пользователей с более чем 5 оценками
).filter(F.col('rating_count') > 5). \
select('user_id', 'dispersion').show()

+-------+----------+
|user_id|dispersion|
+-------+----------+
| 498965|      0.42|
|  11190|      1.22|
| 821354|      0.96|
| 566437|      0.67|
| 844513|      0.99|
| 421791|      0.84|
| 624860|       0.9|
| 241098|      0.68|
| 161277|      0.84|
| 928453|      1.28|
| 589066|      0.85|
| 568706|      0.59|
| 223139|      0.61|
|  23116|      1.33|
| 858526|      0.99|
| 918211|      0.88|
| 325281|      0.69|
| 732747|      1.73|
| 663151|       1.7|
| 289627|      1.11|
+-------+----------+
only showing top 20 rows



## Задание 4.3
По тегу выдать тексты отзывов на фильмы, к которым применимость заданного тега оценивается пользователями в среднем выше, чем на 4. <br>
Стоимость - 5 баллов <br>

In [170]:
survey_answers_df.show(5)

+-------+-----+------+-------+
|item_id|score|tag_id|user_id|
+-------+-----+------+-------+
|   3108|    3| 50126| 978707|
|   2858|    1| 50126| 978707|
|   1269|    1| 50126| 978707|
|   1136|    1| 50126| 978707|
|   1220|    1| 50126| 978707|
+-------+-----+------+-------+
only showing top 5 rows



In [171]:
reviews_df.show(5)

+-------+--------------------+
|item_id|                 txt|
+-------+--------------------+
| 172063|one-shot record o...|
|  95541|Banging Away..; T...|
|   7065|unbelievable; I c...|
|   3739|I'm still starry-...|
|   1562|Failed on every F...|
+-------+--------------------+
only showing top 5 rows



In [172]:
tags_df = tags_df.withColumnRenamed('id', 'tag_id')
tags_df.show(5)

+------+--------------+
|tag_id|           tag|
+------+--------------+
|    22|       aardman|
|   112|secret service|
|   167|    hillarious|
|   270|     christian|
|   362|         mummy|
+------+--------------+
only showing top 5 rows



Группируем по тегу для каждого фильма, считаем среднюю оценку и оставляем оценки выше 4.

In [174]:
result = survey_answers_df \
        .groupBy('tag_id', 'item_id') \
        .agg(F.avg('score').alias('avg_score')) \
        .filter(F.col('avg_score') > 4)
result.show(5)

+------+-------+---------+
|tag_id|item_id|avg_score|
+------+-------+---------+
|101966|   1704|      4.2|
| 28134|  55820|      5.0|
| 34379|   4343|      5.0|
|  6681|   4369|      5.0|
| 37068|   4262|      5.0|
+------+-------+---------+
only showing top 5 rows



К тексту отзыва на фильм присоединяем id тегов, которые оценены выше 4, и сам тег. Для большей ясности выведено название фильма

In [179]:
reviews_df.join(result, on='item_id', how='right') \
        .join(tags_df, on='tag_id') \
        .join(metadata_df.select('item_id', 'title'), on='item_id') \
        .orderBy(F.col('txt')).select('title', 'txt', 'tag').show()

+--------------------+--------------------+--------------------+
|               title|                 txt|                 tag|
+--------------------+--------------------+--------------------+
|Freaky Friday (1977)|                 ...|              comedy|
|  Under Siege (1992)|                u...|           espionage|
|Shawshank Redempt...|           Best m...|          adaptation|
|Shawshank Redempt...|           Best m...|          redemption|
|Shawshank Redempt...|           Best m...|oscar (best direc...|
|Shawshank Redempt...|           Best m...|  exceptional acting|
|Shawshank Redempt...|           Best m...|        heartwarming|
|Shawshank Redempt...|           Best m...|         classic car|
|Shawshank Redempt...|           Best m...|              prison|
|Shawshank Redempt...|           Best m...|            downbeat|
|Shawshank Redempt...|           Best m...|       inspirational|
|Shawshank Redempt...|           Best m...|               drama|
|Shawshank Redempt...|   

## Задание 4.5
Получить средние рейтинги фильмов по годам выпуска. Год выпуска фильма указан в title. То есть надо для каждого фильма определить средний рейтинг, сгруппировать фильмы по годам выпуска и посчитать среднее. Можно и по-другому, если достигается эквивалентный результат.

Стоимость - 5 баллов

In [180]:
metadata_df.show(5)

+---------+---------+---------------+-------+-------+--------------------+--------------------+
|avgRating|dateAdded|     directedBy| imdbId|item_id|            starring|               title|
+---------+---------+---------------+-------+-------+--------------------+--------------------+
|  3.89146|     null|  John Lasseter|0114709|      1|Tim Allen, Tom Ha...|    Toy Story (1995)|
|  3.26605|     null|   Joe Johnston|0113497|      2|Jonathan Hyde, Br...|      Jumanji (1995)|
|  3.17146|     null|  Howard Deutch|0113228|      3|Jack Lemmon, Walt...|Grumpier Old Men ...|
|  2.86824|     null|Forest Whitaker|0114885|      4|Angela Bassett, L...|Waiting to Exhale...|
|   3.0762|     null|  Charles Shyer|0113041|      5|Steve Martin, Mar...|Father of the Bri...|
+---------+---------+---------------+-------+-------+--------------------+--------------------+
only showing top 5 rows



In [181]:
md_df = metadata_df.select('title', 'avgRating').toPandas()
md_df.head()

Unnamed: 0,title,avgRating
0,Toy Story (1995),3.89146
1,Jumanji (1995),3.26605
2,Grumpier Old Men (1995),3.17146
3,Waiting to Exhale (1995),2.86824
4,Father of the Bride Part II (1995),3.0762


Функция ищет год выпуска фильма внутри скобок

In [196]:
import re
def find_year(starring):
    year = re.search(r'(\(\d{4}\))', starring)
    if type(year) == type(None):
        return None
    else:
        return year[0][1:5]

In [197]:
md_df['year'] = md_df.title.map(find_year)
md_df.head()

Unnamed: 0,title,avgRating,year
0,Toy Story (1995),3.89146,1995
1,Jumanji (1995),3.26605,1995
2,Grumpier Old Men (1995),3.17146,1995
3,Waiting to Exhale (1995),2.86824,1995
4,Father of the Bride Part II (1995),3.0762,1995


Группируем данные по году и находим среднее

In [193]:
md_df.groupby('year', as_index=False).agg({'avgRating' : 'mean'})

Unnamed: 0,year,avgRating
0,1874,3.129630
1,1878,3.102940
2,1880,2.550000
3,1883,2.676470
4,1887,1.934780
...,...,...
133,2017,2.590394
134,2018,2.448351
135,2019,2.127804
136,2020,1.695137


## Задание 4.6
Для каждого фильма посчитать его средний рейтинг. Для каждой оценки в ratings.json посчитать разницу между выставленной оценкой и средней оценкой фильма. Выдать для каждого пользователя среднюю таких разниц по всем его оценкам.

Стоимость - 6 баллов

In [200]:
ratings_df.show(5)

+-------+------+-------+
|item_id|rating|user_id|
+-------+------+-------+
|      5|   3.0| 997206|
|     10|   4.0| 997206|
|     13|   4.0| 997206|
|     17|   5.0| 997206|
|     21|   4.0| 997206|
+-------+------+-------+
only showing top 5 rows



In [201]:
metadata_df.show(5)

+---------+---------+---------------+-------+-------+--------------------+--------------------+
|avgRating|dateAdded|     directedBy| imdbId|item_id|            starring|               title|
+---------+---------+---------------+-------+-------+--------------------+--------------------+
|  3.89146|     null|  John Lasseter|0114709|      1|Tim Allen, Tom Ha...|    Toy Story (1995)|
|  3.26605|     null|   Joe Johnston|0113497|      2|Jonathan Hyde, Br...|      Jumanji (1995)|
|  3.17146|     null|  Howard Deutch|0113228|      3|Jack Lemmon, Walt...|Grumpier Old Men ...|
|  2.86824|     null|Forest Whitaker|0114885|      4|Angela Bassett, L...|Waiting to Exhale...|
|   3.0762|     null|  Charles Shyer|0113041|      5|Steve Martin, Mar...|Father of the Bri...|
+---------+---------+---------------+-------+-------+--------------------+--------------------+
only showing top 5 rows



In [202]:
md_df = metadata_df.select('avgRating', 'item_id')
rt_df = ratings_df

К оценке пользователя для фильма присоединяем среднюю оценку фильма. В отдельном столбце считаем насколько оценка пользователя отличается от средней для фильма.

In [203]:
result = rt_df.join(md_df, on='item_id').withColumn('diff', F.round(F.col('rating') - F.col('avgRating'), 2))
result.show(5)

+-------+------+-------+---------+-----+
|item_id|rating|user_id|avgRating| diff|
+-------+------+-------+---------+-----+
|      5|   3.0| 997206|   3.0762|-0.08|
|     10|   4.0| 997206|   3.4334| 0.57|
|     13|   4.0| 997206|  3.34382| 0.66|
|     17|   5.0| 997206|  3.94589| 1.05|
|     21|   4.0| 997206|  3.56584| 0.43|
+-------+------+-------+---------+-----+
only showing top 5 rows



Для каждого пользователя считаем насколько в среднем он завышает или занижает оценку для фильмов.

In [204]:
result.groupBy('user_id').agg(F.round(F.avg('diff'), 2).alias('avg_diff')).show()

+-------+--------+
|user_id|avg_diff|
+-------+--------+
| 498965|    0.14|
|  11190|    0.19|
| 821354|   -0.04|
| 566437|    -0.1|
| 844513|   -0.83|
| 421791|     0.2|
| 624860|    0.16|
| 241098|   -0.03|
| 161277|     0.2|
| 928453|   -0.37|
| 589066|    0.26|
| 568706|    0.46|
| 223139|    0.35|
|  23116|    0.45|
| 858526|    0.43|
| 918211|   -0.23|
| 325281|   -0.41|
| 732747|    0.31|
| 663151|   -0.68|
| 289627|    0.18|
+-------+--------+
only showing top 20 rows



Сделать аналогично "в другую сторону". Для каждого пользователя посчитать среднюю оценку. Для каждой оценки в ratings.json посчитать разницу между выставленной оценкой и средней оценкой пользователя. Выдать для каждого фильма среднюю таких разниц по всем его оценкам.

Считаем среднюю оценку, которую дает каждый пользователь

In [206]:
mean_user_rating_df = ratings_df.groupBy('user_id').agg(F.round(F.avg('rating'), 2).alias('avg_rating'))
mean_user_rating_df.show(5)

+-------+----------+
|user_id|avg_rating|
+-------+----------+
| 498965|      3.39|
|  11190|       3.8|
| 821354|      3.36|
| 566437|      3.35|
| 844513|      2.45|
+-------+----------+
only showing top 5 rows



К оценкам фильмов каждого пользователя присоединяем его среднюю оценку. В отдельном столбце считаем насколько отличается оценка пользователя от его средней оценки.

In [207]:
rt_df = ratings_df \
                .join(mean_user_rating_df, on='user_id') \
                .withColumn('rating_diff', F.round(F.col('rating') - F.col('avg_rating'), 2))
rt_df.show(5)

+-------+-------+------+----------+-----------+
|user_id|item_id|rating|avg_rating|rating_diff|
+-------+-------+------+----------+-----------+
| 498965|      1|   4.0|      3.39|       0.61|
| 498965|      2|   3.0|      3.39|      -0.39|
| 498965|      5|   3.0|      3.39|      -0.39|
| 498965|     19|   4.0|      3.39|       0.61|
| 498965|     22|   4.0|      3.39|       0.61|
+-------+-------+------+----------+-----------+
only showing top 5 rows



Группируем данные по фильму и считаем насколько в среднем оценки пользователей для этого фильма отличаются от их обычных оценок.
Присоединяем названия фильмов. Сортируем по убыванию чтобы выделить фильмы, которые понравились пользователям намного больше, чем фильмы, которые они обычно оценивают.

In [210]:
result = rt_df.groupBy('item_id') \
        .agg(F.round(F.avg('rating_diff'), 2).alias('avg_rating_diff')) \
        .join(metadata_df.select('item_id', 'title'), on='item_id') \
        .orderBy(F.col('avg_rating_diff').desc())
result.show()

+-------+---------------+--------------------+
|item_id|avg_rating_diff|               title|
+-------+---------------+--------------------+
| 159135|           3.42|   The Viking (1931)|
| 159471|           3.42|Evening's Civil T...|
| 159125|           3.42|Anybody's Son Wil...|
| 215251|           3.14|   The Scheme (2020)|
| 121919|           2.77|The Good Mother (...|
| 194434|           2.72|   Adrenaline (1990)|
| 203086|           2.67|Truth and Justice...|
| 151989|           2.65|    The Thorn (1971)|
| 216159|           2.45|He's Such a Girl ...|
| 203882|           2.42|Dead in the Water...|
| 159473|           2.42|Building a Broken...|
| 213469|           2.34|         Coda (2020)|
| 155923|           2.31|         Sing (1989)|
| 222821|           2.31|This Changes Ever...|
| 177209|            2.3|      Acı Aşk (2009)|
| 219031|           2.29|Pornography: A Se...|
| 231289|           2.29|LIFE BEYOND: Chap...|
| 231287|           2.29|LIFE BEYOND: Chap...|
| 217270|    

## Задание 4.7
Для каждого фильма посчитать среднюю оценку по каждому тегу. И для каждого фильма найти список фильмов с похожими оценками.

Похожесть определить по 64-битному sim-хешу. Критерий похожести - отличие не более, чем в 3 битах.

Стоимость - 30 баллов

In [213]:
survey_answers_df.show(5)

+-------+-----+------+-------+
|item_id|score|tag_id|user_id|
+-------+-----+------+-------+
|   3108|    3| 50126| 978707|
|   2858|    1| 50126| 978707|
|   1269|    1| 50126| 978707|
|   1136|    1| 50126| 978707|
|   1220|    1| 50126| 978707|
+-------+-----+------+-------+
only showing top 5 rows



In [214]:
survey_answers_df.count()

58903

Группируем по фильму и тегу. Считаем среднюю оценку для каждого тега для каждого фильма. Находим хеш средней оценки.

In [219]:
sa_df = survey_answers_df \
                        .groupBy('item_id', 'tag_id').agg(F.round(F.avg('score'), 2).alias('avg_score')) \
                        .withColumn('hash', F.xxhash64('avg_score'))
sa_df.show(5)

+-------+------+---------+--------------------+
|item_id|tag_id|avg_score|                hash|
+-------+------+---------+--------------------+
|    648|103285|      2.0| 8973626371093010265|
|   1199|  6270|      5.0| 2166310823221407924|
|   2278| 31115|      1.0|-2162451265447482029|
|    750| 94700|      5.0| 2166310823221407924|
|    223| 33344|      1.0|-2162451265447482029|
+-------+------+---------+--------------------+
only showing top 5 rows



Создадим копию данных и переименуем столбцы чтобы при cross join-е не было путаницы со столбцами

In [223]:
sa_df1 = sa_df \
        .withColumnRenamed('item_id','other_item_id') \
        .withColumnRenamed('tag_id','other_tag_id') \
        .withColumnRenamed('avg_score','other_avg_score') \
        .withColumnRenamed('hash','other_hash')

Cross join-ним таблицы чтобы получить все комбинации пар фильмов. В отдельном столбце считаем разницу хешей.

Данные должны соответсвовать требованиям:
* Теги для фильмов одинаковые
* Сравниваемые фильмы разные
* Разница хешей для оценки тегов не больше 3

In [227]:
result = sa_df.join(sa_df1, how='cross') \
            .withColumn('diff_hash', F.col('hash') - F.col('other_hash')) \
            .filter(F.col('tag_id') == F.col('other_tag_id')) \
            .filter(F.col('item_id') != F.col('other_item_id')) \
            .filter(F.abs(F.col('diff_hash')) <= 3)
result.show(5)

+-------+------+---------+-------------------+-------------+------------+---------------+-------------------+---------+
|item_id|tag_id|avg_score|               hash|other_item_id|other_tag_id|other_avg_score|         other_hash|diff_hash|
+-------+------+---------+-------------------+-------------+------------+---------------+-------------------+---------+
|    648|103285|      2.0|8973626371093010265|         3948|      103285|            2.0|8973626371093010265|        0|
|    648|103285|      2.0|8973626371093010265|         2642|      103285|            2.0|8973626371093010265|        0|
|    648|103285|      2.0|8973626371093010265|         8387|      103285|            2.0|8973626371093010265|        0|
|    648|103285|      2.0|8973626371093010265|         1374|      103285|            2.0|8973626371093010265|        0|
|    648|103285|      2.0|8973626371093010265|          908|      103285|            2.0|8973626371093010265|        0|
+-------+------+---------+--------------

Для наглядности выведем названия похожих фильмов и тег, который их объединяет

In [235]:
res = result.select('item_id', 'other_item_id', 'tag_id')
res = res.join(metadata_df.select('item_id', 'title'), on='item_id') \
        .withColumnRenamed('title','first_title').select('tag_id', 'other_item_id', 'first_title')

res = res.join(metadata_df.select('item_id', 'title'), res.other_item_id == metadata_df.item_id) \
        .withColumnRenamed('title','second_title')

res.join(tags_df, on='tag_id').select('first_title', 'second_title', 'tag').show()

+--------------------+--------------------+---------+
|         first_title|        second_title|      tag|
+--------------------+--------------------+---------+
|Mission: Impossib...|Meet the Parents ...|espionage|
|Mission: Impossib...| Superman III (1983)|espionage|
|Mission: Impossib...|Police Academy: M...|espionage|
|Mission: Impossib...|Star Trek II: The...|espionage|
|Mission: Impossib...|North by Northwes...|espionage|
|Mission: Impossib...|     Iron Man (2008)|espionage|
|Mission: Impossib...|         Hair (1979)|espionage|
|Mission: Impossib...|Sound of Music, T...|espionage|
|Mission: Impossib...|         Hoot (2006)|espionage|
|Mission: Impossib...|Star Wars: Episod...|espionage|
|Mission: Impossib...| Total Recall (1990)|espionage|
|Mission: Impossib...|Die Hard: With a ...|espionage|
|Mission: Impossib...|   No Way Out (1987)|espionage|
|Mission: Impossib...|       Harvey (1950)|espionage|
|       Brazil (1985)|Back to the Futur...|   future|
|       Brazil (1985)| Total

In [236]:
res.count()

841046