# Spotify Tracks
Objective is to understand characteristics of the best songs. This script interfaces with the 
spotify API to load track data.

In [1]:
# Packages to interact with the system and local environment
import os
from dotenv import load_dotenv
import time

# Requests and encoding packages to interact with Spotify API
import base64
import requests
from urllib.parse import urlencode 

# Analytics code
import pandas as pd
import random
import re

In [35]:
def random_wait(t = 5, duration = 's'):
    '''
Function pauses script for a random amount of time. It takes two arguments:

t: int, gives upper bound of random integer that is generated
duration: str, contains values 'short', 's', 'long', 'l' and determines whether the 
    puase is short (1+random_int/10) or long (random_int)
    '''

    rt = random.randint(1, 5)
    
    if duration in ['s', 'short']:
        time.sleep(1+rt/10)
        
    if duration in ['l', 'long']:
        time.sleep(rt)

In [36]:
# Load .env and store values as variables
load_dotenv()
ClientID = os.getenv('SPOTIFY_CLIENT_ID')
ClientSecret = os.getenv('SPOTIFY_CLIENT_SECRET')

In [37]:
# Encode credentials and store as headers to pass to request
client_cred = f'{ClientID}:{ClientSecret}'
client_cred_b64 = base64.b64encode(client_cred.encode())
token_headers = {
    'Authorization' : f'Basic {client_cred_b64.decode()}'
}

In [38]:
# Get access token 
APIKey = requests.post(
    url = 'https://accounts.spotify.com/api/token', 
    data = {'grant_type' : 'client_credentials'},
    headers = token_headers
)

In [39]:
# Authentication argument to put into requests
auth_header = {
    'Authorization' : f"{APIKey.json()['token_type']} {APIKey.json()['access_token']}"
}

In [40]:
# Several different endpoints, can specify which to use with ep
# This then becomes part of the url in endpoint
ep = 'search'
endpoint = 'https://api.spotify.com/v1/'+ep

# Search for top hits playlists; takes query (q) and type (artist, playlist)
search_data = urlencode(
    {
        'q' : 'top hits of',
        'type' : 'playlist',
        'limit' : '50'
    }
)

# Add to url to pass to get
search_url = f'{endpoint}?{search_data}'

# Get data for top hits playlists
r = requests.get(search_url, headers = auth_header)

# Get inital request
# Convert returned items to dataframe
playlist_df = pd.json_normalize(r.json()['playlists']['items'])

# Subset playlist name reference data and only keep spotify playlists
plist_df = playlist_df[
    (playlist_df['owner.display_name'] == 'Spotify')
    & (playlist_df['name'].str.contains('Top Hits'))
][['id', 'name', 'owner.display_name', 'tracks.total', 'tracks.href']]


# Playlist column names
col_dict = {
    'id' : 'playlist_id',
    'name' : 'playlist_name',
    'owner.display_name' : 'playlist_owner', 
    'tracks.total' : 'playlist_n_tracks',
    'tracks.href' : 'tracks_href'
}

# Rename columns to
plist_df = plist_df.rename(
    columns = col_dict
)

# Extract year from playlist name
plist_df['year'] = plist_df['playlist_name'].str.extract('(\d{4})')

# List of all years for which data are extracted
yr_list = list(range(1980, 2025))
yr_list = [str(y) for y in yr_list]

# Initialize counter for for loop
counter = 0

# Loop through all years in yr list
for y in yr_list:
# while set(yr_list).issubset(set(plist_df['year'])) == False and counter <= 15:

    
    # Stopping conditions
    if set(yr_list).issubset(set(plist_df['year'])) or counter > 15:
        break

    elif str(y) in list(plist_df['year']):
        continue

    else:
        print(counter)
        
        # Search for top hits playlists; takes query (q) and type (artist, playlist)
        search_data = urlencode(
            {
                'q' : 'top hits '+str(y),
                'type' : 'playlist',
                'limit' : '50'
            }
        )

        # Add to url to pass to get
        search_url = f'{endpoint}?{search_data}'

        # Get data for top hits playlists
        r = requests.get(search_url, headers = auth_header)

        # Get inital request
        # Convert returned items to dataframe
        pl_df = pd.json_normalize(r.json()['playlists']['items'])

        # Subset playlist name reference data and only keep spotify playlists
        add_pdf = pl_df[
            (pl_df['owner.display_name'] == 'Spotify')
            & (pl_df['name'].str.contains('Top Hits'))
        ][
            ['id', 'name', 'owner.display_name', 'tracks.total', 'tracks.href']
        ].rename(
            columns = col_dict
        )

        # Get year
        add_pdf['year'] = add_pdf['playlist_name'].str.extract('(\d{4})')

        # Join data frames
        plist_df = pd.concat([plist_df, add_pdf])

        # Iterate counter 
        counter += 1

        # Wait a random amount of time
        random_wait(t = 10)

