In [None]:
import os
import sys
import numpy as np
import pandas as pd
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import time
from urllib import request
from urllib.request import urlretrieve
import librosa
from librosa import display
import matplotlib.pyplot as plt
import skimage
from shutil import copy2
import random
from pathlib import Path

In [None]:
#get authorization to access spotify

def spotify_login(cid, secret):
    client_credentials_manager = SpotifyClientCredentials(client_id=cid, client_secret=secret) 
    return spotipy.Spotify(client_credentials_manager=client_credentials_manager)

cid = "##############################" 
secret = "##############################"

sp = spotify_login(cid, secret)

In [None]:
#functions for making a list of artists of a given genre

search_limit = 50

def populate_artist_list(genre):
    num_playlists = 3
    print("Checking for existing artist list for " + genre)
    genre_artists = get_saved_artist_list(genre)
    if not genre_artists:
        print("No existing list found.")
        print("Making a list of artists for " + genre + ".")
        playlists = get_playlists_by_genre(genre, num_playlists)
        genre_artists = make_genre_artist_list(genre, playlists)
    print(str(len(genre_artists)) + " found.")
    save_artist_list(genre_artists, genre)
    return genre_artists

def filter_existing_artists(artist_list):
    print("Checking database to see if we already downloaded any artists.")
    artist_count = len(artist_list)
    if os.path.exists(track_filepath):
        df = pd.read_csv(track_filepath, error_bad_lines=False)
        existing_artists = df['artist_ids'].unique()
        existing_artists = [artist.strip("['']") for artist in existing_artists]
        filtered_list = [artist for artist in artist_list if artist not in existing_artists]
        print(str(artist_count - len(filtered_list)) + " existing artists removed from list.")
        return filtered_list
    else:
        print("No existing artist list found.")
        return artist_list

def get_playlists_by_genre(genre, limit):
    results = sp.search(genre, limit=limit, type='playlist')
    return results['playlists']['items']
                    
def make_genre_artist_list(genre, playlists):
    global last_added
    genre_artists = set()
    artist_list = pd.read_csv(artist_filepath)
    existing_artists = artist_list['id'].tolist()
    for playlist in playlists:
        last_added = 0
        playlist_id = playlist['id']
        user_id = playlist['owner']['id']
        if user_id and playlist_id:
            playlist_tracks = get_playlist_tracks(user_id, playlist_id)
            find_artists_from_playlist(playlist_tracks, genre_artists, existing_artists)
    return genre_artists

def get_playlist_tracks(user_id, playlist_id):
    results = sp.user_playlist_tracks(user_id, playlist_id)
    tracks = results['items']
    while results['next']:
        results = sp.next(results)
        tracks.extend(results['items'])
    return tracks

def find_artists_from_playlist(playlist, genre_artists, existing_artists):
    for i in range(len(playlist)):
        if last_added < search_limit:
            track = playlist[i]['track']
            artist_ids = get_track_artists(track)
            for artist_id in artist_ids:
                if artist_id not in existing_artists:
                    add_artist(artist_id, genre_artists, existing_artists)
                    add_related_artists(artist_id, genre_artists, existing_artists)
                    
def get_track_artists(track):
    artist_ids = []
    if track:
        artist_ids = [artist['id'] for artist in track['artists']]
    return artist_ids

def add_artist(artist_id, genre_artists, existing_artists):
    global last_added
    if artist_id and artist_id not in genre_artists:
        artist = sp.artist(artist_id)
        if genre in artist['genres']:
            #print(artist['name'])
            if artist_id not in existing_artists:
                genre_artists.add(artist_id)
                last_added = 0
            else:
                last_added += 1
            return True
    last_added += 1
    return False

def add_related_artists(artist_id, genre_artists, existing_artists):
    if not artist_id:
        return
    related_artists = sp.artist_related_artists(artist_id)['artists']
    for related_artist in related_artists:
        if last_added < search_limit and add_artist(
            related_artist['id'], 
            genre_artists, 
            existing_artists):
            add_related_artists(related_artist['id'], genre_artists, existing_artists)

In [None]:
#functions for getting a given artist's albums so they can be added to a dictionary

album_sample_per_artist = 5

def get_artist_albums(artist_id):
    albums_per_artist = 10
    albums = sp.artist_albums(artist_id, limit=albums_per_artist)
    albums = get_album_samples(albums['items'])
    return albums

def get_album_samples(albums):
    sample_size = min(album_sample_per_artist, len(albums))
    sample = [albums[i] for i in random.sample(range(len(albums)), sample_size)]
    return sample

def add_artist_albums(albums, artist_id):
    tracks_dict = {}
    for album in albums:
        add_album_tracks(tracks_dict, album, artist_id)
    return tracks_dict

def add_album_tracks(dictionary, album, artist_id):
    album_id = album['id']
    album_tracks = sp.album_tracks(album_id)['items']
    for track in album_tracks:
        if track['preview_url']:
            add_track(dictionary, track, artist_id)

In [None]:
#functions for adding the data of a given track to a dictionary

