In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, concat_ws, collect_list, count, avg, upper, round, desc, asc, dense_rank, countDistinct, row_number
from pyspark.sql.window import Window
import os
from dotenv import load_dotenv, dotenv_values

In [3]:
# VARIABLES NEEDED TO MAKE CONNECTION AND TABLE NAMES

config = dotenv_values(".env")
database = config["DATABASE"]
user = config["USER"]
password = config["PASS"]
port = config["PORT"]
server = config["SERVER"]
jdbc_url = f"jdbc:sqlserver://{server}:{port};databaseName={database};encrypt=true;trustServerCertificate=true"
jdbc_driver_path = config["JDBC_DRIVER_PATH"]

album, artist, customer, employee, genre, invoice, invoice_line, media_type, playlist, playlist_track, track = "dbo.Album", "dbo.Artist",\
    "dbo.Customer", "dbo.Employee", "dbo.Genre",\
    "dbo.Invoice", "dbo.InvoiceLine", "dbo.MediaType",\
    "dbo.Playlist", "dbo.PlaylistTrack", "dbo.Track"

In [4]:
# CREATE SPARK SESSION 

spark = SparkSession.builder \
    .appName("PySpark using Chinook DB") \
    .master("local") \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

In [5]:
# CREATE DF FOR EACH TABLE USING JDBC

album_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", album).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

artist_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", artist).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

customer_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", customer).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

employee_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", employee).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

genre_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", genre).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

invoice_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", invoice).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

invoice_line_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", invoice_line).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

media_type_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", media_type).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

playlist_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", playlist).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

playlist_track_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", playlist_track).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

track_df = spark.read.format("jdbc").option("url", jdbc_url).option("dbtable", track).option("user", user)\
    .option("password", password).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()


EASY

In [8]:
# 1. Retrieve the total sales (invoiced amount) for each customer.

q1_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.CustomerId") == col("i.CustomerId"), "inner")\
    .groupBy(col("c.CustomerId").alias("Customer"))\
    .agg(round(sum(col("i.Total")), 1).alias("Total_Sales"))\
    .show()

+--------+-----------+
|Customer|Total_Sales|
+--------+-----------+
|      31|       37.6|
|      53|       37.6|
|      34|       39.6|
|      28|       43.6|
|      27|       37.6|
|      26|       47.6|
|      44|       41.6|
|      12|       37.6|
|      22|       39.6|
|      47|       37.6|
|       1|       39.6|
|      52|       37.6|
|      13|       37.6|
|      16|       37.6|
|       6|       49.6|
|       3|       39.6|
|      40|       38.6|
|      20|       39.6|
|      57|       46.6|
|      54|       37.6|
+--------+-----------+
only showing top 20 rows



In [61]:
# 2. List all albums by a specific artist (e.g., “Queen”).

q2_df = artist_df.alias("ar").join(album_df.alias("al"), col("ar.ArtistId") == col("al.ArtistId"), "inner")\
    .groupBy(col("ar.ArtistId"), col("ar.Name"))\
    .agg(concat_ws(", ", collect_list(col("al.Title"))).alias("Albums"))\
    .show()

+--------+--------------------+--------------------+
|ArtistId|                Name|              Albums|
+--------+--------------------+--------------------+
|       1|               AC/DC|For Those About T...|
|       2|              Accept|Balls to the Wall...|
|       3|           Aerosmith|            Big Ones|
|       4|   Alanis Morissette|  Jagged Little Pill|
|       5|     Alice In Chains|            Facelift|
|       6|Antônio Carlos Jobim|Warner 25 Anos, C...|
|       7|        Apocalyptica|Plays Metallica B...|
|       8|          Audioslave|Audioslave, Out O...|
|       9|            BackBeat| BackBeat Soundtrack|
|      10|        Billy Cobham|The Best Of Billy...|
|      11| Black Label Society|Alcohol Fueled Br...|
|      12|       Black Sabbath|Black Sabbath, Bl...|
|      13|          Body Count|          Body Count|
|      14|     Bruce Dickinson|    Chemical Wedding|
|      15|           Buddy Guy|The Best Of Buddy...|
|      16|      Caetano Veloso|Prenda Minha, S

