In [53]:
# import the PySpark module
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [54]:
# initialize the SparkContext
spark

In [55]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [56]:
# read the songs data from the CSV file
songs = spark.read.csv('../data/songs/songs.csv', header=True, inferSchema=True)

In [57]:
songs.show(10)

+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
| id|            track_id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|
+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|  0|5SuOikwiRyPMVoIQD...|         Gen Hoshino|              Comedy|              Comedy|        73|     230666|   False|       0.676| 0.461|  1|  -6.746|   0|      0.143|      0.0322|         1.01E-6|   0.358|  0.715| 87.917|           4

In [58]:
# read the songs data from the CSV file
users = spark.read.csv('../data/users/users.csv', header=True, inferSchema=True)

In [59]:
users.show(10)

+-------+-----------------+--------+-------------+----------+
|user_id|        user_name|user_age| user_country|created_at|
+-------+-----------------+--------+-------------+----------+
|      1|     Norma Fisher|      65|United States|2024-02-07|
|      2|   Jorge Sullivan|      28|United States|2024-11-28|
|      3|  Elizabeth Woods|      19|United States|2024-11-16|
|      4|     Susan Wagner|      45|United States|2024-06-14|
|      5| Peter Montgomery|      61|United States|2024-07-24|
|      6| Theodore Mcgrath|      58|United States|2024-12-12|
|      7|Stephanie Collins|      68|United States|2024-04-16|
|      8| Stephanie Sutton|      53|United States|2024-05-04|
|      9|   Brian Hamilton|      34|United States|2024-09-15|
|     10|       Susan Levy|      18|United States|2024-09-08|
+-------+-----------------+--------+-------------+----------+
only showing top 10 rows



In [60]:
# read the streams data from the CSV file
streams1 = spark.read.csv('../data/streams/streams1.csv', header=True, inferSchema=True)
streams2 = spark.read.csv('../data/streams/streams2.csv', header=True, inferSchema=True)
streams3 = spark.read.csv('../data/streams/streams3.csv', header=True, inferSchema=True)

In [61]:
# concat the streams data
streams = streams1.union(streams2).union(streams3)
streams.show(10)


+-------+--------------------+-------------------+
|user_id|            track_id|        listen_time|
+-------+--------------------+-------------------+
|  26213|4dBa8T7oDV9WvGr7k...|2024-06-25 17:43:13|
|   6937|4osgfFTICMkcGbbig...|2024-06-25 07:26:00|
|  21407|2LoQWx41KeqOrSFra...|2024-06-25 13:25:26|
|  47146|7cfG5lFeJWEgpSnub...|2024-06-25 18:17:50|
|  38594|6tilCYbheGMHo3Hw4...|2024-06-25 17:33:21|
|  14209|2QuOheWJqShIBIYC1...|2024-06-25 02:52:20|
|  26986|6qBSGvyUzqNQv8Xtn...|2024-06-25 22:32:51|
|   8173|1wXSL0SAzd7mX0LM8...|2024-06-25 11:59:10|
|  12950|0L7Nv6ToXLRAWId4e...|2024-06-25 17:54:30|
|   2898|7tnE9vy6FCRtbZql5...|2024-06-25 18:30:31|
+-------+--------------------+-------------------+
only showing top 10 rows



In [62]:
# rename the first track_id column to avoid ambiguity
streams = streams.withColumnRenamed("track_id", "track_id_1")
streams = streams.withColumnRenamed("user_id", "user_id_1")

In [63]:
streams.describe().show()

+-------+------------------+--------------------+
|summary|         user_id_1|          track_id_1|
+-------+------------------+--------------------+
|  count|             34038|               34038|
|   mean|24934.808420001176|                NULL|
| stddev|14444.125256146368|                NULL|
|    min|                 3|0000vdREvCVMxbQTk...|
|    max|             49999|7zxpdh3EqMq2JCkOI...|
+-------+------------------+--------------------+



In [64]:
songs.describe().show()

