import modules

In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window


read the enviroments

In [2]:
def get_config() -> dict:
    load_dotenv()
    host = os.getenv("PG_HOST")
    user = os.getenv("PG_USER")
    password = os.getenv("PG_PASSWORD")
    db_name = os.getenv("PG_DB")
    port = os.getenv("PG_HOST_PORT")
    url = f"jdbc:postgresql://{host}:{port}/{db_name}"
    res = {
        "host" : host,
        "user" : user,
        "password" : password,
        "db_name" : db_name,
        "port" : port,
        "db_url" : url
    }
    return res

Create the session

In [3]:
spark = SparkSession.builder.appName("spark-practice")\
            .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4")\
            .config("spark.sql.shuffle.partitions", "8")\
            .getOrCreate() 

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/24 15:50:46 WARN Utils: Your hostname, MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.231.33.155 instead (on interface en0)
25/11/24 15:50:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/m/.ivy2.5.2/cache
The jars for the packages stored in: /Users/m/.ivy2.5.2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e31aad01-33ae-47f2-847d-d53441f8dac2;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.4 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 77ms :: artifacts dl 3ms
	:: modules in use:
	org.checkerfr

create the function read_table

In [4]:
def read_table(spark : SparkSession, table_name: str)-> DataFrame:
    config = get_config()

    reader = (spark.read.format("jdbc")\
             .option("url", config["db_url"])\
             .option("dbtable", table_name)\
             .option("user", config["user"])\
             .option("password", config["password"])\
             .option("driver", "org.postgresql.Driver"))
    return reader.load()

Create the dataframes of needable tables 

In [5]:
df_film = read_table(spark, "film")
df_film_category = read_table(spark, "film_category")
df_category = read_table(spark, "category")
df_actor = read_table(spark, "actor")
df_film_actor = read_table(spark, "film_actor")
df_inventory = read_table(spark, "inventory")
df_rental = read_table(spark, 'rental')
df_payment = read_table(spark, 'payment')
df_customer = read_table(spark, 'customer')
df_address = read_table(spark, 'address')
df_city = read_table(spark, "city")


make the query: Output the number of movies in each category, sorted in descending order. 

In [6]:
result1 = df_category.alias("c")\
    .join(df_film_category.alias("fc"), on="category_id", how="inner")\
    .groupBy("c.name")\
    .agg(F.count("fc.film_id").alias("amount"))\
    .withColumnRenamed("name", "category")\
    .select("category", "amount")\
    .orderBy(F.desc("amount"))

result1.show()

+-----------+------+
|   category|amount|
+-----------+------+
|      Music|   152|
|      Drama|   152|
|     Travel|   151|
|      Games|   150|
|    Foreign|   150|
|   Children|   150|
|     Sci-Fi|   149|
|     Action|   149|
|  Animation|   148|
|   Classics|   147|
|     Family|   147|
|        New|   147|
|Documentary|   145|
|     Sports|   145|
|     Comedy|   143|
|     Horror|   142|
+-----------+------+



make the query: Output the 10 actors whose movies rented the most, sorted in descending order. 

In [9]:
result2 = df_actor.alias("a")\
                .join(df_film_actor.alias("fa"), on="actor_id", how="inner")\
                .join(df_inventory.alias("i"), on="film_id", how="inner")\
                .join(df_rental.alias("r"), on="inventory_id", how="inner")\
                .groupBy('a.actor_id', F.concat_ws(' ', F.col("a.last_name"), F.col("a.first_name")).alias('actor_name'))\
                .agg(F.count('a.actor_id').alias('amount'))\
                .orderBy(F.desc("amount"), F.col('actor_name'))\
                .limit(10)
result2.show()

+--------+------------------+------+
|actor_id|        actor_name|amount|
+--------+------------------+------+
|     107|    DEGENERES GINA|   753|
|     181|    CARREY MATTHEW|   678|
|     198|       KEITEL MARY|   674|
|     144|WITHERSPOON ANGELA|   654|
|     102|       TORN WALTER|   640|
|      60|       BERRY HENRY|   612|
|     150|       NOLTE JAYNE|   611|
|      37|        BOLGER VAL|   605|
|      23|     KILMER SANDRA|   604|
|      90|      GUINESS SEAN|   599|
+--------+------------------+------+



make the query: Output the category of movies on which the most money was spent. 

