In [2]:
from pyspark.sql import SparkSession, functions as F, types as T

In [3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
print(spark.version)

3.4.1


In [5]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [89]:
categories_filmc_cnt_df = film_category_df \
    .groupBy("category_id") \
    .agg(F.count("film_id").alias("cnt_films"))

categories_filmc_cnt_df \
    .join(category_df, on="category_id", how="left") \
    .select("name", "cnt_films") \
    .sort(categories_filmc_cnt_df["cnt_films"].desc()) \
    .show()

+-----------+---------+
|       name|cnt_films|
+-----------+---------+
|     Sports|       74|
|    Foreign|       73|
|     Family|       69|
|Documentary|       68|
|  Animation|       66|
|     Action|       64|
|        New|       63|
|      Drama|       62|
|      Games|       61|
|     Sci-Fi|       61|
|   Children|       60|
|     Comedy|       58|
|     Travel|       57|
|   Classics|       57|
|     Horror|       56|
|      Music|       51|
+-----------+---------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [91]:
res_df = rental_df \
    .join(inventory_df, on="inventory_id", how="left") \
    .join(film_actor_df, on="film_id", how="left") \
    .join(actor_df, on="actor_id", how="left") \
    .groupBy("actor_id", "first_name", "last_name") \
    .agg(F.count("actor_id").alias("cnt")) \
    .select("first_name", "last_name", "cnt")

res_df.sort(res_df["cnt"].desc()).limit(10).show()

+----------+-----------+---+
|first_name|  last_name|cnt|
+----------+-----------+---+
|      GINA|  DEGENERES|753|
|   MATTHEW|     CARREY|678|
|      MARY|     KEITEL|674|
|    ANGELA|WITHERSPOON|654|
|    WALTER|       TORN|640|
|     HENRY|      BERRY|612|
|     JAYNE|      NOLTE|611|
|       VAL|     BOLGER|605|
|    SANDRA|     KILMER|604|
|      SEAN|    GUINESS|599|
+----------+-----------+---+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [92]:
res_df = payment_df \
    .join(rental_df, on="rental_id", how="left") \
    .join(inventory_df, on="inventory_id", how="left") \
    .join(film_category_df, on="film_id", how="left") \
    .join(category_df, on="category_id", how="left") \
    .groupBy("name") \
    .agg(F.sum("amount").alias("sum_amount")) \
    .orderBy(["sum_amount"], ascending=0) \
    .limit(1).show()

+------+-----------------+
|  name|       sum_amount|
+------+-----------------+
|Sports|5314.209999999847|
+------+-----------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [93]:
film_id_inv_df = inventory_df.select("film_id").distinct()
film_id_df = film_df.select("film_id").distinct()

need_film_id_df = film_id_df.subtract(film_id_inv_df)

res_df = film_df \
    .join(need_film_id_df, on="film_id", how="inner") \
    .select("film_id", "title") \
    .show()

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|     14|      ALICE FANTASIA|
|     33|         APOLLO TEEN|
|     36|      ARGONAUTS TOWN|
|     38|       ARK RIDGEMONT|
|     41|ARSENIC INDEPENDENCE|
|     87|   BOONDOCK BALLROOM|
|    108|       BUTCH PANTHER|
|    128|       CATCH AMISTAD|
|    144| CHINATOWN GLADIATOR|
|    148|      CHOCOLATE DUCK|
|    171|COMMANDMENTS EXPRESS|
|    192|    CROSSING DIVORCE|
|    195|     CROWDS TELEMARK|
|    198|    CRYSTAL BREAKING|
|    217|          DAZED PUNK|
|    221|DELIVERANCE MULHO...|
|    318|   FIREHOUSE VIETNAM|
|    325|       FLOATS GARDEN|
|    332|FRANKENSTEIN STRA...|
|    359|  GLADIATOR WESTWARD|
+-------+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [94]:
children_films_df = category_df \
    .join(film_category_df, on="category_id", how="left") \
    .join(film_actor_df, on="film_id", how="left") \
    .filter(category_df.name == "Children") \
    .groupBy(film_actor_df.actor_id) \
    .agg(F.count("*").alias("cnt")) \
    .orderBy("cnt", ascending=0) \
    .limit(3)

res_df = children_films_df \
    .join(actor_df, on="actor_id", how="left") \
    .select(F.concat(actor_df.first_name, F.lit(" "), actor_df.last_name) \
    .alias("name")) \
    .show()

+------------+
|        name|
+------------+
|HELEN VOIGHT|
|  RALPH CRUZ|
| WHOOPI HURT|
+------------+



Stop Spark session:

In [95]:
spark.stop()