In [20]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import max, avg, min, count, concat_ws, sum, round
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc, dense_rank
from pyspark.sql.functions import when, datediff, current_date

In [2]:
spark = (
    SparkSession.builder
    .appName("Pagila_spark")
    .master("local[*]")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4")
    .getOrCreate()
)

In [3]:
spark

In [4]:
def read_table(name):
    df = spark.read.jdbc(
        url="jdbc:postgresql://localhost:5432/postgres",
        table=name,
        properties= {
            "user": "postgres", 
            "password": "123456", 
            "driver": "org.postgresql.Driver"
        }
    )
    return df

In [5]:
# tables_names = "(select table_name from information_schema.tables where table_schema='public'and table_type='BASE TABLE') as names"
# df_tables_names = read_table(url, tables_names, properties)
# df_tables_names.show()

In [6]:
df_film_actor = read_table("film_actor")
df_address = read_table("address")
df_city = read_table("city" )
df_actor = read_table("actor" )
df_category = read_table( "category" )
df_country = read_table( "country" )
df_customer = read_table( "customer" )
df_film = read_table( "film" )
df_film_category = read_table( "film_category" )
df_inventory = read_table( "inventory" )
df_language = read_table( "language" )
df_rental = read_table( "rental" )
df_payment = read_table( "payment" )
df_store = read_table( "store" )
df_staff = read_table( "staff" )

## Вывести количество фильмов в каждой категории, отсортировать по убыванию

In [7]:
df_res_q1 = df_category.join(df_film_category, on="category_id", how="inner")
df_res_q1 = df_res_q1.groupBy("name").agg(count("film_id").alias("films_count")).orderBy(desc("films_count"))
df_res_q1.show()

+-----------+-----------+
|       name|films_count|
+-----------+-----------+
|      Drama|        152|
|      Music|        152|
|     Travel|        151|
|    Foreign|        150|
|      Games|        150|
|   Children|        150|
|     Action|        149|
|     Sci-Fi|        149|
|  Animation|        148|
|     Family|        147|
|   Classics|        147|
|        New|        147|
|     Sports|        145|
|Documentary|        145|
|     Comedy|        143|
|     Horror|        142|
+-----------+-----------+



## Вывести 10 актеров, чьи фильмы большего всего арендовали, отсортировать по убыванию

In [8]:
df_res_q2 = (
    df_actor.join(df_film_actor, on="actor_id", how="inner")
    .join(df_inventory, on="film_id", how="inner")
    .join(df_rental, on="inventory_id", how="inner")
)
  
df_res_q2 = (
    df_res_q2.groupBy(col("actor_id"), concat_ws(" ", col("first_name"), col("last_name")).alias("name"))
    .agg(count("rental_id").alias("rental_count"))
    .orderBy(desc("rental_count"))
    .limit(10)
) 

df_res_q2.show()

+--------+------------------+------------+
|actor_id|              name|rental_count|
+--------+------------------+------------+
|     107|    GINA DEGENERES|         753|
|     181|    MATTHEW CARREY|         678|
|     198|       MARY KEITEL|         674|
|     144|ANGELA WITHERSPOON|         654|
|     102|       WALTER TORN|         640|
|      60|       HENRY BERRY|         612|
|     150|       JAYNE NOLTE|         611|
|      37|        VAL BOLGER|         605|
|      23|     SANDRA KILMER|         604|
|      90|      SEAN GUINESS|         599|
+--------+------------------+------------+



## Вывести категорию фильмов, на которую потратили больше всего денег

In [9]:
df_res_q3 = (
    df_category.join(df_film_category, on="category_id", how="inner")
    .join(df_inventory, on="film_id", how="inner")
    .join(df_rental, on="inventory_id", how="inner")
    .join(df_payment, on="rental_id", how="inner")
)
df_res_q3 = (
    df_res_q3.groupBy("name")
    .agg(sum("amount").alias("total_cost"))
    .orderBy(desc("total_cost"))
    .limit(1)
)


In [10]:
df_res_q3.show()

+-------+----------+
|   name|total_cost|
+-------+----------+
|Foreign|  10507.67|
+-------+----------+



## Вывести названия фильмов, которых нет в inventory. Написать запрос без использования оператора IN

In [11]:
df_res_q4 = df_film.join(df_inventory, on="film_id", how="left")
df_res_q4 = df_res_q4.select("title").filter(col("inventory_id").isNull())

In [12]:
df_res_q4.show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [13]:
df_res_q4.count()

42

## Вывести топ 3 актеров, которые больше всего появлялись в фильмах в категории “Children”. Если у нескольких актеров одинаковое кол-во фильмов, вывести всех

In [14]:
df_res_q5 = (
    df_actor.join(df_film_actor, on="actor_id", how="inner")
    .join(df_film_category, on="film_id", how="inner")
    .join(df_category, on="category_id", how="inner")
    .filter(col("name") == "Children")
)

df_res_q5 = (
    df_res_q5.groupBy(concat_ws(" ", col("first_name"), col("last_name")).alias("actor_name"))
     .agg(count(col("film_id")).alias("films_count"))
)

windowSpec = Window.orderBy(desc("films_count"))

df_res_q5 = (
    df_res_q5.withColumn("rank", dense_rank().over(windowSpec))
    .filter(col("rank") <= 3)
    .orderBy("rank")
)

In [15]:
df_res_q5.show()