In [65]:
# 3. Get a list of tracks from a specific genre (e.g., “Rock”).

q3_df = track_df.alias("t").join(genre_df.alias("g"), col("t.GenreId")==col("g.GenreId"), "inner")\
    .groupBy(col("g.name").alias("Genre")).agg(concat_ws("; ", collect_list(col("t.name"))).alias("Tracks"))\
    .show()

+-----------------+--------------------+
|            Genre|              Tracks|
+-----------------+--------------------+
|            World|Pura Elegancia; C...|
|      Hip Hop/Rap|ZeroVinteUm; Quei...|
|         TV Shows|Occupation / Prec...|
|       Bossa Nova|Samba Da Bênção; ...|
|            Latin|Jorge Da Capadóci...|
| Sci Fi & Fantasy|Crossroads, Pt. 1...|
|            Metal|Enter Sandman; Ma...|
|             Rock|For Those About T...|
|             Jazz|Desafinado; Garot...|
|         R&B/Soul|Please Please Ple...|
|            Drama|Don't Look Back; ...|
|       Soundtrack|Vai-Vai 2001; X-9...|
|Electronica/Dance|Just Another Stor...|
|        Classical|Symphony No. 3 in...|
|            Blues|First Time I Met ...|
|      Alternative|War Pigs; Say Hel...|
|    Rock And Roll|Money; Long Tall ...|
|  Science Fiction|Battlestar Galact...|
|              Pop|Dig-Dig, Lambe-La...|
|      Heavy Metal|Wildest Dreams; R...|
+-----------------+--------------------+
only showing top

In [13]:
# 4. Find the total number of invoices for each customer.

q4_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .groupBy(col("c.customerid"), col("c.firstname"))\
    .agg(count(col("i.invoiceid")))\
    .show()

+----------+---------+------------------+
|customerid|firstname|count(i.invoiceid)|
+----------+---------+------------------+
|        31|   Martha|                 7|
|        53|     Phil|                 7|
|        34|     João|                 7|
|        28|    Julia|                 7|
|        26|  Richard|                 7|
|        27|  Patrick|                 7|
|        44|    Terhi|                 7|
|        12|  Roberto|                 7|
|        22|  Heather|                 7|
|        47|    Lucas|                 7|
|         1|     Luís|                 7|
|        52|     Emma|                 7|
|        13| Fernanda|                 7|
|         6|   Helena|                 7|
|        16|    Frank|                 7|
|         3| François|                 7|
|        20|      Dan|                 7|
|        40|Dominique|                 7|
|        57|     Luis|                 7|
|        54|    Steve|                 7|
+----------+---------+------------

In [12]:
# 5 .Display the average track length (milliseconds) for each album.

q5_df = album_df.alias("al").join(track_df.alias("t"),col("al.albumid") == col("t.albumid"),"inner")\
    .groupBy(col("al.albumid").alias("Album no."), col("al.title").alias("Album Name"))\
    .agg(round(avg(col("t.milliseconds")), 1).alias("Length"))\
    .show()

