In [0]:
dbutils.widgets.text("BASE_URL", "https://api.spotify.com/v1")
dbutils.widgets.text("TOKEN_URL", "https://accounts.spotify.com/api/token")
dbutils.widgets.text("PlaylistId", "")

In [0]:
client_id = dbutils.secrets.get("keys-scope", "client_id")
client_secret = dbutils.secrets.get("keys-scope", "client_secret")

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import requests
import json

sc = spark.sparkContext

class SpotifyAPI:
    def __init__(self) -> object:
        self.client_id = client_id
        self.client_secret = client_secret
        self.base_url = dbutils.widgets.get("BASE_URL")
        self.token_url = dbutils.widgets.get("TOKEN_URL")

    def get_access_token(self) -> str:
        response = requests.post(
            self.token_url,
            data={"grant_type": "client_credentials"},
            auth=(self.client_id, self.client_secret)
        )

        return response.json()["access_token"]

    def parse_json(self, endpoint: str, object_id: str = None, params: dict = {}) -> str:
        url = f'{self.base_url}/{endpoint}/'

        if object_id:
            url += f"{object_id}"
            
        response = requests.get(
            url,
            headers=dict(Authorization=f"Bearer {self.get_access_token()}"),
            params=params
        )

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Failed to retrieve data due to the status code: {response.status_code}.")

    def process_single_row_objects(self, endpoint: str, objects: list) -> object:
        objects_to_process = []

        for obj in objects:
            objects_to_process.append(self.parse_json(endpoint, obj))

        obj_dict = sc.parallelize(objects_to_process).map(lambda x: json.dumps(x))
        objs = spark.read.json(obj_dict)

        return objs

    @staticmethod
    def read_json_to_df(obj) -> object:
        return spark.read.json(sc.parallelize([json.dumps(obj)]))


In [0]:
if __name__ == "__main__":
    # Define new object with API methods to process Spotify data
    sp = SpotifyAPI()

    playlist_id = dbutils.widgets.get("PlaylistId")

    def process_playlists(playlist_id: str):
        playlists = sp.parse_json("playlists", f"{playlist_id}/tracks")["items"]

        return sp.read_json_to_df(playlists)
    
    def get_most_popular_genres(playlist_id: str):
        tracks = process_playlists(playlist_id)

        return tracks
    
    def get_recommendations(track_ids: list):
        params = {
            "seed_tracks": ",".join(track_ids),
            "limit": 10,
            "min_energy": 0.5,
            "min_popularity": 75
        }

        return sp.read_json_to_df(sp.parse_json('recommendations', None, params))

    def get_artists_name_ids(playlist_id: str):
        artists = process_playlists(playlist_id).select(
            F.col("track.artists.id").alias("artist_id"),
            F.col("track.artists.name").alias("artist_name"),
        )

        artists_with_ids = artists.withColumn("id", F.monotonically_increasing_id())

        artists_id_exploded = artists_with_ids.selectExpr(
            "id", "posexplode(artist_id) as (id_index, artist_id)"
        )
        artists_name_exploded = artists_with_ids.selectExpr(
            "id", "posexplode(artist_name) as (name_index, artist_name)"
        )

        artists_matched = artists_id_exploded.join(
            artists_name_exploded,
            (artists_id_exploded["id"] == artists_name_exploded["id"])
            & (artists_id_exploded["id_index"] == artists_name_exploded["name_index"]),
        )

        artists_matched = artists_matched.select("artist_id", "artist_name")

        return artists_matched

    def get_playlists(playlist_id: str):

        playlists = process_playlists(playlist_id).select(
            F.col("track.album.album_type"),
            F.col("track.album.id").alias("album_id"),
            F.col("track.album.name").alias("album_name"),
            F.col("track.album.release_date"),
            F.col("track.album.total_tracks"),
            F.explode(F.col("track.artists.name")).alias("artist_name"),
            F.col("track.duration_ms"),
            F.col("track.id").alias("track_id"),
            F.col("track.name").alias("track_name"),
            F.col("track.popularity"),
        )

        return playlists
    
    def get_albums(albums_ids: list):
        albums = (
            sp.process_single_row_objects("albums", albums_ids)
            .select(
                F.col("name").alias("album_name"),
                F.col("release_date"),
                F.col("type"),
                F.col("artists.name")[0].alias("artist_name"),
                F.explode(F.col("tracks.items.id")).alias("track_id"),
                F.col("popularity")
            )
        )

        return albums

    def get_albums_from_playlist(playlist_id: str):
        
        """ Based on the playlist id, retrieve list of all album ids, and return all the tracks from albums. """

        albums_ids = (
            get_playlists(playlist_id)
            .select(F.col("album_id"))
            .rdd
            .flatMap(lambda album_id: album_id)
            .collect()
        )

        albums = get_albums(albums_ids)

        albums_ordered = (
            albums
            .distinct()
        )
        
        return albums_ordered
    
    def get_tracks_list(playlist_id: str):
        tracks_list = (
            get_albums_from_playlist(playlist_id)
            .select(F.col("track_id"))
            .rdd
            .flatMap(lambda track_id: track_id)
            .collect()
        )

        return tracks_list

    def get_tracks(tracks_list: list):
        return sp.process_single_row_objects("tracks", tracks_list)
    
    def get_track_audio_features(tracks_list: list):
        return sp.process_single_row_objects("audio-features", tracks_list)


