### Подключение PySpark, загрузка библиотек, настройка изображений

In [21]:
import numpy as np 
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, countDistinct, count, size, when, regexp_extract, split

sns.set_style('darkgrid')
params = {'legend.fontsize' : 'medium', 'figure.figsize' : (10,8), 'figure.dpi' : 100, 'axes.labelsize': 'medium', 'axes.titlesize': 'medium', 'xtick.labelsize' : 'medium', 'ytick.labelsize' : 'medium'}
plt.rcParams.update(params)

### Запуск сессии PySpark

In [22]:
spark = SparkSession.builder.appName('EDA Films').getOrCreate()

### Загрузка и обзор датасета

In [23]:
df_movies = spark.read.csv('sp_movies.csv', header=True, inferSchema=True)
df_ratings = spark.read.csv('sp_ratings.csv', header=True, inferSchema=True)
df_tags = spark.read.csv('sp_tags.csv', header=True, inferSchema=True)

In [24]:
df_movies.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [25]:
df_ratings.show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



In [26]:
df_tags.show(3)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
+------+-------+---------------+----------+
only showing top 3 rows



### Статистика данных

In [27]:
print('Количество пользователей поставивших оценку:')
df_ratings.select(countDistinct('userId')).show()

Количество пользователей поставивших оценку:
+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   610|
+----------------------+



In [28]:
print('Количество оцененных фильмов:')
df_ratings.select(countDistinct('movieId')).show()

Количество оцененных фильмов:
+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+



In [29]:
print('Количество комментариев (tags) фильмов:')
df_tags.select(countDistinct('movieId')).show()

Количество комментариев (tags) фильмов:
+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   1572|
+-----------------------+



In [30]:
print('Количество комментариев к фильмам:')
df_tags.select(countDistinct('tag')).show()

Количество комментариев к фильмам:
+-------------------+
|count(DISTINCT tag)|
+-------------------+
|               1589|
+-------------------+



### Предобработка и настройка данных

In [31]:
df1 = df_ratings.alias('df1')
df2 = df_tags.alias('df2')
df3 = df_movies.alias('df3')

In [32]:
df3.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

### Извлечение года из названия

In [33]:
df3 = df3.withColumn('year', regexp_extract(df3['title'], r'\((\d{4})\)', 1))

In [34]:
df3.show()

+-------+--------------------+--------------------+----+
|movieId|               title|              genres|year|
+-------+--------------------+--------------------+----+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|
|      5|Father of the Bri...|              Comedy|1995|
|      6|         Heat (1995)|Action|Crime|Thri...|1995|
|      7|      Sabrina (1995)|      Comedy|Romance|1995|
|      8| Tom and Huck (1995)|  Adventure|Children|1995|
|      9| Sudden Death (1995)|              Action|1995|
|     10|    GoldenEye (1995)|Action|Adventure|...|1995|
|     11|American Presiden...|Comedy|Drama|Romance|1995|
|     12|Dracula: Dead and...|       Comedy|Horror|1995|
|     13|        Balto (1995)|Adventure|Animati...|1995|
|     14|        Nixon (1995)|               Drama|1995|
|     15|Cutthroat Island ...|A

### Извлечение данных о жанрах

In [35]:
split_expr = split(df3['genres'], '\\|')
for i in range(1,11):
    df3 = df3.withColumn('genre{}'.format(i), split_expr.getItem(i-1))

genre_columns = {'genre{}'.format(i) for i in range(1,11)}
for col_name in genre_columns:
    df3 = df3.withColumn(col_name, col(col_name).cast('string').alias(col_name))

genre_count_expr = sum(when(col(col_name) != '0', 1).otherwise(0) for col_name in genre_columns)
df3 = df3.withColumn('genre_count', genre_count_expr)

In [36]:
df3 = df3.drop('genres')

In [37]:
df3.show()