In [10]:
result3 = df_category.alias("c")\
                .join(df_film_category.alias("fc"), on='category_id', how='inner')\
                .join(df_inventory.alias('i'), on='film_id', how='inner')\
                .join(df_rental.alias('r'), on='inventory_id', how="inner")\
                .join(df_payment.alias('p'), on='rental_id', how="inner")\
                .filter(F.col("p.amount") > 0)\
                .groupBy('c.category_id', 'c.name')\
                .agg(F.sum("p.amount").alias("price"))\
                .orderBy(F.desc('price'))\
                .limit(1)
result3.show()

+-----------+-------+--------+
|category_id|   name|   price|
+-----------+-------+--------+
|          9|Foreign|10507.67|
+-----------+-------+--------+



make the query: Output the names of movies that are not in the inventory. 

In [23]:
result4 = df_film.alias('f')\
                .join(df_inventory.alias('i'), on='film_id', how="left_anti")\
                .select('f.film_id', 'f.title')
result4.show(100, truncate=False)


+-------+----------------------+
|film_id|title                 |
+-------+----------------------+
|14     |ALICE FANTASIA        |
|38     |ARK RIDGEMONT         |
|148    |CHOCOLATE DUCK        |
|171    |COMMANDMENTS EXPRESS  |
|198    |CRYSTAL BREAKING      |
|221    |DELIVERANCE MULHOLLAND|
|802    |SKY MIRACLE           |
|712    |RAIDERS ANTITRUST     |
|742    |ROOF CHAMPION         |
|860    |SUICIDES SILENCE      |
|108    |BUTCH PANTHER         |
|195    |CROWDS TELEMARK       |
|801    |SISTER FREDDY         |
|943    |VILLAIN DESPERATE     |
|950    |VOLUME HOUSE          |
|33     |APOLLO TEEN           |
|41     |ARSENIC INDEPENDENCE  |
|217    |DAZED PUNK            |
|332    |FRANKENSTEIN STRANGER |
|642    |ORDER BETRAYED        |
|607    |MUPPET MILE           |
|192    |CROSSING DIVORCE      |
|404    |HATE HANDICAP         |
|669    |PEARL DESTINY         |
|671    |PERDITION FARGO       |
|701    |PSYCHO SHRUNK         |
|713    |RAINBOW SHOCK         |
|874    |T

make the query: Output the top 3 actors who have appeared most in movies in the “Children” category. If several actors have the same number of movies, output all of them. 

In [12]:
counts = df_actor.alias('a')\
            .join(df_film_actor.alias('fa'), F.col("a.actor_id") == F.col("fa.actor_id"), "inner")\
            .join(df_film_category.alias('fc'), F.col("fa.film_id") == F.col("fc.film_id"), "inner")\
            .join(df_category.alias("c"), F.col("fc.category_id") == F.col("c.category_id"), 'inner')\
            .where(F.col("c.name") == "Children")\
            .groupBy('a.actor_id', F.concat_ws(' ', "a.last_name", "a.first_name").alias("actor_name"))\
            .agg(F.count('fa.actor_id').alias("films_cnt"))
ranked = counts.withColumn("rnk", F.dense_rank().over(Window.orderBy(F.desc("films_cnt"))))
result5 = ranked.filter(F.col("rnk") <= 3)\
          .select("actor_id", "actor_name", "films_cnt")\
          .orderBy(F.desc("films_cnt"), "actor_name")
result5.show(30, truncate=False)

25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 16:12:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/11/07 1

+--------+----------------+---------+
|actor_id|actor_name      |films_cnt|
+--------+----------------+---------+
|105     |CROWE SIDNEY    |9        |
|139     |GOODING EWAN    |9        |
|133     |PENN RICHARD    |9        |
|145     |ALLEN KIM       |8        |
|181     |CARREY MATTHEW  |8        |
|56      |HARRIS DAN      |8        |
|131     |JACKMAN JANE    |8        |
|87      |PECK SPENCER    |8        |
|142     |RYDER JADA      |8        |
|66      |TANDY MARY      |8        |
|149     |TEMPLE RUSSELL  |8        |
|29      |WAYNE ALEC      |8        |
|123     |DENCH JULIANNE  |7        |
|65      |HUDSON ANGELA   |7        |
|108     |NOLTE WARREN    |7        |
|34      |OLIVIER AUDREY  |7        |
|84      |PITT JAMES      |7        |
|94      |TORN KENNETH    |7        |
|17      |VOIGHT HELEN    |7        |
|95      |WAHLBERG DARYL  |7        |
|96      |WILLIS GENE     |7        |
|85      |ZELLWEGER MINNIE|7        |
+--------+----------------+---------+



make the query: Output cities with the number of active and inactive customers (active - customer.active = 1). Sort by the number of inactive customers in descending order. 

