In [9]:
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.4.1


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [5]:
joined_df = film_category_df.join(category_df, 'category_id').select(category_df.name)

category_count = (
    joined_df.groupby('name')
    .count()
    .withColumnsRenamed({'count': 'total'})
    .orderBy(F.desc('total'))
)

category_count.show()

+-----------+-----+
|       name|total|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+


2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [13]:
# тут має бути розвʼязок задачі
joined_df = (rental_df
                .join(inventory_df, 'inventory_id')
                .join(film_actor_df, 'film_id')
                .join(actor_df, 'actor_id')
                .withColumn('actor_name', F.concat(actor_df.first_name , F.lit(' ') , actor_df.last_name))
                .select('actor_name')
          )
final_df = (joined_df
                .groupby('actor_name')
                .count()
                .orderBy(F.desc('count'))
                .limit(10)
                .select('actor_name')
)

final_df.show()

+------------------+
|        actor_name|
+------------------+
|       SUSAN DAVIS|
|    GINA DEGENERES|
|    MATTHEW CARREY|
|       MARY KEITEL|
|ANGELA WITHERSPOON|
|       WALTER TORN|
|       HENRY BERRY|
|       JAYNE NOLTE|
|        VAL BOLGER|
|     SANDRA KILMER|
+------------------+


3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [39]:
joined_df = (film_df
                .join(film_category_df, 'film_id')
                .join(category_df, 'category_id')
             )

final_df = (joined_df
                .withColumn('total', joined_df.rental_duration * joined_df.rental_rate)
                .groupby('name')
                .sum('total')
                .orderBy(F.desc('sum(total)'))
                .limit(1)
                .select('name')
)

final_df.show()

+-------+
|   name|
+-------+
|Foreign|
+-------+


4.
Вивести назви фільмів, яких не має в inventory.

In [44]:
# тут має бути розвʼязок задачі
df = (film_df
             .join(inventory_df, 'film_id', 'left')
             .where(inventory_df.film_id.isNull())
             .select('title')
)

df.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+


5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [50]:
# тут має бути розвʼязок задачі
joined_df = (film_actor_df
                .join(film_category_df, 'film_id')
                .join(category_df, 'category_id')
                .join(actor_df, 'actor_id')
          )
final_df = (joined_df
                .where(joined_df.name == 'Children')
                .withColumn('actor_name', F.concat(joined_df.first_name , F.lit(' ') , joined_df.last_name))
                .groupBy('actor_name')
                .count()
                .orderBy(F.desc('count'))
                .limit(3)
                .select('actor_name')
            )

final_df.show()

+------------+
|  actor_name|
+------------+
|HELEN VOIGHT|
| SUSAN DAVIS|
|  MARY TANDY|
+------------+


Stop Spark session:

In [51]:
spark.stop()