In [11]:
from pyspark.sql import SparkSession, functions as f


In [12]:
spark = SparkSession.builder.master('local[3]').getOrCreate()

In [13]:
print(spark.version)

3.3.0


In [4]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

                                                                                

#### 1. Вивести кількість фільмів в кожній категорії. Результат відсортувати за спаданням.

In [5]:
film_df.select(
    'film_id'
).join(
    film_category_df, 'film_id', 'inner'
).select(
    'category_id'
).join(
    category_df, 'category_id', 'inner'
).select(
    f.col('name').alias('category_name')
).groupBy(
    'category_name'
).agg(
    f.count("*").alias('films_count')
).orderBy(
    f.col('films_count').desc()
).show()

+-------------+-----------+
|category_name|films_count|
+-------------+-----------+
|       Sports|         74|
|      Foreign|         73|
|       Family|         69|
|  Documentary|         68|
|    Animation|         66|
|       Action|         64|
|          New|         63|
|        Drama|         62|
|        Games|         61|
|       Sci-Fi|         61|
|     Children|         60|
|       Comedy|         58|
|       Travel|         57|
|     Classics|         57|
|       Horror|         56|
|        Music|         51|
+-------------+-----------+



#### 2. Вивести 10 акторів, чиї фільми брали на прокат найбільше. Результат відсортувати за спаданням.

In [6]:
rental_df.select(
    'inventory_id'
).join(
    inventory_df, 'inventory_id', 'inner'
).select(
    'film_id'
).join(
    film_actor_df, 'film_id', 'inner'
).select(
    'actor_id'
).join(
    actor_df, 'actor_id', 'inner'
).select(
    f.concat(f.col('first_name'), f.lit(' '), f.col('last_name')).alias('actor_name')
).groupBy(
    'actor_name'
).agg(
    f.count("*").alias('rental_count')
).orderBy(
    f.col('rental_count').desc()
).show(10)

+------------------+------------+
|        actor_name|rental_count|
+------------------+------------+
|       SUSAN DAVIS|         825|
|    GINA DEGENERES|         753|
|    MATTHEW CARREY|         678|
|       MARY KEITEL|         674|
|ANGELA WITHERSPOON|         654|
|       WALTER TORN|         640|
|       HENRY BERRY|         612|
|       JAYNE NOLTE|         611|
|        VAL BOLGER|         605|
|     SANDRA KILMER|         604|
+------------------+------------+
only showing top 10 rows



#### 3. Вивести категорія фільмів, на яку було витрачено найбільше грошей в прокаті

In [7]:
payment_df.select(
    'rental_id', 'amount'
).join(
    rental_df, 'rental_id', 'inner'
).select(
    'inventory_id', 'amount'
).join(
    inventory_df, 'inventory_id', 'inner'
).select(
    'film_id', 'amount'
).join(
    film_category_df, 'film_id', 'inner'
).join(
    category_df, 'category_id', 'inner'
).select(
    f.col('name').alias('top_category') , 'amount'
).groupBy(
    'top_category'
).agg(
    f.sum('amount').alias('paid')
).orderBy(
    f.col('paid').desc()
).show(1)

+------------+-----------------+
|top_category|             paid|
+------------+-----------------+
|      Sports|5314.209999999848|
+------------+-----------------+
only showing top 1 row



#### 4. Вивести назви фільмів, яких не має в inventory.

In [8]:
film_df.select(
    'title', 'film_id'
).join(
    inventory_df, 'film_id', 'leftsemi'
).select(
    'title'
).show()

+-------------------+
|              title|
+-------------------+
|   ACADEMY DINOSAUR|
|     ACE GOLDFINGER|
|   ADAPTATION HOLES|
|   AFFAIR PREJUDICE|
|        AFRICAN EGG|
|       AGENT TRUMAN|
|    AIRPLANE SIERRA|
|    AIRPORT POLLOCK|
|      ALABAMA DEVIL|
|   ALADDIN CALENDAR|
|    ALAMO VIDEOTAPE|
|     ALASKA PHANTOM|
|         DATE SPEED|
|        ALI FOREVER|
|       ALIEN CENTER|
|    ALLEY EVOLUTION|
|         ALONE TRIP|
|      ALTER VICTORY|
|       AMADEUS HOLY|
|AMELIE HELLFIGHTERS|
+-------------------+
only showing top 20 rows



#### 5. Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [9]:
film_df.select(
    'film_id'
).join(
    film_actor_df, 'film_id', 'inner'
).select(
    'film_id', 'actor_id'
).join(
    actor_df, 'actor_id', 'inner'
).select(
    'film_id', 'first_name', 'last_name'
).join(
    film_category_df, 'film_id', 'inner'
).select(
    'category_id', 'first_name', 'last_name'
).join(
    category_df, 'category_id', 'inner'
).where(
    f.col('name') == 'Children'
).select(
    f.concat(f.col('first_name'), f.lit(' '), f.col('last_name')).alias('actor_name')
).groupBy(
    'actor_name'
).agg(
    f.count('*').alias('_cnt_category')
).orderBy(
    f.col('_cnt_category').desc()
).select(
    'actor_name'
).show(3)

+------------+
|  actor_name|
+------------+
|HELEN VOIGHT|
| SUSAN DAVIS|
|  MARY TANDY|
+------------+
only showing top 3 rows



In [10]:
spark.stop()