In [1]:
# Env check
import sys, os, platform
print("Python executable:", sys.executable)
print("Python version:", sys.version)
print("Current working directory:", os.getcwd())
print("Platform:", platform.platform())

Python executable: /opt/miniconda3/envs/music/bin/python
Python version: 3.12.11 | packaged by Anaconda, Inc. | (main, Jun  5 2025, 08:03:38) [Clang 14.0.6 ]
Current working directory: /Users/zodenath/Desktop/projects/listenbrainz-data-analysis/notebooks
Platform: macOS-15.6-arm64-arm-64bit


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as psf
from pyspark.sql.types import StructType, StructField, StringType, LongType

In [3]:
# Get Jar path needed for spark session
cwd = os.getcwd()
if cwd.endswith("notebooks"):
    proj_dir = os.path.abspath("..")
else:
    proj_dir = cwd
jar_dir = os.path.join(proj_dir, "jars")
jar = os.path.join(jar_dir, "postgresql-42.6.0.jar")
jar

'/Users/zodenath/Desktop/projects/listenbrainz-data-analysis/jars/postgresql-42.6.0.jar'

In [4]:
spark = SparkSession.builder \
                    .appName("Listenbrainz process") \
                    .config("spark.jars", jar) \
                    .config("spark.driver.bindAddress", "127.0.0.1") \
                    .config("spark.driver.host", "localhost") \
                    .getOrCreate()

25/08/10 21:43:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
#df = spark.read.json(f"{proj_dir}/data/bronze/dataset.txt")
#df.printSchema()

