In [2]:
import findspark
findspark.init()

In [3]:
# Necessary libraries

import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql.window import Window

In [4]:
# create Spark Session 

spark = SparkSession.builder \
    .appName("Postgres_to_PySpark") \
    .config("spark.jars", r"C:\Program Files\spark\jars\postgresql-42.6.0.jar") \
    .getOrCreate()

In [5]:
# Connect to postresql database 

jdbc_url = "jdbc:postgresql://localhost:5432/pagila"
properties = {
    "user": "postgres",
    "password": "victor",
    "driver": "org.postgresql.Driver"  
}

In [6]:
spark

In [7]:
# Necessary table for queries

film = spark.read.jdbc(url=jdbc_url, table="film", properties=properties)
film_category = spark.read.jdbc(url=jdbc_url, table="film_category", properties=properties)
category = spark.read.jdbc(url=jdbc_url, table="category", properties=properties)
film_actor = spark.read.jdbc(url=jdbc_url, table="film_actor", properties=properties)
actor = spark.read.jdbc(url=jdbc_url, table="actor", properties=properties)
inventory = spark.read.jdbc(url=jdbc_url, table="inventory", properties=properties)
rental = spark.read.jdbc(url=jdbc_url, table="rental", properties=properties)
city = spark.read.jdbc(url=jdbc_url, table="city", properties=properties)
address = spark.read.jdbc(url=jdbc_url, table="address", properties=properties)
customer = spark.read.jdbc(url=jdbc_url, table="customer", properties=properties)

In [9]:
# Q1: Number of films in each category

result_1 = (
    film.alias("f1")
    .join(film_category.alias("f2"), col("f1.film_id") == col("f2.film_id"), "inner")
    .join(category.alias("f3"), col("f2.category_id") == col("f3.category_id"), "inner")
    .groupBy(col("f3.category_id"), col("f3.name"))
    .agg(f.count("f1.film_id").alias("quantity"))
    .orderBy(col("quantity").desc())
)

result_1.show()

+-----------+-----------+--------+
|category_id|       name|quantity|
+-----------+-----------+--------+
|         15|     Sports|      74|
|          9|    Foreign|      73|
|          8|     Family|      69|
|          6|Documentary|      68|
|          2|  Animation|      66|
|          1|     Action|      64|
|         13|        New|      63|
|          7|      Drama|      62|
|         10|      Games|      61|
|         14|     Sci-Fi|      61|
|          3|   Children|      60|
|          5|     Comedy|      58|
|          4|   Classics|      57|
|         16|     Travel|      57|
|         11|     Horror|      56|
|         12|      Music|      51|
+-----------+-----------+--------+



In [10]:
#Q2: The 10 actors whose films were rented the most

result_2 = (
    actor.alias("a1")
    .join(film_actor.alias("f1"), col("a1.actor_id") == col("f1.actor_id"), "inner")
    .join(film.alias("f2"), col("f1.film_id") == col("f2.film_id"), "inner")
    .join(inventory.alias("i1"), col("f2.film_id") ==  col("i1.inventory_id"), "inner")
    .join(rental.alias("r1"), col("i1.inventory_id") == col("r1.inventory_id"), "inner")
    .groupBy(col("a1.actor_id"), f.concat_ws(" ", col("a1.first_name"), col("a1.last_name")).alias("full_name"))
    .agg(f.count(col("r1.rental_id")).alias("rental_count"))
    .orderBy(col("rental_count").desc())
    .limit(10)
         )

result_2.show()

+--------+------------------+------------+
|actor_id|         full_name|rental_count|
+--------+------------------+------------+
|     102|       WALTER TORN|         144|
|      23|     SANDRA KILMER|         137|
|     198|       MARY KEITEL|         134|
|     181|    MATTHEW CARREY|         131|
|     107|    GINA DEGENERES|         129|
|     144|ANGELA WITHERSPOON|         128|
|      60|       HENRY BERRY|         125|
|     106|     GROUCHO DUNST|         125|
|      81|    SCARLETT DAMON|         125|
|     150|       JAYNE NOLTE|         123|
+--------+------------------+------------+



In [11]:
#Q3: The category of films on which the most money was spent

result_3 = (
    film.alias("f1")
    .join(film_category.alias("f2"), col("f2.film_id") == col("f1.film_id"), "inner")
    .join(category.alias("c1"), col("c1.category_id") == col("f2.category_id"), "inner")
    .groupBy(col("c1.name"))
    .agg(f.sum(col("f1.replacement_cost")).alias("total_cost"))
    .orderBy(col("total_cost").desc()) 
    .limit(1)
)
         
result_3.show()

+------+----------+
|  name|total_cost|
+------+----------+
|Sports|   1509.26|
+------+----------+



In [12]:
#Q4: Movies that does not have inventory

result_4 = (
    film.alias("f1")
    .join(inventory.alias("i1"), col("f1.film_id") == col("i1.film_id"), "left")
    .filter(col("i1.inventory_id").isNull())
    .select(col("f1.film_id"), col("f1.title"))
    )

result_4.show()

+-------+--------------------+
|film_id|               title|
+-------+--------------------+
|    148|      CHOCOLATE DUCK|
|    108|       BUTCH PANTHER|
|    950|        VOLUME HOUSE|
|    642|      ORDER BETRAYED|
|    874|        TADPOLE PARK|
|    497|    KILL BROTHERHOOD|
|    332|FRANKENSTEIN STRA...|
|    192|    CROSSING DIVORCE|
|    860|    SUICIDES SILENCE|
|    128|       CATCH AMISTAD|
|    671|     PERDITION FARGO|
|    325|       FLOATS GARDEN|
|    386|           GUMP DATE|
|    955|        WALLS ARTIST|
|    359|  GLADIATOR WESTWARD|
|    419|         HOCUS FRIDA|
|     41|ARSENIC INDEPENDENCE|
|    607|         MUPPET MILE|
|    318|   FIREHOUSE VIETNAM|
|    742|       ROOF CHAMPION|
+-------+--------------------+
only showing top 20 rows



