In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.4.1


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [5]:
from pyspark.sql.functions import countDistinct

result1_df = film_category_df.join(category_df, film_category_df.category_id == category_df.category_id) \
    .groupBy(category_df.name) \
    .agg(countDistinct(film_category_df.film_id).alias("film_count")) \
    .orderBy("film_count", ascending=False)
      
result1_df.show()

+-----------+----------+
|       name|film_count|
+-----------+----------+
|     Sports|        74|
|    Foreign|        73|
|     Family|        69|
|Documentary|        68|
|  Animation|        66|
|     Action|        64|
|        New|        63|
|      Drama|        62|
|      Games|        61|
|     Sci-Fi|        61|
|   Children|        60|
|     Comedy|        58|
|     Travel|        57|
|   Classics|        57|
|     Horror|        56|
|      Music|        51|
+-----------+----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [6]:
from pyspark.sql.functions import concat_ws, count

result2_df = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id) \
    .join(film_actor_df, inventory_df.film_id == film_actor_df.film_id) \
    .join(actor_df, film_actor_df.actor_id == actor_df.actor_id) \
    .withColumn("actor", concat_ws(' ', actor_df.first_name, actor_df.last_name)) \
    .groupBy("actor") \
    .agg(count(rental_df.rental_id).alias("rental_count")) \
    .orderBy("rental_count", ascending=False) \
    .limit(10)

result2_df.show()

+------------------+------------+
|             actor|rental_count|
+------------------+------------+
|       SUSAN DAVIS|         825|
|    GINA DEGENERES|         753|
|    MATTHEW CARREY|         678|
|       MARY KEITEL|         674|
|ANGELA WITHERSPOON|         654|
|       WALTER TORN|         640|
|       HENRY BERRY|         612|
|       JAYNE NOLTE|         611|
|        VAL BOLGER|         605|
|     SANDRA KILMER|         604|
+------------------+------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [7]:
from pyspark.sql.functions import sum

result3_df = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id) \
    .join(film_category_df, inventory_df.film_id == film_category_df.film_id) \
    .join(category_df, film_category_df.category_id == category_df.category_id) \
    .join(payment_df, rental_df.rental_id == payment_df.rental_id) \
    .groupBy(category_df.name) \
    .agg(sum(payment_df.amount).alias("total_amount")) \
    .orderBy("total_amount", ascending=False) \
    .limit(1)

result3_df.show()

+------+-----------------+
|  name|     total_amount|
+------+-----------------+
|Sports|5314.209999999848|
+------+-----------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [8]:
result4_df = film_df.join(inventory_df, film_df.film_id == inventory_df.film_id, how='left') \
    .filter(inventory_df.film_id.isNull()) \
    .select(film_df.title).distinct()

result4_df.show()

+--------------------+
|               title|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [9]:
result5_df = actor_df.join(film_actor_df, actor_df.actor_id == film_actor_df.actor_id) \
    .join(film_category_df, film_actor_df.film_id == film_category_df.film_id) \
    .join(category_df, film_category_df.category_id == category_df.category_id) \
    .filter(category_df.name == 'Children') \
    .withColumn("actor", concat_ws(' ', actor_df.first_name, actor_df.last_name)) \
    .groupBy("actor") \
    .agg(count(film_actor_df.film_id).alias("film_count")) \
    .orderBy("film_count", ascending=False) \
    .limit(3)

result5_df.show()

+------------+----------+
|       actor|film_count|
+------------+----------+
|HELEN VOIGHT|         7|
| SUSAN DAVIS|         6|
|  MARY TANDY|         5|
+------------+----------+



Stop Spark session:

In [10]:
spark.stop()