## Imports

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import requests
import json
import musicbrainzngs
import sqlalchemy
from sqlalchemy import text
from dotenv import load_dotenv
import os
import time

## Inital setup
- load env variables
- create connection to the sql database
- create a spotify API client

In [None]:
musicbrainzngs.set_useragent('application-project', '0.0.1')

In [None]:
load_dotenv()
# token = os.environ.get("ENV_VARIABLE")

SPOTIFY_CLIENT_ID = os.environ.get("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = os.environ.get("SPOTIFY_CLIENT_SECRET")
LAST_FM_API_KEY = os.environ.get("LAST_FM_API")

SQL_USERNAME = os.environ.get('SQL_USERNAME')
SQL_PASSWORD = os.environ.get('SQL_PASSWORD')
SQL_SCHEMA=os.environ.get('SQL_SCHEMA')
SQL_TABLE=os.environ.get('SQL_TABLE')
SQL_DIALECT = os.environ.get('SQL_DIALECT')
SQL_DIRVER = os.environ.get('SQL_DRIVER')               # pip install mysqlclient
SQL_HOST = os.environ.get('SQL_HOST')
SQL_PORT = os.environ.get('SQL_PORT')

In [None]:
engine = sqlalchemy.create_engine(f'{SQL_DIALECT}+{SQL_DIRVER}://{SQL_USERNAME}:{SQL_PASSWORD}@{SQL_HOST}:{SQL_PORT}')
with engine.connect() as connection:
    connection.execute(text('Create database if not exists eighties'))

In [None]:
client_credentials_manager = SpotifyClientCredentials(client_id=SPOTIFY_CLIENT_ID, client_secret=SPOTIFY_CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

## Functions

In [None]:
def get_genres(isrc, api_key=LAST_FM_API_KEY):
    '''
    Retrives the genres of a song via the Last FM API.

    First the isrc number is used to retrieve the musicbrainz id via the musicbraniz API. 
    This is used to retrive the genres later on from the Last FM API.

    Parameter
    ---------
    isrc: string
        ISRC number of the song

    api_key: string
        API key for the Last FM API

    Return
    ------
    genre_list: string
        string that contains the genres in a list
    '''
    # print(f'Get genre for isrc: {isrc}')
    genre_list = ''
    try:
        # Use musicbrainz to get the musicbraniz id (https://musicbrainz.org/doc/MusicBrainz_API)
        recordings = musicbrainzngs.get_recordings_by_isrc(isrc)
        isrcs = recordings['isrc'] if 'isrc' in recordings else ''
        recording_list = isrcs['recording-list'] if 'recording-list' in isrcs else ''
        # recording_list = musicbrainzngs.get_recordings_by_isrc(isrc)['isrc']['recording-list']
    except Exception as e:
        print('Failed to to retrieve recordings from the musicbraniz API')
        print(e)
        return genre_list
    
    # time.sleep(1)

    if len(recording_list) == 0:
        return genre_list
    elif len(recording_list) > 1:
        print(f'Multiple recordings with isrc: {isrc}')

    if 'id' in recording_list[0]:
        mbid = recording_list[0]['id']
    else:
        return genre_list

    # Use Last FM to get the genres (https://www.last.fm/api/show/track.getInfo)
    url = f"http://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={api_key}&mbid={mbid}&format=json"
    try:
        response = requests.get(url)
        data = json.loads(response.text)
    except Exception as e:
        print('Failed to retrieve genres from the Last FM API')
        print(e)
        return genre_list
    
    track = data['track'] if 'track' in data else ''
    toptags = track['toptags'] if 'toptags' in track else ''
    if 'tag' in toptags:
        genres = toptags['tag']
        for genre in genres:
            genre_list = f'{genre_list}{genre["name"]},'
        return genre_list[:len(genre_list)-1]
    else:    
        return genre_list
    
    

In [None]:
def filter_track_features(track):
    '''
    Filters the relevant features of a track in returns them in JSON object.

    Parameter
    ---------
    track: Object
        Track returend by the spotify API

    Return
    ------
    relevant_features: Object
        JSON Object that contains the relevant featues
    '''
    
    features = sp.audio_features(track['id'])[0]

    external_ids = track['external_ids'] if 'external_ids' in track else {}
    isrc = external_ids['isrc'] if 'isrc' in external_ids else ''
    
    return {
        'name': track['name'] if 'name' in track else '',
        'popularity': track['popularity'] if 'popularity' in track else '',
        'genres': get_genres(isrc) if isrc else '',
        'danceability': features['danceability'] if 'danceability' in features else '',
        'energy': features['energy'] if 'energy' in features else '',
        'key': features['key'] if 'key' in features else '',
        'loudness': features['loudness'] if 'loudness' in features else '',
        'mode': features['mode'] if 'mode' in features else '',
        'speechiness': features['speechiness'] if 'speechiness' in features else '',
        'acousticness': features['acousticness'] if 'acousticness' in features else '',
        'instrumentalness': features['instrumentalness'] if 'instrumentalness' in features else '',
        'liveness': features['liveness'] if 'liveness' in features else '',
        'valence': features['valence'] if 'valence' in features else '',
        'tempo': features['tempo'] if 'tempo' in features else '',
        'duration_ms': features['duration_ms'] if 'duration_ms' in features else '',
        'time_signature': features['time_signature'] if 'time_signature' in features else '',
        'isrc': isrc,
    }

In [None]:
def save_df_to_sql(df: pd.DataFrame, table_name=SQL_TABLE, schema=SQL_SCHEMA, if_exists='replace'):
    '''
    Saves the DataFrame in the SQL Database

    Parameter
    ---------
    df: pd.DataFrame
        DataFrame that should be saved

    table_name: string; default=SQL_TABLE (.env)
        Table name the DataFrame should be saved in.

    schema: string; default=SQL_SCHMEA (.env)
        Schema that should be used for the database

    if_exists: string; default="replace"
        Action that should be performed if the specified table already exists. Possible values are "replace", "fail", "append".
    '''
    try:
        df.to_sql(table_name, engine, schema=schema, if_exists=if_exists)
    except Exception as e:
        print(e)

In [None]:
def read_df_from_sql(table_name=SQL_TABLE, schema=SQL_SCHEMA):
    '''
    Reads a SQL table and saves it into a DataFrame.

    Parameter
    ---------
    table_name: string; default=SQL_TABLE (.env)
        Name of the table in the database

    schema: string; default=SQL_SCHEMA (.env)
        Name of the SQL schmea

    Return
    ------
    df: pd.DataFrame
        SQL table in a DatFrame
    '''
    try:
        with engine.connect() as connection:
            return pd.read_sql_table(table_name, con=connection, schema=schema)
    except Exception as e:
        print(e)

In [None]:
# Make initial request to get total number of results
def get_number_of_tracks(release_year, start_letters):
    '''
    Retrieves the number of tracks the spotfiy API returns for a specific query.

    Parameter
    ---------
    release_year: int
        Year the tracks were released

    start_letters: string
        Letters the songs start with

    Return
    ------
    num: int
        Number of tracks that spotify has data for. The max number is 1000. If 1000 is returned, it is possible that the number is higher.
    '''
    try:
        result = sp.search(q=f'year:{release_year} track:{start_letters}*', type='track', limit=1, offset=0)
        tracks = result['tracks'] if 'tracks' in result else ''
        return tracks['total'] if 'total' in tracks else 0
    except Exception as e:
        print(e)
    return 0

In [None]:
def req_query_tracks(release_year, start_letters = '', limit=50):
    '''
    Recursivley queries all tracks spotify returns for a specific query.

    Parameter
    ---------
    release_year: int
        Year the tracks were released

    start_letters: string, default=''
        Letters the songs start with
    
    limit: int; default=50
        Number of tracks that should be queried at once. Max number is 50
    '''
    global df
    alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
    for letter in alphabet:
        letters = start_letters + letter
        total_results = get_number_of_tracks(release_year, letters)
        if total_results < 1000:
            print(release_year, letters)
            # Loop through results and retrieve tracks
            offset = 0

            while offset < total_results:
                try:
                    track_features = []
                    result = sp.search(q=f'year:{release_year} track:{letters}*', type='track', limit=limit, offset=offset)
                    tracks = result['tracks'] if 'tracks' in result else ''
                    if 'items' in tracks:
                        for track in tracks['items']:
                            features = filter_track_features(track)
                            track_features.append(features)
                        offset += limit
                        df = pd.concat([df, pd.DataFrame(track_features)], ignore_index=True)
                    else:
                        continue
                except Exception as e:
                    print(e)
        else:
            req_query_tracks(release_year, letters)

## Code

In [None]:
columns = ['name', 'popularity', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature', 'isrc', 'genres']
df = pd.DataFrame(columns=columns)

In [None]:
# Set up query parameters
query = 'year:1980'
limit = 1
offset = 0

# Make initial request to get total number of results
result = sp.search(q=query, type='track', limit=1, offset=0)
total_results = result['tracks']['total']

# Loop through results and retrieve tracks
while offset < 10:
    result = sp.search(q=query, type='track', limit=limit, offset=offset)
    track_features = []
    for track in result['tracks']['items']:
        features = filter_track_features(track)
        track_features.append(features)
    offset += limit
    df = pd.concat([df, pd.DataFrame(track_features)], ignore_index=True)

In [None]:
for year in range(1980, 1990):
    req_query_tracks(year,'')