+-------+--------------------+----+---------+---------+--------+------+--------+------+------+------+------+-------+-----------+
|movieId|               title|year|   genre1|   genre2|  genre3|genre4|  genre5|genre6|genre7|genre8|genre9|genre10|genre_count|
+-------+--------------------+----+---------+---------+--------+------+--------+------+------+------+------+-------+-----------+
|      1|    Toy Story (1995)|1995|Adventure|Animation|Children|Comedy| Fantasy|  NULL|  NULL|  NULL|  NULL|   NULL|          5|
|      2|      Jumanji (1995)|1995|Adventure| Children| Fantasy|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          3|
|      3|Grumpier Old Men ...|1995|   Comedy|  Romance|    NULL|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          2|
|      4|Waiting to Exhale...|1995|   Comedy|    Drama| Romance|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          3|
|      5|Father of the Bri...|1995|   Comedy|     NULL|    NULL|  NULL|    NULL|  NULL|  NULL|  N

### Анализ данных

In [38]:
rating_avg = df1.groupBy('movieId').agg(mean('rating').alias('rating_avg'))
rating_avg = rating_avg.withColumnRenamed('movieId', 'movieId_avg')
rating_avg.show(3)

+-----------+-----------------+
|movieId_avg|       rating_avg|
+-----------+-----------------+
|       1580|3.487878787878788|
|       2366|             3.64|
|       3175|             3.58|
+-----------+-----------------+
only showing top 3 rows



In [39]:
rating_count = df1.groupBy('movieId').agg(count('rating').alias('rating_count'))
rating_count = rating_count.withColumnRenamed('movieId', 'movieId_count')
rating_count.show(3)

+-------------+------------+
|movieId_count|rating_count|
+-------------+------------+
|         1580|         165|
|         2366|          25|
|         3175|          75|
+-------------+------------+
only showing top 3 rows



In [40]:
user_rating = df1.groupBy('userId').agg(mean('rating').alias('user_rating_avg'))
user_rating = user_rating.withColumnRenamed('userId', 'userId_avg')
user_rating.show(3)

+----------+------------------+
|userId_avg|   user_rating_avg|
+----------+------------------+
|       148|3.7395833333333335|
|       463| 3.787878787878788|
|       471|             3.875|
+----------+------------------+
only showing top 3 rows



In [41]:
user_count = df1.groupBy('userId').agg(count('rating').alias('user_rating_count'))
user_count = user_count.withColumnRenamed('userId', 'userId_count')
user_count.show(3)

+------------+-----------------+
|userId_count|user_rating_count|
+------------+-----------------+
|         148|               48|
|         463|               33|
|         471|               28|
+------------+-----------------+
only showing top 3 rows



In [42]:
df_movie = rating_avg.join(rating_count, col('movieId_avg') == col('movieId_count'), 'inner').drop('movieId_count')
df_movie.show(3)

+-----------+-----------------+------------+
|movieId_avg|       rating_avg|rating_count|
+-----------+-----------------+------------+
|       1580|3.487878787878788|         165|
|       2366|             3.64|          25|
|       3175|             3.58|          75|
+-----------+-----------------+------------+
only showing top 3 rows



In [43]:
df_user = user_rating.join(user_count, col('userId_avg') == col('userId_count'), 'inner').drop('userId_count')
df_user.show(3)

+----------+------------------+-----------------+
|userId_avg|   user_rating_avg|user_rating_count|
+----------+------------------+-----------------+
|       148|3.7395833333333335|               48|
|       463| 3.787878787878788|               33|
|       471|             3.875|               28|
+----------+------------------+-----------------+
only showing top 3 rows



In [44]:
df_user.sort(col('user_rating_count').desc()).show()

+----------+------------------+-----------------+
|userId_avg|   user_rating_avg|user_rating_count|
+----------+------------------+-----------------+
|       414| 3.391957005189029|             2698|
|       599|2.6420500403551253|             2478|
|       474| 3.398956356736243|             2108|
|       448|2.8473712446351933|             1864|
|       274| 3.235884101040119|             1346|
|       610|3.6885560675883258|             1302|
|        68| 3.233730158730159|             1260|
|       380|3.6732348111658455|             1218|
|       606|3.6573991031390136|             1115|
|       288|3.1459715639810426|             1055|
|       249|3.6964627151051626|             1046|
|       387|3.2585199610516065|             1027|
|       182|3.5112589559877176|              977|
|       307|2.6656410256410257|              975|
|       603|3.5079533404029695|              943|
|       298| 2.363684771033014|              939|
|       177| 3.375553097345133|              904|