+---------+--------------------+---------+
|Album no.|          Album Name|   Length|
+---------+--------------------+---------+
|      148|         Black Album| 313268.7|
|      243|The Best Of Van H...| 255881.2|
|       31|          Bongo Fury| 273992.1|
|       85|As Canções de Eu ...| 206159.7|
|      137|The Song Remains ...| 588794.2|
|      251|The Office, Season 3|1532683.8|
|       65|        Stormbringer| 244119.7|
|       53|        Vozes do MPB| 204191.3|
|      255|Instant Karma: Th...| 223255.9|
|      133|     Led Zeppelin II| 277652.1|
|      296|A Copland Celebra...| 198064.0|
|       78|        Deixa Entrar| 203032.6|
|      322|               Frank| 275982.5|
|      321|       Back to Black| 212004.2|
|      108|   Rock In Rio [CD1]| 338658.8|
|      155|           St. Anger| 409732.5|
|       34|Chill: Brazil (Di...| 248321.1|
|      193|Blood Sugar Sex M...| 261004.3|
|      211|         The Singles| 214704.4|
|      101|             Killers| 232369.1|
+---------+

In [8]:
# 6. Retrieve all customers from the “USA” and their invoices.

q6_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .select(col("c.customerid"), col("c.firstname"), col("i.invoiceid"), col("i.billingcountry"))\
    .where(upper(col("i.billingcountry")) == "USA")\
    .show()

+----------+---------+---------+--------------+
|customerid|firstname|invoiceid|billingcountry|
+----------+---------+---------+--------------+
|        28|    Julia|       71|           USA|
|        28|    Julia|       82|           USA|
|        28|    Julia|      137|           USA|
|        28|    Julia|      266|           USA|
|        28|    Julia|      289|           USA|
|        28|    Julia|      311|           USA|
|        28|    Julia|      363|           USA|
|        27|  Patrick|       39|           USA|
|        26|  Richard|       70|           USA|
|        26|  Richard|       93|           USA|
|        26|  Richard|      115|           USA|
|        26|  Richard|      167|           USA|
|        27|  Patrick|      168|           USA|
|        27|  Patrick|      191|           USA|
|        27|  Patrick|      213|           USA|
|        27|  Patrick|      265|           USA|
|        26|  Richard|      288|           USA|
|        26|  Richard|      299|        

In [14]:
# 7. Show the total number of tracks in each genre.

q7_df = genre_df.alias("g").join(track_df.alias("t"), col("g.genreid") == col("t.genreid"), "inner")\
    .groupBy(col("g.name").alias("Genre")) \
    .agg(count(col("t.trackid")).alias("no_of_tracks"))\
    .show()

+-----------------+------------+
|            Genre|no_of_tracks|
+-----------------+------------+
|            World|          28|
|      Hip Hop/Rap|          35|
|         TV Shows|          93|
|       Bossa Nova|          15|
|            Latin|         579|
| Sci Fi & Fantasy|          26|
|            Metal|         374|
|             Rock|        1297|
|             Jazz|         130|
|         R&B/Soul|          61|
|            Drama|          64|
|       Soundtrack|          43|
|Electronica/Dance|          30|
|        Classical|          74|
|            Blues|          81|
|      Alternative|          40|
|    Rock And Roll|          12|
|  Science Fiction|          13|
|              Pop|          48|
|      Heavy Metal|          28|
+-----------------+------------+
only showing top 20 rows



In [17]:
# 8. List all tracks for a specific album (e.g., "Abbey Road").

q8_df = album_df.alias("al").join(track_df.alias("t"), col("al.albumid") == col("t.albumid"), "inner")\
    .groupBy(col("al.title").alias("Album")) \
    .agg(concat_ws("; ", collect_list(col("t.name"))).alias("Tracks")) \
    .show()


+--------------------+--------------------+
|               Album|              Tracks|
+--------------------+--------------------+
|...And Justice Fo...|Blackened; ...And...|
|20th Century Mast...|Rock You Like a H...|
|A Copland Celebra...|Fanfare for the C...|
|A Matter of Life ...|Different World; ...|
|     A Real Dead One|The Number Of The...|
|     A Real Live One|Be Quick Or Be De...|
|  A Soprano Inspired|           Ave Maria|
|A TempestadeTempe...|Natália; L'Avvent...|
|             A-Sides|Nothing To Say; F...|
|       Ace Of Spades|Ace Of Spades; Lo...|
|        Achtung Baby|Zoo Station; Even...|
|            Acústico|Comida; Go Back; ...|
|        Acústico MTV|Vulcão Dub - Fui ...|
| Acústico MTV [Live]|Girassol; A Sombr...|
|Adams, John: The ...|Two Fanfares for ...|
|Adorate Deum: Gre...|Intoitus: Adorate...|
|      Afrociberdelia|Mateus Enter; O C...|
|   Album Of The Year|Collision; Strips...|
|Alcohol Fueled Br...|Intro/ Low Down; ...|
|Alcohol Fueled Br...|Heart Of G

In [44]:
# 9. Find customers who have not placed any invoices.

q9_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "leftanti").select(col("c.customerid")).show()

+----------+
|customerid|
+----------+
|        60|
+----------+



