In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
from dotenv import load_dotenv
from pyspark.sql.window import Window
import findspark

findspark.init()

In [2]:
load_dotenv("C:/Users/qwerty/Desktop/tem/pagila/.env")

POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
HOST = os.getenv("HOST")

spark = SparkSession.builder \
    .appName("5_task") \
    .config("spark.driver.extraClassPath", "C:/Users/qwerty/Desktop/tem/postgresql-42.6.0.jar") \
    .getOrCreate()

In [3]:
table_names = spark.read.format('jdbc'). \
     options(
         url=f"jdbc:postgresql://{HOST}:{POSTGRES_PORT}/{POSTGRES_USER}", 
         dbtable='information_schema.tables',
         user=POSTGRES_USER,
         password=POSTGRES_PASSWORD,
         driver='org.postgresql.Driver'). \
     load().\
filter("table_schema = 'public'").select("table_name")
table_names.collect()

[Row(table_name='film_actor'),
 Row(table_name='address'),
 Row(table_name='city'),
 Row(table_name='actor'),
 Row(table_name='actor_info'),
 Row(table_name='category'),
 Row(table_name='country'),
 Row(table_name='customer'),
 Row(table_name='customer_list'),
 Row(table_name='film_list'),
 Row(table_name='nicer_but_slower_film_list'),
 Row(table_name='film'),
 Row(table_name='film_category'),
 Row(table_name='inventory'),
 Row(table_name='language'),
 Row(table_name='sales_by_film_category'),
 Row(table_name='rental'),
 Row(table_name='sales_by_store'),
 Row(table_name='staff_list'),
 Row(table_name='payment'),
 Row(table_name='store'),
 Row(table_name='payment_p2022_02'),
 Row(table_name='payment_p2022_07'),
 Row(table_name='payment_p2022_04'),
 Row(table_name='payment_p2022_05'),
 Row(table_name='staff'),
 Row(table_name='payment_p2022_01'),
 Row(table_name='payment_p2022_03'),
 Row(table_name='payment_p2022_06')]

In [4]:
table_names_list = [row.table_name for row in table_names.collect()]
print(table_names_list)

['film_actor', 'address', 'city', 'actor', 'actor_info', 'category', 'country', 'customer', 'customer_list', 'film_list', 'nicer_but_slower_film_list', 'film', 'film_category', 'inventory', 'language', 'sales_by_film_category', 'rental', 'sales_by_store', 'staff_list', 'payment', 'store', 'payment_p2022_02', 'payment_p2022_07', 'payment_p2022_04', 'payment_p2022_05', 'staff', 'payment_p2022_01', 'payment_p2022_03', 'payment_p2022_06']


In [5]:
dfs = {}
for table_name in table_names_list:
    dfs[table_name] = spark.read \
            .format("jdbc") \
            .option("jdbc.driverClassName", "org.postgresql.Driver") \
            .option("url", f"jdbc:postgresql://{HOST}:{POSTGRES_PORT}/{POSTGRES_USER}") \
            .option("dbtable", table_name) \
            .option("user", POSTGRES_USER) \
            .option("password", POSTGRES_PASSWORD) \
            .load()

In [6]:
dfs['category'].show()

+-----------+-----------+-------------------+
|category_id|       name|        last_update|
+-----------+-----------+-------------------+
|          1|     Action|2022-02-15 12:46:27|
|          2|  Animation|2022-02-15 12:46:27|
|          3|   Children|2022-02-15 12:46:27|
|          4|   Classics|2022-02-15 12:46:27|
|          5|     Comedy|2022-02-15 12:46:27|
|          6|Documentary|2022-02-15 12:46:27|
|          7|      Drama|2022-02-15 12:46:27|
|          8|     Family|2022-02-15 12:46:27|
|          9|    Foreign|2022-02-15 12:46:27|
|         10|      Games|2022-02-15 12:46:27|
|         11|     Horror|2022-02-15 12:46:27|
|         12|      Music|2022-02-15 12:46:27|
|         13|        New|2022-02-15 12:46:27|
|         14|     Sci-Fi|2022-02-15 12:46:27|
|         15|     Sports|2022-02-15 12:46:27|
|         16|     Travel|2022-02-15 12:46:27|
+-----------+-----------+-------------------+



In [7]:
# Вывести количество фильмов в каждой категории, отсортировать по убыванию.

