# Create SparkSession in Apache Spark

In [2]:
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession.builder.appName("Spark-KPIs").getOrCreate()


In [5]:
spark

## Loading songs data


In [6]:
songs_file_path = "/content/songs.csv"
songs_df = spark.read.csv(songs_file_path, header=True, inferSchema=True)
songs_df.printSchema()
songs_df.show(20)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/songs.csv.

## Data Cleaning


In [None]:
songs_df.count()

In [None]:
songs_df.describe().show()

In [None]:
#check for missing values
from pyspark.sql.functions import col, isnan, sum, count

songs_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in songs_df.columns]).show()
# songs_df.agg(*[count(col(c).isNull().cast("int")).alias(c) for c in songs_df.columns]).show()
# songs_df.select([col(c).isNull().alias(c) for c in songs_df.columns]).show()


In [None]:
# Removing rows with null values
songs_df = songs_df.dropna()
songs_df.count()

In [None]:
# Removing Duplicate rows
songs_df_unique = songs_df.dropDuplicates()
songs_df_unique.count()


In [None]:
# df_casted = df.withColumn("Age", df["Age"].cast("Integer"))
# df_casted.printSchema()

## Loading user data

In [None]:
users_file_path = "/content/users.csv"
users_df = spark.read.csv(users_file_path, header=True, inferSchema=True)
users_df.printSchema()
users_df.show(20)

In [None]:
users_df.count()

In [None]:
users_df.describe().show()

In [None]:
#check for missing values
from pyspark.sql.functions import col, isnan, sum

users_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in users_df.columns]).show()

In [None]:

# Removing Duplicate rows
users_df_unique = users_df.dropDuplicates()
users_df_unique.count()

# Loading stream data


In [None]:
streams1 = spark.read.csv("/content/streams1.csv", header=True, inferSchema=True)
streams2 = spark.read.csv("/content/streams2.csv", header=True, inferSchema=True)
streams3 = spark.read.csv("/content/streams3.csv", header=True, inferSchema=True)
streams3.printSchema()

In [None]:
#concat the streams data
streams = streams1.union(streams2).union(streams3)
streams.show(20)

In [None]:
# rename the first track_id column to avoid ambiguity
streams = streams.withColumnRenamed("track_id", "track_id_1")


In [None]:
streams.describe().show()

## Joining song data with streams and user data

In [None]:
song_user_streams = songs_df.join(streams, songs_df.track_id == streams.track_id_1, 'inner') \
                  .join(users_df, streams.user_id == users_df.user_id, 'inner') \
                  .select(songs_df['*'],
                         streams.user_id.alias('stream_user_id'),
                         streams.track_id_1,
                         streams.listen_time,
                         users_df.user_id.alias('user_user_id'),
                         users_df.user_name,
                         users_df.user_age,
                         users_df.user_country,
                         users_df.created_at)

song_user_streams.show()

In [None]:
song_user_streams.describe().show()

In [None]:
song_user_streams.columns

# Computing KPIs

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, count, countDistinct, desc, rank

In [None]:
listen_count = song_user_streams.groupBy("track_genre", to_date("created_at").alias("date")) \
                 .agg(count("track_id").alias("listen_count"))
listen_count.show()

In [None]:
unique_listeners = song_user_streams.groupBy("track_genre", to_date("created_at").alias("date")) \
                     .agg(countDistinct("user_user_id").alias("unique_listeners"))
unique_listeners.show()

In [None]:
total_listening_time = song_user_streams.groupBy("track_genre", to_date("created_at").alias("date")) \
                         .agg(sum("duration_ms").alias("total_listening_time"))
total_listening_time.show()

In [None]:
avg_listening_time_per_user = total_listening_time.join(unique_listeners, ["track_genre", "date"]) \
                                                  .withColumn("avg_listening_time_per_user",
                                                              col("total_listening_time") / col("unique_listeners"))
avg_listening_time_per_user.show()

In [None]:
# Step 2: Compute listen count per song per genre per day
song_listen_count = song_user_streams.groupBy(
    to_date("created_at").alias("date"),
    "track_genre",
    "track_name"
).agg(countDistinct("track_id").alias("listen_count"))


song_rank_window = Window.partitionBy("date", "track_genre").orderBy(desc("listen_count"))

top_songs_per_genre = song_listen_count.withColumn("rank", rank().over(song_rank_window)) \
                                     .filter(col("rank") <= 3)

top_songs_per_genre.show()

In [None]:
# top_songs_per_genre = song_user_streams.groupBy(
#     to_date("created_at").alias("date"),
#     "track_name",
#     "track_genre"
# ).agg(countDistinct("track_id").alias("listen_count"))\
#  .withColumn("rank", rank().over(Window.partitionBy("date").orderBy(desc("listen_count"))))\
#         .filter(col("rank") <= 3)

# top_songs_per_genre.show()

In [None]:

song_listen_count = song_user_streams.groupBy(
    to_date("created_at").alias("date"), "track_genre", "track_name"
).agg(
    countDistinct("track_id").alias("listen_count")  # Count the number of plays per song
)

# Step 2: Define ranking window partitioned by date & genre, ordered by listen count
rank_window = Window.partitionBy("date", "track_genre").orderBy(desc("listen_count"))

# Step 3: Rank songs and filter for the top 3 per genre per day
top_songs_per_genre = song_listen_count.withColumn("rank", rank().over(rank_window)) \
                                       .filter(col("rank") <= 3)

# Step 4: Show results
top_songs_per_genre.show()

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


# Step 1: Compute listen count per genre per day
genre_listen_count = song_user_streams.groupBy(
    F.to_date("created_at").alias("date"),
    "track_genre"
).agg(F.count("track_id").alias("genre_listen_count"))

# Step 2: Define a ranking window for top genres per day
genre_rank_window = Window.partitionBy("date").orderBy(F.desc("genre_listen_count"))

# Step 3: Apply ranking and filter for the top 5 genres per day
top_genres_per_day = genre_listen_count.withColumn("5_gen_rank", F.rank().over(genre_rank_window)) \
                                       .filter(F.col("5_gen_rank") <= 5)

top_genres_per_day.show()