In [19]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col, count, concat_ws, sum as _sum

In [6]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
print(spark.version)

In [3]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [7]:
t = film_category_df.join(category_df, film_category_df.category_id == category_df.category_id) \
                 .select(col("name"), col("film_id"))

result = t.groupBy("name") \
          .agg(count("film_id").alias("count_films")) \
          .orderBy(col("count_films").desc())

result.show()

+-----------+-----------+
|       name|count_films|
+-----------+-----------+
|     Sports|         74|
|    Foreign|         73|
|     Family|         69|
|Documentary|         68|
|  Animation|         66|
|     Action|         64|
|        New|         63|
|      Drama|         62|
|      Games|         61|
|     Sci-Fi|         61|
|   Children|         60|
|     Comedy|         58|
|     Travel|         57|
|   Classics|         57|
|     Horror|         56|
|      Music|         51|
+-----------+-----------+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [9]:
joined_df = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id) \
                  .join(film_df, inventory_df.film_id == film_df.film_id) \
                  .join(film_actor_df, film_df.film_id == film_actor_df.film_id) \
                  .join(actor_df, film_actor_df.actor_id == actor_df.actor_id)

joined_df = joined_df.withColumn("actor_name", concat_ws(" ", col("first_name"), col("last_name")))

result = joined_df.groupBy("actor_name") \
                  .agg(count("rental_id").alias("total_rent")) \
                  .orderBy(col("total_rent").desc()) \
                  .limit(10)

result.show()

+------------------+----------+
|        actor_name|total_rent|
+------------------+----------+
|       SUSAN DAVIS|       825|
|    GINA DEGENERES|       753|
|    MATTHEW CARREY|       678|
|       MARY KEITEL|       674|
|ANGELA WITHERSPOON|       654|
|       WALTER TORN|       640|
|       HENRY BERRY|       612|
|       JAYNE NOLTE|       611|
|        VAL BOLGER|       605|
|     SANDRA KILMER|       604|
+------------------+----------+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [21]:
joined_df = payment_df.join(rental_df, payment_df.rental_id == rental_df.rental_id) \
                   .join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id) \
                   .join(film_df, inventory_df.film_id == film_df.film_id) \
                   .join(film_category_df, film_df.film_id == film_category_df.film_id) \
                   .join(category_df, film_category_df.category_id == category_df.category_id)

result = joined_df.groupBy("name") \
                  .agg(_sum("amount").alias("total_amount")) \
                  .orderBy(col("total_amount").desc()) \
                  .limit(1) \
                  .select(col("name").alias("category"))

result.show()

+--------+
|category|
+--------+
|  Sports|
+--------+



4.
Вивести назви фільмів, яких не має в inventory.

In [22]:
result = film_df.join(inventory_df, film_df.film_id == inventory_df.film_id, "left_anti") \
             .select(col("title").alias("film_titles")) \
             .distinct()

result.show()

+--------------------+
|         film_titles|
+--------------------+
|       RAINBOW SHOCK|
|           GUMP DATE|
|         HOCUS FRIDA|
|    TREASURE COMMAND|
| CHINATOWN GLADIATOR|
|        WALLS ARTIST|
|      ARGONAUTS TOWN|
|       PSYCHO SHRUNK|
|   FIREHOUSE VIETNAM|
|DELIVERANCE MULHO...|
|       ROOF CHAMPION|
|        TADPOLE PARK|
|         APOLLO TEEN|
|       HATE HANDICAP|
|       PEARL DESTINY|
|COMMANDMENTS EXPRESS|
|        VOLUME HOUSE|
|     CROWDS TELEMARK|
|   RAIDERS ANTITRUST|
|    KILL BROTHERHOOD|
+--------------------+
only showing top 20 rows



5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [24]:
children_category = category_df.filter(col("name") == "Children")

joined_df = children_category.alias('c') \
                             .join(film_category_df.alias('fc'), col('c.category_id') == col('fc.category_id')) \
                             .join(film_actor_df.alias('fa'), col('fc.film_id') == col('fa.film_id')) \
                             .join(actor_df.alias('a'), col('fa.actor_id') == col('a.actor_id'))

joined_df = joined_df.withColumn("actor_name", concat_ws(" ", col("first_name"), col("last_name")))

result = joined_df.groupBy("actor_name") \
                  .agg(count("fa.film_id").alias("total_films")) \
                  .orderBy(col("total_films").desc()) \
                  .limit(3)

result.show()

+------------+-----------+
|  actor_name|total_films|
+------------+-----------+
|HELEN VOIGHT|          7|
| SUSAN DAVIS|          6|
|  MARY TANDY|          5|
+------------+-----------+



Stop Spark session:

In [25]:
spark.stop()