In [25]:
from pyspark.sql import SparkSession, functions as F, types as T

In [26]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [27]:
print(spark.version)

3.4.1


In [28]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [29]:
film_category_df \
    .groupBy(['category_id']) \
    .count() \
    .orderBy(F.desc('count')) \
    .show()

+-----------+-----+
|category_id|count|
+-----------+-----+
|         15|   74|
|          9|   73|
|          8|   69|
|          6|   68|
|          2|   66|
|          1|   64|
|         13|   63|
|          7|   62|
|         10|   61|
|         14|   61|
|          3|   60|
|          5|   58|
|         16|   57|
|          4|   57|
|         11|   56|
|         12|   51|
+-----------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [30]:
top_10_film_rentals_df = inventory_df \
    .groupBy(['film_id']) \
    .count() \
    .orderBy(F.desc('count')) \
    .limit(10) \

popular_actors_df = film_actor_df \
    .join(top_10_film_rentals_df, top_10_film_rentals_df['film_id'] == film_actor_df['film_id']) \
    .join(actor_df, actor_df['actor_id'] == film_actor_df['actor_id'], 'inner') \
    .select(F.concat(actor_df['first_name'], F.lit(' '), actor_df['last_name']).alias('Popular Actors')) \

popular_actors_df \
    .limit(10) \
    .show()

+----------------+
|  Popular Actors|
+----------------+
|PENELOPE GUINESS|
|   NICK WAHLBERG|
| BETTE NICHOLSON|
| CHRISTIAN GABLE|
|      KARL BERRY|
|    CUBA OLIVIER|
|    CUBA OLIVIER|
|        DAN TORN|
|   LUCILLE TRACY|
|    RIP CRAWFORD|
+----------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [31]:
category_payment_rentals_df = film_category_df \
    .join(inventory_df, inventory_df['film_id'] == film_category_df['film_id']) \
    .join(rental_df, inventory_df['inventory_id'] == rental_df['inventory_id']) \
    .join(payment_df, payment_df['rental_id'] == rental_df['rental_id'])

category_revenue_df = category_payment_rentals_df \
    .groupBy('category_id') \
    .agg(F.sum("amount").alias("revenue")) \

category_df \
    .join(category_revenue_df, category_revenue_df['category_id'] == category_df['category_id']) \
    .orderBy(F.desc(category_revenue_df['revenue'])) \
    .select([category_df['name'], category_revenue_df['revenue']]) \
    .show()

+-----------+------------------+
|       name|           revenue|
+-----------+------------------+
|     Sports| 5314.209999999843|
|     Sci-Fi| 4756.979999999873|
|  Animation| 4656.299999999867|
|      Drama|4587.3899999998885|
|     Comedy| 4383.579999999895|
|     Action| 4375.849999999871|
|        New| 4361.569999999892|
|      Games| 4281.329999999897|
|    Foreign| 4270.669999999886|
|     Family| 4226.069999999887|
|Documentary| 4217.519999999893|
|     Horror|  3722.53999999992|
|   Children|3655.5499999999115|
|   Classics|3639.5899999999156|
|     Travel|3549.6399999999226|
|      Music|3417.7199999999216|
+-----------+------------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [32]:
distinct_films_in_inventory_df = inventory_df \
    .select(inventory_df['film_id']) \
    .distinct()

film_df \
    .join(distinct_films_in_inventory_df, distinct_films_in_inventory_df['film_id'] == film_df['film_id'], 'left_anti') \
    .select('title') \
    .show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [34]:
children_movies_df = film_df \
    .join(film_category_df, film_category_df['film_id'] == film_df['film_id'], 'inner') \
    .filter(film_category_df['category_id'] == 3) \
    .select([film_df['film_id']])

actors_in_children_movies_df = actor_df \
    .join(film_actor_df, film_actor_df['actor_id'] == actor_df['actor_id'], 'inner') \
    .join(children_movies_df, children_movies_df['film_id'] == film_actor_df['actor_id'], 'inner') \
    .select([
        film_actor_df['actor_id'].alias('actor_id'),
        F.concat(
            actor_df['first_name'],
            F.lit(" "),
            actor_df['last_name']).alias('actor_full_name')
    ])

actors_in_children_movies_df \
    .groupBy('actor_id', 'actor_full_name') \
    .agg(F.count('actor_id').alias('number_of_movies_played')) \
    .select(['actor_id', 'number_of_movies_played', 'actor_full_name']) \
    .orderBy(F.desc('number_of_movies_played')) \
    .limit(3) \
    .show()

+--------+-----------------------+---------------+
|actor_id|number_of_movies_played|actor_full_name|
+--------+-----------------------+---------------+
|     157|                     32|   GRETA MALDEN|
|     168|                     31|    WILL WILSON|
|     149|                     31| RUSSELL TEMPLE|
+--------+-----------------------+---------------+



Stop Spark session:

In [35]:
spark.stop()