+-------+-----------------+--------------------+------------------+----------------------------+--------------+------------------+--------------------+------------------+-----------------+--------------------+-----------------+------------------+----------------+------------------+-------------------+------------------+-----------------+-------------------+-----------------+------------------+------------------+
|summary|               id|            track_id|           artists|                  album_name|    track_name|        popularity|         duration_ms|          explicit|     danceability|              energy|              key|          loudness|            mode|       speechiness|       acousticness|  instrumentalness|         liveness|            valence|            tempo|    time_signature|       track_genre|
+-------+-----------------+--------------------+------------------+----------------------------+--------------+------------------+--------------------+-----------------

In [65]:
users.describe().show()

+-------+-----------------+--------------+------------------+-------------+
|summary|          user_id|     user_name|          user_age| user_country|
+-------+-----------------+--------------+------------------+-------------+
|  count|            50000|         50000|             50000|        50000|
|   mean|          25000.5|          NULL|          43.56998|         NULL|
| stddev|14433.90106658626|          NULL|14.996324902949087|         NULL|
|    min|                1|Aaron Alvarado|                18|    Australia|
|    max|            50000|   Zoe Walters|                69|United States|
+-------+-----------------+--------------+------------------+-------------+



In [80]:
data = streams.join(users, streams.user_id_1 == users.user_id, how='left').join(songs, streams.track_id_1 == songs.track_id, how='left')

In [82]:
data.describe().show()

+-------+------------------+--------------------+------------------+--------------+------------------+-------------+------------------+--------------------+------------------+----------------------------+--------------+------------------+--------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+
|summary|         user_id_1|          track_id_1|           user_id|     user_name|          user_age| user_country|                id|            track_id|           artists|                  album_name|    track_name|        popularity|         duration_ms|          explicit|     danceability|            energy|               key|          loudness|              mode|        speechiness|      acousticness|   instrumentalness|         liveness|            valence|     

In [67]:
# Join the songs and users dataframes
song_users = songs.join(users, songs.id == users.user_id, how='left')

In [68]:
# Join the song_users and streams dataframes
song_users = song_users.join(streams, song_users.id == streams.user_id_1, how='left')

In [69]:
song_users.describe().show()

+-------+-----------------+--------------------+------------------+----------------------------+--------------+-----------------+--------------------+------------------+------------------+--------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+--------------+------------------+-------------+------------------+--------------------+
|summary|               id|            track_id|           artists|                  album_name|    track_name|       popularity|         duration_ms|          explicit|      danceability|              energy|              key|          loudness|              mode|       speechiness|       acousticness|  instrumentalness|         liveness|           valence|             tempo|    time_signature|       track_genre|           user_id|     user_name|          user_age| user

In [70]:
song_users.show(10)

+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+-------+-----------------+--------+-------------+----------+---------+--------------------+-------------------+
| id|            track_id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|user_id|        user_name|user_age| user_country|created_at|user_id_1|          track_id_1|        listen_time|
+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+-------+---------

In [71]:
song_users.columns

['id',
 'track_id',
 'artists',
 'album_name',
 'track_name',
 'popularity',
 'duration_ms',
 'explicit',
 'danceability',
 'energy',
 'key',
 'loudness',
 'mode',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'time_signature',
 'track_genre',
 'user_id',
 'user_name',
 'user_age',
 'user_country',
 'created_at',
 'user_id_1',
 'track_id_1',
 'listen_time']

In [72]:
listen_count = song_users.groupBy("track_genre", F.to_date("created_at").alias("date")) \
                 .agg(F.count("track_id").alias("listen_count"))
listen_count.show()

+-----------+----------+------------+
|track_genre|      date|listen_count|
+-----------+----------+------------+
|   acoustic|2024-05-20|           7|
|   acoustic|2024-08-13|           2|
|   afrobeat|2024-11-23|           2|
|   alt-rock|2024-08-26|           3|
|   alt-rock|2024-09-25|           7|
|   alt-rock|2024-06-03|           4|
|alternative|2024-10-31|           9|
|alternative|2024-09-04|           1|
|alternative|2024-02-11|           2|
|alternative|2024-01-18|           1|
|    ambient|2024-11-06|          10|
|    ambient|2024-08-08|           5|
|      anime|2024-11-02|           5|
|      anime|2024-04-09|           9|
|black-metal|2024-04-10|           6|
|  bluegrass|2024-03-14|           2|
|  bluegrass|2024-04-22|           3|
|    british|2024-10-11|           3|
|   cantopop|2024-02-26|           6|
|   cantopop|2024-04-11|           1|
+-----------+----------+------------+
only showing top 20 rows



