In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=ace48be5c5cf7b5f7ede178b7f23c00d3fced115d546a7e4b1c7fc4c4d0319bb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# 1) Чтение данных


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.functions import mean, col, when, max, sum, year, count, avg

# Создание SparkSession
spark = SparkSession.builder.appName("Read CSV Example").getOrCreate()

# Чтение CSV-файла
df_movies = spark.read.csv("/content/sample_data/movies.csv", header=True, inferSchema=True)
df_movie_actors = spark.read.csv("/content/sample_data/movie_actors.csv", header=True, inferSchema=True)
df_actors = spark.read.csv("/content/sample_data/actors.csv", header=True, inferSchema=True)



In [None]:
# Печать схемы DataFrame
df_movies.printSchema()

# Показ первых 5 строк
df_movies.show(5)

root
 |-- movie_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- budget: double (nullable = true)

+--------+-------+------+------------+-------------+
|movie_id|  title| genre|release_date|       budget|
+--------+-------+------+------------+-------------+
|       1|Movie_1|Horror|  2000-12-31|8.660058311E7|
|       2|Movie_2|Comedy|  2001-12-31|1.274740083E7|
|       3|Movie_3|Action|  2002-12-31| 1.80157747E7|
|       4|Movie_4| Drama|  2003-12-31|4.817612061E7|
|       5|Movie_5| Drama|  2004-12-31| 7.40501611E7|
+--------+-------+------+------------+-------------+
only showing top 5 rows



In [None]:
# Печать схемы DataFrame
df_movie_actors.printSchema()

# Показ первых 5 строк
df_movie_actors.show(5)

root
 |-- movie_id: integer (nullable = true)
 |-- actor_id: integer (nullable = true)

+--------+--------+
|movie_id|actor_id|
+--------+--------+
|       1|      25|
|      16|       5|
|       6|      16|
|      16|      11|
|      14|      21|
+--------+--------+
only showing top 5 rows



In [None]:
# Печать схемы DataFrame
df_actors.printSchema()

# Показ первых 5 строк
df_actors.show(5)

root
 |-- actor_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- country: string (nullable = true)

+--------+-------+----------+-------+
|actor_id|   name|birth_date|country|
+--------+-------+----------+-------+
|       1|Actor_1|1960-12-31| Canada|
|       2|Actor_2|1962-12-31|     UK|
|       3|Actor_3|1964-12-31|     UK|
|       4|Actor_4|1966-12-31|     UK|
|       5|Actor_5|1968-12-31|  India|
+--------+-------+----------+-------+
only showing top 5 rows



# 2) Создание временных таблиц

#### Создадим временные таблицы для данных о фильмах, актерах и связях между ними.

In [None]:
df_actors.createOrReplaceTempView("actors")
df_movie_actors.createOrReplaceTempView("movie_actors")
df_movies.createOrReplaceTempView("movies")


# 3)  SQL запросы

#### Найдём топ-5 жанров по количеству фильмов.


In [None]:
res = spark.sql("""
SELECT genre, COUNT(*) as cnt_movies
FROM movies
GROUP BY genre
ORDER BY cnt_movies DESC
""")

# Показ результатов
res.show()

+------+----------+
| genre|cnt_movies|
+------+----------+
| Drama|         6|
|Action|         6|
|Comedy|         4|
|Horror|         2|
|Sci-Fi|         2|
+------+----------+



#### Найдём актера с наибольшим количеством фильмов.


In [None]:
res = spark.sql("""


WITH t1 AS
(SELECT a.actor_id, COUNT(*) as cnt_movies
FROM actors a
INNER JOIN movie_actors ma
ON a.actor_id = ma.actor_id
GROUP BY a.actor_id
ORDER BY cnt_movies DESC
LIMIT 1)
SELECT name FROM actors WHERE actor_id = (SELECT t1.actor_id FROM t1)
""")



# Показ результатов
res.show()

+--------+
|    name|
+--------+
|Actor_17|
+--------+



#### Подсчитаем средний бюджет фильмов по жанрам.


In [None]:
# Выполнение JOIN-запроса с использованием SQL
res = spark.sql("""
SELECT genre, AVG(budget) as avg_budget
FROM movies
GROUP BY genre
ORDER BY avg_budget DESC
""")

# Показ результатов
res.show()

+------+--------------------+
| genre|          avg_budget|
+------+--------------------+
|Horror|      8.7281876775E7|
|Sci-Fi|       7.809715175E7|
| Drama| 6.076021856166667E7|
|Comedy|     5.20709662225E7|
|Action|2.7492742561666667E7|
+------+--------------------+



#### Найдём фильмы, в которых снялось более одного актера из одной страны.

In [None]:
res = spark.sql("""

SELECT m.title, a.country, COUNT(*) as num_actors
FROM movies m
JOIN movie_actors ma ON m.movie_id = ma.movie_id
JOIN actors a ON ma.actor_id = a.actor_id
GROUP BY m.title, a.country
HAVING num_actors > 1
""")

# Показ результатов
res.show()

+--------+---------+----------+
|   title|  country|num_actors|
+--------+---------+----------+
| Movie_7|    India|         2|
| Movie_3|      USA|         2|
|Movie_10|       UK|         2|
|Movie_15|    India|         2|
|Movie_18|Australia|         2|
| Movie_1|    India|         3|
| Movie_2|      USA|         2|
| Movie_7|      USA|         2|
|Movie_10|      USA|         2|
+--------+---------+----------+



#  4) Результаты