Spark initialization

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MoviesAnalytics") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

print(f"Spark v{spark.version} is sucesfully initialized.")

Spark v3.5.0 is sucesfully initialized.


Importing libriaries needed for execution of PySpark queries:

In [2]:
from pyspark.sql.functions import sum, col, desc, dense_rank, when, unix_timestamp, lower
from pyspark.sql.window import Window

Establishing connection with pagila database and importing dataframes from it:

In [3]:
jdbc_url = "jdbc:postgresql://db:5432/pagila"
connection_properties = {
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

db_actor = spark.read.jdbc(url=jdbc_url, table="actor", properties=connection_properties)
db_address = spark.read.jdbc(url=jdbc_url, table="address", properties=connection_properties)
db_category = spark.read.jdbc(url=jdbc_url, table="category", properties=connection_properties)
db_store = spark.read.jdbc(url=jdbc_url, table="store", properties=connection_properties)
db_city = spark.read.jdbc(url=jdbc_url, table="city", properties=connection_properties)
db_customer = spark.read.jdbc(url=jdbc_url, table="customer", properties=connection_properties)
db_film = spark.read.jdbc(url=jdbc_url, table="film", properties=connection_properties)
db_film_actor = spark.read.jdbc(url=jdbc_url, table="film_actor", properties=connection_properties)
db_film_category = spark.read.jdbc(url=jdbc_url, table="film_category", properties=connection_properties)
db_inventory = spark.read.jdbc(url=jdbc_url, table="inventory", properties=connection_properties)
db_language = spark.read.jdbc(url=jdbc_url, table="language", properties=connection_properties)
db_payment = spark.read.jdbc(url=jdbc_url, table="payment", properties=connection_properties)
db_rental = spark.read.jdbc(url=jdbc_url, table="rental", properties=connection_properties)
db_staff = spark.read.jdbc(url=jdbc_url, table="staff", properties=connection_properties)
db_store = spark.read.jdbc(url=jdbc_url, table="store", properties=connection_properties)
db_payment = spark.read.jdbc(url=jdbc_url, table="payment", properties=connection_properties)

The number of movies in each category (descending order) 

In [4]:
q1_result = db_film_category \
    .join(db_category, "category_id") \
    .groupBy("name") \
    .count() \
    .orderBy(col("count").desc())

q1_result.show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



10 actors whose movies rented the most (descending order)

In [14]:
q2_result = db_rental \
    .join(db_inventory, "inventory_id") \
    .join(db_film_actor, "film_id") \
    .join(db_actor, "actor_id") \
    .groupBy("first_name", "last_name") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

q2_result.show()
    

+----------+-----------+-----+
|first_name|  last_name|count|
+----------+-----------+-----+
|     SUSAN|      DAVIS|  825|
|      GINA|  DEGENERES|  753|
|   MATTHEW|     CARREY|  678|
|      MARY|     KEITEL|  674|
|    ANGELA|WITHERSPOON|  654|
|    WALTER|       TORN|  640|
|     HENRY|      BERRY|  612|
|     JAYNE|      NOLTE|  611|
|       VAL|     BOLGER|  605|
|    SANDRA|     KILMER|  604|
+----------+-----------+-----+



The category of movies on which the most money was spent

In [None]:
q3_result = db_payment \
    .join(db_rental, "rental_id") \
    .join(db_inventory, "inventory_id") \
    .join(db_film_category, "film_id") \
    .join(db_category, "category_id") \
    .groupBy("name") \
    .agg((sum("amount")).alias("money spent")) \
    .orderBy(col("money spent").desc()) \
    .limit(1)

q3_result.show()

+------+-----------+
|  name|money spent|
+------+-----------+
|Sports|    5314.21|
+------+-----------+



Movies that are not in the inventory

In [30]:
q4_result = db_film \
    .join(db_inventory, "film_id", "left_anti") \
    .select("title")

q4_result.show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows



Top 3 actors who have appeared most in movies in the “Children” category

In [14]:
q5_joins = db_category \
    .join(db_film_category, "category_id") \
    .filter(col("name") == "Children") \
    .join(db_film_actor, "film_id") \
    .join(db_actor, "actor_id") \
    .groupBy("first_name", "last_name") \
    .count()

windowSpec = Window.orderBy(desc("count"))

q5_result = q5_joins \
    .withColumn("rank", dense_rank().over(windowSpec)) \
    .filter(col("rank") <= 3) \
    .orderBy(col("rank"))

q5_result.show()

+----------+---------+-----+----+
|first_name|last_name|count|rank|
+----------+---------+-----+----+
|     HELEN|   VOIGHT|    7|   1|
|     SUSAN|    DAVIS|    6|   2|
|     KEVIN|  GARLAND|    5|   3|
|     RALPH|     CRUZ|    5|   3|
|    WHOOPI|     HURT|    5|   3|
|      MARY|    TANDY|    5|   3|
+----------+---------+-----+----+



Cities with the number of active and inactive customers

In [18]:
q6_result = db_customer \
    .join(db_address, "address_id") \
    .join(db_city, "city_id") \
    .groupBy("city") \
    .agg(
        sum(when(col("active") == 1, 1).otherwise(0)).alias("active_customers"),
        sum(when(col("active") == 0, 1).otherwise(0)).alias("inactive_customers")
    ) \
    .orderBy(col("inactive_customers").desc())

q6_result.show()

+------------------+----------------+------------------+
|              city|active_customers|inactive_customers|
+------------------+----------------+------------------+
|          Uluberia|               0|                 1|
|         Najafabad|               0|                 1|
|         Pingxiang|               0|                 1|
|          Xiangfan|               0|                 1|
|        Kumbakonam|               0|                 1|
|       Szkesfehrvr|               0|                 1|
|  Charlotte Amalie|               0|                 1|
|            Kamyin|               0|                 1|
|            Daxian|               0|                 1|
|     Coatzacoalcos|               0|                 1|
|           Wroclaw|               0|                 1|
|            Ktahya|               0|                 1|
|            Amroha|               0|                 1|
|   Southend-on-Sea|               0|                 1|
|           Bat Yam|           

The category of movies that have the highest number of total rental hours in the cities, and that start with the letter “a” or has a “-” symbol

In [25]:
q7_dataframe = db_rental \
    .filter(col("return_date").isNotNull()) \
    .withColumn("rental_hours", (unix_timestamp("return_date") - unix_timestamp("rental_date")) / 3600) \
    .join(db_inventory, "inventory_id") \
    .join(db_film_category, "film_id") \
    .join(db_category, "category_id") \
    .join(db_customer, "customer_id") \
    .join(db_address, "address_id") \
    .join(db_city, "city_id") \
    .select(
        col("city"),
        col("name").alias("category_name"),
        col("rental_hours")
    )

q7_result = q7_dataframe \
    .filter(
        (lower(col("city")).startswith("a")) | (col("city").contains("-"))
    ) \
    .groupBy("category_name") \
    .agg(sum("rental_hours").alias("total_hours")) \
    .orderBy(desc("total_hours")) \
    .limit(1)

q7_result.show()


+-------------+-----------------+
|category_name|      total_hours|
+-------------+-----------------+
|       Sports|16858.16666666667|
+-------------+-----------------+

