In [128]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, count
from pyspark.sql.functions import round

In [132]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [133]:
print(spark.version)

3.5.3


In [134]:
actor_df = spark.read.csv("./data/actor.csv", header=True, inferSchema=True)
address_df = spark.read.csv("./data/address.csv", header=True, inferSchema=True)
category_df = spark.read.csv("./data/category.csv", header=True, inferSchema=True)
city_df = spark.read.csv("./data/city.csv", header=True, inferSchema=True)
country_df = spark.read.csv("./data/country.csv", header=True, inferSchema=True)
customer_df = spark.read.csv("./data/customer.csv", header=True, inferSchema=True)
film_df = spark.read.csv("./data/film.csv", header=True, inferSchema=True)
film_actor_df = spark.read.csv("./data/film_actor.csv", header=True, inferSchema=True)
film_category_df = spark.read.csv("./data/film_category.csv", header=True, inferSchema=True)
inventory_df = spark.read.csv("./data/inventory.csv", header=True, inferSchema=True)
language_df = spark.read.csv("./data/language.csv", header=True, inferSchema=True)
payment_df = spark.read.csv("./data/payment.csv", header=True, inferSchema=True)
rental_df = spark.read.csv("./data/rental.csv", header=True, inferSchema=True)
staff_df = spark.read.csv("./data/staff.csv", header=True, inferSchema=True)
store_df = spark.read.csv("./data/store.csv", header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [152]:
count_film_by_category_df = (
    film_category_df
    .join(category_df, film_category_df["category_id"] == category_df["category_id"])
    .groupBy("name")
    .count()
    .orderBy(col("count").desc())
    .show())

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [72]:
top_10_actors = (
    actor_df
    .join(film_actor_df, actor_df["actor_id"] == film_actor_df["actor_id"])
    .join(film_df, film_actor_df["film_id"] == film_df["film_id"])
    .join(inventory_df, film_df["film_id"] == inventory_df["film_id"])
    .join(rental_df, inventory_df["inventory_id"] == rental_df["inventory_id"])
    .groupBy(F.concat(actor_df["first_name"], F.lit(" "), actor_df["last_name"]).alias("actor_full_name"))
    .agg(F.count(rental_df["rental_id"]).alias("rental_count"))
    .orderBy(F.desc("rental_count"))
    .select("actor_full_name")
    .limit(10)
    .show()
)

+------------------+
|   actor_full_name|
+------------------+
|       SUSAN DAVIS|
|    GINA DEGENERES|
|    MATTHEW CARREY|
|       MARY KEITEL|
|ANGELA WITHERSPOON|
|       WALTER TORN|
|       HENRY BERRY|
|       JAYNE NOLTE|
|        VAL BOLGER|
|     SANDRA KILMER|
+------------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [99]:
rental_film_df = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id)

category_cost = (
    rental_film_df
    .join(film_category_df, rental_film_df.film_id == film_category_df.film_id)
    .join(category_df, film_category_df.category_id == category_df.category_id)
    .join(payment_df, rental_df.rental_id == payment_df.rental_id)
    .groupBy("name").sum("amount").alias("total_cost")
)

category_revenue_rounded = (
    category_cost
    .withColumn("total_cost", round(category_cost["sum(amount)"], 2))
    .select("name", "total_cost")
    .orderBy("total_cost", ascending=False).show(1)
)

+------+----------+
|  name|total_cost|
+------+----------+
|Sports|   5314.21|
+------+----------+
only showing top 1 row



4.
Вивести назви фільмів, яких не має в inventory.

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

+---------------+
|actor_full_name|
+---------------+
|   HELEN VOIGHT|
|     MARY TANDY|
|    WHOOPI HURT|
+---------------+
only showing top 3 rows



Stop Spark session:

In [130]:
spark.stop()