## import libraries needed

In [2]:
# get .env params
import os
from dotenv import load_dotenv

# get spotipy lib, define client auth
import spotipy
from spotipy.oauth2 import SpotifyOAuth

# analysis libs
import pandas as pd

## udf

In [3]:
### aggregate track stats
def agg_pl_stats(df):
    """
    Takes a flattened track df as input and performs basic operations on track properties

    :param df: DataFrame
    """
    
    agg = (
        df.groupby("playlist_id").agg(
                tot_duration   = ("duration_ms", "sum"),
                avg_duration   = ("duration_ms", "mean"),
                std_duration   = ("duration_ms", "std"),
            
                avg_popularity = ("popularity", "mean"),
                min_popularity = ("popularity", "min"),
                max_popularity = ("popularity", "max"),
                std_popularity = ("popularity", "std")
            ).reset_index()
    )
    # convert ms to s and cast float to int
    agg["tot_duration"] = (agg["tot_duration"] / 1000).round().astype(int)
    agg["avg_duration"] = (agg["avg_duration"] / 1000).round().astype(int)
    agg["std_duration"] = (agg["std_duration"] / 1000).round().astype(int)
    
    agg["avg_popularity"] = agg["avg_popularity"].round().astype(int)
    agg["std_popularity"] = agg["std_popularity"].round().astype(int)

    return agg

In [34]:
load_dotenv()

auth_manager = SpotifyOAuth(
    os.getenv("SPOTIPY_CLIENT_ID"),
    client_secret = os.getenv("SPOTIPY_CLIENT_SECRET"),
    redirect_uri = "http://127.0.0.1:9090",
    scope = "user-read-email user-read-private playlist-read-private playlist-read-collaborative",
    open_browser = True
)


sp = spotipy.Spotify(auth_manager=auth_manager)

playlists = sp.user_playlists(
    user = os.getenv("USER_ID"),
)

# define playlists to pull
## more work needed for automation
year_starting = 2024
years = 2
months = 12

pl_names = [
    f"{str(month).zfill(2)}.{year}"
    for year in range(year_starting, year_starting + years)
    for month in range(1, months + 1)
]

# get item info from init data load
pl_info = pd.DataFrame(playlists["items"])

# filter playlists for date range defined previously
pl_filter = pl_info["name"].isin(pl_names)

# create new dataframe filtered for scope of analysis (2024 - 2025 inc.)
pl_scope = pl_info[pl_filter]
pl_scope = pl_scope[["images", "name", "tracks", "id"]]

pl_scope = pl_scope.reset_index(drop = True)

### get playlist covers
num_pl = len(pl_scope)
pl_covers = [None] * num_pl

for i in range(num_pl):
    pl_scope["images"].iloc[i] = sp.playlist_cover_image(pl_scope["id"].iloc[i])

### get playlist track information

# define 'flattened' list for collecting normalised json responses
flattened = [] # list of dfs, one per playlist
pl_tracks = [] # raw json items
pl_name_map = dict(zip(pl_scope["id"], pl_scope["name"])) # get map for name for each playlist id

for pl_id in pl_scope["id"]:

    # Get track data
    response = sp.playlist_items(
        playlist_id = pl_id,
        fields = "items.track.id, items.track.name, items.track.duration_ms, items.track.popularity," 
                 "items.track.album.id, items.track.album.images, items.track.album.name, items.track.album.artists, items.track.album.release_date,"
                 "items.track.artists.id, items.track.artists.name"
    )

    items = response.get("items", [])
    pl_tracks.append(items)

    # call returns dicts for some items, extract desired fields
    for item in items:
        track = item.get("track")
        if not track:
            continue

        # extract album image url
        album = track.get("album")
        if album:
            images = album.get("images", [])
            album["image"] = images[0]["url"] if images else None

        # extract artist names and ids as separate cols
        artist_list = track.get("artists", [])
        if isinstance(artist_list, list) and artist_list:
            track["artist_names"] = [a.get("name") for a in artist_list]
            track["artist_ids"] = [a.get("id") for a in artist_list]
        else:
            track["artist_names"] = []
            track["artist_ids"] = []

    # Now normalize
    df_flat = pd.json_normalize(items, sep="_")
    
    # remove prefixes
    df_flat.columns = df_flat.columns.str.replace("^track_", "", regex = True)

    # drop dict columns no longer needed
    df_flat.drop(
        labels = ["artists", "album_images", "album_artists"],
        axis = 1,
        inplace = True
    )

    # get playlist id for tracks
    df_flat["playlist_id"] = pl_id
    df_flat["playlist_name"] = pl_name_map[pl_id]

    # get genre information
    # this returns a list of genres per artist. With multiple artists we hit API rate limit, so batch
    # Collect all artist IDs
    artist_ids = []
    for item in items:
        t = item.get("track")
        if not t:
            continue
        artist_ids.extend(t.get("artist_ids", []))

    artist_ids = list(set(artist_ids))  # unique IDs

    # Lookup artist genres in batches
    artists_info = {}
    for i in range(0, len(artist_ids), 50):
        batch = artist_ids[i:i+50]
        resp = sp.artists(batch)
        for a in resp["artists"]:
            genres = a.get("genres", [])
            artists_info[a["id"]] = genres

    # Build mapping track_id → top genre
    track_genre_rows = []
    for item in items:
        t = item.get("track")
        if not t:
            continue

        tids = t.get("id")
        aids = t.get("artist_ids", [])

        top_genre = None

        if aids:
            main_artist = aids[0]
            main_genres = artists_info.get(main_artist, [])

            # try main artist’s top genre
            if main_genres:
                top_genre = main_genres[0]
            else:
                # if none found check other artists
                for aid in aids[1:]:
                    g = artists_info.get(aid, [])
                    if g:
                        top_genre = g[0]
                        break

        track_genre_rows.append({
            "id": tids,
            "top_genre": top_genre
        })

    df_genres = pd.DataFrame(track_genre_rows)

    # Merge top genre into df
    df_flat = df_flat.merge(df_genres, on="id", how="left")

    # append to output list
    flattened.append(df_flat)

# extract yyyy-mm from playlist and add date to df
for df in flattened:
    df["playlist_date"] = pd.to_datetime(df["playlist_name"], format = "%m.%Y")

In [36]:
%store flattened
%store pl_scope

Stored 'flattened' (list)
Stored 'pl_scope' (DataFrame)
