In [0]:
import os
import json
import re

import logging
logging.basicConfig(level=logging.DEBUG, handlers=[logging.StreamHandler()])
logger = logging.getLogger("mdp_prop")

from tqdm.notebook import tqdm

import pyspark.sql.functions as sf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

spark


## Config

Prerequisite is to allow the Databrick environment to load from ADLS container. Follow this tutorial:

[Tutorial: Connect to Azure Data Lake Storage Gen2](https://learn.microsoft.com/en-gb/azure/databricks/connect/storage/tutorial-azure-storage)

In [0]:
# Set an Entra ID for the Databricks workspace
DBK_SECRET_SCOPE = "tichack2024kv" # Databricks secret scope to access Azure Key Vault
AKV_KEY_NAME = "analytical-databricks-key" # Azure Key Vault
ENTRA_APP_ID = "9156dfe1-254b-4047-9f1a-a8fd3e79787d" # Entra ID for the Databricks workspace
ENTRA_DIRECTORY_ID = "565f1c8e-754e-473e-8352-ac5b86a38c93" # Tenant ID of Entra App

# ADLS container with the data
STORAGE_ACC = "agenticaiamlws" # Storage Account
ADLS_CONTAINER = "azureml-blobstore-03a975f6-17cd-4334-a581-d30d363b62ab"

In [0]:
service_credential = dbutils.secrets.get(scope=DBK_SECRET_SCOPE, key=AKV_KEY_NAME)

## Extract MDP Zip

[Mounting cloud object storage on Azure Databricks](https://learn.microsoft.com/en-gb/azure/databricks/dbfs/mounts)

In [0]:
# # Already mounted to /mnt/adls/ No need to mount twice
# configs = {"fs.azure.account.auth.type": "OAuth",
#           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#           "fs.azure.account.oauth2.client.id": ENTRA_APP_ID,
#           "fs.azure.account.oauth2.client.secret": service_credential,
#           "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{ENTRA_DIRECTORY_ID}/oauth2/token"}

# dbutils.fs.mount(
#     source=f"abfss://{ADLS_CONTAINER}@{STORAGE_ACC}.dfs.core.windows.net",
#     mount_point="/mnt/adls",
#     extra_configs=configs
# )

In [0]:
# import zipfile
# import os

# Define Paths
adls_mnt_path = "/dbfs/mnt/adls_2"

# # Fake path
# zip_adls_file = "<fake/path>" # Challenge path
# extract_path = "<fake/path>"

# # Challenge
# zip_adls_file = "million_playlist_dataset/spotify_million_playlist_dataset_challenge.zip" # Challenge path
# extract_path = "/dbfs/mnt/adls/challenge_dataset/"

# # MLD
# zip_adls_file = "million_playlist_dataset/spotify_million_playlist_dataset.zip" # MPD path
# extract_path = "/dbfs/mnt/adls/mld_dataset/"

# # Create extract folder
# zip_path = os.path.join(adls_mnt_path, zip_adls_file)  # Databricks paths use /dbfs/
# os.makedirs(extract_path, exist_ok=True)

The extraction can take 30 minutes. Total extracted JSON size ~33GB and 1000 files.

In [0]:
# # Extract ZIP file
# with zipfile.ZipFile(zip_path, 'r') as zf:
#     zf.extractall(extract_path)

## Flatten MPD JSONs

Convert MPD json to parquet.

In [0]:
# Set spark connection to ADLS
spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACC}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{STORAGE_ACC}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{STORAGE_ACC}.dfs.core.windows.net", ENTRA_APP_ID)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{STORAGE_ACC}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{STORAGE_ACC}.dfs.core.windows.net", f"https://login.microsoftonline.com/{ENTRA_DIRECTORY_ID}/oauth2/token")

# # Spark configurations (set in the Databricks compute configuration)
# spark.conf.set("spark.databricks.cluster.profile", "singleNode")
# spark.conf.set("spark.master", "local[*, 4]")
# spark.conf.set("spark.driver.maxResultSize", "8g")

# Test spark connection
df_titanic = spark.read.csv(f"abfss://{ADLS_CONTAINER}@{STORAGE_ACC}.dfs.core.windows.net/titanic.csv", header=True)
display(df_titanic)

In [0]:
# Million Playlist Dataset paths
mdp_adls_path = f"abfss://{ADLS_CONTAINER}@{STORAGE_ACC}.dfs.core.windows.net/" # adls path
mpd_json_datastore = "mld_dataset/data/" # mpd json path

# List all files in the ADLS
logger.info("List JSON files as ADLS path.")
mdp_json_paths = []
for file in dbutils.fs.ls(os.path.join(mdp_adls_path, mpd_json_datastore)):
    mdp_json_paths.append(file.path)

