In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PagilaTest") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()


jdbc_url = "jdbc:postgresql://pagila:5432/postgres" 
connection_properties = {
    "user": "postgres",
    "password": "123456", 
    "driver": "org.postgresql.Driver"
}

try:
    df_actors = spark.read.jdbc(url=jdbc_url, table="actor", properties=connection_properties)
    
    df_actors.show(5)    
except Exception as e:
    print("Ошибка подключения:")
    print(e)

+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2022-02-15 09:34:33|
|       2|      NICK|    WAHLBERG|2022-02-15 09:34:33|
|       3|        ED|       CHASE|2022-02-15 09:34:33|
|       4|  JENNIFER|       DAVIS|2022-02-15 09:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2022-02-15 09:34:33|
+--------+----------+------------+-------------------+
only showing top 5 rows



In [3]:
import sys
print(sys.version)
print(sys.executable)

3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0]
/opt/conda/bin/python


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, sum, avg, when, dense_rank, lower, unix_timestamp
from pyspark.sql.window import Window

jdbc_url = "jdbc:postgresql://pagila:5432/postgres"
connection_properties = {
    "user": "postgres",
    "password": "123456",
    "driver": "org.postgresql.Driver"
}

spark = SparkSession.builder \
    .appName("Pagila_Analysis") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

def load_table(table_name):
    return spark.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)

df_category      = load_table("category")
df_film_category = load_table("film_category")
df_film          = load_table("film")
df_actor         = load_table("actor")
df_film_actor    = load_table("film_actor")
df_inventory     = load_table("inventory")
df_rental        = load_table("rental")
df_payment       = load_table("payment")
df_customer      = load_table("customer")
df_address       = load_table("address")
df_city          = load_table("city")
df_country       = load_table("country")


In [6]:
task1_result = df_category.join(df_film_category, on="category_id", how="inner") \
    .groupBy("name") \
    .agg(count("film_id").alias("movie_count")) \
    .orderBy(col("movie_count").desc())

task1_result.show()

Task 1 Result:
+-----------+-----------+
|       name|movie_count|
+-----------+-----------+
|      Drama|        152|
|      Music|        152|
|     Travel|        151|
|    Foreign|        150|
|      Games|        150|
|   Children|        150|
|     Action|        149|
|     Sci-Fi|        149|
|  Animation|        148|
|     Family|        147|
|   Classics|        147|
|        New|        147|
|     Sports|        145|
|Documentary|        145|
|     Comedy|        143|
|     Horror|        142|
+-----------+-----------+



In [8]:
task2_result = df_actor.join(df_film_actor, on="actor_id", how="inner") \
    .join(df_inventory, on="film_id", how="inner") \
    .join(df_rental, on="inventory_id", how="inner") \
    .groupBy("first_name", "last_name") \
    .agg(count("rental_id").alias("rentals_count")) \
    .orderBy(col("rentals_count").desc()) \
    .limit(10)
task2_result.show()

+----------+-----------+-------------+
|first_name|  last_name|rentals_count|
+----------+-----------+-------------+
|     SUSAN|      DAVIS|          825|
|      GINA|  DEGENERES|          753|
|   MATTHEW|     CARREY|          678|
|      MARY|     KEITEL|          674|
|    ANGELA|WITHERSPOON|          654|
|    WALTER|       TORN|          640|
|     HENRY|      BERRY|          612|
|     JAYNE|      NOLTE|          611|
|       VAL|     BOLGER|          605|
|    SANDRA|     KILMER|          604|
+----------+-----------+-------------+



In [9]:
df_category.join(df_film_category, "category_id") \
    .join(df_inventory, "film_id") \
    .join(df_rental, "inventory_id") \
    .join(df_payment, "rental_id") \
    .groupBy("name") \
    .agg(sum("amount").alias("total_sales")) \
    .orderBy(col("total_sales").desc()) \
    .limit(1) \
    .show()

+-------+-----------+
|   name|total_sales|
+-------+-----------+
|Foreign|   10507.67|
+-------+-----------+



In [10]:
df_film.join(df_inventory, "film_id", "left_anti") \
    .select("title") \
    .show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [11]:
