In [1]:
from pyspark.sql import SparkSession, functions as F, types as T, Window

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
print(spark.version)

3.4.1


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)
- код має бути оформлений у відповідності із одним із стилем, показаним лектором на занятті 13.

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [5]:
joined_df = category_df.join(film_category_df, ['category_id'], 'left')
joined_df\
    .groupby(['category_id','name'])\
    .agg(
        F.count('category_id').alias('count')
    )\
    .orderBy(F.col('count').desc())\
    .show()

+-----------+-----------+-----+
|category_id|       name|count|
+-----------+-----------+-----+
|         15|     Sports|   74|
|          9|    Foreign|   73|
|          8|     Family|   69|
|          6|Documentary|   68|
|          2|  Animation|   66|
|          1|     Action|   64|
|         13|        New|   63|
|          7|      Drama|   62|
|         14|     Sci-Fi|   61|
|         10|      Games|   61|
|          3|   Children|   60|
|          5|     Comedy|   58|
|          4|   Classics|   57|
|         16|     Travel|   57|
|         11|     Horror|   56|
|         12|      Music|   51|
+-----------+-----------+-----+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [35]:
joined_df = rental_df\
    .join(inventory_df, ['inventory_id'], 'inner')\
    .join(film_df, ['film_id'], 'inner')\
    .join(film_actor_df, ['film_id'], 'inner')\
    .join(actor_df, ['actor_id'], 'inner')

window = Window.orderBy([F.col('count').desc()])

joined_df\
    .groupBy(['actor_id','first_name','last_name'])\
    .agg(F.count('rental_id').alias('count'))\
    .withColumn('rank',F.rank().over(window))\
    .filter(F.col('rank')<=10)\
    .orderBy(F.col('count').desc())\
    .drop('rank')\
    .show()

+--------+----------+-----------+-----+
|actor_id|first_name|  last_name|count|
+--------+----------+-----------+-----+
|     107|      GINA|  DEGENERES|  753|
|     181|   MATTHEW|     CARREY|  678|
|     198|      MARY|     KEITEL|  674|
|     144|    ANGELA|WITHERSPOON|  654|
|     102|    WALTER|       TORN|  640|
|      60|     HENRY|      BERRY|  612|
|     150|     JAYNE|      NOLTE|  611|
|      37|       VAL|     BOLGER|  605|
|      23|    SANDRA|     KILMER|  604|
|      90|      SEAN|    GUINESS|  599|
+--------+----------+-----------+-----+



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [16]:
joined_df = rental_df\
    .join(inventory_df, ['inventory_id'], 'inner')\
    .join(film_df, ['film_id'], 'inner')\
    .join(film_category_df, ['film_id'], 'inner')\
    .join(category_df, ['category_id'], 'inner')

window = Window.orderBy([F.col('sum').desc()])

joined_df\
    .groupby(['category_id', 'name'])\
    .agg(F.sum('rental_rate').alias('sum'))\
    .withColumn('rank', F.rank().over(window))\
    .filter(F.col('rank') == 1)\
    .drop('rank')\
    .show()

+-----------+------+------------------+
|category_id|  name|               sum|
+-----------+------+------------------+
|         15|Sports|3617.2099999998945|
+-----------+------+------------------+



4.
Вивести назви фільмів, яких не має в inventory.

In [34]:
film_df\
    .join(inventory_df, ['film_id'], 'left_anti')\
    .select('title')\
    .show(n=100)

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
|           GUMP DATE|
|       HATE HANDICAP|
|         HOCUS FRIDA|
|    KENTUCKIAN GIANT|
|    KILL BROTHERHOOD|
|         MUPPET MILE|
|      ORDER BETRAYED|
|       PEARL DESTINY|
|     PERDITION FARGO|
|       PSYCHO SHRUNK|
|   RAIDERS ANTITRUST|
|       RAINBOW SHOCK|
|       ROOF CHAMPION|
|       SISTER FREDDY|
|         SKY MIRACLE|
|    SUICIDES SILENCE|
|        TADPOLE PARK|
|    TREASURE COMMAND|
|   VILLAIN DESPERATE|
|        VOLUME HOUSE|
|          

5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [36]:
joined_df = actor_df\
    .join(film_actor_df, ['actor_id'], 'inner')\
    .join(film_df, ['film_id'], 'inner')\
    .join(film_category_df, ['film_id'], 'inner')\
    .join(category_df, ['category_id'], 'inner')

window = Window.orderBy(F.col('count').desc())

joined_df\
    .filter(F.col('name') == 'Children')\
    .groupby(['actor_id', 'first_name', 'last_name'])\
    .agg(F.count('actor_id').alias('count'))\
    .withColumn('rank', F.rank().over(window))\
    .filter(F.col('rank') <= 3)\
    .orderBy(F.col('count').desc())\
    .drop('rank')\
    .show()

+--------+----------+---------+-----+
|actor_id|first_name|last_name|count|
+--------+----------+---------+-----+
|      17|     HELEN|   VOIGHT|    7|
|     127|     KEVIN|  GARLAND|    5|
|      80|     RALPH|     CRUZ|    5|
|      66|      MARY|    TANDY|    5|
|     140|    WHOOPI|     HURT|    5|
+--------+----------+---------+-----+



Stop Spark session:

In [37]:
spark.stop()