In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Movies Analysis") \
    .config("spark.jars", "C:\\Users\\Admin\\Downloads\\postgresql-42.7.8.jar") \
    .getOrCreate()

url = "jdbc:postgresql://localhost:5432/pagila"
properties = {
    "user": "postgres",
    "password": "root",
    "driver": "org.postgresql.Driver"
}

films_df = spark.read.jdbc(url=url, table="film", properties=properties)
actors_df = spark.read.jdbc(url=url, table="actor", properties=properties)
rental_df = spark.read.jdbc(url=url, table="rental", properties=properties)
category_df = spark.read.jdbc(url=url, table="category", properties=properties)
inventory_df = spark.read.jdbc(url=url, table="inventory", properties=properties)
film_category_df = spark.read.jdbc(url=url, table="film_category", properties=properties)
customer_df = spark.read.jdbc(url=url, table="customer", properties=properties)
address_df = spark.read.jdbc(url=url, table="address", properties=properties)
city_df = spark.read.jdbc(url=url, table="city", properties=properties)


In [5]:
film_category_df = spark.read.jdbc(url=url, table="film_category", properties=properties)

category_count = category_df \
    .join(film_category_df, "category_id") \
    .join(films_df, "film_id") \
    .groupBy("name") \
    .agg(F.count("film_id").alias("film_count")) \
    .orderBy(F.desc("film_count"))

category_count.show()


+-----------+----------+
|       name|film_count|
+-----------+----------+
|      Drama|       152|
|      Music|       152|
|     Travel|       151|
|    Foreign|       150|
|      Games|       150|
|   Children|       150|
|     Action|       149|
|     Sci-Fi|       149|
|  Animation|       148|
|     Family|       147|
|   Classics|       147|
|        New|       147|
|     Sports|       145|
|Documentary|       145|
|     Comedy|       143|
|     Horror|       142|
+-----------+----------+



In [4]:
film_actor_df = spark.read.jdbc(url=url, table="film_actor", properties=properties)

from pyspark.sql import functions as F

actor_rental_count = actors_df \
    .join(film_actor_df, "actor_id") \
    .join(films_df, "film_id") \
    .join(inventory_df, "film_id") \
    .join(rental_df, "inventory_id") \
    .groupBy("actor_id", "first_name", "last_name") \
    .agg(F.count("rental_id").alias("rental_count")) \
    .orderBy(F.desc("rental_count")) \
    .limit(10)

actor_rental_count.show()


+--------+----------+-----------+------------+
|actor_id|first_name|  last_name|rental_count|
+--------+----------+-----------+------------+
|     107|      GINA|  DEGENERES|         753|
|     181|   MATTHEW|     CARREY|         678|
|     198|      MARY|     KEITEL|         674|
|     144|    ANGELA|WITHERSPOON|         654|
|     102|    WALTER|       TORN|         640|
|      60|     HENRY|      BERRY|         612|
|     150|     JAYNE|      NOLTE|         611|
|      37|       VAL|     BOLGER|         605|
|      23|    SANDRA|     KILMER|         604|
|      90|      SEAN|    GUINESS|         599|
+--------+----------+-----------+------------+



In [6]:
payment_df = spark.read.jdbc(url=url, table="payment", properties=properties)