# List all files in local mount
logger.info("List JSON files as local path.")
mdp_json_local = "mld_dataset/data"
mdp_json_paths_local = []
for file in dbutils.fs.ls(f"/mnt/adls_2/{mdp_json_local}"):
    mdp_json_paths_local.append(file.path)

mdp_json_paths_local

Convert JSON to parquet.

In [0]:

from typing import Any

def flatten_mpd_json(json_paths: list[str], parquet_path: str) -> None:
    for fpath in tqdm(json_paths):
        fpath = re.sub("dbfs:", "/dbfs", fpath) # Convert to Databricks path
        with open(fpath, 'r') as fi:
            data = json.load(fi)
        if 'playlists' in data:
            playlists = data['playlists']
            df_playlists = (
                spark.createDataFrame(playlists)
                .withColumn("track", sf.explode("tracks"))    
                .drop("tracks")
            )
            # Extract track attributes as column
            playlist_columns = df_playlists.columns
            track_columns = [
                sf.col("track").getItem(k).alias(
                    "track_" + k if k not in ['track_name', 'track_uri'] else k
                )
                for k in [
                "pos", "artist_name", "track_uri", "artist_uri", 
                "track_name", "album_uri", "duration_ms", "album_name"
            ]]
            df_playlists = df_playlists.select(*(playlist_columns + track_columns)).drop("track")
            # Save to parquet
            fname_parts = os.path.basename(fpath).split(".")[:-1]
            fname_parts.append('parquet')
            fname = '.'.join(fname_parts)
            (
                df_playlists
                .coalesce(1)
                .write
                .mode("overwrite")
                .format("parquet")
                .save(os.path.join(parquet_path, fname))
            )
        else:
            playlist = []
            logger.error(f"No playlists in {fpath}")


# # Convert MDP json to parquet and save to ADLS
mpd_parquet_raw = "mpd_parquet_raw"
mdp_parquet_raw_path = os.path.join(adls_mnt_path, mpd_parquet_raw)
logging.info("Saving MDP json to %s", mdp_parquet_raw_path)
# os.makedirs(os.path.join(adls_mnt_path, mpd_parquet_raw), exist_ok=True)
# flatten_mpd_json(
#     json_paths = mdp_json_paths_local,
#     parquet_path = os.path.join(mdp_adls_path, mpd_parquet_raw)
# )


In [0]:
# Load from ADLS
df_playlists_raw = spark.read.parquet(os.path.join(mdp_adls_path, mpd_parquet_raw, "*.parquet"))
print(df_playlists_raw.count())
display(df_playlists_raw)


## Load Track Features

Load track features from Spotify API.

Note: the remote ADLS container is sometimes mounted to /dbfs/dbfs/mnt/adls_2 and sometimes mounted to /dbfs/mnt/adls_2.

In [0]:
# # Write unique track and artist ID table to ADLS
# df_unique_tracks = df_playlists_raw.groupby("track_uri", "track_album_uri", "track_artist_uri").agg(sf.count("track_pos").alias("freq"))
# mpd_unique_tracks_file = os.path.join("/mnt/adls_2", "unique_tracks.parquet") 
# print(df_unique_tracks.count())
# df_unique_tracks.coalesce(1).write.mode("overwrite").format("parquet").save(mpd_unique_tracks_file)

# Read unique track and artist ID table back
fpath = os.path.join(mdp_adls_path, "unique_tracks.parquet")
logger.info("Load unique tracks from %s", fpath)
df_unique_tracks = spark.read.parquet(fpath)

display(df_unique_tracks)

In [0]:
# Load track and artist features from Spotify API
import re
import time
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from dotenv import load_dotenv
from typing import Optional, Sequence
import pandas as pd

# Load environment variables
load_dotenv(override=True)