result_query_1 = dfs['category'] \
    .join(dfs['film_category'], dfs['category'].category_id == dfs['film_category'].category_id) \
    .join(dfs['film'], dfs['film_category'].film_id == dfs['film'].film_id) \
    .groupBy(dfs['category'].name.alias('category_name')) \
    .agg(count(dfs['film'].film_id).alias('film_count')) \
    .orderBy(col('film_count').desc())

result_query_1.show()

+-------------+----------+
|category_name|film_count|
+-------------+----------+
|       Sports|        74|
|      Foreign|        73|
|       Family|        69|
|  Documentary|        68|
|    Animation|        66|
|       Action|        64|
|          New|        63|
|        Drama|        62|
|       Sci-Fi|        61|
|        Games|        61|
|     Children|        60|
|       Comedy|        58|
|     Classics|        57|
|       Travel|        57|
|       Horror|        56|
|        Music|        51|
+-------------+----------+



In [8]:
# Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию.

result_query_2 = dfs["actor"] \
    .join(dfs["film_actor"], "actor_id")\
    .join(dfs["film"], "film_id")\
    .join(dfs["inventory"], "film_id")\
    .join(dfs["rental"], "inventory_id")\
    .groupBy(dfs["actor"].actor_id, dfs["actor"].first_name, dfs["actor"].last_name)\
    .agg(count(dfs["rental"].rental_id).alias("rental_count"))\
    .orderBy("rental_count", ascending=False)

result_query_2.limit(10).show()

+--------+----------+-----------+------------+
|actor_id|first_name|  last_name|rental_count|
+--------+----------+-----------+------------+
|     107|      GINA|  DEGENERES|         753|
|     181|   MATTHEW|     CARREY|         678|
|     198|      MARY|     KEITEL|         674|
|     144|    ANGELA|WITHERSPOON|         654|
|     102|    WALTER|       TORN|         640|
|      60|     HENRY|      BERRY|         612|
|     150|     JAYNE|      NOLTE|         611|
|      37|       VAL|     BOLGER|         605|
|      23|    SANDRA|     KILMER|         604|
|      90|      SEAN|    GUINESS|         599|
+--------+----------+-----------+------------+



In [9]:
# Вывести категорию фильмов, на которую потратили больше всего денег.

result_query_3 = dfs["rental"]\
    .join(dfs["inventory"], "inventory_id")\
    .join(dfs["film"], "film_id")\
    .join(dfs["film_category"], "film_id")\
    .join(dfs["category"], "category_id")\
    .groupBy(dfs["category"].name)\
    .agg(sum(dfs["film"].rental_rate).alias("total_spent"))\
    .orderBy("total_spent", ascending=False)\
    .limit(1)

result_query_3.show()

+------+-----------+
|  name|total_spent|
+------+-----------+
|Sports|    3617.21|
+------+-----------+



In [17]:
# Вывести названия фильмов, которых нет в inventory.

result_query_4 = dfs["film"]\
    .join(dfs["inventory"], "film_id", "left")\
    .where(dfs["inventory"].film_id.isNull())\
    .select(dfs["film"].title)\
    .orderBy("title")

result_query_4.show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [18]:
# Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”.
# Если у нескольких актеров одинаковое кол-во фильмов, вывести всех

top_children_actors = (
    dfs["actor"].join(dfs["film_actor"], "actor_id")\
    .join(dfs["film_category"], "film_id")\
    .join(dfs["category"], "category_id")\
    .where(dfs["category"].name == "Children")\
    .groupBy("actor_id", "first_name", "last_name")\
    .agg(count("*").alias("count_films"))\
    .withColumn(
        "rank", dense_rank().over(Window.orderBy(desc("count_films")))
    )
)

result_query_5 = (
    top_children_actors.select("actor_id", "first_name", "last_name", "count_films")
    .where("rank <= 3")
)
result_query_5.show()

