In [1]:
from pyspark.sql import SparkSession, functions as f, types as t

In [2]:
spark = SparkSession.builder.appName('lect_13_home_task').getOrCreate()

In [3]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

# Домашнє завдання на тему Spark SQL

Задачі з домашнього завдання на SQL потрібно розвʼязати за допомогою Spark SQL DataFrame API.

- Дампи таблиць знаходяться в папці `data`. Датафрейми таблиць вже створені в клітинці вище.
- Можете створювати стільки нових клітинок, скільки вам необхідно.
- Розвʼязок кожної задачі має бути відображений в самому файлі (використати метод `.show()`)

**Увага!**
Використовувати мову запитів SQL безпосередньо забороняється, потрібно використовувати виключно DataFrame API!


1.
Вивести кількість фільмів в кожній категорії.
Результат відсортувати за спаданням.

In [55]:
# SELECT 	c."name" , count(*) cnt
# FROM	public.film_category fc 
#		JOIN public.film f ON fc.film_id = f.film_id 
#		JOIN public.category c ON fc.category_id = c.category_id 
#GROUP BY c."name" 		
#ORDER BY cnt DESC; 
df = film_category_df  \
    .join(film_df, film_df.film_id==film_category_df.film_id)  \
    .join(category_df, category_df.category_id==film_category_df.category_id)  \
    .select(category_df.name)  \
    .groupBy(category_df.name)  \
    .count()  \
    .withColumnRenamed("count","cnt")
df.show();


+-----------+---+
|       name|cnt|
+-----------+---+
|    Foreign| 73|
|     Sports| 74|
|      Drama| 62|
|Documentary| 68|
|     Travel| 57|
|     Family| 69|
|      Games| 61|
|   Classics| 57|
|  Animation| 66|
|      Music| 51|
|     Horror| 56|
|        New| 63|
|     Comedy| 58|
|   Children| 60|
|     Action| 64|
|     Sci-Fi| 61|
+-----------+---+



2.
Вивести 10 акторів, чиї фільми брали на прокат найбільше.
Результат відсортувати за спаданням.

In [56]:
#SELECT 	a.first_name, a.last_name, COUNT(*) cnt
#FROM 	public.actor a 
#		JOIN public.film_actor fa ON a.actor_id = fa.actor_id 
#		JOIN public.inventory i ON i.film_id = fa.film_id 
#		JOIN public.rental r ON r.inventory_id = i.inventory_id 
#GROUP BY a.first_name, a.last_name
#ORDER BY cnt DESC
#LIMIT 10;
df = actor_df  \
        .join(film_actor_df, actor_df.actor_id==film_actor_df.actor_id)  \
        .join(inventory_df, inventory_df.film_id==film_actor_df.film_id)  \
        .join(rental_df, rental_df.inventory_id==inventory_df.inventory_id)  \
        .select(actor_df.first_name, actor_df.last_name)  \
        .groupBy(actor_df.first_name, actor_df.last_name)  \
        .count().withColumnRenamed("count","cnt")  \
        .orderBy(f.col("cnt").desc())
df.show(10);

+----------+-----------+---+
|first_name|  last_name|cnt|
+----------+-----------+---+
|     SUSAN|      DAVIS|825|
|      GINA|  DEGENERES|753|
|   MATTHEW|     CARREY|678|
|      MARY|     KEITEL|674|
|    ANGELA|WITHERSPOON|654|
|    WALTER|       TORN|640|
|     HENRY|      BERRY|612|
|     JAYNE|      NOLTE|611|
|       VAL|     BOLGER|605|
|    SANDRA|     KILMER|604|
+----------+-----------+---+
only showing top 10 rows



3.
Вивести категорія фільмів, на яку було витрачено найбільше грошей
в прокаті

In [133]:
#SELECT 	category 
#FROM 	public.sales_by_film_category sbfc 
#ORDER BY total_sales DESC
#LIMIT 1;
#SELECT c.name AS category,
#    sum(p.amount) AS total_sales
#   FROM payment p
#     JOIN rental r ON p.rental_id = r.rental_id
#     JOIN inventory i ON r.inventory_id = i.inventory_id
#     JOIN film f ON i.film_id = f.film_id
#     JOIN film_category fc ON f.film_id = fc.film_id
#     JOIN category c ON fc.category_id = c.category_id
#  GROUP BY c.name
#  ORDER BY (sum(p.amount)) DESC;
df = payment_df  \
    .join(rental_df, payment_df.rental_id==rental_df.rental_id)  \
    .join(inventory_df, rental_df.inventory_id==inventory_df.inventory_id)  \
    .join(film_df, inventory_df.film_id==film_df.film_id)  \
    .join(film_category_df, film_df.film_id==film_category_df.film_id)  \
    .join(category_df, film_category_df.category_id==category_df.category_id)  \
    .select(category_df.name, payment_df.amount) 