window_spec = Window.orderBy(col("film_count").desc())

df_actor.join(df_film_actor, "actor_id") \
    .join(df_film_category, "film_id") \
    .join(df_category, "category_id") \
    .filter(col("name") == "Children") \
    .groupBy("first_name", "last_name") \
    .agg(count("film_id").alias("film_count")) \
    .withColumn("rank", dense_rank().over(window_spec)) \
    .filter(col("rank") <= 3) \
    .show()

+----------+---------+----------+----+
|first_name|last_name|film_count|rank|
+----------+---------+----------+----+
|    SIDNEY|    CROWE|         9|   1|
|   RICHARD|     PENN|         9|   1|
|      EWAN|  GOODING|         9|   1|
|      JANE|  JACKMAN|         8|   2|
|     SUSAN|    DAVIS|         8|   2|
|   RUSSELL|   TEMPLE|         8|   2|
|   SPENCER|     PECK|         8|   2|
|      MARY|    TANDY|         8|   2|
|       DAN|   HARRIS|         8|   2|
|      ALEC|    WAYNE|         8|   2|
|       KIM|    ALLEN|         8|   2|
|      JADA|    RYDER|         8|   2|
|   MATTHEW|   CARREY|         8|   2|
|    MINNIE|ZELLWEGER|         7|   3|
|    ANGELA|   HUDSON|         7|   3|
|     HELEN|   VOIGHT|         7|   3|
|     DARYL| WAHLBERG|         7|   3|
|      GENE|   WILLIS|         7|   3|
|    WARREN|    NOLTE|         7|   3|
|     JAMES|     PITT|         7|   3|
+----------+---------+----------+----+
only showing top 20 rows



In [12]:
df_city.join(df_address, "city_id") \
    .join(df_customer, "address_id") \
    .groupBy("city") \
    .agg(sum("active").alias("active_customers"), \
         sum(when(col("active") == 0, 1).otherwise(0)).alias("inactive_customers")) \
    .orderBy(col("inactive_customers").desc()) \
    .show()

+------------------+----------------+------------------+
|              city|active_customers|inactive_customers|
+------------------+----------------+------------------+
|          Uluberia|               0|                 1|
|         Najafabad|               0|                 1|
|         Pingxiang|               0|                 1|
|          Xiangfan|               0|                 1|
|        Kumbakonam|               0|                 1|
|       Szkesfehrvr|               0|                 1|
|  Charlotte Amalie|               0|                 1|
|            Kamyin|               0|                 1|
|            Daxian|               0|                 1|
|     Coatzacoalcos|               0|                 1|
|           Wroclaw|               0|                 1|
|            Ktahya|               0|                 1|
|           Bat Yam|               0|                 1|
|   Southend-on-Sea|               0|                 1|
|            Amroha|           

In [14]:
df_city.filter(lower(col("city")).startswith("a")) \
    .join(df_address, "city_id") \
    .join(df_customer, "address_id") \
    .join(df_rental, "customer_id") \
    .join(df_inventory, "inventory_id") \
    .join(df_film_category, "film_id") \
    .join(df_category, "category_id") \
    .groupBy("name") \
    .agg(sum((unix_timestamp("return_date") - unix_timestamp("rental_date"))/3600).alias("total_hours")) \
    .orderBy(col("total_hours").desc()) \
    .limit(1) \
    .show()
df_city.filter(col("city").contains("-")) \
    .join(df_address, "city_id") \
    .join(df_customer, "address_id") \
    .join(df_rental, "customer_id") \
    .join(df_inventory, "inventory_id") \
    .join(df_film_category, "film_id") \
    .join(df_category, "category_id") \
    .groupBy("name") \
    .agg(sum((unix_timestamp("return_date") - unix_timestamp("rental_date"))/3600).alias("total_hours")) \
    .orderBy(col("total_hours").desc()) \
    .limit(1) \
    .show()

+--------+------------------+
|    name|       total_hours|
+--------+------------------+
|Children|25834.199999999997|
+--------+------------------+

+-----+------------------+
| name|       total_hours|
+-----+------------------+
|Drama|14556.033333333333|
+-----+------------------+