In [73]:
song_users.columns

['id',
 'track_id',
 'artists',
 'album_name',
 'track_name',
 'popularity',
 'duration_ms',
 'explicit',
 'danceability',
 'energy',
 'key',
 'loudness',
 'mode',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'time_signature',
 'track_genre',
 'user_id',
 'user_name',
 'user_age',
 'user_country',
 'created_at',
 'user_id_1',
 'track_id_1',
 'listen_time']

In [74]:
unique_listeners = song_users.groupBy("track_genre", F.to_date("created_at").alias("date")) \
                     .agg(F.countDistinct("user_id_1").alias("unique_listeners"))
unique_listeners.show()

+-----------+----------+----------------+
|track_genre|      date|unique_listeners|
+-----------+----------+----------------+
|  bluegrass|2024-04-22|               1|
|      anime|2024-11-02|               3|
|   alt-rock|2024-09-25|               2|
|   afrobeat|2024-11-23|               1|
|     comedy|2024-02-26|               2|
|      anime|2024-04-09|               2|
|   acoustic|2024-05-20|               3|
|      disco|2024-09-23|               2|
|   children|2024-02-03|               1|
|    ambient|2024-08-08|               3|
|black-metal|2024-04-10|               1|
|  bluegrass|2024-03-14|               1|
|  classical|2024-03-12|               1|
|      dance|2024-04-11|               1|
| deep-house|2024-08-04|               1|
|   cantopop|2024-02-26|               2|
|    ambient|2024-11-06|               2|
|alternative|2024-10-31|               3|
|   acoustic|2024-08-13|               1|
|      chill|2024-11-07|               1|
+-----------+----------+----------

In [75]:
total_listening_time = song_users.groupBy("track_genre", F.to_date("created_at").alias("date")) \
                         .agg(F.sum("duration_ms").alias("total_listening_time"))
total_listening_time.show()

+-----------+----------+--------------------+
|track_genre|      date|total_listening_time|
+-----------+----------+--------------------+
|   acoustic|2024-05-20|           1549666.0|
|   acoustic|2024-08-13|            288280.0|
|   afrobeat|2024-11-23|            782559.0|
|   alt-rock|2024-08-26|            637030.0|
|   alt-rock|2024-09-25|           1702426.0|
|   alt-rock|2024-06-03|            948265.0|
|alternative|2024-10-31|           2189045.0|
|alternative|2024-09-04|            170771.0|
|alternative|2024-02-11|            287121.0|
|alternative|2024-01-18|            234760.0|
|    ambient|2024-11-06|           2821263.0|
|    ambient|2024-08-08|           1195224.0|
|      anime|2024-11-02|            724924.0|
|      anime|2024-04-09|           2037219.0|
|black-metal|2024-04-10|           1326182.0|
|  bluegrass|2024-03-14|            419593.0|
|  bluegrass|2024-04-22|            219000.0|
|    british|2024-10-11|            611801.0|
|   cantopop|2024-02-26|          

In [76]:
avg_listening_time_per_user = total_listening_time.join(unique_listeners, ["track_genre", "date"]) \
                                                  .withColumn("avg_listening_time_per_user", 
                                                              F.col("total_listening_time") / F.col("unique_listeners"))
avg_listening_time_per_user.show()

+-----------+----------+--------------------+----------------+---------------------------+
|track_genre|      date|total_listening_time|unique_listeners|avg_listening_time_per_user|
+-----------+----------+--------------------+----------------+---------------------------+
|  bluegrass|2024-04-22|            219000.0|               1|                   219000.0|
|      anime|2024-11-02|            724924.0|               3|         241641.33333333334|
|   alt-rock|2024-09-25|           1702426.0|               2|                   851213.0|
|   afrobeat|2024-11-23|            782559.0|               1|                   782559.0|
|     comedy|2024-02-26|           1303957.0|               2|                   651978.5|
|      anime|2024-04-09|           2037219.0|               2|                  1018609.5|
|   acoustic|2024-05-20|           1549666.0|               3|          516555.3333333333|
|      disco|2024-09-23|           1162248.0|               2|                   581124.0|

