In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=09e9de0b9200d0a4c52db19178dfa013a07f372b23bb2c3563b78b7ee4234e33
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# **Importing necessary Libraries**

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# **Creating a Spark Session**

In [None]:
spark = SparkSession.builder.appName('PySpark-Operations').getOrCreate()

# **Load the Data**

In [None]:
# Read CSV files into PySpark DataFrames
actor_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/actor.csv', header=True, inferSchema=True)
address_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/address.csv', header=True, inferSchema=True)
category_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/category.csv', header=True, inferSchema=True)
city_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/city.csv', header=True, inferSchema=True)
country_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/country.csv', header=True, inferSchema=True)
customer_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/customer.csv', header=True, inferSchema=True)
film_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/film.csv', header=True, inferSchema=True)
film_actor_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/film_actor.csv', header=True, inferSchema=True)
film_category_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/film_category.csv', header=True, inferSchema=True)
inventory_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/inventory.csv', header=True, inferSchema=True)
language_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/language.csv', header=True, inferSchema=True)
payment_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/payment.csv', header=True, inferSchema=True)
rental_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/rental.csv', header=True, inferSchema=True)
staff_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/staff.csv', header=True, inferSchema=True)
store_df = spark.read.csv('/content/drive/MyDrive/Datasets/dvdrental/store.csv', header=True, inferSchema=True)

# **1. PySpark Operations:**
Write SQL queries to perform the following operations:

1) Select distinct values for key columns in each table.

2) Join relevant tables to create a consolidated view.

3) Calculate summary statistics for important columns.

4) Filter and sort data based on specific conditions.


# **1) Select distinct values for key columns in each dataframes**

In [None]:
# Actor table
actor_distinct = actor_df.select('actor_id', 'first_name', 'last_name').distinct()
actor_distinct.show()

+--------+----------+------------+
|actor_id|first_name|   last_name|
+--------+----------+------------+
|      52|    Carmen|        Hunt|
|     164|  Humphrey|      Willis|
|      22|     Elvis|        Marx|
|      29|      Alec|       Wayne|
|     115|  Harrison|        Bale|
|      98|     Chris|     Bridges|
|      16|      Fred|     Costner|
|      21|   Kirsten|     Paltrow|
|     166|      Nick|   Degeneres|
|       5|    Johnny|Lollobrigida|
|      39|    Goldie|       Brody|
|      64|       Ray|   Johansson|
|     190|    Audrey|      Bailey|
|     161|    Harvey|        Hope|
|       8|   Matthew|   Johansson|
|      97|       Meg|       Hawke|
|     101|     Susan|       Davis|
|     124|  Scarlett|      Bening|
|     183|   Russell|       Close|
|      32|       Tim|     Hackman|
+--------+----------+------------+
only showing top 20 rows



In [None]:
# Address table
address_distinct = address_df.select('address_id', 'address', 'district', 'city_id').distinct()
address_distinct.show()