category_revenue = category_df \
    .join(film_category_df, "category_id") \
    .join(films_df, "film_id") \
    .join(inventory_df, "film_id") \
    .join(rental_df, "inventory_id") \
    .join(payment_df, "rental_id") \
    .groupBy("name") \
    .agg(F.sum("amount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

category_revenue.show(1)


+-------+-------------+
|   name|total_revenue|
+-------+-------------+
|Foreign|     10507.67|
+-------+-------------+
only showing top 1 row



In [7]:
films_not_in_inventory = films_df.join(inventory_df, "film_id", "left_anti")

films_not_in_inventory.select("title").show()


+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

children_actors = actors_df \
    .join(film_actor_df, "actor_id") \
    .join(films_df, "film_id") \
    .join(film_category_df, "film_id") \
    .join(category_df, "category_id") \
    .filter(F.col("name") == "Children") \
    .groupBy("actor_id", "first_name", "last_name") \
    .agg(F.count("film_id").alias("film_count"))

window = Window.orderBy(F.desc("film_count"))
top3_children_actors = children_actors.withColumn("rank", F.dense_rank().over(window)) \
    .filter(F.col("rank") <= 3)

top3_children_actors.select("first_name", "last_name", "film_count").show()


+----------+---------+----------+
|first_name|last_name|film_count|
+----------+---------+----------+
|    SIDNEY|    CROWE|         9|
|      EWAN|  GOODING|         9|
|   RICHARD|     PENN|         9|
|   SPENCER|     PECK|         8|
|       KIM|    ALLEN|         8|
|      MARY|    TANDY|         8|
|      ALEC|    WAYNE|         8|
|       DAN|   HARRIS|         8|
|   RUSSELL|   TEMPLE|         8|
|   MATTHEW|   CARREY|         8|
|      JANE|  JACKMAN|         8|
|      JADA|    RYDER|         8|
|     JAMES|     PITT|         7|
|    WARREN|    NOLTE|         7|
|  JULIANNE|    DENCH|         7|
|    AUDREY|  OLIVIER|         7|
|      GENE|   WILLIS|         7|
|    ANGELA|   HUDSON|         7|
|     DARYL| WAHLBERG|         7|
|   KENNETH|     TORN|         7|
+----------+---------+----------+
only showing top 20 rows



In [9]:
from pyspark.sql import functions as F

city_client_status = customer_df \
    .join(address_df, "address_id") \
    .join(city_df, "city_id") \
    .groupBy("city") \
    .agg(
        F.sum(F.col("active")).alias("active_count"),
        F.sum(F.when(F.col("active") == 0, 1).otherwise(0)).alias("inactive_count")
    ) \
    .orderBy(F.desc("inactive_count"))

city_client_status.show()


+------------------+------------+--------------+
|              city|active_count|inactive_count|
+------------------+------------+--------------+
|          Uluberia|           0|             1|
|         Najafabad|           0|             1|
|         Pingxiang|           0|             1|
|          Xiangfan|           0|             1|
|        Kumbakonam|           0|             1|
|       Szkesfehrvr|           0|             1|
|  Charlotte Amalie|           0|             1|
|            Kamyin|           0|             1|
|            Daxian|           0|             1|
|     Coatzacoalcos|           0|             1|
|           Wroclaw|           0|             1|
|            Ktahya|           0|             1|
|            Amroha|           0|             1|
|   Southend-on-Sea|           0|             1|
|           Bat Yam|           0|             1|
|          Fengshan|           1|             0|
|A Corua (La Corua)|           1|             0|
|           El Alto|

In [12]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

city_rental = rental_df \
    .join(inventory_df, "inventory_id") \
    .join(films_df, "film_id") \
    .join(film_category_df, "film_id") \
    .join(category_df, "category_id") \
    .join(customer_df, "customer_id") \
    .join(address_df, "address_id") \
    .join(city_df, "city_id") \
    .withColumn("rental_hours", F.col("rental_duration")) \
    .withColumn(
        "city_group",
        F.when(F.col("city").startswith("A"), "A_cities")
         .when(F.col("city").contains("-"), "dash_cities")
    )

window_spec = Window.partitionBy("city_group")

city_category_hours = city_rental \
    .filter(F.col("city_group").isNotNull()) \
    .groupBy("city_group", "name") \
    .agg(F.sum("rental_hours").alias("total_rental_hours")) \
    .withColumn("max_hours", F.max("total_rental_hours").over(window_spec)) \
    .filter(F.col("total_rental_hours") == F.col("max_hours")) \
    .select("city_group", "name", "total_rental_hours")

city_category_hours.show()


+-----------+--------+------------------+
| city_group|    name|total_rental_hours|
+-----------+--------+------------------+
|   A_cities|Children|               955|
|dash_cities|  Sci-Fi|               576|
+-----------+--------+------------------+