+--------+----------+---------+-----------+
|actor_id|first_name|last_name|count_films|
+--------+----------+---------+-----------+
|      17|     HELEN|   VOIGHT|          7|
|     127|     KEVIN|  GARLAND|          5|
|      80|     RALPH|     CRUZ|          5|
|      66|      MARY|    TANDY|          5|
|     140|    WHOOPI|     HURT|          5|
|      81|  SCARLETT|    DAMON|          4|
|     109| SYLVESTER|     DERN|          4|
|      23|    SANDRA|   KILMER|          4|
|     187|     RENEE|     BALL|          4|
|      92|   KIRSTEN|   AKROYD|          4|
|     173|      ALAN| DREYFUSS|          4|
|     101|     SUSAN|    DAVIS|          4|
|     150|     JAYNE|    NOLTE|          4|
|      13|       UMA|     WOOD|          4|
|     131|      JANE|  JACKMAN|          4|
|      58| CHRISTIAN|   AKROYD|          4|
|     142|      JADA|    RYDER|          4|
|      93|     ELLEN|  PRESLEY|          4|
|      37|       VAL|   BOLGER|          4|
+--------+----------+---------+-

In [23]:
# Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1).
# Отсортировать по количеству неактивных клиентов по убыванию.


joined_df = dfs["address"]\
    .join(dfs["city"], "city_id")\
    .join(dfs["customer"], "address_id")

result_query_6 = joined_df\
    .groupBy("city")\
    .agg(
        count(when(joined_df.active == 1, 1)).alias("active_customers"),
        count(when(joined_df.active == 0, 1)).alias("inactive_customers")
        )\
    .orderBy(col("inactive_customers").desc())

result_query_6.show()

+------------------+----------------+------------------+
|              city|active_customers|inactive_customers|
+------------------+----------------+------------------+
|           Wroclaw|               0|                 1|
|  Charlotte Amalie|               0|                 1|
|            Ktahya|               0|                 1|
|         Pingxiang|               0|                 1|
|       Szkesfehrvr|               0|                 1|
|            Daxian|               0|                 1|
|     Coatzacoalcos|               0|                 1|
|           Bat Yam|               0|                 1|
|   Southend-on-Sea|               0|                 1|
|          Uluberia|               0|                 1|
|         Najafabad|               0|                 1|
|          Xiangfan|               0|                 1|
|        Kumbakonam|               0|                 1|
|            Kamyin|               0|                 1|
|            Amroha|           

In [121]:
# Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды
# в городах (customer.address_id в этом city), и которые начинаются на букву “a”.
# Тоже самое сделать для городов в которых есть символ “-”.

# create DataFrame film
film = (
    dfs['film']
    .join(dfs['film_category'], 'film_id', 'left')
    .join(dfs['category'], 'category_id', 'left')
    .select('film_id', 'title', 'name')
)
film.createOrReplaceTempView('film_full')

# create DataFrame  max_rental_hours
max_rental_hours = (
    dfs['city']
    .join(dfs['address'], 'city_id')
    .join(dfs['customer'], 'address_id')
    .join(dfs['rental'], 'customer_id')
    .join(dfs['inventory'], 'inventory_id')
    .join(film, 'film_id')
    .where(upper("city").like("A%") | upper("city").like("%-%"))
    .groupBy(
        col('city').alias('city_name'),
        when(upper("city").like("A%"), "Starts with A")\
                          .when(upper("city").like("%-%"), "Has hyphen")\
                          .alias('city_category'),
        col('name').alias('film_category')
    )
    .agg(sum(((dfs['rental'].return_date.cast("long") - dfs['rental'].rental_date.cast("long")) / 3600)\
             .cast("int")).alias('total_rental_hours'))
)

# create DataFrame sum_rental_hours_by_city_category
sum_rental_hours_by_city_category = (
    max_rental_hours
    .groupBy('city_category', 'film_category')
    .agg(sum('total_rental_hours').alias('max_total_rental_hours'))
)

# create DataFrame max_rental_hours_by_city
window = Window.partitionBy('city_category').orderBy(sum_rental_hours_by_city_category['max_total_rental_hours'].desc())
max_rental_hours_by_city = (
    sum_rental_hours_by_city_category
    .select(
        'city_category',
        'film_category',
        'max_total_rental_hours',
        row_number().over(window).alias('rn')
    )
    .where('rn = 1')
    .orderBy('city_category')
)

# result
max_rental_hours_by_city.select('city_category', 'film_category', 'max_total_rental_hours').show()

+-------------+-------------+----------------------+
|city_category|film_category|max_total_rental_hours|
+-------------+-------------+----------------------+
|   Has hyphen|      Foreign|                  5538|
|Starts with A|       Sports|                 12309|
+-------------+-------------+----------------------+