In [13]:
#Q5: Top 3 actors who appears in category 'Children'

children = (
    actor.alias("a1")
    .join(film_actor.alias("fa"), col("a1.actor_id") == col("fa.actor_id"), "inner")
    .join(film.alias("f1"), col("fa.film_id") == col("f1.film_id"), "inner")
    .join(film_category.alias("fc"), col("f1.film_id") == col("fc.film_id"), "inner")
    .join(category.alias("c"), col("c.category_id") == col("fc.category_id"), "inner")
    .filter(col("c.name") == "Children")
    .groupBy(col("a1.actor_id"), f.concat_ws(" ", col("a1.first_name"), col("a1.last_name")).alias("full_name"), col("c.name"))
    .agg(f.count(col("a1.actor_id")).alias("appearance_count")) 
)

window = Window.orderBy(col("appearance_count").desc())
ranked_children = children.withColumn("top", f.dense_rank().over(window))

result_5 = ranked_children.filter(col("top").between(1,3))
                                    
result_5.show()

+--------+----------------+--------+----------------+---+
|actor_id|       full_name|    name|appearance_count|top|
+--------+----------------+--------+----------------+---+
|      17|    HELEN VOIGHT|Children|               7|  1|
|      66|      MARY TANDY|Children|               5|  2|
|     127|   KEVIN GARLAND|Children|               5|  2|
|     140|     WHOOPI HURT|Children|               5|  2|
|      80|      RALPH CRUZ|Children|               5|  2|
|     131|    JANE JACKMAN|Children|               4|  3|
|     109|  SYLVESTER DERN|Children|               4|  3|
|     187|      RENEE BALL|Children|               4|  3|
|      37|      VAL BOLGER|Children|               4|  3|
|      23|   SANDRA KILMER|Children|               4|  3|
|      81|  SCARLETT DAMON|Children|               4|  3|
|     150|     JAYNE NOLTE|Children|               4|  3|
|     173|   ALAN DREYFUSS|Children|               4|  3|
|      58|CHRISTIAN AKROYD|Children|               4|  3|
|     101|    

In [14]:
# Cities that have active and inactive customer

result_6 = (
    city.alias("c1")
    .join(address.alias("a1"), col("c1.city_id") == col("a1.city_id"), "inner")
    .join(customer.alias("c2"), col("c2.address_id") == col("a1.address_id"), "inner")
    .groupBy(col("c1.city_id"), col("c1.city"))
    .agg(
        f.count(f.when(col("c2.active") == 0, 1).otherwise(0)).alias("inactive_customers"),
        f.count(f.when(col("c2.active") == 1, 1).otherwise(0)).alias("active_customers")
    )
    .orderBy(f.count(f.when(col("c2.active") == 0, 1).otherwise(0).alias("inactive_customers")).desc())
)

result_6.show()

+-------+--------------------+------------------+----------------+
|city_id|                city|inactive_customers|active_customers|
+-------+--------------------+------------------+----------------+
|    312|              London|                 2|               2|
|     42|              Aurora|                 2|               2|
|    148|            Duisburg|                 1|               1|
|    463|              Sasebo|                 1|               1|
|    471|            Shenzhen|                 1|               1|
|    496|           Southport|                 1|               1|
|    243|             Jodhpur|                 1|               1|
|    392|               Paarl|                 1|               1|
|    540|            Tongliao|                 1|               1|
|     31|                Arak|                 1|               1|
|    516|              Tafuna|                 1|               1|
|     85|            Boksburg|                 1|             

In [15]:
#Q7: The category of movies that has the highest number of total rental hours in cities

rent = (
    rental.alias("r1")
    .join(inventory.alias("i1"), col("r1.inventory_id") == col("i1.inventory_id"), "inner")
    .join(film.alias("f1"), col("f1.film_id") == col("i1.film_id"), "inner")
    .join(film_category.alias("f2"), col("f1.film_id") == col("f2.film_id"), "inner")
    .join(category.alias("c1"), col("c1.category_id") == col("f2.category_id"), "inner")
    .groupBy(col("c1.category_id"), col("c1.name"), col("f1.length"))
    .agg(f.count(col("rental_id")).alias("total_rentals"))
    
)

total_1 = (
    rent
    .filter(col("name").startswith("A"))
    .groupBy(col("category_id"), col("name"))
    .agg(
        f.sum(col("length") * col("total_rentals")).alias("hours_sum"))
)

total_2 = (
    rent
    .filter(col("name").rlike("-"))
    .groupBy(col("category_id"), col("name"))
    .agg(
        f.sum(col("length") * col("total_rentals")).alias("hours_sum"))
)

max_hours_1 = total_1.agg(f.max("hours_sum").alias("max_hours_sum")).collect()[0]["max_hours_sum"]
filter_1 = total_1.filter(total_1.hours_sum == max_hours_1)

max_hours_2 = total_2.agg(f.max("hours_sum").alias("max_hours_sum")).collect()[0]["max_hours_sum"]
filter_2 = total_2.filter(total_2.hours_sum == max_hours_2)

result_7 = filter_1.select("category_id", "name", "hours_sum").union(filter_2.select("category_id", "name", "hours_sum"))
result_7.show()

+-----------+---------+---------+
|category_id|     name|hours_sum|
+-----------+---------+---------+
|          2|Animation|   126836|
|         14|   Sci-Fi|   116227|
+-----------+---------+---------+

