In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("ETL") \
    .config("spark.sql.sources.header", "true") \
    .config("spark.sql.sources.inferSchema", "true") \
    .getOrCreate()

In [None]:
song_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("mode", "PERMISSIVE") \
    .csv("/content/songs.csv")
user_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/users.csv")

In [None]:
streams1 = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/streams1.csv")
streams2 = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/streams2.csv")
streams3 = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/streams3.csv")

In [None]:
merged_streams = streams1.union(streams2).union(streams3)

In [None]:
merged_streams.show()

+-------+--------------------+-------------------+
|user_id|            track_id|        listen_time|
+-------+--------------------+-------------------+
|  26213|4dBa8T7oDV9WvGr7k...|2024-06-25 17:43:13|
|   6937|4osgfFTICMkcGbbig...|2024-06-25 07:26:00|
|  21407|2LoQWx41KeqOrSFra...|2024-06-25 13:25:26|
|  47146|7cfG5lFeJWEgpSnub...|2024-06-25 18:17:50|
|  38594|6tilCYbheGMHo3Hw4...|2024-06-25 17:33:21|
|  14209|2QuOheWJqShIBIYC1...|2024-06-25 02:52:20|
|  26986|6qBSGvyUzqNQv8Xtn...|2024-06-25 22:32:51|
|   8173|1wXSL0SAzd7mX0LM8...|2024-06-25 11:59:10|
|  12950|0L7Nv6ToXLRAWId4e...|2024-06-25 17:54:30|
|   2898|7tnE9vy6FCRtbZql5...|2024-06-25 18:30:31|
|   5873|5QAzf7c3dNfcuXkvx...|2024-06-25 14:48:02|
|  30933|4cJhBmeJ7KiBeuy7o...|2024-06-25 14:07:00|
|   8892|0bcy0hFL8G0IfV0mp...|2024-06-25 14:58:10|
|  28613|6tDtuaCX0sIea6o8Q...|2024-06-25 20:50:59|
|  13663|5G67YWTwKKRBkk1EH...|2024-06-25 04:22:32|
|  21063|1OAkkP5JGtwDZbsE6...|2024-06-25 14:59:09|
|  48747|2goJK85OkKFSLkv7k...|2

In [None]:
display(merged_streams.printSchema())
display(song_df.printSchema())
display(user_df.printSchema())

root
 |-- user_id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- listen_time: timestamp (nullable = true)



None

root
 |-- id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: integer (nullable = true)
 |-- track_genre: string (nullable = true)



None