In [77]:
# Step 2: Compute listen count per song per genre per day
song_listen_count = song_users.groupBy( 
    F.to_date("listen_time").alias("date"), 
    "track_name",
    "track_genre"
).agg(F.countDistinct("track_id").alias("listen_count"))

# Step 3: Define ranking window for top songs per genre per day
song_rank_window = Window.partitionBy("date").orderBy(F.desc("listen_count"))

# Step 4: Rank songs and filter for the top 3 per genre per day
top_songs_per_genre = song_listen_count.withColumn("rank", F.rank().over(song_rank_window)) \
                                     .filter(F.col("rank") <= 3)

# Step 5: Show results
top_songs_per_genre.show()


+----------+--------------------+-----------+------------+----+
|      date|          track_name|track_genre|listen_count|rank|
+----------+--------------------+-----------+------------+----+
|      NULL|Rockin' Around Th...| rockabilly|          45|   1|
|      NULL|Little Saint Nick...| psych-rock|          38|   2|
|      NULL|Let It Snow! Let ...|       jazz|          32|   3|
|2024-06-25|     Run Rudolph Run|      blues|          15|   1|
|2024-06-25|  Frosty The Snowman|      blues|          12|   2|
|2024-06-25|           Last Last|      dance|          10|   3|
|2024-06-25|Cozy Little Chris...|      dance|          10|   3|
|2024-06-25|Devil Doesn't Bar...|    electro|          10|   3|
+----------+--------------------+-----------+------------+----+



In [78]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Ensure created_at exists and is correctly formatted
song_users = song_users.withColumn("created_at", F.to_timestamp("created_at"))

# Step 1: Compute listen count per genre per day
genre_listen_count = song_users.groupBy(
    "track_genre",
    F.to_date("listen_time").alias("date")  # Ensure conversion to date format
).agg(F.countDistinct("track_id").alias("listen_count"))

# Step 2: Define a ranking window for top genres per day
genre_rank_window = Window.partitionBy("date").orderBy(F.desc("listen_count"))

# Step 3: Apply ranking and filter for the top 5 genres per day
top_genres_per_day = genre_listen_count.withColumn("rank", F.rank().over(genre_rank_window)) \
                                       .filter(F.col("rank") <= 5)
top_genres_per_day.show()

+-------------+----------+------------+----+
|  track_genre|      date|listen_count|rank|
+-------------+----------+------------+----+
|        tango|      NULL|         999|   1|
|        study|      NULL|         998|   2|
|  heavy-metal|      NULL|         997|   3|
|        sleep|      NULL|         996|   4|
|       j-idol|      NULL|         995|   5|
|     cantopop|2024-06-25|         378|   1|
|        forro|2024-06-25|         377|   2|
|chicago-house|2024-06-25|         375|   3|
|       disney|2024-06-25|         373|   4|
|        chill|2024-06-25|         372|   5|
+-------------+----------+------------+----+



In [79]:
# Add a date column
song_users = song_users.withColumn("created_date", F.to_date("created_at"))

# 1. Daily Genre-Level KPIs
# Listen count, unique listeners, total listening time, average listening time per user
kpis = song_users.groupBy("created_date", "track_genre").agg(
    F.count("track_id").alias("listen_count"),
    F.countDistinct("user_id").alias("unique_listeners"),
    F.sum("duration_ms").alias("total_listening_time")
).withColumn(
    "avg_listening_time_per_user", 
    F.col("total_listening_time") / F.col("unique_listeners") / 1000  # converting ms to seconds
)

# 2. Top 3 Songs per Genre per Day
song_counts = song_users.groupBy("created_date", "track_genre", "track_name").agg(
    F.count("track_id").alias("listen_count")
)

song_rank_window = Window.partitionBy("created_date", "track_genre").orderBy(F.desc("listen_count"))

