In [37]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col, count, sum, desc

In [38]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [39]:
print(spark.version)

3.4.1


In [40]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [41]:

result1 = (film_actor_df
           .join(rental_df, film_actor_df["film_id"] == rental_df["inventory_id"], "inner")
           .groupBy("actor_id")
           .agg(count("actor_id").alias("rental_count"))
           .join(actor_df, "actor_id")
           .select("first_name", "last_name", "rental_count")
           .orderBy(desc("rental_count"))
           .limit(10))
result1.show()


+----------+-----------+------------+
|first_name|  last_name|rental_count|
+----------+-----------+------------+
|    WALTER|       TORN|         144|
|    SANDRA|     KILMER|         137|
|      MARY|     KEITEL|         134|
|   MATTHEW|     CARREY|         131|
|      GINA|  DEGENERES|         129|
|    ANGELA|WITHERSPOON|         128|
|  SCARLETT|      DAMON|         125|
|   GROUCHO|      DUNST|         125|
|     HENRY|      BERRY|         125|
|     JAYNE|      NOLTE|         123|
+----------+-----------+------------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [42]:
result1 = (film_actor_df
           .join(inventory_df, "film_id")
           .join(rental_df, "inventory_id")
           .groupBy("actor_id")
           .agg(count("actor_id").alias("rental_count"))
           .join(actor_df, "actor_id")
           .select("first_name", "last_name", "rental_count")
           .orderBy(desc("rental_count"))
           .limit(10))
result1.show()

+----------+-----------+------------+
|first_name|  last_name|rental_count|
+----------+-----------+------------+
|      GINA|  DEGENERES|         753|
|   MATTHEW|     CARREY|         678|
|      MARY|     KEITEL|         674|
|    ANGELA|WITHERSPOON|         654|
|    WALTER|       TORN|         640|
|     HENRY|      BERRY|         612|
|     JAYNE|      NOLTE|         611|
|       VAL|     BOLGER|         605|
|    SANDRA|     KILMER|         604|
|      SEAN|    GUINESS|         599|
+----------+-----------+------------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [43]:
# тут має бути розвʼязок задачі
result3 = (film_df
           .join(inventory_df, "film_id", "left_anti")
           .select("title"))
result3.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



4.
Вивести назви фільмів, яких не має в inventory.

In [44]:
# тут має бути розвʼязок задачі
result3 = (film_df
           .join(inventory_df, film_df["film_id"] == inventory_df["film_id"], "left_anti")
           .select("title"))
result3.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [45]:
result4 = (film_actor_df
           .join(film_category_df, "film_id")
           .join(category_df, "category_id")
           .filter(col("name") == "Children")
           .groupBy("actor_id")
           .agg(count("actor_id").alias("film_count"))
           .join(actor_df, "actor_id")
           .select("first_name", "last_name", "film_count")
           .orderBy(desc("film_count"))
           .limit(3))
result4.show()

+----------+---------+----------+
|first_name|last_name|film_count|
+----------+---------+----------+
|     HELEN|   VOIGHT|         7|
|     RALPH|     CRUZ|         5|
|    WHOOPI|     HURT|         5|
+----------+---------+----------+



Stop Spark session:

In [46]:
spark.stop()