+----------+--------------------+----------------+-------+
|address_id|             address|        district|city_id|
+----------+--------------------+----------------+-------+
|       130|  1666 Qomsheh Drive|        So Paulo|    410|
|       134|      758 Junan Lane|            Gois|    190|
|       369|   817 Laredo Avenue|         Jalisco|    188|
|        20|360 Toulouse Parkway|         England|    495|
|       511|  1152 al-Qatif Lane|Kalimantan Barat|    412|
|       424|1948 Bayugan Parkway|           Bihar|    264|
|       585|      1208 Tama Loop|          Ninawa|    344|
|       274| 920 Kumbakonam Loop|      California|    446|
|       354|  953 Hodeida Street|Southern Tagalog|    221|
|       464| 76 Kermanshah Manor|         Esfahan|    423|
|       561| 1497 Fengshan Drive|   KwaZulu-Natal|    112|
|        37|127 Purnea (Purni...|        Piemonte|     17|
|       569| 1342 Abha Boulevard|        Bukarest|     95|
|        45|   42 Brindisi Place|         Yerevan|    58

In [None]:
# Category table
category_distinct = category_df.select('category_id', 'name').distinct()
category_distinct.show()

+-----------+-----------+
|category_id|       name|
+-----------+-----------+
|         11|     Horror|
|         14|     Sci-Fi|
|          1|     Action|
|          4|   Classics|
|          3|   Children|
|          6|Documentary|
|          8|     Family|
|          2|  Animation|
|         10|      Games|
|         15|     Sports|
|          9|    Foreign|
|          5|     Comedy|
|          7|      Drama|
|         13|        New|
|         16|     Travel|
|         12|      Music|
+-----------+-----------+



In [None]:
# City table
city_distinct = city_df.select('city_id', 'city', 'country_id').distinct()
city_distinct.show()

+-------+-----------------+----------+
|city_id|             city|country_id|
+-------+-----------------+----------+
|     39|           Atinsk|        80|
|    239|           Jhansi|        44|
|    273|       Klerksdorp|        85|
|     47|           Baiyin|        23|
|    289|         La Plata|         6|
|    111| Charlotte Amalie|       106|
|    140|           Dayton|       103|
|    238|           Jelets|        80|
|     25|   Angra dos Reis|        15|
|    249|            Junan|        23|
|    559|Uttarpara-Kotrung|        44|
|    100|         Cam Ranh|       105|
|    275|          Konotop|       100|
|    325|         Mannheim|        38|
|    375|            Okara|        72|
|    431|        Rio Claro|        15|
|    358|     Nakhon Sawan|        94|
|    430|    Richmond Hill|        20|
|     35|         Ashgabat|        98|
|    115|         Chisinau|        61|
+-------+-----------------+----------+
only showing top 20 rows



In [None]:
# Country table
country_distinct = country_df.select('country_id', 'country').distinct()
country_distinct.show()

+----------+--------------------+
|country_id|             country|
+----------+--------------------+
|        13|             Belarus|
|        20|              Canada|
|        79|              Runion|
|        69|             Nigeria|
|        17|            Bulgaria|
|       104|           Venezuela|
|        37|              Gambia|
|       108|          Yugoslavia|
|        45|           Indonesia|
|         5|            Anguilla|
|        62|             Morocco|
|        36|    French Polynesia|
|        53|              Kuwait|
|        51|           Kazakstan|
|        41|Holy See (Vatican...|
|        43|             Hungary|
|        67|         Netherlands|
|        68|         New Zealand|
|        85|        South Africa|
|         7|             Armenia|
+----------+--------------------+
only showing top 20 rows



In [None]:
# Customer table
customer_distinct = customer_df.select('customer_id', 'first_name', 'last_name').distinct()
customer_distinct.show()

+-----------+----------+---------+
|customer_id|first_name|last_name|
+-----------+----------+---------+
|         94|     Norma| Gonzales|
|        432|     Edwin|     Burk|
|        529|      Erik|  Guillen|
|        564|       Bob| Pfeiffer|
|        232| Constance|     Reid|
|        249|      Dora|   Medina|
|        260|   Christy|   Vargas|
|        492|    Lester|    Kraus|
|        216|   Natalie|    Meyer|
|        360|     Ralph| Madrigal|
|        458|     Lloyd|     Dowd|
|        591|      Kent|Arsenault|
|        161| Geraldine|  Perkins|
|        304|     David|    Royal|
|        328|   Jeffrey|    Spear|
|        375|     Aaron|    Selby|
|        408|    Manuel|  Murrell|
|        447|  Clifford|   Bowens|
|        387|     Jesse|Schilling|
|        574|    Julian|     Vest|
+-----------+----------+---------+
only showing top 20 rows



In [None]:
# Film table
film_distinct = film_df.select('film_id', 'title', 'release_year').distinct()
film_distinct.show()

+-------+-------------------+------------+
|film_id|              title|release_year|
+-------+-------------------+------------+
|    273|   Effect Gladiator|        2006|
|    730|Ridgemont Submarine|        2006|
|    766|      Savannah Town|        2006|
|     38|      Ark Ridgemont|        2006|
|    120|  Caribbean Liberty|        2006|
|    170|    Command Darling|        2006|
|    233|    Disciple Mother|        2006|
|    379|       Greedy Roots|        2006|
|    474|         Jade Bunch|        2006|
|    595|         Moon Bunch|        2006|
|    658|      Paris Weekend|        2006|
|    679|     Pilot Hoosiers|        2006|
|    781|        Seven Swarm|        2006|
|     42| Artist Coldblooded|        2006|
|    215|          Dawn Pond|        2006|
|    364|    Godfather Diary|        2006|
|     20|Amelie Hellfighters|        2006|
|    147|     Chocolat Harry|        2006|
|    246|Doubtfire Labyrinth|        2006|
|    306|     Feathers Metal|        2006|
+-------+--

In [None]:
# Film Actor table
film_actor_distinct = film_actor_df.select('film_id', 'actor_id').distinct()
film_actor_distinct.show()

+-------+--------+
|film_id|actor_id|
+-------+--------+
|    858|       4|
|    179|       8|
|    154|      13|
|    427|      13|
|    527|      20|
|    977|      20|
|    856|      23|
|    336|      34|
|    559|      48|
|    571|      65|
|    577|      65|
|     32|      79|
|    519|      83|
|    522|      85|
|    967|      99|
|    759|     100|
|    394|     104|
|    894|     111|
|    322|     113|
|    910|     129|
+-------+--------+
only showing top 20 rows



In [None]:
# Film Category table
film_category_distinct = film_category_df.select('film_id', 'category_id').distinct()
film_category_distinct.show()

+-------+-----------+
|film_id|category_id|
+-------+-----------+
|    247|          5|
|    393|          6|
|    688|          3|
|    105|          1|
|    181|         16|
|    336|          6|
|    542|          1|
|    597|         10|
|    665|         11|
|    684|         10|
|    902|         15|
|    990|         11|
|    713|          6|
|    850|          1|
|    901|          2|
|    373|          3|
|    411|         12|
|    461|          2|
|    584|          9|
|    629|          6|
+-------+-----------+
only showing top 20 rows



In [None]:
# Inventory table
inventory_distinct = inventory_df.select('inventory_id', 'film_id', 'store_id').distinct()
inventory_distinct.show()

+------------+-------+--------+
|inventory_id|film_id|store_id|
+------------+-------+--------+
|        1033|    231|       2|
|        1424|    311|       2|
|        1758|    382|       1|
|        2153|    465|       2|
|        2575|    563|       2|
|        2612|    573|       1|
|        2719|    596|       2|
|        2733|    599|       2|
|        2955|    648|       1|
|        3296|    725|       2|
|        3371|    739|       2|
|        3587|    784|       2|
|        3675|    804|       1|
|        4089|    891|       1|
|        4213|    916|       1|
|        4265|    928|       1|
|        4581|   1000|       2|
|         253|     57|       1|
|         584|    127|       2|
|         674|    147|       1|
+------------+-------+--------+
only showing top 20 rows



In [None]:
# Language table
language_distinct = language_df.select('language_id', 'name').distinct()
language_distinct.show()

+-----------+--------------------+
|language_id|                name|
+-----------+--------------------+
|          3|Japanese            |
|          4|Mandarin            |
|          6|German              |
|          5|French              |
|          2|Italian             |
|          1|English             |
+-----------+--------------------+



In [None]:
# Payment table
payment_distinct = payment_df.select('payment_id', 'customer_id').distinct()
payment_distinct.show()

+----------+-----------+
|payment_id|customer_id|
+----------+-----------+
|     17598|        364|
|     17685|        386|
|     17904|        447|
|     18213|        526|
|     18258|        540|
|     18367|        572|
|     18583|         24|
|     18630|         31|
|     18649|         35|
|     18675|         42|
|     18749|         62|
|     18850|         89|
|     19091|        151|
|     19553|        270|
|     19565|        271|
|     19569|        272|
|     19622|        276|
|     19700|        284|
|     19732|        288|
|     19798|        295|
+----------+-----------+
only showing top 20 rows



In [None]:
# Rental table
rental_distinct = rental_df.select('rental_id', 'rental_date', 'inventory_id', 'customer_id').distinct()
rental_distinct.show()

+---------+---------------+------------+-----------+
|rental_id|    rental_date|inventory_id|customer_id|
+---------+---------------+------------+-----------+
|      531| 5/28/2005 5:23|        1113|        589|
|     1535| 6/16/2005 0:52|        1096|        202|
|     1614| 6/16/2005 6:58|          23|        489|
|     2173| 6/18/2005 0:08|         995|         65|
|     2255| 6/18/2005 5:21|          24|        410|
|     2363|6/18/2005 13:33|        3497|          1|
|     2457|6/18/2005 19:38|        1277|         91|
|     2481|6/18/2005 21:08|         800|        322|
|     2877| 6/20/2005 1:07|         485|        267|
|     2984| 6/20/2005 8:43|        1819|        435|
|     3129|6/20/2005 18:57|        1789|        495|
|     3212| 6/21/2005 1:04|        4178|         21|
|     3405|6/21/2005 15:58|        2344|        155|
|     3570|  7/6/2005 3:23|        1300|        450|
|     3574|  7/6/2005 3:36|        2799|        225|
|     4320| 7/7/2005 17:51|        3437|      

In [None]:
# Staff table
staff_distinct = staff_df.select('staff_id', 'first_name', 'last_name').distinct()
staff_distinct.show()

+--------+----------+---------+
|staff_id|first_name|last_name|
+--------+----------+---------+
|       2|       Jon| Stephens|
|       1|      Mike|  Hillyer|
+--------+----------+---------+



In [None]:
# Store table
store_distinct = store_df.select('store_id', 'manager_staff_id', 'address_id').distinct()
store_distinct.show()

+--------+----------------+----------+
|store_id|manager_staff_id|address_id|
+--------+----------------+----------+
|       2|               2|         2|
|       1|               1|         1|
+--------+----------------+----------+



# **2) Join relevant tables to create a consolidated view.**

In [None]:
# Join the customer DataFrame with the rental DataFrame
customer_rental_join = customer_df.join(rental_df, customer_df.customer_id == rental_df.customer_id, 'inner')

# Select the columns for the new DataFrame
customer_rental_view = customer_rental_join.select(
    customer_df.customer_id.alias("customer_customer_id"),
    rental_df.customer_id.alias("rental_customer_id"),
    customer_df.first_name,
    customer_df.last_name,
    rental_df.rental_id,
    rental_df.inventory_id
)

# Show the resulting DataFrame
customer_rental_view.show()

+--------------------+------------------+----------+-----------+---------+------------+
|customer_customer_id|rental_customer_id|first_name|  last_name|rental_id|inventory_id|
+--------------------+------------------+----------+-----------+---------+------------+
|                 459|               459|     Tommy|    Collazo|        2|        1525|
|                 408|               408|    Manuel|    Murrell|        3|        1711|
|                 333|               333|    Andrew|      Purdy|        4|        2452|
|                 222|               222|   Delores|     Hansen|        5|        2079|
|                 549|               549|    Nelson|Christenson|        6|        2792|
|                 269|               269| Cassandra|    Walters|        7|        3995|
|                 239|               239|    Minnie|     Romero|        8|        2346|
|                 126|               126|     Ellen|    Simpson|        9|        2580|
|                 399|          

In [None]:
# Join the actor DataFrame with the film_actor DataFrame
actor_film_actor_join = actor_df.join(film_actor_df, actor_df.actor_id == film_actor_df.actor_id, 'left')

# Select the columns for the new DataFrame
actor_film_actor_view = actor_film_actor_join.select(
    actor_df.actor_id.alias("actor_actor_id"),
    film_actor_df.actor_id.alias("film_actor_actor_id"),
    actor_df.first_name,
    actor_df.last_name,
    film_actor_df.film_id
)

# Show the resulting DataFrame
actor_film_actor_view.show()

+--------------+-------------------+----------+---------+-------+
|actor_actor_id|film_actor_actor_id|first_name|last_name|film_id|
+--------------+-------------------+----------+---------+-------+
|             1|                  1|  Penelope|  Guiness|    980|
|             1|                  1|  Penelope|  Guiness|    970|
|             1|                  1|  Penelope|  Guiness|    939|
|             1|                  1|  Penelope|  Guiness|    832|
|             1|                  1|  Penelope|  Guiness|    749|
|             1|                  1|  Penelope|  Guiness|    635|
|             1|                  1|  Penelope|  Guiness|    605|
|             1|                  1|  Penelope|  Guiness|    509|
|             1|                  1|  Penelope|  Guiness|    506|
|             1|                  1|  Penelope|  Guiness|    499|
|             1|                  1|  Penelope|  Guiness|    438|
|             1|                  1|  Penelope|  Guiness|    361|
|         

In [None]:
# Join the language DataFrame with the film DataFrame
language_film_join = language_df.join(film_df, language_df.language_id == film_df.language_id, 'right')

# Select the columns for the new DataFrame
language_film_view = language_film_join.select(
    language_df.language_id.alias("language_language_id"),
    film_df.language_id.alias("film_language_id"),
    language_df.name,
    film_df.film_id,
    film_df.title,
    film_df.release_year,
    film_df.length,
    film_df.rating
)

# Show the resulting DataFrame
language_film_view.show()

+--------------------+----------------+--------------------+-------+-----------------+------------+------+------+
|language_language_id|film_language_id|                name|film_id|            title|release_year|length|rating|
+--------------------+----------------+--------------------+-------+-----------------+------------+------+------+
|                   1|               1|English             |    133|  Chamber Italian|        2006|   117| NC-17|
|                   1|               1|English             |    384| Grosse Wonderful|        2006|    49|     R|
|                   1|               1|English             |      8|  Airport Pollock|        2006|    54|     R|
|                   1|               1|English             |     98|Bright Encounters|        2006|    73| PG-13|
|                   1|               1|English             |      1| Academy Dinosaur|        2006|    86|    PG|
|                   1|               1|English             |      2|   Ace Goldfinger|  

In [None]:
# Join the country DataFrame with the city DataFrame
country_city_join = country_df.join(city_df, country_df.country_id == city_df.country_id, 'full_outer')

# Select the columns for the new DataFrame
country_city_view = country_city_join.select(
    country_df.country_id.alias("country_country_id"),
    city_df.country_id.alias("city_country_id"),
    country_df.country,
    city_df.city
)

# Show the resulting DataFrame
country_city_view.show()

+------------------+---------------+--------------+--------------------+
|country_country_id|city_country_id|       country|                city|
+------------------+---------------+--------------+--------------------+
|                 1|              1|   Afghanistan|               Kabul|
|                 2|              2|       Algeria|               Batna|
|                 2|              2|       Algeria|               Bchar|
|                 2|              2|       Algeria|              Skikda|
|                 3|              3|American Samoa|              Tafuna|
|                 4|              4|        Angola|            Benguela|
|                 4|              4|        Angola|              Namibe|
|                 5|              5|      Anguilla|          South Hill|
|                 6|              6|     Argentina|     Almirante Brown|
|                 6|              6|     Argentina|          Avellaneda|
|                 6|              6|     Argentina|

# **3) Calculate summary statistics for important columns.**

In [None]:
# Calculate the total number of films in the database
total_films = film_df.count()
print("Total number of films in the database:", total_films)

Total number of films in the database: 1000


In [None]:
# Calculate the average length of films
average_length = film_df.agg({'length': 'avg'}).collect()[0][0]
print("Average length of films:", average_length)

Average length of films: 115.272


In [None]:
# Determine the number of active and inactive customers
customer_count = customer_df.groupBy('active').count()
customer_count.show()

+------+-----+
|active|count|
+------+-----+
|     1|  584|
|     0|   15|
+------+-----+



In [None]:
# Determine the total number of films in each category
film_count_per_category = film_category_df.join(category_df, film_category_df.category_id == category_df.category_id) \
                                           .groupBy(category_df.name.alias('category')) \
                                           .agg(F.count('film_id').alias('film_count')) \
                                           .orderBy('category')
film_count_per_category.show()

+-----------+----------+
|   category|film_count|
+-----------+----------+
|     Action|        64|
|  Animation|        66|
|   Children|        60|
|   Classics|        57|
|     Comedy|        58|
|Documentary|        68|
|      Drama|        62|
|     Family|        69|
|    Foreign|        73|
|      Games|        61|
|     Horror|        56|
|      Music|        51|
|        New|        63|
|     Sci-Fi|        61|
|     Sports|        74|
|     Travel|        57|
+-----------+----------+



In [None]:
# Calculate the total revenue generated from each store
total_revenue_per_store = store_df.join(staff_df, store_df.manager_staff_id == staff_df.staff_id) \
                                  .join(payment_df, staff_df.staff_id == payment_df.staff_id) \
                                  .groupBy(store_df["store_id"]) \
                                  .agg({'amount': 'sum'}) \
                                  .withColumnRenamed('sum(amount)', 'total_revenue')
total_revenue_per_store.show()

+--------+------------------+
|store_id|     total_revenue|
+--------+------------------+
|       1|30252.120000004612|
|       2|31059.920000004782|
+--------+------------------+



# **4) Filter and sort data based on specific conditions.**

In [None]:
# How many actors have 8 letters only in their first names.
actors_with_8_letters = actor_df.filter(F.length("first_name") == 8).count()
print("Number of actors with 8 letters in their first names:", actors_with_8_letters)

Number of actors with 8 letters in their first names: 16


In [None]:
# Count the number of actors whose first_names don't start with an 'A'.
actors_without_A = actor_df.filter(~F.col("first_name").startswith("A")).count()
print("Number of actors whose first names don't start with 'A':", actors_without_A)

Number of actors whose first names don't start with 'A': 187


In [None]:
# Find actor names that start with 'P' followed by any letter from 'a' to 'e' then any other letter.
pattern = "^P[a-e].*"
actors_matching_pattern = actor_df.filter(F.col("first_name").rlike(pattern)).show()

+--------+----------+---------+-----------+
|actor_id|first_name|last_name|last_update|
+--------+----------+---------+-----------+
|       1|  Penelope|  Guiness|    47:57.6|
|      46|    Parker| Goldberg|    47:57.6|
|      54|  Penelope|  Pinkett|    47:57.6|
|     104|  Penelope|   Cronyn|    47:57.6|
|     120|  Penelope|   Monroe|    47:57.6|
+--------+----------+---------+-----------+



In [None]:
# Which movies have been rented so far.
joined_df = rental_df.join(inventory_df, rental_df.inventory_id == inventory_df.inventory_id)
distinct_film_ids = joined_df.select("film_id").distinct()
movies_rented_so_far = film_df.join(distinct_film_ids, film_df.film_id == distinct_film_ids.film_id).select("title")
movies_rented_so_far.show()

+-------------------+
|              title|
+-------------------+
|    Island Exorcist|
|      Kick Savannah|
|   Instinct Airport|
|    Splendor Patton|
|      Submarine Bed|
|    Doors President|
|       Lucky Flying|
|   Newton Labyrinth|
|       Torque Bound|
|      Rock Instinct|
|       Hall Cassidy|
|      Apache Divine|
|        Legend Jedi|
|Dragonfly Strangers|
|        Mine Titans|
|     Charade Duffel|
|         Igby Maker|
|   Bonnie Holocaust|
|         Sling Luke|
|   Behavior Runaway|
+-------------------+
only showing top 20 rows



In [None]:
# Display the names of the actors that acted in more than 20 movies.
actors_more_than_20_movies = (film_actor_df
    .join(actor_df, film_actor_df.actor_id == actor_df.actor_id)
    .groupBy("first_name", "last_name")
    .agg(F.count("film_id").alias("movie_count"))
    .filter("movie_count > 20")
    .orderBy(F.asc("movie_count"))
)
actors_more_than_20_movies.show()

+-----------+-----------+-----------+
| first_name|  last_name|movie_count|
+-----------+-----------+-----------+
|Christopher|       West|         21|
|    Kenneth|    Paltrow|         21|
|      Kevin|      Bloom|         21|
|    Spencer|       Peck|         21|
|        Dan|       Torn|         22|
|         Ed|      Chase|         22|
|       Adam|     Hopper|         22|
|      Meryl|      Allen|         22|
|  Christian|      Gable|         22|
|       Gene|    Hopkins|         22|
|   Jennifer|      Davis|         22|
|  Sylvester|       Dern|         22|
|        Fay|       Wood|         22|
|       Nick|  Degeneres|         22|
|       Burt|     Temple|         23|
|   Michelle|Mcconaughey|         23|
|    Jessica|     Bailey|         23|
|       Lisa|     Monroe|         23|
|        Ben|     Harris|         23|
|        Tim|    Hackman|         23|
+-----------+-----------+-----------+
only showing top 20 rows



# **Question Set:**
Formulate 15 questions based on the dataset, ranging from easy to difficult.

Questions:

1) Retrieve all the distinct country names from the country table.

2) List the titles of films along with their categories.