In [5]:
schema = StructType([
    StructField("listened_at", LongType(), True),
    StructField("recording_msid", StringType(), True),
    StructField("track_metadata", StructType([
        StructField("track_name", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("release_name", StringType(), True),
    ]), True),
    StructField("user_name", StringType(), True)
])

In [6]:
# Reading raw data from bronze storage layer as per required schema
df = spark.read \
    .schema(schema) \
    .json(f"{proj_dir}/data/bronze/dataset.txt")
df.show(5)

+-----------+--------------------+--------------------+---------+
|listened_at|      recording_msid|      track_metadata|user_name|
+-----------+--------------------+--------------------+---------+
| 1555286560|1e1b2aa0-b2db-42e...|{Love In the Time...|  NichoBI|
| 1555286378|283062c8-75e2-406...|{Cornflake, Withe...|  NichoBI|
| 1555286137|8fe0c93d-f44a-415...|{Providence, With...|  NichoBI|
| 1555246360|5998f0ac-2350-4ee...|{Boots of Spanish...|  NichoBI|
| 1555246191|e3912d35-54e4-421...|{Trouble, Cat Ste...|  NichoBI|
+-----------+--------------------+--------------------+---------+
only showing top 5 rows


In [7]:

df_flat = df.select(
    psf.col("user_name"),
    psf.from_unixtime("listened_at").cast("timestamp").alias("listened_at_ts"),
    psf.to_date(psf.from_unixtime("listened_at")).alias("listened_date"),
    psf.col("recording_msid"),
    psf.col("track_metadata.track_name").alias("track_name"),
    psf.col("track_metadata.artist_name").alias("artist_name"),
    psf.col("track_metadata.release_name").alias("release_name")
)


In [8]:
df_flat.printSchema()

root
 |-- user_name: string (nullable = true)
 |-- listened_at_ts: timestamp (nullable = true)
 |-- listened_date: date (nullable = true)
 |-- recording_msid: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- release_name: string (nullable = true)



In [9]:
# Sample Data Validation : based on Assumption that recording ID is of particular format (E.g UUID )
# 
uuid_regex = "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
# Filter rows where UUID is invalid ; created a sample Invalid record
df_invalid_uuid = df_flat.filter(~psf.col("recording_msid").rlike(uuid_regex))
print("Invalid recording IDs :")
df_invalid_uuid.select("recording_msid").distinct().show(truncate=False)



Invalid recording IDs :


[Stage 1:>                                                        (0 + 10) / 10]

+------------------------------------+
|recording_msid                      |
+------------------------------------+
|aba7cc6a-xxxx-4a27-88aa-c4c7c5fb5d83|
+------------------------------------+



                                                                                

In [10]:
df_flat = df_flat.filter(psf.col("recording_msid").rlike(uuid_regex))
df_final = df_flat.dropDuplicates(["user_name", "recording_msid", "listened_at_ts"])
df_final.show(10, truncate=False)



+--------------+-------------------+-------------+------------------------------------+------------------+--------------+---------------+
|user_name     |listened_at_ts     |listened_date|recording_msid                      |track_name        |artist_name   |release_name   |
+--------------+-------------------+-------------+------------------------------------+------------------+--------------+---------------+
|6d6f7274686f6e|2019-02-12 09:29:59|2019-02-12   |000a98d0-02ac-4206-afde-5c985a5e4dcc|Hypersleep        |Volkor X      |This Means War |
|6d6f7274686f6e|2019-03-01 20:52:41|2019-03-01   |009ed5a8-6bca-4c42-bbdc-66bd7c282c4b|Echoes of Divinity|George Kollias|INVICTUS       |
|6d6f7274686f6e|2019-01-26 23:44:52|2019-01-26   |00b970e6-6b69-4667-8c5b-f0f555d7e372|The First Supper  |Daughters     |Daughters      |
|6d6f7274686f6e|2019-01-29 09:30:21|2019-01-29   |00b970e6-6b69-4667-8c5b-f0f555d7e372|The First Supper  |Daughters     |Daughters      |
|6d6f7274686f6e|2019-02-03 12:39:1

                                                                                

In [11]:
df_final.count()

                                                                                

333033

### Write data to postgres database . staging tables

In [None]:
import yaml
with open("config/db_config.yaml", "r") as f:
    conf = yaml.safe_load(f)

postgres_url = f"jdbc:postgresql://{conf['host']}:{conf['port']}/{conf['database']}"
properties = {
    "user": conf["user"],
    "password": conf["password"],
    "driver": "org.postgresql.Driver"
}

In [16]:
df_final.write.jdbc(url=postgres_url, table="music.user_listens_staging", mode="overwrite", properties=properties)

                                                                                

### Answering Bi quries in Pysaprk
- Who are the top 10 users with respect to number of songs listened to?

In [15]:
top_10_users = (
    df_final.groupBy("user_name")
           .agg(psf.count("*").alias("songs_listened"))
           .orderBy(psf.desc("songs_listened"))
           .limit(10)
)
top_10_users.show()



+--------------+--------------+
|     user_name|songs_listened|
+--------------+--------------+
|           hds|         46885|
|       Groschi|         14959|
| Silent Singer|         13005|
|         phdnk|         12861|
|6d6f7274686f6e|         11544|
|      reverbel|          8398|
|    Cl�psHydra|          8318|
|InvincibleAsia|          7804|
|      cimualte|          7356|
|         inhji|          6349|
+--------------+--------------+



                                                                                

In [36]:
# Q. How many users did listen to some song on the 1st of March 2019?
users_march1 = (
    df_final.filter(psf.col("listened_date") == psf.lit("2019-03-01"))
           .select("user_name")
           .distinct()
           .count()
)
print(f"Number of users on 2019-03-01: {users_march1}")



Number of users on 2019-03-01: 76


                                                                                

In [37]:
# Q. For every user, what was the first song the user listened to?

from pyspark.sql import Window

w = Window.partitionBy("user_name").orderBy("listened_at_ts")

first_song_per_user = (
    df_final.withColumn("rn", psf.row_number().over(w))
           .filter(psf.col("rn") == 1)
           .select("user_name", "track_name", "listened_at_ts")
)
first_song_per_user.show()



+---------------+--------------------+-------------------+
|      user_name|          track_name|     listened_at_ts|
+---------------+--------------------+-------------------+
| 6d6f7274686f6e|  The Leper Affinity|2019-01-01 11:41:51|
|  Adsky_traktor|Сердце с долгом р...|2019-01-01 10:24:44|
|      AllSparks|               Fever|2019-01-02 09:48:19|
|   AlwinHummels|    Geef me je angst|2019-02-24 12:40:47|
|          Arcor|     Exsultate Justi|2019-01-01 02:22:23|
|AscendedGravity|              Amoeba|2019-01-02 00:01:17|
|   Bezvezenator|              Devour|2019-01-01 07:19:22|
|      BiamBioum|Beirut (14.12.16 ...|2019-01-07 14:56:07|
|     BlackGauna|             Visionz|2019-01-01 19:36:09|
|      Boris_Neo|      Keep You Close|2019-01-22 17:48:27|
|   BornabeWylde|                Wow.|2019-04-14 16:01:05|
|     Bound2Fate|       Home Invasion|2019-01-01 01:07:01|
| Canis_L_Sapien|              Wolves|2019-01-03 09:39:29|
|     Cl�psHydra|                 Hym|2019-01-01 14:09:3

                                                                                

In [38]:
# Q. the top 3 days on which User had the most listens
w_top3 = Window.partitionBy("user_name").orderBy(psf.desc("number_of_listens"))

top3_days_per_user = (
    df_final.groupBy("user_name", "listened_date")
           .agg(psf.count("*").alias("number_of_listens"))
           .withColumn("rn", psf.row_number().over(w_top3))
           .filter(psf.col("rn") <= 3)
           .orderBy("user_name", psf.desc("number_of_listens"))
           .select("user_name", "number_of_listens", "listened_date")
)
top3_days_per_user.show(10)



+--------------+-----------------+-------------+
|     user_name|number_of_listens|listened_date|
+--------------+-----------------+-------------+
|6d6f7274686f6e|              200|   2019-01-27|
|6d6f7274686f6e|              195|   2019-01-14|
|6d6f7274686f6e|              193|   2019-01-16|
| Adsky_traktor|              109|   2019-01-03|
| Adsky_traktor|               99|   2019-01-05|
| Adsky_traktor|               86|   2019-01-04|
|     AllSparks|              114|   2019-01-31|
|     AllSparks|               81|   2019-01-23|
|     AllSparks|               71|   2019-01-07|
|  AlwinHummels|                1|   2019-02-24|
+--------------+-----------------+-------------+
only showing top 10 rows


                                                                                

In [39]:
#c) Daily active Users

users_per_day = df_final.select("listened_date", "user_name").distinct()

# get distinct dates
all_dates = users_per_day.select("listened_date").distinct()

# Join each date to users active in last 7 days
active_users_window = (
    all_dates.alias("d")
    .join(users_per_day.alias("u"),
          (psf.col("u.listened_date") >= psf.date_sub(psf.col("d.listened_date"), 6)) &
          (psf.col("u.listened_date") <= psf.col("d.listened_date")),
          "left")
    .groupBy("d.listened_date")
    .agg(psf.countDistinct("u.user_name").alias("number_active_users"))
)

# Total unique users for percentage calc
total_users = df_final.select("user_name").distinct().count()

# Add percentage column
active_users_final = (
    active_users_window
    .withColumn("percentage_active_users",
                (psf.col("number_active_users") / psf.lit(total_users) * 100))
    .orderBy("listened_date")
)
active_users_final.show()




+-------------+-------------------+-----------------------+
|listened_date|number_active_users|percentage_active_users|
+-------------+-------------------+-----------------------+
|   2019-01-01|                 70|      34.65346534653465|
|   2019-01-02|                 95|      47.02970297029702|
|   2019-01-03|                102|     50.495049504950494|
|   2019-01-04|                107|      52.97029702970298|
|   2019-01-05|                108|      53.46534653465347|
|   2019-01-06|                110|      54.45544554455446|
|   2019-01-07|                114|      56.43564356435643|
|   2019-01-08|                113|     55.940594059405946|
|   2019-01-09|                115|     56.930693069306926|
|   2019-01-10|                115|     56.930693069306926|
|   2019-01-11|                114|      56.43564356435643|
|   2019-01-12|                113|     55.940594059405946|
|   2019-01-13|                114|      56.43564356435643|
|   2019-01-14|                119|     

                                                                                

In [None]:
spark.stop()