In [0]:
tracks_list = get_tracks_list(playlist_id)

def get_albums_tracks_based_on_playlist(playlist_id: str):
    albums = get_albums_from_playlist(playlist_id)
    tracks = get_tracks(tracks_list)

    albums_tracks_joined = albums.join(
        tracks,
        (albums["track_id"] == tracks["id"]),
        how='inner'
    )

    albums_tracks_joined = albums_tracks_joined.drop(albums["popularity"])

    albums_tracks_joined_final = (
        albums_tracks_joined
        .select(
            F.col("album_name"),
            F.col("release_date"),
            F.col("artist_name"),
            F.col("track_id"),
            F.col("name").alias("track_name"),
            (F.col("duration_ms") / 1000).alias('duration_seconds'), 
            F.col("popularity"),
            F.col("track_number")
        )
    )

    return albums_tracks_joined_final

def get_audio_features_for_tracks():
    audio_features_per_track = get_track_audio_features(tracks_list)

    return (
        audio_features_per_track
        .select(
            F.col("id").alias("track_id"),
            F.col("acousticness"),
            F.col("danceability"),
            F.col("duration_ms"),
            F.col("energy"),
            F.col("instrumentalness"),
            F.col("key"),
            F.col("tempo"),
            F.col("liveness"),
            F.col("loudness"),
            F.col("time_signature"),
            F.col("valence")
        )
    )


In [0]:
full_albums_from_playlist = get_albums_tracks_based_on_playlist(playlist_id)
tracks_audio_features = get_audio_features_for_tracks()

full_albums_with_audio_features = (
    full_albums_from_playlist
    .join(
        tracks_audio_features,
        on="track_id",
        how="inner"
    )
)

full_albums_from_playlist.createOrReplaceTempView("full_albums")
full_albums_with_audio_features.createOrReplaceTempView("full_albums_with_audio_features")

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS spotify;

CREATE TABLE IF NOT EXISTS spotify.full_albums (
  ArtistName STRING,
  AlbumName STRING,
  ReleaseDate DATE,
  TrackId STRING,
  TrackName STRING,
  DurationSeconds DECIMAL(8, 2),
  Duration STRING,
  Popularity INT,
  TrackNumber INT
) PARTITIONED BY (ArtistName);

CREATE TABLE IF NOT EXISTS spotify.full_albums_with_audio_features (
  ArtistName STRING,
  AlbumName STRING,
  ReleaseDate DATE,
  TrackId STRING,
  TrackName STRING,
  DurationSeconds DECIMAL(8, 2),
  Duration STRING,
  Popularity INT,
  TrackNumber INT,
  Acousticness DECIMAL(8, 6),
  Danceability DECIMAL(5, 3),
  Energy DECIMAL(5, 3),
  Instrumentalness DECIMAL(10, 8),
  Key INT,
  Tempo DECIMAL(10, 6),
  Liveness DECIMAL(8, 6),
  Loudness DECIMAL(5, 3),
  TimeSignature INT,
  Valence DECIMAL(5, 3)
) PARTITIONED BY (ArtistName)

In [0]:
%sql

INSERT OVERWRITE TABLE spotify.full_albums 
SELECT
  artist_name AS ArtistName,
  album_name AS AlbumName,
  release_date AS ReleaseDate,
  track_id AS TrackId,
  track_name AS TrackName,
  duration_seconds AS DurationSeconds,
  from_unixtime(duration_seconds, 'mm:ss') AS Duration,
  popularity AS Popularity,
  track_number AS TrackNumber
FROM
  full_albums;


num_affected_rows,num_inserted_rows
57,57


In [0]:
%sql

INSERT OVERWRITE TABLE spotify.full_albums_with_audio_features
SELECT
  artist_name AS ArtistName,
  album_name AS AlbumName,
  release_date AS ReleaseDate,
  track_id AS TrackId,
  track_name AS TrackName,
  duration_seconds AS DurationSeconds,
  from_unixtime(duration_seconds, 'mm:ss') AS Duration,
  popularity AS Popularity,
  track_number AS TrackNumber,
  acousticness AS Acousticness,
  danceability AS Danceability,
  energy AS Energy,
  instrumentalness as Instrumentalness,
  key AS Key,
  tempo AS Tempo,
  liveness AS Liveness,
  loudness AS Loudness,
  time_signature AS TimeSignature,
  valence AS Valence