# Drop duplicates from plist and reset index
plist_df = plist_df.drop_duplicates().reset_index(drop = True)

# Print message to indicate that script has completed
print('Script Executed')

0
1
2
3
4
5
6
Script Executed


In [42]:
# Store columns to be kept for tracks
track_cols  = [
    'track.id', 'track.name', 'track.popularity', 'track.available_markets', 'track.explicit',
    'track.album.id', 'track.album.name', 'track.album.type', 'track.album.release_date', 
    'track.album.total_tracks', 'track.track_number', 'n_artists'
]

# Store columns for audio features
audio_feature_cols = [
    'id', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness',
    'liveness', 'valence', 'tempo', 'time_signature', 'duration_ms'
]

# Initialize counter
counter = 0

# Loop through playlist df and load track data and audio features
for row in plist_df.iterrows():
    
    # Extract href, id, and number of tracks
    href = row[1]['tracks_href']
    pl_id = row[1]['playlist_id']
    ntrack = row[1]['playlist_n_tracks']
    
    # Get track data
    r2 = requests.get(href, headers = auth_header)

    # Wait a random amount of time after api pull
    random_wait(t = 5)

    # Make track df from r2 requests
    track_df = pd.json_normalize(r2.json()['items'])

    # Number of artist
    track_df['n_artists'] = track_df['track.artists'].str.len()

    # Expand each artist to its own column
    artist_df = pd.DataFrame(track_df['track.artists'].tolist(), index = track_df.index)

    # Get number of artists
    n_artists = artist_df.shape[1]

    # Loop through columns and extract artist and artist id
    for i in range(0, n_artists):
        artist_df['artist'+str(i)+'_id'] = artist_df[i].str['id']
        artist_df['artist'+str(i)+'_name'] = artist_df[i].str['name']

    # Subset artists to only labeled columns
    artist_df = artist_df.iloc[: , n_artists:]

    # Merge artist and track data
    df1 = track_df[track_cols].merge(artist_df, how = 'left', left_index = True, right_index = True)

    # Replace periods with underscores
    df1.columns = [c.replace('.', '_') for c in df1.columns]

    # Count number of markets for track
    df1['n_markets'] = df1['track_available_markets'].str.len()

    # Add playlist id to df so it can be mapped to playlist_df
    df1['playlist_id'] = pl_id

    # Check how long playlist is, if longer than 100 then get two lists
    if len(track_df) > 100:
        # Get strings for all track ids
        tid1 = ','.join(track_df.loc[0:100, 'track.id'])
        tid2 = ','.join(track_df.loc[100:, 'track_id'])  

        # Store urls for audio features
        af_url1 = 'https://api.spotify.com/v1/audio-features?ids='+tid1
        af_url2 = 'https://api.spotify.com/v1/audio-features?ids='+tid2

        # Request audio features for first 100 tracks
        r3 = requests.get(af_url1, headers = auth_header)

        # Wait for random amount of time
        random_wait(t = 5)

        # Store in dataframe
        af_df = pd.json_normalize(r3.json()['audio_features'])

        # Request audio features for remaining tracks
        r3 = requests.get(af_url2, headers = auth_header)

        # Wait for random amount of time
        random_wait(t = 5)

        af_df = pd.concat([af_df, pd.json(normalize(r3.json()['audio_features']))])

    else:
        # Get string for all track ids
        tid = ','.join(track_df['track.id'])

        # Store urls for audio features
        af_url = 'https://api.spotify.com/v1/audio-features?ids='+tid

        # Request audio features
        r3 = requests.get(af_url, headers = auth_header)

        # Wait for random amount of time
        random_wait(t = 5)    

        # Store in dataframe
        af_df = pd.json_normalize(r3.json()['audio_features'])

    # Get only desired columns for audio features
    af_df = af_df[audio_feature_cols].rename(columns = {'id': 'track_id'})

    # Merge audio features
    df1 = df1.merge(af_df, how = 'left', on = 'track_id')    

    # If counter is 0, make final df as copy of df1 else concatenate the two
    if counter == 0:
        df = df1.copy()
    else:
        df = pd.concat([df, df1])
        
    counter += 1
        
# Merge df to playlist
df = plist_df.merge(df, how = 'inner', on = ['playlist_id'])

# Print message to indicate script has run
print('Script Executed')

Script Executed


In [80]:
# Remove "This Is Top Hits 2024" which seems to be bogus
dfs = df[df['year'] != '2024']

# Save to csv
dfs.to_csv('SpotifyTopHits.csv')