+----------------+-----------+----+
|      actor_name|films_count|rank|
+----------------+-----------+----+
|    SIDNEY CROWE|          9|   1|
|    RICHARD PENN|          9|   1|
|    EWAN GOODING|          9|   1|
|      DAN HARRIS|          8|   2|
|       KIM ALLEN|          8|   2|
|      ALEC WAYNE|          8|   2|
|      MARY TANDY|          8|   2|
|    JANE JACKMAN|          8|   2|
|  RUSSELL TEMPLE|          8|   2|
|    SPENCER PECK|          8|   2|
|  MATTHEW CARREY|          8|   2|
|     SUSAN DAVIS|          8|   2|
|      JADA RYDER|          8|   2|
|   ANGELA HUDSON|          7|   3|
|    WARREN NOLTE|          7|   3|
|MINNIE ZELLWEGER|          7|   3|
|     GENE WILLIS|          7|   3|
|  AUDREY OLIVIER|          7|   3|
|  JULIANNE DENCH|          7|   3|
|      JAMES PITT|          7|   3|
+----------------+-----------+----+
only showing top 20 rows



## Вывести города с количеством активных и неактивных клиентов (активный — customer.active = 1). Отсортировать по количеству неактивных клиентов по убыванию

In [16]:
df_res_q6 = (
    df_city.join(df_address, on=["city_id"], how="inner")
    .join(df_customer, on=["address_id"], how="inner")
)

df_res_q6 = (
    df_res_q6.groupBy("city")
    .agg(
        sum(when(col("active") == 1, 1).otherwise(0)).alias("active_clients"),
        sum(when(col("active") == 0, 1).otherwise(0)).alias("inactive_clients")
    )
    .orderBy(col("inactive_clients").desc())
)

In [17]:
df_res_q6.show()

+------------------+--------------+----------------+
|              city|active_clients|inactive_clients|
+------------------+--------------+----------------+
|          Uluberia|             0|               1|
|         Najafabad|             0|               1|
|         Pingxiang|             0|               1|
|          Xiangfan|             0|               1|
|        Kumbakonam|             0|               1|
|       Szkesfehrvr|             0|               1|
|  Charlotte Amalie|             0|               1|
|            Kamyin|             0|               1|
|            Daxian|             0|               1|
|     Coatzacoalcos|             0|               1|
|           Wroclaw|             0|               1|
|            Ktahya|             0|               1|
|           Bat Yam|             0|               1|
|   Southend-on-Sea|             0|               1|
|            Amroha|             0|               1|
|A Corua (La Corua)|             1|           

## Вывести категорию фильмов, у которой самое большое кол-во часов суммарной аренды в городах (customer.address_id в этом city), и которые начинаются на букву “a”. Тоже самое сделать для городов в которых есть символ “-”

In [21]:
df_rent_hours = (
    df_city.join(df_address, on="city_id", how="inner")
    .join(df_customer, on="address_id", how="inner")
    .join(df_rental, on="customer_id", how="inner")
    .join(df_inventory, on="inventory_id", how="inner")
    .join(df_film_category, on="film_id", how="inner")
    .join(df_category, on="category_id", how="inner")
    .filter(col("return_date").isNotNull())
)


df_rent_hours = (
    df_rent_hours.select(
        col("city").alias("city_name"),  
        col("name").alias("category"),  
        col("return_date").cast("long").alias("return_date_long"),
        col("rental_date").cast("long").alias("rental_date_long")
    )
    .groupBy("city_name", "category")  
    .agg(round(sum((col("return_date_long") - col("rental_date_long"))/3600)).alias("total_rental_hours"))
)

df_max_hours_a = (
    df_rent_hours.filter(col("category").like("A%"))  
    .groupBy("city_name", "category")  
    .agg(max("total_rental_hours").alias("max_hours"))  
)

windowSpec = Window.partitionBy("city_name").orderBy(desc("total_rental_hours"))

df_max_hours_cities = (
    df_rent_hours.filter(col("city_name").like("%-%"))
    .withColumn("row_num", row_number().over(windowSpec))                       

)

df_res_q7 = (
    df_max_hours_a.union(df_max_hours_cities.filter(col("row_num") == 1)
        .select(col("city_name"), col("category"), col("total_rental_hours").alias("max_hours"))
    )
    .orderBy(desc("max_hours"))
)

df_res_q7.show()


+--------------------+---------+---------+
|           city_name| category|max_hours|
+--------------------+---------+---------+
|           Molodetno|   Action|   1579.0|
|         Saint-Denis|      New|   1508.0|
|             Hodeida|   Action|   1505.0|
|     Jastrzebie-Zdrj|   Horror|   1450.0|
|                Fuyu|Animation|   1431.0|
|             Bijapur|   Action|   1395.0|
|        Garden Grove|Animation|   1384.0|
|Augusta-Richmond ...|    Games|   1362.0|
|              Yangor|Animation|   1334.0|
|            Bhilwara|Animation|   1252.0|
|       Richmond Hill|   Action|   1251.0|
|      Kirovo-Tepetsk|   Comedy|   1246.0|
|            Erlangen|   Action|   1242.0|
|          Santa Rosa|   Action|   1222.0|
|          Cape Coral|Animation|   1219.0|
|Kowloon and New K...|   Action|   1215.0|
|         Zhezqazghan|   Action|   1213.0|
|           Sincelejo|   Action|   1208.0|
|           Lapu-Lapu|      New|   1204.0|
|                Abha|   Action|   1185.0|
+----------