In [47]:
# 10. Retrieve the most popular genres by total number of tracks.

q10_df = genre_df.alias("g").join(track_df.alias("t"), col("g.genreid") == col("t.genreid"), "inner") \
    .groupBy(col("g.name").alias("Genre")) \
    .agg(count(col("t.trackid")).alias("Total_tracks"))\
    .orderBy(desc("Total_tracks"))\
    .show()

+------------------+------------+
|             Genre|Total_tracks|
+------------------+------------+
|              Rock|        1297|
|             Latin|         579|
|             Metal|         374|
|Alternative & Punk|         332|
|              Jazz|         130|
|          TV Shows|          93|
|             Blues|          81|
|         Classical|          74|
|             Drama|          64|
|          R&B/Soul|          61|
|            Reggae|          58|
|               Pop|          48|
|        Soundtrack|          43|
|       Alternative|          40|
|       Hip Hop/Rap|          35|
| Electronica/Dance|          30|
|             World|          28|
|       Heavy Metal|          28|
|  Sci Fi & Fantasy|          26|
|    Easy Listening|          24|
+------------------+------------+
only showing top 20 rows



MEDIUM

In [63]:
# 11. Find the top 5 customers who have spent the most on purchases.

q11_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .groupBy(col("c.customerid"))  \
    .agg(sum(col("i.total")).alias("Spend")) \
    .withColumn("rank", dense_rank().over(Window.orderBy(desc("Spend")))) \
    .where(col("rank")<=5) \
    .show()

+----------+-----+----+
|customerid|Spend|rank|
+----------+-----+----+
|         6|49.62|   1|
|        26|47.62|   2|
|        57|46.62|   3|
|        45|45.62|   4|
|        46|45.62|   4|
|        28|43.62|   5|
|        37|43.62|   5|
|        24|43.62|   5|
+----------+-----+----+



In [67]:
# 12. For each genre, calculate the total sales (invoice total) made from tracks in that genre.

q12_df = genre_df.alias("g")\
    .join(track_df.alias("t"), col("g.genreid") == col("t.genreid"), "inner")\
    .join(invoice_line_df.alias("il"), col("t.trackid") == col("il.trackid"), "inner")\
    .join(invoice_df.alias("i"), col("il.invoiceid") == col("i.invoiceid"), "inner")\
    .groupBy(col("g.genreid"), col("g.name").alias("Genre"))\
    .agg(sum(col("i.total")).alias("Sales By Genre"))\
    .show(100)

+-------+------------------+--------------+
|genreid|             Genre|Sales By Genre|
+-------+------------------+--------------+
|     18|   Science Fiction|        102.41|
|     15| Electronica/Dance|        149.62|
|     24|         Classical|        317.04|
|     17|       Hip Hop/Rap|        166.41|
|     11|        Bossa Nova|         86.13|
|     13|       Heavy Metal|        161.37|
|     21|             Drama|        544.61|
|     12|    Easy Listening|        138.60|
|     23|       Alternative|        211.17|
|     22|            Comedy|        112.30|
|     20|  Sci Fi & Fantasy|        198.87|
|     19|          TV Shows|        817.71|
|      8|            Reggae|        332.64|
|      2|              Jazz|        746.46|
|     10|        Soundtrack|        242.55|
|      3|             Metal|       2093.13|
|      5|     Rock And Roll|         83.16|
|     16|             World|        182.18|
|      4|Alternative & Punk|       1961.66|
|      1|              Rock|    

In [69]:
# 13. Get the total number of tracks purchased by each customer.

q13_df = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner") \
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner") \
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner") \
    .groupBy(col("c.customerid"), col("c.firstname"))\
    .agg(count(col("t.trackid")).alias("total_tracks"))\
    .show()

