In [4]:
from pyspark.sql import SparkSession, functions as F, types as T

In [5]:
spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.jars.packages", 
                                                                        "org.apache.hadoop:hadoop-aws-2.7.3").appName("spark-hw").getOrCreate()

In [6]:
print(spark.version)

3.5.5


In [7]:
actor_df = spark.read.csv('../data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('../data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('../data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('../data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('../data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('../data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('../data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('../data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('../data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('../data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('../data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('../data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('../data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('../data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('../data/store.csv', header=True, inferSchema=True)

                                                                                

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [24]:
film_with_fc_df = film_df.join(
    film_category_df,
    film_df.film_id == film_category_df.film_id,
    "left"
)

film_with_category_df = film_with_fc_df.join(
    category_df,
    film_category_df.category_id == category_df.category_id,
    "left"
)

film_with_category_sorted_df = film_with_category_df.groupBy(
    category_df["name"].alias("film_category")
).agg(
    F.countDistinct(film_df.film_id).alias("films_count")
).orderBy(
    F.col("films_count").desc()
)

film_with_category_sorted_df.show()

+-------------+-----------+
|film_category|films_count|
+-------------+-----------+
|       Sports|         74|
|      Foreign|         73|
|       Family|         69|
|  Documentary|         68|
|    Animation|         66|
|       Action|         64|
|          New|         63|
|        Drama|         62|
|        Games|         61|
|       Sci-Fi|         61|
|     Children|         60|
|       Comedy|         58|
|       Travel|         57|
|     Classics|         57|
|       Horror|         56|
|        Music|         51|
+-------------+-----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [23]:
rental_inventory_df = rental_df.join(
    inventory_df,
    on=rental_df["inventory_id"] == inventory_df["inventory_id"],
    how="left"
)

film_actor_rental_inventory_df = rental_inventory_df.join(
    film_actor_df,
    on=inventory_df["film_id"] == film_actor_df["film_id"],
    how="left"
)

actor_rental_df = film_actor_rental_inventory_df.join(
    actor_df,
    on=film_actor_df["actor_id"] == actor_df["actor_id"],
    how="left"
)

top_rented_actor_films_df = actor_rental_df.groupBy(
    film_actor_df["actor_id"],
    actor_df["first_name"],
    actor_df["last_name"]
).agg(
    F.countDistinct(F.col("rental_id")).alias("rentals_count")
).withColumn(
    "actor",
    F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
).select(
    "actor", "rentals_count"
).orderBy(
    F.col("rentals_count").desc()
).limit(10)

top_rented_actor_films_df.show()

[Stage 52:>                                                         (0 + 1) / 1]

+------------------+-------------+
|             actor|rentals_count|
+------------------+-------------+
|    GINA DEGENERES|          753|
|    MATTHEW CARREY|          678|
|       MARY KEITEL|          674|
|ANGELA WITHERSPOON|          654|
|       WALTER TORN|          640|
|       HENRY BERRY|          612|
|       JAYNE NOLTE|          611|
|        VAL BOLGER|          605|
|     SANDRA KILMER|          604|
|      SEAN GUINESS|          599|
+------------------+-------------+



                                                                                

3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [25]:
payment_with_rental_df = payment_df.join(
    rental_df,
    payment_df.rental_id == rental_df.rental_id,
    "left"
)

payment_with_inventory_df = payment_with_rental_df.join(
    inventory_df,
    rental_df.inventory_id == inventory_df.inventory_id,
    "left"
)

payment_with_film_category_df = payment_with_inventory_df.join(
    film_category_df,
    inventory_df.film_id == film_category_df.film_id,
    "left"
)

payment_with_category_df = payment_with_film_category_df.join(
    category_df,
    film_category_df.category_id == category_df.category_id,
    "left"
)

category_revenue_df = payment_with_category_df.groupBy(
    category_df["name"]
).agg(
    F.sum(payment_df.amount).alias("rental_revenue")
).orderBy(
    F.col("rental_revenue").desc()
).limit(1)

category_revenue_df.show()

+------+-----------------+
|  name|   rental_revenue|
+------+-----------------+
|Sports|5314.209999999847|
+------+-----------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [29]:
film_inventory_joined_df = film_df.join(
    inventory_df,
    film_df.film_id == inventory_df.film_id,
    "left"
)

films_without_inventory_df = film_inventory_joined_df.filter(
    inventory_df.inventory_id.isNull()
).select(
    film_df.title
)

films_without_inventory_df.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [28]:
film_actor_category_joined_df = film_actor_df.join(
    actor_df,
    film_actor_df.actor_id == actor_df.actor_id,
    "inner"
).join(
    film_category_df,
    film_actor_df.film_id == film_category_df.film_id,
    "inner"
).join(
    category_df,
    film_category_df.category_id == category_df.category_id,
    "inner"
)

actors_children_category_df = film_actor_category_joined_df.filter(
    category_df["name"] == "Children"
)

actor_children_film_count_df = actors_children_category_df.groupBy(
    actor_df.actor_id,
    F.concat_ws(" ", actor_df.first_name, actor_df.last_name).alias("actor")
).agg(
    F.count(film_actor_df.film_id).alias("films_count")
).orderBy(
    F.col("films_count").desc()
).limit(3)

actor_children_film_count_df.show()

+--------+------------+-----------+
|actor_id|       actor|films_count|
+--------+------------+-----------+
|      17|HELEN VOIGHT|          7|
|     140| WHOOPI HURT|          5|
|      80|  RALPH CRUZ|          5|
+--------+------------+-----------+



Stop Spark session:

In [30]:
spark.stop()