In [63]:
from pyspark.sql import SparkSession, functions as f, types as t

In [64]:
spark = SparkSession.builder.appName('lect_13_home_task').getOrCreate()

In [65]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [66]:
category_df\
    .join(film_category_df,
          "category_id", "left")\
    .groupby(category_df.name)\
    .agg(f.count(category_df.name).alias("film_count"))\
    .orderBy(f.col("film_count").desc())\
    .select(f.col("name").alias("Film category"),
            f.col("film_count").alias("Number of films"))\
    .show()

+-------------+---------------+
|Film category|Number of films|
+-------------+---------------+
|       Sports|             74|
|      Foreign|             73|
|       Family|             69|
|  Documentary|             68|
|    Animation|             66|
|       Action|             64|
|          New|             63|
|        Drama|             62|
|        Games|             61|
|       Sci-Fi|             61|
|     Children|             60|
|       Comedy|             58|
|       Travel|             57|
|     Classics|             57|
|       Horror|             56|
|        Music|             51|
+-------------+---------------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [67]:
rental_df.join(inventory_df, "inventory_id")\
    .groupby("film_id")\
    .agg(f.count("film_id").alias("rent_count"))\
    .join(film_actor_df, "film_id")\
    .groupby("actor_id")\
    .agg(f.sum("rent_count").alias("film_rent_count"))\
    .join(actor_df, "actor_id")\
    .select(
        f.col("first_name").alias("First name"),
        f.col("last_name").alias("Last name"),
        f.col("film_rent_count").alias("Number of rents of the actor's films"))\
    .orderBy(f.col("Film_rent_count").desc())\
    .limit(10)\
    .show()

+----------+-----------+------------------------------------+
|First name|  Last name|Number of rents of the actor's films|
+----------+-----------+------------------------------------+
|      GINA|  DEGENERES|                                 753|
|   MATTHEW|     CARREY|                                 678|
|      MARY|     KEITEL|                                 674|
|    ANGELA|WITHERSPOON|                                 654|
|    WALTER|       TORN|                                 640|
|     HENRY|      BERRY|                                 612|
|     JAYNE|      NOLTE|                                 611|
|       VAL|     BOLGER|                                 605|
|    SANDRA|     KILMER|                                 604|
|      SEAN|    GUINESS|                                 599|
+----------+-----------+------------------------------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [68]:
payment_df\
    .join(rental_df, "rental_id")\
    .groupby("inventory_id")\
    .agg(f.sum(f.col("amount")).alias("inventory_rent_amount"))\
    .join(inventory_df, "inventory_id")\
    .groupby("film_id")\
    .agg(f.sum(f.col("inventory_rent_amount")).alias("film_rent_amount"))\
    .join(film_category_df, "film_id")\
    .groupby("category_id")\
    .agg(f.sum(f.col("film_rent_amount")).alias("category_rent_amount"))\
    .join(category_df, "category_id")\
    .orderBy(f.col("category_rent_amount").desc())\
    .limit(1)\
    .select(f.col("name").alias("Film category"))\
    .show()

+-------------+
|Film category|
+-------------+
|       Sports|
+-------------+



4.
Вивести назви фільмів, яких не має в inventory.
Запит має бути без оператора IN

In [69]:
film_df.join(inventory_df, "film_id", "left")\
    .filter(inventory_df.film_id.isNull())\
    .select(f.col("title").alias("Film"))\
    .show()

+--------------------+
|                Film|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [70]:
film_category_df.join(category_df, "category_id")\
    .join(film_actor_df, "film_id")\
    .filter(category_df.name == 'Children')\
    .groupby("actor_id")\
    .agg(f.count(f.col("actor_id")).alias("film_count"))\
    .orderBy(f.col("film_count").desc())\
    .limit(3)\
    .join(actor_df, "actor_id")\
    .select(f.col("first_name").alias("First name"),
        f.col("last_name").alias("Last name"),
        f.col("film_count").alias("Number of films"))\
    .show()

+----------+---------+---------------+
|First name|Last name|Number of films|
+----------+---------+---------------+
|     HELEN|   VOIGHT|              7|
|     RALPH|     CRUZ|              5|
|    WHOOPI|     HURT|              5|
+----------+---------+---------------+



6.
Вивести міста з кількістю активних та неактивних клієнтів
(в активних customer.active = 1).
Результат відсортувати за кількістю неактивних клієнтів за спаданням.

In [71]:
cnt_cond = lambda cond: f.sum(f.when(cond, 1).otherwise(0))
customer_df.join(address_df, "address_id")\
    .groupby("city_id")\
    .agg(cnt_cond(f.col('active') == 1).alias('active'),
         cnt_cond(f.col('active') ==0).alias('inactive'))\
    .join(city_df, "city_id")\
    .orderBy(f.col("inactive").desc(), f.col("city"))\
    .select(f.col("city").alias("City"),
            f.col("active").alias("Number of active clients"),
            f.col("inactive").alias("Number of inactive clients"))\
    .show(1000)

+--------------------+------------------------+--------------------------+
|                City|Number of active clients|Number of inactive clients|
+--------------------+------------------------+--------------------------+
|              Amroha|                       0|                         1|
|             Bat Yam|                       0|                         1|
|    Charlotte Amalie|                       0|                         1|
|       Coatzacoalcos|                       0|                         1|
|              Daxian|                       0|                         1|
|              Kamyin|                       0|                         1|
|              Ktahya|                       0|                         1|
|          Kumbakonam|                       0|                         1|
|           Najafabad|                       0|                         1|
|           Pingxiang|                       0|                         1|
|     Southend-on-Sea|   