+----------+---------+------------+
|customerid|firstname|total_tracks|
+----------+---------+------------+
|        31|   Martha|          38|
|        53|     Phil|          38|
|        34|     João|          38|
|        28|    Julia|          38|
|        26|  Richard|          38|
|        27|  Patrick|          38|
|        44|    Terhi|          38|
|        12|  Roberto|          38|
|        22|  Heather|          38|
|        47|    Lucas|          38|
|         1|     Luís|          38|
|        52|     Emma|          38|
|        13| Fernanda|          38|
|         6|   Helena|          38|
|        16|    Frank|          38|
|         3| François|          38|
|        20|      Dan|          38|
|        40|Dominique|          38|
|        57|     Luis|          38|
|        54|    Steve|          38|
+----------+---------+------------+
only showing top 20 rows



In [72]:
# 14. Show the top 10 most purchased tracks across all invoices.

q14_df = invoice_df.alias("i")\
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner")\
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner")\
    .groupBy(col("t.trackid"))\
    .agg(count(col("i.invoiceid")).alias("total_tracks"))\
    .withColumn("rank", dense_rank().over(Window.orderBy(desc("total_tracks"))))\
    .where(col("rank") <= 10)\
    .show()

+-------+------------+----+
|trackid|total_tracks|rank|
+-------+------------+----+
|    496|           2|   1|
|    858|           2|   1|
|   3488|           2|   1|
|   2572|           2|   1|
|   1322|           2|   1|
|   1139|           2|   1|
|   2259|           2|   1|
|   2096|           2|   1|
|   2249|           2|   1|
|   2027|           2|   1|
|   1157|           2|   1|
|    472|           2|   1|
|   1888|           2|   1|
|   1808|           2|   1|
|    857|           2|   1|
|    976|           2|   1|
|   1331|           2|   1|
|    744|           2|   1|
|   1344|           2|   1|
|    211|           2|   1|
+-------+------------+----+
only showing top 20 rows



In [80]:
# 15. Calculate the average invoice total for each country.
"""
# Using Group By
q15_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .groupBy(col("c.country"))\
    .agg(avg(col("i.total")).alias("avg_invoice"))\
    .show()
"""
# Using window function

q15_df = customer_df.alias("c").join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .withColumn("avg_sales", avg(col("i.total")).over(Window.partitionBy(col("c.country"))))\
    .select(col("c.country"), col("avg_sales")).distinct()\
    .show()



+--------------+---------+
|       country|avg_sales|
+--------------+---------+
|     Argentina| 5.374286|
|     Australia| 5.374286|
|       Austria| 6.088571|
|       Belgium| 5.374286|
|        Brazil| 5.431429|
|        Canada| 5.427857|
|         Chile| 6.660000|
|Czech Republic| 6.445714|
|       Denmark| 5.374286|
|       Finland| 5.945714|
|        France| 5.574286|
|       Germany| 5.588571|
|       Hungary| 6.517143|
|         India| 5.789231|
|       Ireland| 6.517143|
|         Italy| 5.374286|
|   Netherlands| 5.802857|
|        Norway| 5.660000|
|        Poland| 5.374286|
|      Portugal| 5.517143|
+--------------+---------+
only showing top 20 rows



In [88]:
# 16. List the customers who have bought tracks from more than one genre.

q16_df = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner") \
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner") \
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner") \
    .join(genre_df.alias("g"), col("t.genreid") == col("g.genreid"), "inner") \
    .groupBy(col("c.customerid"), col("c.firstname"))\
    .agg(countDistinct("g.name").alias("total_genre"))\
    .filter(col("total_genre")>1)\
    .show()

+----------+---------+-----------+
|customerid|firstname|total_genre|
+----------+---------+-----------+
|        45| Ladislav|         11|
|        11|Alexandre|          6|
|         4|    Bjørn|          8|
|        23|     John|          9|
|        56|    Diego|          7|
|        36|   Hannah|          6|
|        33|    Ellie|          6|
|        34|     João|         10|
|        52|     Emma|          5|
|        51|   Joakim|          7|
|        37|     Fynn|         10|
|        25|   Victor|          8|
|        30|   Edward|          6|
|        38|   Niklas|          7|
|        57|     Luis|         12|
|        55|     Mark|          6|
|        53|     Phil|          6|
|        24|    Frank|         10|
|        35| Madalena|          6|
|        44|    Terhi|          7|
+----------+---------+-----------+
only showing top 20 rows



