In [3]:
from pyspark.sql import SparkSession, functions as F, types as T

In [4]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [5]:
print(spark.version)

3.4.1


In [6]:
actor_df = spark.read.csv('./data/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('./data/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('./data/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('./data/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('./data/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('./data/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('./data/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('./data/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('./data/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('./data/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('./data/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('./data/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('./data/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('./data/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('./data/store.csv', header=True, inferSchema=True)

In [12]:
film_with_category_df = film_df.join(film_category_df, "film_id")
full_category_info_df = film_with_category_df.join(category_df, film_with_category_df.category_id == category_df.category_id)
category_counts_df = full_category_info_df.groupBy("name").count()
sorted_category_counts_df = category_counts_df.sort(F.desc("count"))
sorted_category_counts_df.show()

+-----------+-----+
|       name|count|
+-----------+-----+
|     Sports|   74|
|    Foreign|   73|
|     Family|   69|
|Documentary|   68|
|  Animation|   66|
|     Action|   64|
|        New|   63|
|      Drama|   62|
|      Games|   61|
|     Sci-Fi|   61|
|   Children|   60|
|     Comedy|   58|
|     Travel|   57|
|   Classics|   57|
|     Horror|   56|
|      Music|   51|
+-----------+-----+



In [13]:
rentals_with_film_id = rental_df.join(inventory_df, "inventory_id")
rentals_with_actors = rentals_with_film_id.join(film_actor_df, "film_id")
actor_rental_counts = rentals_with_actors.groupBy("actor_id").count()
top_actors_df = actor_rental_counts.join(actor_df, "actor_id")
top_10_actors_df = top_actors_df.sort(F.desc("count")).limit(10)
top_10_actors_df.select("first_name", "last_name", "count").show()

+----------+-----------+-----+
|first_name|  last_name|count|
+----------+-----------+-----+
|      GINA|  DEGENERES|  753|
|   MATTHEW|     CARREY|  678|
|      MARY|     KEITEL|  674|
|    ANGELA|WITHERSPOON|  654|
|    WALTER|       TORN|  640|
|     HENRY|      BERRY|  612|
|     JAYNE|      NOLTE|  611|
|       VAL|     BOLGER|  605|
|    SANDRA|     KILMER|  604|
|      SEAN|    GUINESS|  599|
+----------+-----------+-----+



In [14]:
payments_with_inventory = payment_df.join(rental_df, "rental_id")
payments_with_film = payments_with_inventory.join(inventory_df, "inventory_id")
payments_with_category = payments_with_film.join(film_category_df, "film_id")
category_payment_sums = payments_with_category.groupBy("category_id").agg(F.sum("amount").alias("total_amount"))
category_payment_with_names = category_payment_sums.join(category_df, "category_id")
top_category_df = category_payment_with_names.sort(F.desc("total_amount")).limit(1)
top_category_df.select("name", "total_amount").show()

+------+-----------------+
|  name|     total_amount|
+------+-----------------+
|Sports|5314.209999999848|
+------+-----------------+



In [15]:
films_not_in_inventory = film_df.join(inventory_df, "film_id", "left_anti")
films_not_in_inventory.select("title").show()

+--------------------+
|               title|
+--------------------+
|      ALICE FANTASIA|
|         APOLLO TEEN|
|      ARGONAUTS TOWN|
|       ARK RIDGEMONT|
|ARSENIC INDEPENDENCE|
|   BOONDOCK BALLROOM|
|       BUTCH PANTHER|
|       CATCH AMISTAD|
| CHINATOWN GLADIATOR|
|      CHOCOLATE DUCK|
|COMMANDMENTS EXPRESS|
|    CROSSING DIVORCE|
|     CROWDS TELEMARK|
|    CRYSTAL BREAKING|
|          DAZED PUNK|
|DELIVERANCE MULHO...|
|   FIREHOUSE VIETNAM|
|       FLOATS GARDEN|
|FRANKENSTEIN STRA...|
|  GLADIATOR WESTWARD|
+--------------------+
only showing top 20 rows



In [16]:
actors_with_categories = film_actor_df.join(film_category_df, "film_id")
full_category_info = actors_with_categories.join(category_df, actors_with_categories.category_id == category_df.category_id)
children_category = full_category_info.filter(full_category_info.name == "Children")
actor_counts_in_children = children_category.groupBy("actor_id").count()
actor_names_with_counts = actor_counts_in_children.join(actor_df, "actor_id")
top_3_actors_in_children = actor_names_with_counts.sort(F.desc("count")).limit(3)
top_3_actors_in_children.select("first_name", "last_name", "count").show()


+----------+---------+-----+
|first_name|last_name|count|
+----------+---------+-----+
|     HELEN|   VOIGHT|    7|
|     RALPH|     CRUZ|    5|
|    WHOOPI|     HURT|    5|
+----------+---------+-----+



In [17]:
spark.stop()