def download_artist_list_track_data(genre_artists):
    update_interval = 10    
    print("Getting track data for " + genre + " artists.")
    count = 0
    for artist_id in genre_artists:
        print("Adding " + sp.artist(artist_id)['name'])
        albums = get_artist_albums(artist_id)
        tracks_dict = add_artist_albums(albums, artist_id)
        save_track_data(tracks_dict)
        count += 1
        if count % update_interval == 0 or count == len(genre_artists):
            print(str(count) + "/" + str(len(genre_artists)) + " added.")

def add_track(dictionary, track, artist_id):
    artists = track['artists']
    if not artist_match(artist_id, artists):
        print("Skipping " + track['name'] + " by " + artists[0]['name'])
        return
    track_id = track['id']
    dictionary[track_id] = {}
    track_data = dictionary[track_id] 
    track_features = get_track_features(track_id)
    track_sections = get_track_sections(track_id)
    add_track_data(track_data, track, artists)
    add_track_features(track_data, track_features)
    add_track_sections(track_data, track_sections)
    add_track_popularity(track_data, track_id)
    add_artist_genre(track_data, genre)
    
#this is to avoid adding any tracks not by the target artist (e.g on compilation albums)
def artist_match(artist_id, artists):
    for artist in artists:
        if artist['id'] == artist_id:
            return True
    return False

def get_track_features(track_id):
    results = sp.audio_features(track_id)
    features = results.pop()
    return features

def get_track_sections(track_id):
    #some tracks return an exception when querying for audio analysis
    try:
        results = sp.audio_analysis(track_id)
        return results['sections']
    except Exception as e:
        track = sp.track(track_id)
        track_name = track['name']
        artist_name = track['artists'][0]['name']
        print(track_name + " by " + artist_name + " does not have audio analysis.")
        return None

def add_track_data(track_data, track, artists):
    track_data['track_name'] = track['name'].replace(',', '') #remove commas to avoid messing up csv file
    track_data['artist_names'] = [artist['name'] for artist in artists]
    track_data['artist_ids'] = [artist['id'] for artist in artists]
    track_data['uri'] = track['uri']
    track_data['duration_ms'] = track['duration_ms']
    track_data['explicit'] = np.where(track['explicit'] == True, 1, 0)
    track_data['preview_url'] = track['preview_url']

def add_track_features(track_data, features):
    if features:
        track_data['key'] = features['key']
        track_data['mode'] = features['mode']
        track_data['time_signature'] = features['time_signature']
        track_data['acousticness'] = features['acousticness']
        track_data['danceability'] = features['danceability']
        track_data['energy'] = features['energy']
        track_data['instrumentalness'] = features['instrumentalness']
        track_data['liveness'] = features['liveness']
        track_data['loudness'] = features['loudness']
        track_data['speechiness'] = features['speechiness']
        track_data['tempo'] = features['tempo']
        track_data['valence'] = features['valence']
        
def add_track_sections(track_data, sections):
    if sections:
        track_data['num_sections'] = len(sections)
        
def add_track_popularity(track_data, track_id):
    track = sp.track(track_id)
    track_data['popularity'] = track['popularity']
    
def add_artist_genre(track_data, genre):
    track_data['genre'] = genre

In [None]:
#functions for converting between dictionaries, dataframes, and csv

artist_filepath = os.path.join(sys.path[0], "metal_artist_list_data.csv")
track_filepath = os.path.join(sys.path[0], "metal_subgenre_data.csv")

def save_artist_list(artists, genre):
    df = pd.DataFrame({'id': list(artists), 'genre': genre})
    add_to_csv(df, artist_filepath)

def get_saved_artist_list(genre):
    if os.path.exists(artist_filepath):
        df = pd.read_csv(artist_filepath, error_bad_lines=False)
        df = df[df['genre'] == genre]
        artists = df['id'].unique()
        return artists.tolist()
    else:
        return []

def save_track_data(dictionary):
    dataframe = make_track_dataframe(dictionary)
    add_to_csv(dataframe, track_filepath)
    dictionary = {}

def make_track_dataframe(tracks_dict):
    df = pd.DataFrame.from_dict(tracks_dict)
    df = df.T
    df.rename(columns={"index" : "track_id"}, inplace=True)
    df.reset_index(inplace=True)
    return df

def add_to_csv(new_data, filepath):
    if not os.path.exists(filepath):
        new_data.to_csv(filepath, index=False)
    else:
        database = pd.read_csv(filepath, error_bad_lines=False)
        database = database.append(new_data, ignore_index=True)
        database.to_csv(filepath, index=False)

In [None]:
genres = ["thrash metal", "death metal", "black metal", "grindcore", "melodic death metal", 
          "power metal", "hardcore", "hard rock", "metalcore", "deathcore", "nwobhm", "doom metal", 
          "nu metal", "folk metal", "blackgaze", "glam metal", "goregrind", "industrial metal", 
          "mathcore", "progressive metal", "djent", "stoner rock", "sludge metal", "punk", 
          "crust punk", "technical death metal", "funeral doom", "symphonic metal", "drone metal", 
          "screamo", "slam death metal", "depressive black metal"]

for genre in genres: 
    genre_artists = populate_artist_list(genre)      
    genre_artists = filter_existing_artists(genre_artists)       
    download_artist_list_track_data(genre_artists)

In [None]:
spotify_data = pd.read_csv(track_filepath)
spotify_data.sample(10)

In [None]:
spotify_data['genre'].value_counts()