In [90]:
# 17. Find the artists whose tracks have generated the most revenue. (added by me: top 10 artists)

q17_df = artist_df.alias("ar")\
    .join(album_df.alias("al"), col("ar.artistid") == col("al.artistid"), "inner") \
    .join(track_df.alias("t"), col("al.albumid") == col("t.albumid"), "inner") \
    .join(invoice_line_df.alias("il"), col("t.trackid") == col("il.trackid"), "inner") \
    .join(invoice_df.alias("i"), col("il.invoiceid") == col("i.invoiceid"), "inner") \
    .groupBy(col("ar.artistid"), col("ar.name"))\
    .agg(sum(col("i.total")).alias("revenue"))\
    .withColumn("rank", dense_rank().over(Window.orderBy(desc("revenue"))))\
    .filter(col("rank") <= 10)\
    .show()

+--------+-------------+-------+----+
|artistid|         name|revenue|rank|
+--------+-------------+-------+----+
|      90|  Iron Maiden|1233.54|   1|
|     150|           U2| 895.59|   2|
|     149|         Lost| 833.70|   3|
|      22| Led Zeppelin| 620.73|   4|
|      50|    Metallica| 599.94|   5|
|      58|  Deep Purple| 550.44|   6|
|     118|    Pearl Jam| 408.87|   7|
|     100|Lenny Kravitz| 372.51|   8|
|     152|    Van Halen| 336.82|   9|
|     156|   The Office| 328.80|  10|
+--------+-------------+-------+----+



In [95]:
# 18. Display the number of invoices and total revenue generated by each employee (sales representative).

q18_df = employee_df.alias("e")\
    .join(customer_df.alias("c"), [col("e.reportsto") == col("c.supportrepid"), col("e.title") == "Sales Support Agent"], "inner")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"))\
    .groupBy("e.employeeid")\
    .agg(countDistinct("i.invoiceid").alias("no_of_invoices"), sum(col("i.total")).alias("revenue")) \
    .show()

+----------+--------------+-------+
|employeeid|no_of_invoices|revenue|
+----------+--------------+-------+
|         9|           146| 833.04|
|        10|           126| 720.16|
+----------+--------------+-------+



In [107]:
# 19. For each customer, calculate the total amount spent on tracks from each genre.

q19_df = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner")\
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner")\
    .join(genre_df.alias("g"), col("t.genreid") == col("g.genreid"), "inner")\
    .groupBy(col("c.customerid"), col("g.genreid"), col("g.name"))\
    .agg(sum(col("i.total")).alias("total_purchase"))\
    .orderBy("c.customerid")\
    .show()

"""
# Using window function

q19_window_df = customer_df.alias("c") \
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner") \
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner") \
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner") \
    .join(genre_df.alias("g"), col("t.genreid") == col("g.genreid"), "inner") \
    .withColumn("total_purchase", sum("i.total").over(Window.partitionBy("c.customerid", "g.genreid").orderBy("c.customerid"))) \
    .select("c.customerid", "g.genreid", "g.name", "total_purchase") \
    .distinct() \
    .orderBy("c.customerid") \
    .show()
"""

+----------+-------+------------------+--------------+
|customerid|genreid|              name|total_purchase|
+----------+-------+------------------+--------------+
|         1|      3|             Metal|         17.82|
|         1|      1|              Rock|         89.10|
|         1|      7|             Latin|        122.76|
|         1|      8|            Reggae|         41.58|
|         1|      9|               Pop|         27.72|
|         1|     10|        Soundtrack|         27.72|
|         1|     20|  Sci Fi & Fantasy|          7.96|
|         1|     24|         Classical|          3.96|
|         2|      6|             Blues|         47.52|
|         2|      4|Alternative & Punk|         17.82|
|         2|      3|             Metal|         17.82|
|         2|      1|              Rock|        150.48|
|         2|      7|             Latin|         55.44|
|         2|      9|               Pop|         13.86|
|         2|     10|        Soundtrack|         31.68|
|         

In [111]:
# 20. Retrieve all albums with more than 10 tracks and list the number of tracks for each.