3) Calculate the maximum length of films in the database.

4) Retrieve all the rental records where the return date is null.

5) What are the addresses of each store?

6) Count the number of films in each category, but only for categories with more than 10 films.

7) What is the name of the customer who lives in the city 'Apeldoorn'?

8) Update the email of the staff member with ID 101 to 'newemail@example.com'.

9) Write a query to create a count of movies in each of the 4 filmlen_groups:
1 hour or less
Between 1-2 hours
Between 2-3 hours
More than 3 hours

10) Select the titles of the movies that have the highest replacement cost.

11) Insert a new category named 'Documentary' into the category table.

12) Combine first_name and last_name from the customer table to become full_name.

13) Show how many inventory items are available at each store.

14) What is the total amount paid by each customer for all their rentals? For each customer, print their name and the total amount paid.

15) What payments have amounts between 3 USD and 5 USD?

# **Implement PySpark code to answer the set of 15 questions.**

In [None]:
# 1. Retrieve all the distinct country names from the country table.
distinct_countries = country_df.select('country').distinct()
distinct_countries.show()

+--------------------+
|             country|
+--------------------+
|                Chad|
|            Anguilla|
|            Paraguay|
|               Yemen|
|             Senegal|
|              Sweden|
|         Philippines|
|               Tonga|
|            Malaysia|
|              Turkey|
|              Malawi|
|                Iraq|
|             Germany|
|         Afghanistan|
|            Cambodia|
|               Sudan|
|              France|
|              Greece|
|Holy See (Vatican...|
|           Sri Lanka|
+--------------------+
only showing top 20 rows



In [None]:
# 2. List the titles of films along with their categories.
film_category_join = film_df.join(film_category_df, film_df.film_id == film_category_df.film_id)\
                     .join(category_df, film_category_df.category_id == category_df.category_id)
film_category_join.select('title', F.col('name').alias('categories')).orderBy('title').show()

+-------------------+-----------+
|              title| categories|
+-------------------+-----------+
|   Academy Dinosaur|Documentary|
|     Ace Goldfinger|     Horror|
|   Adaptation Holes|Documentary|
|   Affair Prejudice|     Horror|
|        African Egg|     Family|
|       Agent Truman|    Foreign|
|    Airplane Sierra|     Comedy|
|    Airport Pollock|     Horror|
|      Alabama Devil|     Horror|
|   Aladdin Calendar|     Sports|
|    Alamo Videotape|    Foreign|
|     Alaska Phantom|      Music|
|        Ali Forever|     Horror|
|     Alice Fantasia|   Classics|
|       Alien Center|    Foreign|
|    Alley Evolution|    Foreign|
|         Alone Trip|      Music|
|      Alter Victory|  Animation|
|       Amadeus Holy|     Action|
|Amelie Hellfighters|      Music|
+-------------------+-----------+
only showing top 20 rows



In [None]:
# 3. Calculate the maximum length of films in the database.
max_length = film_df.select(F.max('length').alias('maximum_length_of_films'))
max_length.show()

+-----------------------+
|maximum_length_of_films|
+-----------------------+
|                    185|
+-----------------------+



In [None]:
# 4. Retrieve all the rental records where the return date is null.
rental_df.filter(rental_df.return_date.isNull()).show()

+---------+---------------+------------+-----------+-----------+--------+--------------+
|rental_id|    rental_date|inventory_id|customer_id|return_date|staff_id|   last_update|
+---------+---------------+------------+-----------+-----------+--------+--------------+
|    11496|2/14/2006 15:16|        2047|        155|       NULL|       1|2/16/2006 2:30|
|    11541|2/14/2006 15:16|        2026|        335|       NULL|       1|2/16/2006 2:30|
|    12101|2/14/2006 15:16|        1556|        479|       NULL|       1|2/16/2006 2:30|
|    11563|2/14/2006 15:16|        1545|         83|       NULL|       1|2/16/2006 2:30|
|    11577|2/14/2006 15:16|        4106|        219|       NULL|       2|2/16/2006 2:30|
|    11593|2/14/2006 15:16|         817|         99|       NULL|       1|2/16/2006 2:30|
|    11611|2/14/2006 15:16|        1857|        192|       NULL|       2|2/16/2006 2:30|
|    11646|2/14/2006 15:16|         478|         11|       NULL|       2|2/16/2006 2:30|
|    11652|2/14/2006 

In [None]:
# 5. What are the addresses of each store?
store_address_join = store_df.join(address_df, store_df.address_id == address_df.address_id)
store_address_join.select('store_id', 'address', 'address2').show()

+--------+------------------+--------+
|store_id|           address|address2|
+--------+------------------+--------+
|       1| 47 MySakila Drive|    NULL|
|       2|28 MySQL Boulevard|    NULL|
+--------+------------------+--------+



In [None]:
# 6. Count the number of films in each category, but only for categories with more than 10 films.
film_category_count = film_category_df.groupBy('category_id').count()
film_category_count_filtered = film_category_count.filter(film_category_count['count'] > 10)

film_category_count_filtered = film_category_count_filtered.join(category_df,
                                                                 'category_id',
                                                                 'inner') \
    .select(F.col('name').alias('category'), 'count') \
    .orderBy('count')

film_category_count_filtered.show()

+-----------+-----+
|   category|count|
+-----------+-----+
|      Music|   51|
|     Horror|   56|
|     Travel|   57|
|   Classics|   57|
|     Comedy|   58|
|   Children|   60|
|      Games|   61|
|     Sci-Fi|   61|
|      Drama|   62|
|        New|   63|
|     Action|   64|
|  Animation|   66|
|Documentary|   68|
|     Family|   69|
|    Foreign|   73|
|     Sports|   74|
+-----------+-----+



In [None]:
# 7. What is the name of the customer who lives in the city 'Apeldoorn'?
customer_city_join = customer_df.join(address_df, customer_df.address_id == address_df.address_id)\
                                      .join(city_df, address_df.city_id == city_df.city_id)
customer_city_join.filter(city_df.city == 'Apeldoorn').select('first_name', 'last_name').show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Rhonda|  Kennedy|
+----------+---------+



In [None]:
# 8. Update the email of the staff member with ID 101 to 'newemail@example.com'.
staff_df_upd = staff_df.withColumn('email', F.when(staff_df.staff_id == 1, 'newemail@example.com').otherwise(staff_df.email))
staff_df_upd.select('staff_id', 'email').show()

+--------+--------------------+
|staff_id|               email|
+--------+--------------------+
|       1|newemail@example.com|
|       2|Jon.Stephens@saki...|
+--------+--------------------+



In [None]:
"""
9. Write a query you to create a count of movies in each of the 4 filmlen_groups:
1 hour or less, Between 1-2 hours, Between 2-3 hours, More than 3 hours.

filmlen_groups		filmcount_bylencat
1 hour or less			104
Between 1-2 hours		439
Between 2-3 hours		418
More than 3 hours		39

"""

# Define the film length groups using when and otherwise functions
film_df = film_df.withColumn("filmlen_groups",
                             F.when(F.col("length") <= 60, "1 hour or less")
                             .when((F.col("length") > 60) & (F.col("length") <= 120), "Between 1-2 hours")
                             .when((F.col("length") > 120) & (F.col("length") <= 180), "Between 2-3 hours")
                             .otherwise("More than 3 hours"))

# Calculate the count of films in each length category
film_count_by_length = film_df \
    .groupBy("filmlen_groups") \
    .agg(F.count("title").alias("filmcount_bylencat")) \
    .orderBy("filmlen_groups")

# Show the results with the specified format
film_count_by_length \
    .select(F.col("filmlen_groups").alias("filmlen_groups"),
            F.col("filmcount_bylencat").alias("filmcount_bylencat")).show()

+-----------------+------------------+
|   filmlen_groups|filmcount_bylencat|
+-----------------+------------------+
|   1 hour or less|               104|
|Between 1-2 hours|               439|
|Between 2-3 hours|               418|
|More than 3 hours|                39|
+-----------------+------------------+



In [None]:
# 10. Select the titles of the movies that have the highest replacement cost.
max_replacement_cost = film_df.agg({"replacement_cost": "max"}).collect()[0][0]
highest_replacement_cost_films = film_df.filter(film_df.replacement_cost == max_replacement_cost)

titles_highest_replacement_cost = highest_replacement_cost_films.select("title", "replacement_cost")
titles_highest_replacement_cost.show()

+--------------------+----------------+
|               title|replacement_cost|
+--------------------+----------------+
|        Arabia Dogma|           29.99|
|Ballroom Mockingbird|           29.99|
|       Blindness Gun|           29.99|
|    Bonnie Holocaust|           29.99|
| Chariots Conspiracy|           29.99|
|  Clockwork Paradise|           29.99|
|        Clyde Theory|           29.99|
|  Cruelty Unforgiven|           29.99|
|    Cupboard Sinners|           29.99|
|Desperate Trainsp...|           29.99|
|           Dirty Ace|           29.99|
|        Doctor Grail|           29.99|
|        Earth Vision|           29.99|
|      Everyone Craft|           29.99|
|Extraordinary Con...|           29.99|
|       Fantasia Park|           29.99|
|        Feud Frogmen|           29.99|
|   Flatliners Killer|           29.99|
|       Floats Garden|           29.99|
|      Gilmore Boiled|           29.99|
+--------------------+----------------+
only showing top 20 rows



In [None]:
# 11. Insert a new category named 'Documentary' into the category table.

# Define the schema for the new DataFrame
schema = StructType([
    StructField("category_id", StringType(), nullable=True),
    StructField("name", StringType(), nullable=False),
    StructField("last_update", TimestampType(), nullable=False)
])

# Create a new DataFrame with the 'Documentary' category
new_category_df = spark.createDataFrame([(None, 'Documentary', datetime.now())], schema)

# Union the new DataFrame with the existing category DataFrame
category_df_upd = category_df.union(new_category_df)

# Show the updated DataFrame
category_df_upd.show()

+-----------+-----------+--------------------+
|category_id|       name|         last_update|
+-----------+-----------+--------------------+
|          1|     Action|      2/15/2006 9:46|
|          2|  Animation|      2/15/2006 9:46|
|          3|   Children|      2/15/2006 9:46|
|          4|   Classics|      2/15/2006 9:46|
|          5|     Comedy|      2/15/2006 9:46|
|          6|Documentary|      2/15/2006 9:46|
|          7|      Drama|      2/15/2006 9:46|
|          8|     Family|      2/15/2006 9:46|
|          9|    Foreign|      2/15/2006 9:46|
|         10|      Games|      2/15/2006 9:46|
|         11|     Horror|      2/15/2006 9:46|
|         12|      Music|      2/15/2006 9:46|
|         13|        New|      2/15/2006 9:46|
|         14|     Sci-Fi|      2/15/2006 9:46|
|         15|     Sports|      2/15/2006 9:46|
|         16|     Travel|      2/15/2006 9:46|
|       NULL|Documentary|2024-01-27 09:52:...|
+-----------+-----------+--------------------+



In [None]:
# 12. Combine first_name and last_name from the customer table to become full_name.
customer_full_name = customer_df.withColumn('full_name', F.concat(F.col('first_name'), F.lit(' '), F.col('last_name')))
customer_full_name.select('first_name', 'last_name', 'full_name').show()

+----------+---------+----------------+
|first_name|last_name|       full_name|
+----------+---------+----------------+
|     Jared|      Ely|       Jared Ely|
|      Mary|    Smith|      Mary Smith|
|  Patricia|  Johnson|Patricia Johnson|
|     Linda| Williams|  Linda Williams|
|   Barbara|    Jones|   Barbara Jones|
| Elizabeth|    Brown| Elizabeth Brown|
|  Jennifer|    Davis|  Jennifer Davis|
|     Maria|   Miller|    Maria Miller|
|     Susan|   Wilson|    Susan Wilson|
|  Margaret|    Moore|  Margaret Moore|
|   Dorothy|   Taylor|  Dorothy Taylor|
|      Lisa| Anderson|   Lisa Anderson|
|     Nancy|   Thomas|    Nancy Thomas|
|     Karen|  Jackson|   Karen Jackson|
|     Betty|    White|     Betty White|
|     Helen|   Harris|    Helen Harris|
|    Sandra|   Martin|   Sandra Martin|
|     Donna| Thompson|  Donna Thompson|
|     Carol|   Garcia|    Carol Garcia|
|      Ruth| Martinez|   Ruth Martinez|
+----------+---------+----------------+
only showing top 20 rows



In [None]:
# 13. Show how many inventory items are available at each store.
inventory_count_per_store = inventory_df.groupBy('store_id').count()
inventory_count_per_store.show()

+--------+-----+
|store_id|count|
+--------+-----+
|       1| 2270|
|       2| 2311|
+--------+-----+



In [None]:
"""
14. What is the total amount paid by each customer for all their rentals?
For each customer print their name and the total amount paid.
"""

# Group by customer_id and sum the amount paid
total_amount_paid_per_customer = payment_df.groupBy('customer_id') \
    .agg(F.sum('amount').alias('total_amount_paid'))

# Join with customer_df to get customer names
total_amount_paid_per_customer = total_amount_paid_per_customer.join(customer_df,
                                                                     'customer_id',
                                                                     'inner') \
    .select('first_name', 'last_name', 'total_amount_paid')

# Order the result by total_amount_paid in descending order
total_amount_paid_per_customer = total_amount_paid_per_customer.orderBy(F.desc('total_amount_paid'))

# Show the result
total_amount_paid_per_customer.show()

+----------+---------+------------------+
|first_name|last_name| total_amount_paid|
+----------+---------+------------------+
|   Eleanor|     Hunt| 211.5500000000001|
|      Karl|     Seal|208.58000000000013|
|    Marion|   Snyder|194.61000000000007|
|    Rhonda|  Kennedy|191.62000000000006|
|     Clara|     Shaw|189.60000000000005|
|     Tommy|  Collazo|183.63000000000002|
|       Ana|  Bradley|167.67000000000002|
|    Curtis|     Irby|            167.62|
|    Marcia|     Dean|166.61000000000004|
|      Mike|      Way|            162.67|
|    Arnold|   Havens|            161.68|
|    Wesley|     Bull|            158.65|
|    Gordon|   Allard|            157.69|
|     Louis|    Leone|            156.66|
|      Lena|   Jensen|             154.7|
|       Tim|     Cary|            154.66|
|    Warren|  Sherrod|152.69000000000003|
|     Steve|Mackenzie|            152.68|
|  Brittany|    Riley|            151.73|
|       Guy| Brownlee|            151.69|
+----------+---------+------------

In [None]:
# 15. What payments have amounts between 3 USD and 5 USD?
payments_between_3_and_5 = payment_df.filter((payment_df.amount >= 3) & (payment_df.amount <= 5))
payments_between_3_and_5.select('customer_id', 'payment_id', 'amount').orderBy('amount').show()

+-----------+----------+------+
|customer_id|payment_id|amount|
+-----------+----------+------+
|        269|     31919|  3.98|
|        361|     31945|  3.98|
|        448|     31965|  3.98|
|        457|     31969|  3.98|
|         15|     32014|  3.98|
|         43|     32025|  3.98|
|        175|     32065|  3.98|
|        228|     32088|  3.98|
|        364|     17601|  3.99|
|        431|     17849|  3.99|
|        371|     17623|  3.99|
|        344|     17519|  3.99|
|        372|     17627|  3.99|
|        400|     17741|  3.99|
|        403|     17748|  3.99|
|        350|     17541|  3.99|
|        405|     17759|  3.99|
|        377|     17639|  3.99|
|        406|     17763|  3.99|
|        380|     17653|  3.99|
+-----------+----------+------+
only showing top 20 rows

