In [1]:
from spark import get_spark_session, full_dfs
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc, count as count_fn, concat_ws, col, sum, when, lower, cast
table_names = ["actor", "address", "category", "city", "country", "customer", "film", "film_actor", "film_category", "inventory", "language", "packages_apt_postgresql_org", "packages_yum_postgresql_org", "payment", "rental", "staff", "store"]
spark = get_spark_session()
dfs = full_dfs(spark, table_names)

In [2]:
#query 1
films_and_categories = dfs["film_category"].join(dfs["category"], on="category_id", how="inner")
films_and_categories.groupby('name') \
                    .agg(count_fn("*").alias("film_count")) \
                    .orderBy(desc('film_count')) \
                    .show() 

+-----------+----------+
|       name|film_count|
+-----------+----------+
|      Drama|       152|
|      Music|       152|
|     Travel|       151|
|    Foreign|       150|
|      Games|       150|
|   Children|       150|
|     Action|       149|
|     Sci-Fi|       149|
|  Animation|       148|
|     Family|       147|
|   Classics|       147|
|        New|       147|
|     Sports|       145|
|Documentary|       145|
|     Comedy|       143|
|     Horror|       142|
+-----------+----------+



In [3]:
#query 2
joined_tables = dfs["rental"].join(dfs["inventory"], on = 'inventory_id', how = "inner") \
                            .join(dfs["film_actor"], on = 'film_id', how = "inner") \
                            .join(dfs["actor"], on = 'actor_id', how = "inner")

joined_tables.withColumn('full_name', concat_ws(" ", col('first_name'), col("last_name"))) \
             .groupBy("full_name") \
             .agg(count_fn("*").alias("rented_films")) \
             .orderBy(desc("rented_films")) \
             .show(10)

+------------------+------------+
|         full_name|rented_films|
+------------------+------------+
|       SUSAN DAVIS|         825|
|    GINA DEGENERES|         753|
|    MATTHEW CARREY|         678|
|       MARY KEITEL|         674|
|ANGELA WITHERSPOON|         654|
|       WALTER TORN|         640|
|       HENRY BERRY|         612|
|       JAYNE NOLTE|         611|
|        VAL BOLGER|         605|
|     SANDRA KILMER|         604|
+------------------+------------+
only showing top 10 rows


In [4]:
#query 3
joined_tables = dfs["rental"].join(dfs["inventory"], on = 'inventory_id', how = "inner") \
                             .join(dfs["film_category"], on = 'film_id', how = "inner") \
                             .join(dfs["payment"], on = 'rental_id', how = "inner") \
                             .join(dfs["category"], on = 'category_id', how = "inner")

joined_tables.groupBy("name") \
             .agg(sum("amount").alias("revenue")) \
             .orderBy(desc("revenue")) \
             .show(1)
            

+-------+--------+
|   name| revenue|
+-------+--------+
|Foreign|10507.67|
+-------+--------+
only showing top 1 row


In [5]:
#query 4
joined_tables = dfs["film"].join(dfs["inventory"], on = 'film_id', how = "left")

joined_tables.select(col("title")).where(col("inventory_id").isNull()).show()

+--------------------+
|               title|
+--------------------+
|      CHOCOLATE DUCK|
|       BUTCH PANTHER|
|        VOLUME HOUSE|
|      ORDER BETRAYED|
|        TADPOLE PARK|
|    KILL BROTHERHOOD|
|FRANKENSTEIN STRA...|
|    CROSSING DIVORCE|
|    SUICIDES SILENCE|
|       CATCH AMISTAD|
|     PERDITION FARGO|
|       FLOATS GARDEN|
|           GUMP DATE|
|        WALLS ARTIST|
|  GLADIATOR WESTWARD|
|         HOCUS FRIDA|
|ARSENIC INDEPENDENCE|
|         MUPPET MILE|
|   FIREHOUSE VIETNAM|
|       ROOF CHAMPION|
+--------------------+
only showing top 20 rows


In [6]:
#query 5
joined_tables = dfs["category"].join(dfs["film_category"], on = 'category_id', how = "inner") \
                               .join(dfs["film_actor"], on = 'film_id', how = "inner") \
                               .join(dfs["actor"], on = 'actor_id', how = "inner") 

count_films_in_children_category = joined_tables.filter(col("name") == "Children") \
                                                          .withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name"))) \
                                                          .groupBy("full_name") \
                                                          .agg(count_fn("*").alias("films_count")) \
                                
windowSpec = Window.orderBy(desc("films_count"))
final_result = count_films_in_children_category.withColumn("rank", rank().over(windowSpec))
final_result.where(col("rank") <= 3).show()

+------------+-----------+----+
|   full_name|films_count|rank|
+------------+-----------+----+
|SIDNEY CROWE|          9|   1|
|RICHARD PENN|          9|   1|
|EWAN GOODING|          9|   1|
+------------+-----------+----+



In [7]:
#query 6
joined_tables = dfs["customer"].join(dfs["address"], on = 'address_id', how = "inner") \
                               .join(dfs["city"], on = 'city_id', how = "inner")

joined_tables.groupBy("city") \
            .agg(
                count_fn(when(col("active") == 0, 1)).alias("inactive_customers"),
                count_fn(when(col("active") == 1, 1)).alias("active_customers")
    ) \
            .orderBy(desc("inactive_customers"))\
             .show()

+------------------+------------------+----------------+
|              city|inactive_customers|active_customers|
+------------------+------------------+----------------+
|          Uluberia|                 1|               0|
|         Najafabad|                 1|               0|
|         Pingxiang|                 1|               0|
|          Xiangfan|                 1|               0|
|        Kumbakonam|                 1|               0|
|       Szkesfehrvr|                 1|               0|
|  Charlotte Amalie|                 1|               0|
|            Kamyin|                 1|               0|
|            Daxian|                 1|               0|
|     Coatzacoalcos|                 1|               0|
|           Wroclaw|                 1|               0|
|            Ktahya|                 1|               0|
|            Amroha|                 1|               0|
|   Southend-on-Sea|                 1|               0|
|           Bat Yam|           

In [8]:
#query 7

joined_tables = dfs["rental"].join(dfs["customer"], on = 'customer_id', how = "inner") \
                             .join(dfs["address"], on = 'address_id', how = "inner") \
                             .join(dfs["city"], on = 'city_id', how = "inner") \
                             .join(dfs["inventory"], on = 'inventory_id', how = "inner") \
                             .join(dfs["film_category"], on = 'film_id', how = "inner") \
                             .join(dfs["category"], on = 'category_id', how = "inner") 

cities_starts_with_a = joined_tables.filter(lower(col("city")).startswith("a"))

cities_starts_with_a.groupBy("name") \
                    .agg(
                        sum((col("return_date").cast("long") - col("rental_date").cast("long")) / 3600).alias("total_hours")
                    ).orderBy(desc("total_hours")).show(1)

cities_starts_with_dash = joined_tables.filter(col("city").contains("-"))
cities_starts_with_dash.groupBy("name") \
                    .agg(
                        sum((col("return_date").cast("long") - col("rental_date").cast("long")) / 3600).alias("total_hours")
                    ).orderBy(desc("total_hours")).show(1)

+--------+------------------+
|    name|       total_hours|
+--------+------------------+
|Children|25834.199999999997|
+--------+------------------+
only showing top 1 row
+-----+------------------+
| name|       total_hours|
+-----+------------------+
|Drama|14556.033333333333|
+-----+------------------+
only showing top 1 row