q20_df = album_df.alias("al")\
    .join(track_df.alias("t"), col("al.albumid") == col("t.albumid"), "inner")\
    .groupBy(col("al.albumid"), col("al.title"))\
    .agg(count(col("t.trackid")).alias("no_of_tracks"))\
    .orderBy(col("no_of_tracks"))\
    .filter(col("no_of_tracks")>=10)\
    .show()

+-------+--------------------+------------+
|albumid|               title|no_of_tracks|
+-------+--------------------+------------+
|    177|      The Beast Live|          10|
|    108|   Rock In Rio [CD1]|          10|
|    101|             Killers|          10|
|     28|            Na Pista|          10|
|    103|Live At Donington...|          10|
|    209|       Live [Disc 1]|          10|
|      1|For Those About T...|          10|
|    269|   Temple of the Dog|          10|
|    205|             In Step|          10|
|    191|        Cesta Básica|          10|
|    127|BBC Sessions [Dis...|          10|
|     17|Black Sabbath Vol...|          10|
|     49|The Essential Mil...|          10|
|     97|     Brave New World|          10|
|    239|                 War|          10|
|     80|In Your Honor [Di...|          10|
|    240|             Zooropa|          10|
|    121|Surfing with the ...|          10|
|    105|No Prayer For The...|          10|
|    150|        Kill 'Em All|  

3 WINDOW FUNCTION QUESTIONS

In [12]:
# 1. Retrieve the top 3 most purchased tracks by each customer, based on the total number of times each track was purchased by that customer.

q1_windowdf = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner")\
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner")\
    .groupBy(col("c.customerid"), col("t.name"), col("t.trackid"))\
    .agg(count("t.trackid").alias("track_purchases"))\
    .withColumn("rank", dense_rank().over(Window.partitionBy(col("c.customerid")).orderBy(col("track_purchases").desc())))\
    .where(col("rank") <= 3)\
    .show()



+----------+--------------------+-------+---------------+----+
|customerid|                name|trackid|track_purchases|rank|
+----------+--------------------+-------+---------------+----+
|         1|   Take the Celestra|   3248|              1|   1|
|         1| Experiment In Terra|   3247|              1|   1|
|         1|            Cold Gin|    453|              1|   1|
|         1|            Strutter|    451|              1|   1|
|         1|    Calling Dr. Love|    449|              1|   1|
|         1|   Shout It Out Loud|    447|              1|   1|
|         1|                Coma|   1173|              1|   1|
|         1|      Garden of Eden|   1169|              1|   1|
|         1|      Back off Bitch|   1165|              1|   1|
|         1|Don't Cry (Original)|   1161|              1|   1|
|         1|        Rocket Queen|   1157|              1|   1|
|         1|     Think About You|   1153|              1|   1|
|         1|All Along The Wat...|   2991|              

In [17]:
# 2. For each customer, display the running total (cumulative sum) of their invoice amounts, ordered by the invoice date. 
# Include columns for CustomerId, InvoiceId, InvoiceDate, and RunningTotal.

q2_windowdf = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .withColumn("running_total", sum(col("i.total")).over(Window.partitionBy(col("c.customerid")).orderBy(col("i.invoicedate"))))\
    .select(col("c.customerid"), col("i.invoiceid"), col("i.invoicedate"), col("running_total"))\
    .show(1000)
    

+----------+---------+-------------------+-------------+
|customerid|invoiceid|        invoicedate|running_total|
+----------+---------+-------------------+-------------+
|         1|       98|2022-03-11 00:00:00|         3.98|
|         1|      121|2022-06-13 00:00:00|         7.94|
|         1|      143|2022-09-15 00:00:00|        13.88|
|         1|      195|2023-05-06 00:00:00|        14.87|
|         1|      316|2024-10-27 00:00:00|        16.85|
|         1|      327|2024-12-07 00:00:00|        30.71|
|         1|      382|2025-08-07 00:00:00|        39.62|
|         2|        1|2021-01-01 00:00:00|         1.98|
|         2|       12|2021-02-11 00:00:00|        15.84|
|         2|       67|2021-10-12 00:00:00|        24.75|
|         2|      196|2023-05-19 00:00:00|        26.73|
|         2|      219|2023-08-21 00:00:00|        30.69|
|         2|      241|2023-11-23 00:00:00|        36.63|
|         2|      293|2024-07-13 00:00:00|        37.62|
|         3|       99|2022-03-1

In [28]:
# 3.Rank the artists based on the total number of tracks purchased from them, 
# but also include a column showing the ranking at each invoice date for each artist, so the rank updates as more purchases are made.

q3_windowdf = artist_df.alias("ar")\
    .join(album_df.alias("al"), col("ar.artistid") == col("al.artistid"), "inner")\
    .join(track_df.alias("t"), col("al.albumid") == col("t.albumid"), "inner")\
    .join(invoice_line_df.alias("il"), col("t.trackid") == col("il.trackid"), "inner")\
    .join(invoice_df.alias("i"), col("il.invoiceid") == col("i.invoiceid"), "inner")\
    .groupBy(col("ar.artistid"), col("ar.name"), col("i.invoicedate"))\
    .agg(count("t.trackid").alias("track_count"))\
    .withColumn("track_sum", sum(col("track_count")).over(Window.partitionBy(col("ar.artistid")).orderBy(col("i.invoicedate"))))\
    .withColumn("rank", dense_rank().over(Window.orderBy(col("track_sum").desc())))\
    .show()\

    


+--------+------------+-------------------+-----------+---------+----+
|artistid|        name|        invoicedate|track_count|track_sum|rank|
+--------+------------+-------------------+-----------+---------+----+
|      90| Iron Maiden|2025-05-06 00:00:00|          9|      140|   1|
|      90| Iron Maiden|2025-05-03 00:00:00|          6|      131|   2|
|      90| Iron Maiden|2025-05-02 00:00:00|          4|      125|   3|
|      90| Iron Maiden|2025-05-01 00:00:00|          4|      121|   4|
|      90| Iron Maiden|2025-04-18 00:00:00|          1|      117|   5|
|      90| Iron Maiden|2025-04-10 00:00:00|         12|      116|   6|
|     150|          U2|2025-12-09 00:00:00|          9|      107|   7|
|      90| Iron Maiden|2024-01-27 00:00:00|          5|      104|   8|
|      90| Iron Maiden|2024-01-24 00:00:00|          6|       99|   9|
|     150|          U2|2025-12-06 00:00:00|          6|       98|  10|
|      90| Iron Maiden|2024-01-23 00:00:00|          4|       93|  11|
|     

In [31]:
# 4. For each customer, calculate the cumulative total of tracks they have purchased, 
# ranked by the total number of tracks purchased up to each invoice date. Display the 
# running total for each customer along with their ranking across all customers.

q4_windowdf = customer_df.alias("c")\
    .join(invoice_df.alias("i"), col("c.customerid") == col("i.customerid"), "inner")\
    .join(invoice_line_df.alias("il"), col("i.invoiceid") == col("il.invoiceid"), "inner")\
    .join(track_df.alias("t"), col("il.trackid") == col("t.trackid"), "inner")\
    .groupBy(col("c.customerid"), col("i.invoicedate"), col("i.total"))\
    .agg(count("t.trackid").alias("track_count"))\
    .withColumn("cumulative_sum", sum(col("track_count")).over(Window.partitionBy("c.customerid"))) \
    .withColumn("track_purchase_rank", dense_rank().over(Window.partitionBy("i.invoicedate").orderBy("track_count")))\
    .withColumn("running_total", sum(col("i.total")).over(Window.partitionBy(col("c.customerid"))))\
    .withColumn("running_total_rank", dense_rank().over(Window.orderBy(col("running_total"))))

In [32]:
q4_windowdf.printSchema()

root
 |-- customerid: integer (nullable = true)
 |-- invoicedate: timestamp (nullable = true)
 |-- total: decimal(10,2) (nullable = true)
 |-- track_count: long (nullable = false)
 |-- cumulative_sum: long (nullable = true)
 |-- track_purchase_rank: integer (nullable = false)
 |-- running_total: decimal(20,2) (nullable = true)
 |-- running_total_rank: integer (nullable = false)



In [5]:
#spark.stop()