In [45]:
df_movie.sort(col('rating_avg').desc()).show()

+-----------+----------+------------+
|movieId_avg|rating_avg|rating_count|
+-----------+----------+------------+
|      26350|       5.0|           1|
|       3795|       5.0|           1|
|      25887|       5.0|           1|
|     157775|       5.0|           1|
|        633|       5.0|           1|
|      33138|       5.0|           1|
|      67618|       5.0|           1|
|        876|       5.0|           1|
|        496|       5.0|           1|
|      27373|       5.0|           1|
|     113829|       5.0|           1|
|      53578|       5.0|           1|
|     152711|       5.0|           1|
|     118894|       5.0|           1|
|         53|       5.0|           2|
|     160644|       5.0|           1|
|        148|       5.0|           1|
|       8911|       5.0|           1|
|     147300|       5.0|           1|
|      84273|       5.0|           1|
+-----------+----------+------------+
only showing top 20 rows



In [46]:
df_movies = df_movie.withColumnRenamed('movieId_avg', 'movieId')

In [47]:
dfk = df3.select('movieId', df3['genre1'])
df2 = df2.join(dfk, on='movieId', how='inner')

In [48]:
df2.show()

+-------+------+----------------+----------+---------+
|movieId|userId|             tag| timestamp|   genre1|
+-------+------+----------------+----------+---------+
|      1|   567|             fun|1525286013|Adventure|
|      1|   474|           pixar|1137206825|Adventure|
|      1|   336|           pixar|1139045764|Adventure|
|      2|   474|            game|1137375552|Adventure|
|      2|    62|  Robin Williams|1528843907|Adventure|
|      2|    62|magic board game|1528843932|Adventure|
|      2|    62|         fantasy|1528843929|Adventure|
|      3|   289|             old|1143424860|   Comedy|
|      3|   289|           moldy|1143424860|   Comedy|
|      5|   474|          remake|1137373903|   Comedy|
|      5|   474|       pregnancy|1137373903|   Comedy|
|      7|   474|          remake|1137375642|   Comedy|
|     11|   474|       president|1137374904|   Comedy|
|     11|   474|        politics|1137374904|   Comedy|
|     14|   474|       president|1137375623|    Drama|
|     14| 

In [49]:
user_tags = df2.groupBy("userId", "movieId").agg(count("tag").alias("tag_count"))
max_tag_count = user_tags.agg({"tag_count": "max"}).collect()[0][0]
user_tags_max = user_tags.filter(user_tags["tag_count"] == max_tag_count)

user_tags_max.show()

+------+-------+---------+
|userId|movieId|tag_count|
+------+-------+---------+
|   599|    296|      173|
+------+-------+---------+



In [50]:
user_tagcount = df2.groupBy("userId").agg(count("tag").alias("tag_count"))
user_tagcount.show(3)

+------+---------+
|userId|tag_count|
+------+---------+
|   513|        3|
|   193|       20|
|   300|        1|
+------+---------+
only showing top 3 rows



In [51]:
movie_tagcount = df2.groupBy("movieId").agg(count("tag").alias("tag_count"))
cols = ["movieId", "genre_count", "genre1"]
dfs = df3.select(cols)
movie_tagcount = movie_tagcount.join(dfs, on="movieId", how="inner")

movie_tagcount.show(3)

+-------+---------+-----------+------+
|movieId|tag_count|genre_count|genre1|
+-------+---------+-----------+------+
|    471|        1|          1|Comedy|
|   1088|        2|          3| Drama|
|   1580|        1|          3|Action|
+-------+---------+-----------+------+
only showing top 3 rows



In [52]:
df3 = df3.join(df_movie, on="movieId", how="inner")
df3 = df3.withColumnRenamed("avg_rating", "rating")
df3.show(3)

AnalysisException: [UNRESOLVED_USING_COLUMN_FOR_JOIN] USING column `movieId` cannot be resolved on the right side of the join. The right-side columns: [`movieId_avg`, `rating_avg`, `rating_count`].