In [25]:
result6 = df_city.alias("c")\
            .join(df_address.alias("a"), F.col("c.city_id") == F.col("a.city_id"), "inner")\
            .join(df_customer.alias("cs"), F.col("a.address_id") == F.col("cs.address_id"), "inner")\
            .groupBy('c.city')\
            .agg(
                 F.sum("cs.active").alias("active_count"),
                 (F.count("*") - F.sum("cs.active")).alias("inactive_count")
                )\
            .orderBy(F.desc('inactive_count'))
result6.show(600, truncate=False)

+--------------------------+------------+--------------+
|city                      |active_count|inactive_count|
+--------------------------+------------+--------------+
|Kamyin                    |0           |1             |
|Uluberia                  |0           |1             |
|Ktahya                    |0           |1             |
|Najafabad                 |0           |1             |
|Coatzacoalcos             |0           |1             |
|Bat Yam                   |0           |1             |
|Charlotte Amalie          |0           |1             |
|Pingxiang                 |0           |1             |
|Amroha                    |0           |1             |
|Southend-on-Sea           |0           |1             |
|Szkesfehrvr               |0           |1             |
|Xiangfan                  |0           |1             |
|Daxian                    |0           |1             |
|Kumbakonam                |0           |1             |
|Wroclaw                   |0  

make the query: Output the category of movies that have the highest number of total rental hours in the cities (customer.address_id in this city), and that start with the letter “a”. Do the same for cities with a “-” symbol.

In [44]:
def hours_by_city_category(cat_predicate=None, city_predicate=None):
    df = df_category.alias("cat")\
        .join(df_film_category.alias("fc"), F.col("cat.category_id") == F.col("fc.category_id"), "inner")\
        .join(df_inventory.alias("i"), F.col("fc.film_id") == F.col("i.film_id"), "inner")\
        .join(df_rental.alias("r"), F.col("i.inventory_id") == F.col("r.inventory_id"), "inner")\
        .join(df_customer.alias("cu"), F.col("r.customer_id") == F.col("cu.customer_id"), "inner")\
        .join(df_address.alias("a"), F.col("cu.address_id") == F.col("a.address_id"), "inner")\
        .join(df_city.alias("c"), F.col("a.city_id") == F.col("c.city_id"), "inner")
    if cat_predicate is not None:
        df = df.filter(cat_predicate)
    if city_predicate is not None:
        df = df.filter(city_predicate)

    return df.withColumn(
            "hours",
            (F.col("r.return_date").cast("long") - F.col("r.rental_date").cast("long")) / F.lit(3600.0)
        )\
        .groupBy(
            F.col("c.city").alias("city"),
            F.col("cat.name").alias("category")
        )\
        .agg(F.sum("hours").alias("hours_total"))
  

a_city   = hours_by_city_category(cat_predicate=F.col("cat.name").ilike("a%"))
dash_city = hours_by_city_category(city_predicate=F.col("c.city").like("%-%"))

w = Window.partitionBy("city").orderBy(F.desc("hours_total"))

a_ranked   = a_city.withColumn("rnk", F.rank().over(w)).filter(F.col("rnk")==1)
dash_ranked = dash_city.withColumn("rnk", F.rank().over(w)).filter(F.col("rnk")==1)

result7 = a_ranked.select(
        F.lit("starts_with_a").alias("bucket"),
        "city", "category", "hours_total"
    )\
    .unionByName(
        dash_ranked.select(
            F.lit("has_dash").alias("bucket"),
            "city", "category", "hours_total"
        )
    ).orderBy("bucket", "city", "category")

result7.show(1000, truncate=False)

+-------------+--------------------------+-----------+------------------+
|bucket       |city                      |category   |hours_total       |
+-------------+--------------------------+-----------+------------------+
|has_dash     |Augusta-Richmond County   |Games      |1361.7333333333336|
|has_dash     |Beni-Mellal               |Drama      |982.2333333333333 |
|has_dash     |Donostia-San Sebastin     |Drama      |1066.7333333333333|
|has_dash     |Effon-Alaiye              |Animation  |1053.8333333333333|
|has_dash     |Hubli-Dharwad             |Documentary|835.4666666666667 |
|has_dash     |Jalib al-Shuyukh          |Music      |775.6500000000001 |
|has_dash     |Jastrzebie-Zdrj           |Horror     |1450.0166666666664|
|has_dash     |Kamjanets-Podilskyi       |Sci-Fi     |1064.2333333333333|
|has_dash     |Kirovo-Tepetsk            |Comedy     |1246.3166666666666|
|has_dash     |Lapu-Lapu                 |New        |1204.4            |
|has_dash     |Mwene-Ditu             