In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.4.1


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [14]:
film_category_df.groupBy('category_id').count()\
    .join(category_df, how='left', on='category_id')\
        .select('name', 'count').sort('count', ascending=False).show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [48]:
## Count number of rental for each film_id which was in inventory
rental_count_df = rental_df.join(inventory_df, how='left', on='inventory_id')\
                           .select('film_id').groupBy('film_id').count()

actor_df.join(film_actor_df, how='left', on='actor_id')\
    .join(rental_count_df, how='left', on='film_id')\
    .groupBy('actor_id').agg({'count': 'sum',
                              'first_name': 'max',
                              'last_name': 'max',})\
    .select(F.col('max(first_name)').alias('first_name'),
            F.col('max(last_name)').alias('last_name'),
            F.col('sum(count)').alias('count'))\
    .sort('sum(count)', ascending=False).limit(10).show()

+----------+-----------+-----+
|first_name|  last_name|count|
+----------+-----------+-----+
|      GINA|  DEGENERES|  753|
|   MATTHEW|     CARREY|  678|
|      MARY|     KEITEL|  674|
|    ANGELA|WITHERSPOON|  654|
|    WALTER|       TORN|  640|
|     HENRY|      BERRY|  612|
|     JAYNE|      NOLTE|  611|
|       VAL|     BOLGER|  605|
|    SANDRA|     KILMER|  604|
|      SEAN|    GUINESS|  599|
+----------+-----------+-----+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [61]:
film_rental_revenue_df = payment_df.join(rental_df, on='rental_id')\
                                .join(inventory_df, on='inventory_id')\
                                .groupBy('film_id').agg(F.sum('amount').alias('rental_revenue'))

film_rental_revenue_df.join(film_category_df, on='film_id')\
                        .join(category_df, on='category_id')\
                        .groupBy('category_id')\
                        .agg(F.sum('rental_revenue').alias('category_rental_revenue'), 
                             F.max('name').alias('name'))\
                        .select('name', 'category_rental_revenue')\
                        .sort('category_rental_revenue', ascending=False).show()


+-----------+-----------------------+
|       name|category_rental_revenue|
+-----------+-----------------------+
|     Sports|                5314.21|
|     Sci-Fi|                4756.98|
|  Animation|      4656.299999999999|
|      Drama|      4587.390000000001|
|     Comedy|      4383.580000000001|
|     Action|      4375.849999999999|
|        New|     4361.5700000000015|
|      Games|      4281.329999999998|
|    Foreign|                4270.67|
|     Family|      4226.070000000001|
|Documentary|     4217.5199999999995|
|     Horror|     3722.5399999999995|
|   Children|     3655.5499999999997|
|   Classics|      3639.589999999999|
|     Travel|     3549.6400000000003|
|      Music|     3417.7200000000003|
+-----------+-----------------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [70]:
film_df.select('title').distinct()\
    .exceptAll(film_df.join(inventory_df, on='film_id')
                      .select('title').distinct()
).show()

+--------------------+
|               title|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [87]:
children_films = category_df.filter(F.col('name') == 'Children')\
                        .join(film_category_df, on='category_id')

actor_df.join(film_actor_df, how='left', on='actor_id')\
        .join(children_films, how='left', on='film_id')\
        .groupBy('actor_id').agg({'film_id': 'count',
                                  'first_name': 'max',
                                  'last_name': 'max',})\
        .select(F.col('max(first_name)').alias('first_name'),
                F.col('max(last_name)').alias('last_name'),
                F.col('count(film_id)').alias('count'))\
        .sort('count', ascending=False).limit(3).show()

+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|      GINA|DEGENERES|   42|
|    WALTER|     TORN|   41|
|      MARY|   KEITEL|   40|
+----------+---------+-----+



Stop Spark session:

In [None]:
spark.stop()