FROM
  full_albums_with_audio_features;

num_affected_rows,num_inserted_rows
57,57


In [0]:
top_5_longest_albums = spark.sql("""
               SELECT 
                    AlbumName, 
                    SUM(DurationSeconds) AS AlbumDuration 
                FROM 
                    spotify.full_albums_with_audio_features 
                GROUP BY 
                    AlbumName 
                ORDER BY AlbumDuration DESC
                LIMIT 5
                """) 


In [0]:
import pyspark.sql.functions as F

albums_with_most_popular_songs = spark.sql("""
                                           SELECT
                                                AlbumName,
                                                CASE 
                                                    WHEN
                                                        Popularity BETWEEN 0 AND 40 THEN 'NotPopular'
                                                    WHEN
                                                        Popularity BETWEEN 41 AND 100 THEN 'Popular'
                                                    ELSE NULL
                                                END AS Popularity
                                            FROM
                                                spotify.full_albums_with_audio_features
                                           """)

albums_per_popularity = (albums_with_most_popular_songs
    .groupBy("AlbumName")
    .pivot("Popularity")
    .agg(F.count("Popularity"))
    .na.fill(0)
)

In [0]:
albums_energy = spark.sql("""
                            SELECT
                                AlbumName,
                                CASE 
                                    WHEN
                                        Energy BETWEEN 0 AND 0.3 THEN 'NotEnergetic'
                                    WHEN
                                        Energy BETWEEN 0.3 AND 0.6 THEN 'MediumEnergetic'
                                    WHEN
                                        Energy BETWEEN 0.6 AND 1 THEN 'Energetic'
                                    ELSE NULL
                                END AS Energy
                            FROM
                                spotify.full_albums_with_audio_features
                            """)

albums_per_energy = (albums_energy
    .groupBy("AlbumName")
    .pivot("Energy")
    .agg(F.count("Energy"))
    .na.fill(0)
)

In [0]:
audio_keys = {
    0: 'C',
    1: 'C#',
    2: 'D',
    3: 'D#',
    4: 'E',
    5: 'F',
    6: 'F#',
    7: 'G',
    8: 'G#',
    9: 'A',
    10: 'A#',
    11: 'B'
}

audio_keys_map = [(k,)+(v,) for k,v in audio_keys.items()] 
df_audio_keys_map = spark.createDataFrame(audio_keys_map, ['id','Key'])
df_audio_keys_map.createOrReplaceTempView("df_audio_keys_map")

tracks_per_keys = spark.sql("""SELECT 
                                    dakm.Key, 
                                    COUNT(fawaf.Key) AS `NoOfTracksInKey` 
                               FROM 
                                    spotify.full_albums_with_audio_features fawaf INNER JOIN
                                    df_audio_keys_map dakm ON fawaf.Key = dakm.id
                                GROUP BY dakm.Key""")   

In [0]:
from pyspark.sql.types import StringType

tracks_list = (
    spark.sql("SELECT TrackId FROM spotify.full_albums_with_audio_features ORDER BY Popularity DESC LIMIT 5")
            .select(F.col("TrackId"))
            .rdd
            .flatMap(lambda track_id: track_id)
            .collect()
)

recommended_tracks_based_on_top_5 = get_recommendations(tracks_list)

recommended_tracks_based_on_top_5 = (
    recommended_tracks_based_on_top_5.select(
            F.explode(F.col("tracks.id")).alias("TrackId")
        )
        .select(F.col("TrackId"))
        .rdd
        .flatMap(lambda track_id: track_id)
        .collect()
)


recommended_tracks = get_tracks(recommended_tracks_based_on_top_5)

recommended_tracks = (
    recommended_tracks
    .select(
        F.regexp_replace(F.col("album.artists.name").cast(StringType()), r"\[|\]", "").alias("ArtistName"),
        F.col("album.name").alias("AlbumName"),
        F.col("name").alias("TrackName"),
        F.col("popularity").alias("Popularity")
        )
    .orderBy(F.desc("Popularity"))
)


In [0]:
top_5_longest_albums.write.option("header", "true").mode('overwrite').saveAsTable('spotify.top_5_longest_albums')
albums_per_popularity.write.option("header", "true").mode('overwrite').saveAsTable('spotify.albums_per_popularity')
albums_per_energy.write.option("header", "true").mode('overwrite').saveAsTable('spotify.albums_per_energy')
tracks_per_keys.write.option("header", "true").mode('overwrite').saveAsTable('spotify.tracks_per_keys')
recommended_tracks.write.option("header", "true").mode('overwrite').saveAsTable('spotify.recommended_tracks')