In [1]:
from dotenv import load_dotenv
import os

load_dotenv()

USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')

In [2]:
from pyspark.sql import SparkSession, DataFrame, functions as f
import pyspark.sql.types as t

In [3]:
spark = (SparkSession.
         builder.
         config("spark.jars",r"D:\spark_demo\Spark_Demo_Course\spark-3.1.3-bin-hadoop2.7\jars\mysql-connector-java-8.0.30.jar").
         master("local").
         appName("driver").
         getOrCreate()
)

In [4]:
connector = (spark.read
        .format("jdbc")\
        .option("url", "jdbc:mysql://localhost:3306/sakila")\
        .option("user", USER)\
        .option("password", PASSWORD)\
        .option("driver", "com.mysql.jdbc.Driver")
        )

In [5]:
def table_reader(table_name:str) -> DataFrame:
    return connector.option("dbtable", table_name).load()

In [6]:
actor = table_reader("actor")
address = table_reader("address")
category = table_reader("category")
city = table_reader("city")
country = table_reader("country")
customer = table_reader("customer")
film = table_reader("film")
film_actor = table_reader("film_actor")
film_category = table_reader("film_category")
inventory = table_reader("inventory")
language = table_reader("language")
payment = table_reader("payment")
rental = table_reader("rental")
staff = table_reader("staff")
store = table_reader("store")

In [7]:
# 1
(category.join(film_category, on="category_id").
     groupBy("name").
     count().
     orderBy(f.col("count").desc()).
     select("name", "count").
     show()
)

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|     Sci-Fi|   61|
|      Games|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



In [8]:
# 2
(actor.join(film_actor, on='actor_id').
        join(film, on='film_id').
        groupBy('first_name','last_name').
        sum('rental_duration').
        orderBy(f.col('sum(rental_duration)').desc()).
        show(10)
)

+----------+---------+--------------------+
|first_name|last_name|sum(rental_duration)|
+----------+---------+--------------------+
|     SUSAN|    DAVIS|                 242|
|      GINA|DEGENERES|                 209|
|    WALTER|     TORN|                 201|
|      MARY|   KEITEL|                 192|
|   MATTHEW|   CARREY|                 190|
|   GROUCHO|    DUNST|                 183|
|    ANGELA|   HUDSON|                 183|
|    SANDRA|   KILMER|                 181|
|     HENRY|    BERRY|                 180|
|       UMA|     WOOD|                 179|
+----------+---------+--------------------+
only showing top 10 rows



In [9]:
# 3
(category.join(film_category, on='category_id').
     join(film, on='film_id').
     groupBy('name').
     agg(
         f.sum("replacement_cost").alias('Sum')
     ).
     orderBy(f.col("Sum").desc()).
     select("name").
     show(1)
)

+------+
|  name|
+------+
|Sports|
+------+
only showing top 1 row



In [10]:
# 4
(film.join(inventory,'film_id','left').
     groupBy('title').
     count().
     filter(f.col('count') == 1).
     orderBy('title').
     select('title').
     show(100)
)

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
|           GUMP DATE|
|       HATE HANDICAP|
|         HOCUS FRIDA|
|    KENTUCKIAN GIANT|
|    KILL BROTHERHOOD|
|         MUPPET MILE|
|      ORDER BETRAYED|
|       PEARL DESTINY|
|     PERDITION FARGO|
|       PSYCHO SHRUNK|
|   RAIDERS ANTITRUST|
|       RAINBOW SHOCK|
|       ROOF CHAMPION|
|       SISTER FREDDY|
|         SKY MIRACLE|
|    SUICIDES SILENCE|
|        TADPOLE PARK|
|    TREASURE COMMAND|
|   VILLAIN DESPERATE|
|        VOLUME HOUSE|
|          

In [11]:
# 5
temp_table = (actor.join(film_actor, on='actor_id').
                  join(film_category, on='film_id').
                  join(category, on='category_id').
                  filter(f.col('name') == 'Children').
                  withColumn('full_name', f.concat_ws(' ','first_name','last_name')).
                  groupBy('full_name').
                  count().
                  orderBy(f.col('count').desc()).
                  select('full_name','count')
             )

In [12]:
(temp_table.
 filter(f.col('count').isin(temp_table.select('count').limit(3).rdd.map(lambda row: row[0]).collect())).
 orderBy(f.col('count').desc()).
 show()
)

+-------------+-----+
|    full_name|count|
+-------------+-----+
| HELEN VOIGHT|    7|
|  SUSAN DAVIS|    6|
|   MARY TANDY|    5|
|   RALPH CRUZ|    5|
|  WHOOPI HURT|    5|
|KEVIN GARLAND|    5|
+-------------+-----+



In [13]:
# 6
(city.join(address, on='city_id').
     join(customer, on='address_id').
     groupBy('city').
     agg(
     f.sum(f.col("active").cast('int')).alias("active_clients"),
     (f.count(f.col("active").cast('int'))-f.sum(f.col("active").cast('int'))).alias("inactive_clients")
     ).
     orderBy(f.col('inactive_clients').desc()).
     show()
)

+----------------+--------------+----------------+
|            city|active_clients|inactive_clients|
+----------------+--------------+----------------+
|          Ktahya|             0|               1|
|Charlotte Amalie|             0|               1|
|         Wroclaw|             0|               1|
|       Pingxiang|             0|               1|
|     Szkesfehrvr|             0|               1|
|          Daxian|             0|               1|
|   Coatzacoalcos|             0|               1|
|         Bat Yam|             0|               1|
| Southend-on-Sea|             0|               1|
|        Uluberia|             0|               1|
|       Najafabad|             0|               1|
|        Xiangfan|             0|               1|
|      Kumbakonam|             0|               1|
|          Kamyin|             0|               1|
|          Amroha|             0|               1|
|        Chisinau|             1|               0|
|         Esfahan|             

In [14]:
# 7
temp_table = (city.join(address, on='city_id').
                  join(customer, on='address_id').
                  join(rental, on='customer_id').
                  join(inventory, on='inventory_id').
                  join(film_category, on='film_id').
                  join(category, on='category_id').
                  filter(f.col('return_date').isNotNull()).
                  withColumnRenamed('name', 'category').
                  withColumnRenamed('rental_date', 'd1').
                  withColumnRenamed('return_date', 'd2').
                  select('category', 'd1', 'd2', 'city')
             )


In [15]:
tb1 = (temp_table.
       withColumn('diff', (f.col('d2').cast('int')-f.col('d1').cast('int'))).
       filter(f.col('city').like('A%')).
       groupBy('category').
       agg(f.sum('diff').alias('time')).
       orderBy(f.col('time').desc()).
       select('category', 'time').
       limit(1)
      )

In [16]:
tb2 = (temp_table.
       withColumn('diff', (f.col('d2').cast('int')-f.col('d1').cast('int'))).
       filter(f.col('city').like('%-%')).
       groupBy('category').
       agg(f.sum('diff').alias('time')).
       orderBy(f.col('time').desc()).
       select('category', 'time').
       limit(1)
      )

In [17]:
tb1.union(tb2).show()

+--------+--------+
|category|    time|
+--------+--------+
|  Sports|44497260|
| Foreign|23299740|
+--------+--------+