class SpotifyFeatureLoader:
    """
    Load track features from spotify.
    """
    # Declare hints
    client_cred: SpotifyClientCredentials

    def __init__(
        self,
        client_id: Optional[str] = os.getenv("SPOTIFY_CLIENT_ID"),
        client_secret: Optional[str] = os.getenv("SPOTIFY_CLIENT_SECRET")
    ):
        # Connect to Spotify App
        self.client_cred = SpotifyClientCredentials(
            client_id=client_id,
            client_secret=client_secret
        )

    def load_spotify_track_features(
        self, 
        uri: Sequence[str],
        batch_size: Optional[int] = 100,
        sleep_time: Optional[int] = None
    ):
        chunks = [uri[i:i+batch_size] for i in range(0, len(uri), batch_size)]
        chunks_df_list = []
        for chunk in tqdm(chunks):
            df_tracks = self._load_spotify_track_features(chunk)
            chunks_df_list.append(df_tracks)
            if sleep_time:
                time.sleep(sleep_time)
        return pd.concat(chunks_df_list, ignore_index=True)

    def _load_spotify_track_features(
        self,
        uri: Sequence[str],
        
    ) -> None:
        """
        Note: Audio features endpoint is deprecated by spotify.
        Notification: [Introducing some changes to our Web API](https://developer.spotify.com/blog/2024-11-27-changes-to-the-web-api)
        Forum discussion: [Changes to Web API](https://community.spotify.com/t5/Spotify-for-Developers/Changes-to-Web-API/td-p/6540414)
        """
        sp = spotipy.Spotify(
            client_credentials_manager=self.client_cred,
            requests_timeout=10,
            retries=10
        )
        # Get track attributes
        uri_stripped = [re.sub("spotify:track:", "", u) for u in uri]
        tracks_json = sp.tracks(uri_stripped)
        # # Get track audio features (Audio feature endpoints is deprecated by Spotify)
        # time.sleep(1) # Sleep 20 milliseconds before next call
        # audio_features = sp.audio_features(uri_stripped)
        df_tracks = pd.json_normalize(tracks_json["tracks"])[[
            'popularity', 'href', 'name', 'uri', 'duration_ms'
        ]]
        return df_tracks


In [0]:
pd_tracks_uri = df_unique_tracks[['track_uri']].toPandas()

In [0]:
pd_tracks_uri[:200]['track_uri']

In [0]:
sfl = SpotifyFeatureLoader()
test_track_uri = ["spotify:track:3oLXQgbcC34Oo4gFHCeTmi", "spotify:track:6TRp2628QKH3kY6KrCnjqp"]
df_test_tracks = sfl.load_spotify_track_features(pd_tracks_uri[:2000]['track_uri'].to_list(), 100)
df_test_tracks


In [0]:
df_test_tracks

In [0]:
pd.json_normalize(tracks_json["tracks"])[['popularity', 'href', 'name', 'uri', 'duration_ms']]

In [0]:
tracks_json

Get unique artist table

In [0]:
data['info']

In [0]:
data['playlists'][0]

In [0]:
# Load MDP. About 6 minutes
df = spark.read.option("multiline", "true").json(mdp_json_paths[0])


```
+--------------------+--------------------+
|                info|           playlists|
+--------------------+--------------------+
|{2017-12-03 08:41...|[{false, NULL, 11...|
+--------------------+--------------------+
```

In [0]:
display(df[["playlists"]])

An example playlist entry.

```
{
        "name": "musical",
        "collaborative": "false",
        "pid": 5,
        "modified_at": 1493424000,
        "num_albums": 7,
        "num_tracks": 12,
        "num_followers": 1,
        "num_edits": 2,
        "duration_ms": 2657366,
        "num_artists": 6,
        "tracks": [
            {
                "pos": 0,
                "artist_name": "Degiheugi",
                "track_uri": "spotify:track:7vqa3sDmtEaVJ2gcvxtRID",
                "artist_uri": "spotify:artist:3V2paBXEoZIAhfZRJmo2jL",
                "track_name": "Finalement",
                "album_uri": "spotify:album:2KrRMJ9z7Xjoz1Az4O6UML",
                "duration_ms": 166264,
                "album_name": "Dancing Chords and Fireflies"
            },
            {
                "pos": 1,
                "artist_name": "Degiheugi",
                "track_uri": "spotify:track:23EOmJivOZ88WJPUbIPjh6",
                "artist_uri": "spotify:artist:3V2paBXEoZIAhfZRJmo2jL",
                "track_name": "Betty",
                "album_uri": "spotify:album:3lUSlvjUoHNA8IkNTqURqd",
                "duration_ms": 235534,
                "album_name": "Endless Smile"
            },
            {
                "pos": 2,
                "artist_name": "Degiheugi",
                "track_uri": "spotify:track:1vaffTCJxkyqeJY7zF9a55",
                "artist_uri": "spotify:artist:3V2paBXEoZIAhfZRJmo2jL",
                "track_name": "Some Beat in My Head",
                "album_uri": "spotify:album:2KrRMJ9z7Xjoz1Az4O6UML",
                "duration_ms": 268050,
                "album_name": "Dancing Chords and Fireflies"
            },
            // 8 tracks omitted
            {
                "pos": 11,
                "artist_name": "Mo' Horizons",
                "track_uri": "spotify:track:7iwx00eBzeSSSy6xfESyWN",
                "artist_uri": "spotify:artist:3tuX54dqgS8LsGUvNzgrpP",
                "track_name": "Fever 99\u00b0",
                "album_uri": "spotify:album:2Fg1t2tyOSGWkVYHlFfXVf",
                "duration_ms": 364320,
                "album_name": "Come Touch The Sun"
            }
        ],

    }
```