ranked_songs = song_counts.withColumn("rank", F.rank().over(song_rank_window)).filter(F.col("rank") <= 3)

# Aggregate top 3 songs into a single string per genre per day
top3_songs_per_genre = ranked_songs.groupBy("created_date", "track_genre").agg(
    F.concat_ws(", ", F.collect_list("track_name")).alias("top_3_songs")
)

# 3. Top 5 Genres per Day
genre_counts_per_day = song_users.groupBy("created_date", "track_genre").agg(
    F.count("track_id").alias("genre_listen_count")
)

genre_rank_window = Window.partitionBy("created_date").orderBy(F.desc("genre_listen_count"))

ranked_genres = genre_counts_per_day.withColumn("rank", F.rank().over(genre_rank_window)).filter(F.col("rank") <= 5)

# Aggregate top 5 genres into a single string per day
top5_genres_per_day = ranked_genres.groupBy("created_date").agg(
    F.concat_ws(", ", F.collect_list("track_genre")).alias("top_5_genres")
)

# Join all results
final_kpis = kpis \
    .join(top3_songs_per_genre, ["created_date", "track_genre"], "left") \
    .join(top5_genres_per_day, "created_date", "left")

# Show final daily genre-level KPIs
final_kpis.show(truncate=False)


+------------+--------------+------------+----------------+--------------------+---------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|created_date|track_genre   |listen_count|unique_listeners|total_listening_time|avg_listening_time_per_user|top_3_songs                                                                                                             |top_5_genres                                                                                |
+------------+--------------+------------+----------------+--------------------+---------------------------+------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|2024-08-07  |breakbeat     |6 

In [83]:
# Add a date column
data = data.withColumn("created_date", F.to_date("created_at"))

# 1. Daily Genre-Level KPIs
# Listen count, unique listeners, total listening time, average listening time per user
kpis = data.groupBy("created_date", "track_genre").agg(
    F.count("track_id").alias("listen_count"),
    F.countDistinct("user_id").alias("unique_listeners"),
    F.sum("duration_ms").alias("total_listening_time")
).withColumn(
    "avg_listening_time_per_user", 
    F.col("total_listening_time") / F.col("unique_listeners") / 1000  # converting ms to seconds
)

# 2. Top 3 Songs per Genre per Day
song_counts = data.groupBy("created_date", "track_genre", "track_name").agg(
    F.count("track_id").alias("listen_count")
)

song_rank_window = Window.partitionBy("created_date", "track_genre").orderBy(F.desc("listen_count"))

ranked_songs = song_counts.withColumn("rank", F.rank().over(song_rank_window)).filter(F.col("rank") <= 3)

# Aggregate top 3 songs into a single string per genre per day
top3_songs_per_genre = ranked_songs.groupBy("created_date", "track_genre").agg(
    F.concat_ws(", ", F.collect_list("track_name")).alias("top_3_songs")
)

# 3. Top 5 Genres per Day
genre_counts_per_day = data.groupBy("created_date", "track_genre").agg(
    F.count("track_id").alias("genre_listen_count")
)

genre_rank_window = Window.partitionBy("created_date").orderBy(F.desc("genre_listen_count"))

ranked_genres = genre_counts_per_day.withColumn("rank", F.rank().over(genre_rank_window)).filter(F.col("rank") <= 5)

# Aggregate top 5 genres into a single string per day
top5_genres_per_day = ranked_genres.groupBy("created_date").agg(
    F.concat_ws(", ", F.collect_list("track_genre")).alias("top_5_genres")
)

# Join all results
final_kpis = kpis \
    .join(top3_songs_per_genre, ["created_date", "track_genre"], "left") \
    .join(top5_genres_per_day, "created_date", "left")

# Show final daily genre-level KPIs
final_kpis.show(truncate=False)


+------------+-------------+------------+----------------+--------------------+---------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|created_date|track_genre  |listen_count|unique_listeners|total_listening_time|avg_listening_time_per_user|top_3_songs                                                                  |top_5_genres                                                                                                                                                                                            |
+------------+-------------+------------+----------------+--------------------+---------------------------+-----------------------------------------------------------------------------+-----------------------------------------