root
 |-- user_id: integer (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_age: integer (nullable = true)
 |-- user_country: string (nullable = true)
 |-- created_at: date (nullable = true)



None

In [None]:
# Columns to keep for each file
song_columns = ["track_id", "track_name", "duration_ms", "track_genre"]
user_columns = ["user_id"]
streams_columns = ["user_id", "track_id", "listen_time"]

In [None]:
# Drop columns
song_df = song_df.select(song_columns)
user_df = user_df.select(user_columns)
merged_streams = merged_streams.select(streams_columns)

In [None]:
display(merged_streams.printSchema())
display(song_df.printSchema())
display(user_df.printSchema())

root
 |-- user_id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- listen_time: timestamp (nullable = true)



None

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- track_genre: string (nullable = true)



None

root
 |-- user_id: integer (nullable = true)



None

In [None]:
# Merge files for computation
merged_df = merged_streams.join(song_df, "track_id", "left").join(user_df, "user_id", "left")
display(merged_df.printSchema())
display(merged_df.show())

root
 |-- user_id: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- listen_time: timestamp (nullable = true)
 |-- track_name: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- track_genre: string (nullable = true)



None

+-------+--------------------+-------------------+--------------------+-----------+-----------------+
|user_id|            track_id|        listen_time|          track_name|duration_ms|      track_genre|
+-------+--------------------+-------------------+--------------------+-----------+-----------------+
|  26213|4dBa8T7oDV9WvGr7k...|2024-06-25 17:43:13|Don't Let the Lig...|     229517|             rock|
|   6937|4osgfFTICMkcGbbig...|2024-06-25 07:26:00|            Novacane|     302346|             soul|
|  21407|2LoQWx41KeqOrSFra...|2024-06-25 13:25:26|           Harpooned|      47160|        grindcore|
|  47146|7cfG5lFeJWEgpSnub...|2024-06-25 18:17:50|The Forgotten Kin...|     334599|      heavy-metal|
|  38594|6tilCYbheGMHo3Hw4...|2024-06-25 17:33:21| Short Dressed Woman|     252133|            piano|
|  14209|2QuOheWJqShIBIYC1...|2024-06-25 02:52:20|The Right Time - ...|     233560|        power-pop|
|  26986|6qBSGvyUzqNQv8Xtn...|2024-06-25 22:32:51|Everybody Have Fu...|     287360

None

In [None]:
# Perform transformations and cleaning before compuations

# drop null values
merged_df = merged_df.na.drop()

# drop duplicate
merged_df = merged_df.dropDuplicates()

# # strip """ from
# merged_df = merged_df.withColumn("track_name", F.regexp_replace("track_name", r'(^\"|\"$)', ""))

# # strip whitespace
# merged_df = merged_df.withColumn("track_name", F.trim(merged_df["track_name"]))

# # replace / with ,
# merged_df = merged_df.withColumn("track_genre", F.regexp_replace("track_genre", "/", ","))

# # set max space to 1
# merged_df = merged_df.withColumn("track_genre", F.regexp_replace("track_genre", "  ", " "))

 Listen Count: Total number of times tracks in a genre have been played in
a day.
▪ Unique Listeners: Distinct users who streamed a track in a given genre
per day.
▪ Total Listening Time: The cumulative listening time for tracks in a genre
per day.
▪ Average Listening Time per User: The mean listening duration per user
per day.
▪ Top 3 Songs per Genre per Day: The most played songs in each genre
daily.
▪ Top 5 Genres per Day: The five most popular genres based on listen
count per day.

In [None]:
# Start computations
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# Add listen_date from listen_time
merged_df = merged_df.withColumn("listen_date", merged_df["listen_time"].cast("date"))

# Calculate duration in minutes
merged_df = merged_df.withColumn("duration_minutes", merged_df["duration_ms"] / 60000)

# Compute listen_count and total_duration
daily_genre_metrics = merged_df.groupBy("listen_date", "track_genre").agg(
    # Listen Count: Total number of times tracks in a genre have been played in a day.
    F.count("*").alias("listen_count"),

    # Unique Listeners: Distinct users who streamed a track in a given genre per day
    F.countDistinct("user_id").alias("unique_listeners"),

    # Total Listening Time: The cumulative listening time for tracks in a genre per day
    F.round(F.sum("duration_minutes"), 2).alias("total_duration_minutes")
)

# Calculate Average Listening Time per User
daily_genre_metrics = daily_genre_metrics.withColumn(
    "avg_listening_time_per_user",
    F.round(F.col("total_duration_minutes") / F.col("unique_listeners"), 2)
)

In [None]:
# Calculate Top 3 Songs per Genre per Day
song_rank_window = Window.partitionBy("listen_date", "track_genre").orderBy(F.desc("track_listen_count"))

song_plays = merged_df.groupBy("listen_date", "track_genre", "track_name").agg(
    F.count("*").alias("track_listen_count")
)
song_plays_rank = song_plays.withColumn("rank", F.row_number().over(song_rank_window))
top_3_songs_per_genre = song_plays_rank.filter(F.col("rank") <= 3)
# top_3_songs_per_genre = top_3_songs_per_genre.drop("rank")
top_3_songs_per_genre.show()

+-----------+-----------+--------------------+------------------+----+
|listen_date|track_genre|          track_name|track_listen_count|rank|
+-----------+-----------+--------------------+------------------+----+
| 2024-06-25|   acoustic|You and Me on the...|                 5|   1|
| 2024-06-25|   acoustic|   Winter Wonderland|                 4|   2|
| 2024-06-25|   acoustic|             DBS Out|                 3|   3|
| 2024-06-25|   afrobeat|              Makoti|                 4|   1|
| 2024-06-25|   afrobeat|      Expensive Shit|                 4|   2|
| 2024-06-25|   afrobeat|             Raminho|                 3|   3|
| 2024-06-25|   alt-rock|                Sway|                 4|   1|
| 2024-06-25|   alt-rock|           Mr. Jones|                 4|   2|
| 2024-06-25|   alt-rock|Symphony Of Destr...|                 3|   3|
| 2024-06-25|alternative|             abcdefu|                 6|   1|
| 2024-06-25|alternative|         Living Hell|                 5|   2|
| 2024

In [None]:
# Calculate Top 5 Genres per Day
genre_rank_window = Window.partitionBy("listen_date").orderBy(F.desc("listen_count"))
top_5_genres_per_day = daily_genre_metrics.withColumn("rank", F.row_number().over(genre_rank_window))
top_5_genres_per_day = top_5_genres_per_day.filter(F.col("rank") <= 5)
# top_5_genres_per_day = top_5_genres_per_day.drop("rank")
top_5_genres_per_day.show()

+-----------+-----------+------------+----------------+----------------------+---------------------------+----+
|listen_date|track_genre|listen_count|unique_listeners|total_duration_minutes|avg_listening_time_per_user|rank|
+-----------+-----------+------------+----------------+----------------------+---------------------------+----+
| 2024-06-25|   children|         418|             417|                963.64|                       2.31|   1|
| 2024-06-25|      anime|         415|             413|               1453.86|                       3.52|   2|
| 2024-06-25|     disney|         413|             409|               1104.66|                        2.7|   3|
| 2024-06-25|   cantopop|         407|             406|               1526.08|                       3.76|   4|
| 2024-06-25|      happy|         395|             391|               1603.89|                        4.1|   5|
+-----------+-----------+------------+----------------+----------------------+--------------------------

In [None]:
# Round values to two decimal place
daily_genre_metrics.show()
# print(f"Shape: {daily_genre_metrics.count()} rows, {len(daily_genre_metrics.columns)} columns")

+-----------+-----------+------------+----------------+----------------------+---------------------------+
|listen_date|track_genre|listen_count|unique_listeners|total_duration_minutes|avg_listening_time_per_user|
+-----------+-----------+------------+----------------+----------------------+---------------------------+
| 2024-06-25|      metal|          94|              94|                407.87|                       4.34|
| 2024-06-25|death-metal|         367|             364|               1520.13|                       4.18|
| 2024-06-25|  sertanejo|         329|             328|               1107.23|                       3.38|
| 2024-06-25|      tango|         390|             389|               1205.77|                        3.1|
| 2024-06-25|  hardstyle|         315|             313|               1154.38|                       3.69|
| 2024-06-25|     guitar|         332|             329|               1284.39|                        3.9|
| 2024-06-25|      indie|          46

In [None]:
!pip install boto3

In [None]:
# Save kpis to dynamoDB
import boto3
from botocore.config import Config
from decimal import Decimal
import os

os.environ["AWS_ACCESS_KEY_ID"] = ""
os.environ["AWS_SECRET_ACCESS_KEY"] = ""
os.environ["AWS_DEFAULT_REGION"] = ""

dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
table = dynamodb.Table('daily-genre-kpis')

# partition key genre::date (String)
# sort key metric-type (String)

def prepare_data_for_dynamo(df):
    data = []
    for row in df.collect():
        data.append({
            'genre::date': f"{row['track_genre']}::{row['listen_date']}",  # ✅ Combined partition key
            'metric-type': 'daily-genre-kpis',  # ✅ Add the sort key
            'genre': row['track_genre'],
            'listen_count': Decimal(str(row['listen_count'])),  # ✅ Convert float/int to Decimal
            'unique_listeners': Decimal(str(row['unique_listeners'])),
            'total_duration_minutes': Decimal(str(row['total_duration_minutes'])),
            'avg_listening_time_per_user': Decimal(str(row['avg_listening_time_per_user']))
        })
    return data



data = prepare_data_for_dynamo(daily_genre_metrics)

for item in data:
    table.put_item(Item=item)


In [None]:
top_3_songs_per_genre.show(2)

+-----------+-----------+--------------------+------------------+----+
|listen_date|track_genre|          track_name|track_listen_count|rank|
+-----------+-----------+--------------------+------------------+----+
| 2024-06-25|   acoustic|You and Me on the...|                 5|   1|
| 2024-06-25|   acoustic|   Winter Wonderland|                 4|   2|
+-----------+-----------+--------------------+------------------+----+
only showing top 2 rows



In [None]:
grouped_df_top3 = top_3_songs_per_genre.groupBy("listen_date", "track_genre").agg(
    F.collect_list("track_name").alias("top_3_songs")
)

display(grouped_df_top3.printSchema())
display(grouped_df_top3.show())

root
 |-- listen_date: date (nullable = true)
 |-- track_genre: string (nullable = true)
 |-- top_3_songs: array (nullable = false)
 |    |-- element: string (containsNull = false)



None

+-----------+-------------+------------------------+
|listen_date|  track_genre|             top_3_songs|
+-----------+-------------+------------------------+
| 2024-06-25|     acoustic|    [You and Me on th...|
| 2024-06-25|     afrobeat|    [Makoti, Expensiv...|
| 2024-06-25|     alt-rock|    [Sway, Mr. Jones,...|
| 2024-06-25|  alternative|    [abcdefu, Living ...|
| 2024-06-25|      ambient|    [Reverie, What Ge...|
| 2024-06-25|        anime|[Mind brand, 自閉円頓...|
| 2024-06-25|  black-metal|    [The Death of Lov...|
| 2024-06-25|    bluegrass|    [Long Journey Hom...|
| 2024-06-25|        blues|    [Run Rudolph Run,...|
| 2024-06-25|       brazil|    [Até Quando Esper...|
| 2024-06-25|    breakbeat|    [Electro Glide In...|
| 2024-06-25|      british|    [Paper Thin, Gree...|
| 2024-06-25|     cantopop|      [想你, 白玫瑰, 海]|
| 2024-06-25|chicago-house|    [Wanna Give It Up...|
| 2024-06-25|     children|    [The Chipmunk Son...|
| 2024-06-25|        chill|    [Midnight River (...|
| 2

None

In [None]:
def prepare_data_for_dynamo_top_3(df):
  data = []
  for row in df.collect():
    data.append({
        'genre::date': f"{row['track_genre']}::{row['listen_date']}",  # ✅ Combined partition key
        'metric-type': 'top-3-songs',  # ✅ Add the sort key
        'genre': row['track_genre'],
        'top_3_songs': row['top_3_songs']
    })
  return data

data = prepare_data_for_dynamo_top_3(grouped_df_top3)

# data[:2]
for item in data:
    table.put_item(Item=item)

In [None]:
# save top 5 songs
grouped_df_top5 = top_5_genres_per_day.groupBy("listen_date").agg(
    F.collect_list("track_genre").alias("top_5_genres")
)

display(grouped_df_top5.printSchema())
display(grouped_df_top5.show())

root
 |-- listen_date: date (nullable = true)
 |-- top_5_genres: array (nullable = false)
 |    |-- element: string (containsNull = false)



None

+-----------+--------------------+
|listen_date|        top_5_genres|
+-----------+--------------------+
| 2024-06-25|[children, anime,...|
+-----------+--------------------+



None

In [None]:
def prepare_data_for_dynamo_top_5(df):
  data = []
  for row in df.collect():
    data.append({
        'genre::date': f"top5::{row['listen_date']}",  # ✅ Combined partition key
        'metric-type': 'top-5-genres',
        'top_5_genres': row['top_5_genres']
    })
  return data

data = prepare_data_for_dynamo_top_5(grouped_df_top5)

# data[:2]
for item in data:
    table.put_item(Item=item)