df_sum = df.groupBy("name").sum("amount").orderBy(f.col("sum(amount)").desc())
df_sum.select("name").show(1)

+------+
|  name|
+------+
|Sports|
+------+
only showing top 1 row



4.
Вивести назви фільмів, яких не має в inventory.
Запит має бути без оператора IN

In [139]:
#SELECT 	f.title 
#FROM 	public.film f 
#		LEFT JOIN public.inventory i ON f.film_id = i.film_id 
#WHERE 	i.film_id IS null; 
df = film_df  \
    .join(inventory_df, film_df.film_id==inventory_df.film_id, "LEFT")  \
    .select(film_df.title)  \
    .where(inventory_df.film_id.isNull())
df.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
|           GUMP DATE|
|       HATE HANDICAP|
|         HOCUS FRIDA|
|    KENTUCKIAN GIANT|
|    KILL BROTHERHOOD|
|         MUPPET MILE|
|      ORDER BETRAYED|
|       PEARL DESTINY|
|     PERDITION FARGO|
|       PSYCHO SHRUNK|
|   RAIDERS ANTITRUST|
|       RAINBOW SHOCK|
|       ROOF CHAMPION|
|       SISTER FREDDY|
|         SKY MIRACLE|
|    SUICIDES SILENCE|
|        TADPOLE PARK|
|    TREASURE COMMAND|
|   VILLAIN DESPERATE|
|        VOLUME HOUSE|
|          

5.
Вивести топ 3 актори, які найбільше зʼявлялись в категорії фільмів “Children”

In [142]:
#SELECT 	a.first_name, a.last_name, count(fc.*) cnt
#FROM 	public.actor a 
#		JOIN public.film_actor fa ON a.actor_id = fa.actor_id 
#		JOIN public.film_category fc ON fa.film_id = fc.film_id 
#		JOIN public.category c ON c.category_id = fc.category_id 
#WHERE	c."name" = 'Children'	
#GROUP BY a.first_name, a.last_name
#ORDER BY cnt DESC
#LIMIT 3;	
df = actor_df  \
    .join(film_actor_df, actor_df.actor_id==film_actor_df.actor_id)  \
    .join(film_category_df, film_actor_df.film_id==film_category_df.film_id) \
    .join(category_df, film_category_df.category_id==category_df.category_id) \
    .select(actor_df.first_name, actor_df.last_name)  \
    .where(category_df.name == "Children")   \
    .groupBy(actor_df.first_name, actor_df.last_name)  \
    .count().withColumnRenamed("count","cnt")  \
    .orderBy(f.col("cnt").desc())    
df.show(3)

+----------+---------+---+
|first_name|last_name|cnt|
+----------+---------+---+
|     HELEN|   VOIGHT|  7|
|     SUSAN|    DAVIS|  6|
|     KEVIN|  GARLAND|  5|
+----------+---------+---+
only showing top 3 rows



6.
Вивести міста з кількістю активних та неактивних клієнтів
(в активних customer.active = 1).
Результат відсортувати за кількістю неактивних клієнтів за спаданням.

In [173]:
#SELECT  c2.city, SUM(CASE WHEN c.active = 1 THEN 1 ELSE 0 END) active_customer, SUM(CASE WHEN c.active = 1 THEN 0 ELSE 1 END) not_active_customer
#FROM	public.customer c 
#		JOIN public.address a ON c.address_id = a.address_id 
#		JOIN public.city c2 ON a.city_id = c2.city_id 
#GROUP BY c2.city
#ORDER BY not_active_customer DESC;
from pyspark.sql.functions import when
df = customer_df  \
    .join(address_df, customer_df.address_id==address_df.address_id) \
    .join(city_df, address_df.city_id==city_df.city_id) \
    .select(city_df.city, when(customer_df.active == 1, 1).otherwise(0).alias('active_customer'), when(customer_df.active == 1, 0).otherwise(1).alias('not_active_customer'))

df_sum = df  \
        .groupBy(df.city) \
        .agg({"active_customer": "sum", "not_active_customer": "sum"})  \
        .orderBy(f.col("sum(not_active_customer)").desc())
    
df_sum.show()    


+------------------+------------------------+--------------------+
|              city|sum(not_active_customer)|sum(active_customer)|
+------------------+------------------------+--------------------+
|         Pingxiang|                       1|                   0|
|       Szkesfehrvr|                       1|                   0|
|  Charlotte Amalie|                       1|                   0|
|         Najafabad|                       1|                   0|
|           Wroclaw|                       1|                   0|
|            Ktahya|                       1|                   0|
|   Southend-on-Sea|                       1|                   0|
|           Bat Yam|                       1|                   0|
|            Amroha|                       1|                   0|
|            Kamyin|                       1|                   0|
|          Xiangfan|                       1|                   0|
|            Daxian|                       1|                 