### Данные для работы

- sp_movies.csv
- sp_ratings.csv
- sp_tags.csv 

### Подключение PySpark, загрузка библиотек, настройка изображений

In [11]:
!pip install pyspark



In [20]:
import numpy as np
import pandas as pd

import seaborn as sns
from matplotlib import pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, countDistinct, count, size , when
from pyspark.sql.functions import regexp_extract, split

sns.set_style("darkgrid")

params = {"legend.fontsize" : "medium", "figure.figsize" : (10, 8), "figure.dpi" : 100, "axes.labelsize" : "medium", "axes.titlesize" : "medium", "xtick.labelsize": "medium", "ytick.labelsize" : "medium"}
plt.rcParams.update(params)


### Запуск сессии PySpark

In [13]:
spark = SparkSession.builder.appName("EDA Films").getOrCreate()

### Загрузка и обзор датасета

In [14]:
df_movies = spark.read.csv("sp_movies.csv", header=True, inferSchema=True)
df_ratings = spark.read.csv("sp_ratings.csv", header=True, inferSchema=True)
df_tags = spark.read.csv("sp_tags.csv", header=True, inferSchema=True)

                                                                                

In [15]:
df_movies.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [16]:
df_ratings.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
+------+-------+------+----------+
only showing top 3 rows



In [17]:
df_tags.show(3)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
+------+-------+---------------+----------+
only showing top 3 rows



### Статистика данных

In [21]:
print("Количество пользоваелей поставивших оценку:")
df_ratings.select(countDistinct("userId")).show()

Количество пользоваелей поставивших оценку:




+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                270896|
+----------------------+



                                                                                

In [23]:
print("Количество оцененных фильмов:")
df_ratings.select(countDistinct("movieId")).show()

Количество оцененных фильмов:




+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                  45115|
+-----------------------+



                                                                                

In [24]:
print("Количество комментариев (tags) фильмов:")
df_tags.select(countDistinct("movieId")).show()

Количество комментариев (tags) фильмов:
+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   1572|
+-----------------------+



In [25]:
print("Количество комментариев к фильмам:")
df_tags.select(countDistinct("tag")).show()

Количество комментариев к фильмам:
+-------------------+
|count(DISTINCT tag)|
+-------------------+
|               1589|
+-------------------+



### Предобработка и настройка данных

In [26]:
df1 = df_ratings.alias("df1")
df2 = df_tags.alias("df2")
df3 = df_movies.alias("df3")

In [29]:
df3.show(20)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

### Извлечение года из названия

In [30]:
df3 = df3.withColumn("year", regexp_extract(df3["title"], r"\((\d{4})\)", 1))

In [31]:
df3.show(3)

+-------+--------------------+--------------------+----+
|movieId|               title|              genres|year|
+-------+--------------------+--------------------+----+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|
+-------+--------------------+--------------------+----+
only showing top 3 rows



### Извлечение данных о жанрах

In [33]:
split_expr = split(df3["genres"], "\\|")
for i in range(1,11):
    df3 = df3.withColumn("genre{}".format(i), split_expr.getItem(i-1))

genre_columns = ["genre{}".format(i) for i in range(1, 11)]
for col_name in genre_columns:
    df3 = df3.withColumn(col_name, col(col_name).cast("string").alias(col_name))

genre_count_expr = sum(when(col(col_name) != "0", 1).otherwise(0) for col_name in genre_columns)
df3 = df3.withColumn("genre_count", genre_count_expr)

In [35]:
df3 = df3.drop("genres")

In [36]:
df3.show()

+-------+--------------------+----+---------+---------+--------+------+--------+------+------+------+------+-------+-----------+
|movieId|               title|year|   genre1|   genre2|  genre3|genre4|  genre5|genre6|genre7|genre8|genre9|genre10|genre_count|
+-------+--------------------+----+---------+---------+--------+------+--------+------+------+------+------+-------+-----------+
|      1|    Toy Story (1995)|1995|Adventure|Animation|Children|Comedy| Fantasy|  NULL|  NULL|  NULL|  NULL|   NULL|          5|
|      2|      Jumanji (1995)|1995|Adventure| Children| Fantasy|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          3|
|      3|Grumpier Old Men ...|1995|   Comedy|  Romance|    NULL|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          2|
|      4|Waiting to Exhale...|1995|   Comedy|    Drama| Romance|  NULL|    NULL|  NULL|  NULL|  NULL|  NULL|   NULL|          3|
|      5|Father of the Bri...|1995|   Comedy|     NULL|    NULL|  NULL|    NULL|  NULL|  NULL|  N

23/11/15 00:29:53 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 317099 ms exceeds timeout 120000 ms
23/11/15 00:29:53 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/15 00:29:53 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$