In [15]:
from pyspark.sql import SparkSession, functions as f, types as t

In [16]:
spark = SparkSession.builder.master('local[3]').getOrCreate()

In [17]:
print(spark.version)

3.3.0


In [18]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [103]:
films_count_by_category_df = category_df\
    .join(film_category_df,
        category_df.category_id == film_category_df.category_id, 'left'
    ).select(
        category_df.category_id,
        category_df.name.alias("category_name"),
    ).groupBy(
        category_df.category_id,
        f.col("category_name"),
    ).count(
    ).sort(
        f.col("count").desc(),
    ).persist()

In [105]:
films_count_by_category_df.show(films_count_by_category_df.count(), False)

+-----------+-------------+-----+
|category_id|category_name|count|
+-----------+-------------+-----+
|15         |Sports       |74   |
|9          |Foreign      |73   |
|8          |Family       |69   |
|6          |Documentary  |68   |
|2          |Animation    |66   |
|1          |Action       |64   |
|13         |New          |63   |
|7          |Drama        |62   |
|14         |Sci-Fi       |61   |
|10         |Games        |61   |
|3          |Children     |60   |
|5          |Comedy       |58   |
|4          |Classics     |57   |
|16         |Travel       |57   |
|11         |Horror       |56   |
|12         |Music        |51   |
+-----------+-------------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [67]:
top10_popular_actors_df = film_actor_df\
    .join(actor_df, film_actor_df.actor_id == actor_df.actor_id, 'inner')\
    .select(
        actor_df.actor_id,
        actor_df.first_name,
        actor_df.last_name
    ).groupBy(
        actor_df.actor_id,
        actor_df.first_name,
        actor_df.last_name
    ).agg(
        f.count(actor_df.actor_id).alias("total_film_count")
    ).sort(
        f.col("total_film_count").desc(),
    ).limit(10)

In [68]:
top10_popular_actors_df.show()

+--------+----------+-----------+----------------+
|actor_id|first_name|  last_name|total_film_count|
+--------+----------+-----------+----------------+
|     107|      GINA|  DEGENERES|              42|
|     102|    WALTER|       TORN|              41|
|     198|      MARY|     KEITEL|              40|
|     181|   MATTHEW|     CARREY|              39|
|      23|    SANDRA|     KILMER|              37|
|      81|  SCARLETT|      DAMON|              36|
|     144|    ANGELA|WITHERSPOON|              35|
|      60|     HENRY|      BERRY|              35|
|      13|       UMA|       WOOD|              35|
|      37|       VAL|     BOLGER|              35|
+--------+----------+-----------+----------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [160]:
# I want fetch category_name. But when I add this to groupBy clause then explain shows
#   that category_name will be used to calculate hash. So I try to avoid this by second category_df join
most_expensive_film_category_df = category_df\
    .join(film_category_df, category_df.category_id == film_category_df.category_id, 'inner')\
    .join(film_df, film_category_df.film_id == film_df.film_id, 'inner')\
    .groupBy(
        category_df.category_id,
    ).agg(
        f.sum(film_df.replacement_cost).alias("sum_production_cost")
    ).sort(
        f.col("sum_production_cost").desc(),
    ).limit(1)\
    .join(
        category_df.alias("category_lookup"),
        category_df.category_id == f.col("category_lookup.category_id"),
        'inner'
    ).select(
        f.col("category_lookup.category_id"),
        f.col("category_lookup.name").alias('category_name'),
        f.col("sum_production_cost")
    )

In [162]:
most_expensive_film_category_df.first()

Row(category_id=15, category_name='Sports', sum_production_cost=1509.2600000000004)

4.
Вивести назви фільмів, яких не має в inventory.

In [101]:
no_inventory_films_df = film_df\
    .join(inventory_df, inventory_df.film_id == film_df.film_id, 'left_anti')\
    .select(film_df.title)\
    .persist()

In [102]:
no_inventory_films_df.show(no_inventory_films_df.count(), False)

+----------------------+
|title                 |
+----------------------+
|ALICE FANTASIA        |
|APOLLO TEEN           |
|ARGONAUTS TOWN        |
|ARK RIDGEMONT         |
|ARSENIC INDEPENDENCE  |
|BOONDOCK BALLROOM     |
|BUTCH PANTHER         |
|CATCH AMISTAD         |
|CHINATOWN GLADIATOR   |
|CHOCOLATE DUCK        |
|COMMANDMENTS EXPRESS  |
|CROSSING DIVORCE      |
|CROWDS TELEMARK       |
|CRYSTAL BREAKING      |
|DAZED PUNK            |
|DELIVERANCE MULHOLLAND|
|FIREHOUSE VIETNAM     |
|FLOATS GARDEN         |
|FRANKENSTEIN STRANGER |
|GLADIATOR WESTWARD    |
|GUMP DATE             |
|HATE HANDICAP         |
|HOCUS FRIDA           |
|KENTUCKIAN GIANT      |
|KILL BROTHERHOOD      |
|MUPPET MILE           |
|ORDER BETRAYED        |
|PEARL DESTINY         |
|PERDITION FARGO       |
|PSYCHO SHRUNK         |
|RAIDERS ANTITRUST     |
|RAINBOW SHOCK         |
|ROOF CHAMPION         |
|SISTER FREDDY         |
|SKY MIRACLE           |
|SUICIDES SILENCE      |
|TADPOLE PARK          |


5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [167]:
top3_actors_in_children_films = category_df\
    .select(category_df.category_id)\
    .filter(category_df.name == 'Children')\
    .join(film_category_df, category_df.category_id == film_category_df.category_id, 'inner')\
    .join(film_actor_df, film_category_df.film_id == film_actor_df.film_id, 'inner')\
    .join(actor_df, film_actor_df.actor_id == actor_df.actor_id, 'inner')\
    .groupBy(
        actor_df.actor_id,
    ).agg(
        f.count(film_actor_df.actor_id).alias("total_films_count"),
    ).sort(
        f.col("total_films_count").desc(),
    ).limit(3)\
    .join(
        actor_df.alias("actor_lookup"),
        actor_df.actor_id == f.col("actor_lookup.actor_id"),
        'inner'
    ).select(
        f.col("actor_lookup.actor_id"),
        f.col("actor_lookup.first_name"),
        f.col("actor_lookup.last_name"),
        f.col("total_films_count")
    )

In [168]:
top3_actors_in_children_films.show()

+--------+----------+---------+-----------------+
|actor_id|first_name|last_name|total_films_count|
+--------+----------+---------+-----------------+
|      17|     HELEN|   VOIGHT|                7|
|      80|     RALPH|     CRUZ|                5|
|     140|    WHOOPI|     HURT|                5|
+--------+----------+---------+-----------------+



Stop Spark session:

In [169]:
spark.stop()