### Installing importing the necessary packages

In [None]:
# Install Pyspark
!pip install pyspark



In [7]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, max, countDistinct, sum as _sum
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import collect_list, concat_ws

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("MusicStreamingKPIs").getOrCreate()

## Loading the data

In [3]:
# Load the streams, users, and songs data
streams_df = spark.read.csv("drive/MyDrive/Colab Notebooks/data/data/streams/streams1.csv", header=True, inferSchema=True)
users_df = spark.read.csv("drive/MyDrive/Colab Notebooks/data/data/users/users.csv", header=True, inferSchema=True)
songs_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("mode", "PERMISSIVE") \
    .csv("drive/MyDrive/Colab Notebooks/data/data/songs/songs.csv")

# Show the schema and some sample data
streams_df.printSchema()
streams_df.show(5)

users_df.printSchema()
users_df.show(5)

songs_df.printSchema()
songs_df.show(5)

root
 |-- user_id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- listen_time: timestamp (nullable = true)

+-------+--------------------+-------------------+
|user_id|            track_id|        listen_time|
+-------+--------------------+-------------------+
|  26213|4dBa8T7oDV9WvGr7k...|2024-06-25 17:43:13|
|   6937|4osgfFTICMkcGbbig...|2024-06-25 07:26:00|
|  21407|2LoQWx41KeqOrSFra...|2024-06-25 13:25:26|
|  47146|7cfG5lFeJWEgpSnub...|2024-06-25 18:17:50|
|  38594|6tilCYbheGMHo3Hw4...|2024-06-25 17:33:21|
+-------+--------------------+-------------------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_age: integer (nullable = true)
 |-- user_country: string (nullable = true)
 |-- created_at: date (nullable = true)

+-------+----------------+--------+-------------+----------+
|user_id|       user_name|user_age| user_country|created_at|
+-------+----------------+--------+-------------+-

## EDA

In [4]:
# Checking for missing values

print("Streams missing values:")
streams_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in streams_df.columns]).show()

print("Users missing values:")
users_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in users_df.columns]).show()

print("Songs missing values:")
songs_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in songs_df.columns]).show()

Streams missing values:
+-------+--------+-----------+
|user_id|track_id|listen_time|
+-------+--------+-----------+
|      0|       0|          0|
+-------+--------+-----------+

Users missing values:
+-------+---------+--------+------------+----------+
|user_id|user_name|user_age|user_country|created_at|
+-------+---------+--------+------------+----------+
|      0|        0|       0|           0|         0|
+-------+---------+--------+------------+----------+

Songs missing values:
+---+--------+-------+----------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+
| id|track_id|artists|album_name|track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|track_genre|
+---+--------+-------+----------+----------+----------+-----------+--------+------------+------+

In [5]:
# Replace the missing values in songs with "Unknown"
songs_df = songs_df.na.fill("Unknown")

print("Songs missing values:")
songs_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in songs_df.columns]).show()

Songs missing values:
+---+--------+-------+----------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+
| id|track_id|artists|album_name|track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|track_genre|
+---+--------+-------+----------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+
|  0|       0|      0|         0|         0|         0|          0|       0|           0|     0|  0|       0|   0|          0|           0|               0|       0|      0|    0|             0|          0|
+---+--------+-------+----------+----------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+-----------

## Transformation and KPI's Computations

In [6]:
# Merging songs and steams data and converting listen_time to tinestamp to extract the date

merged_df = streams_df.join(songs_df, on="track_id", how="left")
merged_df = merged_df.withColumn("listen_time", F.to_timestamp(col("listen_time")))
merged_df = merged_df.withColumn("date", F.to_date(col("listen_time")))
merged_df.show(5)
merged_df.printSchema()

+--------------------+-------+-------------------+------+-------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+----------+
|            track_id|user_id|        listen_time|    id|            artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|      date|
+--------------------+-------+-------------------+------+-------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+----------+
|2QuOheWJqShIBIYC1...|  14209|2024-06-25 02:52:20| 82146|       Hoodoo Gurus|               Crank|The Right Time - 

In [20]:
# Compute daily genre-level KPIs
genre_kpis = merged_df.groupBy("date", "track_genre").agg(
    count("track_id").alias("listen_count"),  # Total listen count per genre per day
    countDistinct("user_id").alias("unique_listeners"),  # Unique listeners per genre per day
    _sum("duration_ms").alias("total_listening_time"),  # Total listening time per genre per day
    mean("duration_ms").alias("avg_listening_time_per_user")  # Average listening time per user per day
)

# Compute the most popular songs per genre per day
song_per_genre_rank_window = Window.partitionBy("date", "track_genre").orderBy(F.desc("track_listen_count"))

song_plays = merged_df.groupBy("date", "track_genre", "track_name").agg(
    count("track_id").alias("track_listen_count")
)

song_per_genre_rank = song_plays.withColumn("rank", F.row_number().over(song_per_genre_rank_window))
top_3_songs_per_genre_per_day = song_per_genre_rank.filter(col("rank") <= 3).orderBy("date", "track_genre", "rank")

# top_3_songs_per_genre_per_day.show()

# Aggregate the top 3 songs per genre per day into a single column
top_3_songs_aggregated = top_3_songs_per_genre_per_day.groupBy("date", "track_genre").agg(
    concat_ws("|", collect_list("track_name")).alias("top_3_songs_per_genre")  # Concatenate track names with "|"
)

# Join the aggregated top 3 songs with genre_kpis
genre_kpis_with_top_songs = genre_kpis.join(
    top_3_songs_aggregated,
    on=["date", "track_genre"],
    how="left"
)

# Compute the top 5 genres per day based on listen count
top_5_genre = Window.partitionBy("date").orderBy(F.desc("listen_count"))

top_5_genres_per_day = genre_kpis.withColumn("rank", F.row_number().over(top_5_genre)) \
    .filter(col("rank") <= 5).orderBy("date", "rank")
    # .select("date", "rank", "track_genre", "listen_count")

# Aggregating the top 5 genre per day into a single column
top_5_genres_aggregated = top_5_genres_per_day.groupBy("date").agg(
    concat_ws("|", collect_list("track_genre")).alias("top_5_genres_per_day")
)

# Join the aggregated top 5 genre with genre_kpis
complete_genre_kpis = genre_kpis_with_top_songs.join(
    top_5_genres_aggregated,
    on="date",
    how="left"
)

# Order by date asc and listen_count desc
complete_genre_kpis = complete_genre_kpis.orderBy("date", F.desc("listen_count"))

# Show the final genre_kpis with top 3 songs
complete_genre_kpis.show()

+----------+--------------+------------+----------------+--------------------+---------------------------+-------------------------+--------------------+
|      date|   track_genre|listen_count|unique_listeners|total_listening_time|avg_listening_time_per_user|    top_3_songs_per_genre|top_5_genres_per_day|
+----------+--------------+------------+----------------+--------------------+---------------------------+-------------------------+--------------------+
|2024-06-25|         dance|         150|             150|            29195954|         194639.69333333333|     Slidin'|Monster|L...|dance|black-metal...|
|2024-06-25|   black-metal|         148|             148|            44607204|         301400.02702702704|     Towards Forgotten...|dance|black-metal...|
|2024-06-25|         anime|         148|             146|            30483930|                   205972.5|     Attention, please...|dance|black-metal...|
|2024-06-25|         malay|         142|             141|            3620586