In [None]:
import spotipy
import json
import time
import numpy as np
import pandas as pd
from spotipy.oauth2 import SpotifyClientCredentials
from getpass import getpass
from itertools import groupby
from operator import itemgetter
from pymongo import MongoClient
from pymongo import UpdateOne
from random import randint

### Create Spotify client

In [None]:
client_id = getpass(prompt='Enter Spotify Client ID: ')
client_secret = getpass(prompt='Enter Spotify Client Secret: ')

spotify_client = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(client_id=client_id, client_secret=client_secret))

### Create MongoDB client

In [None]:
mongo_db_username = getpass(prompt='Enter MongoDB username: ')
mongo_db_password = getpass(prompt='Enter MongoDB password: ')

mongo_db_client = MongoClient(f'mongodb://{mongo_db_username}:{mongo_db_password}@localhost:27018')

# Create a new database instance
db = mongo_db_client['spotify-db']

# Create a song-collection instance from db
song_collection = db['song-collection']

# Create a artist-ids-collection instance from db
artist_collection = db['artist-collection']

### Helper functions

In [None]:
def extract_album_data(album):
    return {
        'id': album['id'],
        'name': album['name'],
        'release_date': album['release_date'],
        'total_tracks': album['total_tracks'],
        'artists': [{'id': artist['id'], 'name': artist['name']} for artist in album['artists']]
    }


def extract_artists_data(artists):
    return [{'id': artist['id'], 'name': artist['name']} for artist in artists]


def try_extract_data(item):
    try:
        return {
            'id': item['id'],
            'name': item['name'],
            'album': extract_album_data(item['album']),
            'artists': extract_artists_data(item['artists'])
        }
    except:
        return {}


def search_with_retry(query, type, field_to_extract, max_retries=5, backoff_factor=0.1):
    for i in range(max_retries):
        try:
            return spotify_client.search(q=f'{query}', type=type, limit=50)[field_to_extract]
        except Exception as e:
            print(f"Could not get results when searching '{query}'. Trying again.")
            pass
        if i < max_retries - 1:
            delay = backoff_factor * (2 ** i)
            time.sleep(delay)
    
    print(f"Max retries exceeded. Ignoring '{query}'.")
    return None


def next_with_retry(results, field_to_extract, max_retries=5, backoff_factor=0.1):
    try:
        return spotify_client.next(results)[field_to_extract]
    except Exception as e:
        print(f"Could not get next page. Current offset: '{results['offset']}'. Ignoring page at offset '{results['offset'] + 50}'.")
        return None
    
    
    # for i in range(max_retries):
    #     try:
    #         return spotify_client.next(results)[field_to_extract]
    #     except Exception as e:
    #         print(f"Could not get next page. Current offset: '{results['offset']}'. Trying again.")
    #         pass
    #     if i < max_retries - 1:
    #         delay = backoff_factor * (2 ** i)
    #         time.sleep(delay)
    
    # print(f"Max retries exceeded. Ignoring page at offset '{results['offset'] + 50}'.")
    # return None


def get_all_tracks_by_genre(genre):
    results = search_with_retry(f'genre:{genre}', 'track', 'tracks')
    if results == None:
        return []

    tracks = [try_extract_data(item) for item in results['items']]
    while results['next'] != None:
        results = next_with_retry(results, 'tracks')
        if results == None:
            break
        
        tracks += [try_extract_data(item) for item in results['items']]
    
    return tracks


def try_upsert_tracks(tracks):
    bulk_updates = []
    for track in tracks:
        try:
            id  = track['id']
            track.pop('id')
            bulk_updates.append(UpdateOne({"_id": id}, {"$set": track}, upsert=True))
        except Exception as e:
            print("MongoDB error: ", e)

    try:
        song_collection.bulk_write(bulk_updates)
    except Exception as e:
            print("MongoDB error: ", e)


def try_upsert_artists(artists):
    bulk_updates = []
    for artist in artists:
        try:
            id  = artist['id']
            artist.pop('id')
            bulk_updates.append(UpdateOne({"_id": id}, {"$set": artist}, upsert=True))
        except Exception as e:
            print("MongoDB error: ", e)

    try:
        artist_collection.bulk_write(bulk_updates)
    except Exception as e:
            print("MongoDB error: ", e)


def get_audio_features_with_retry(ids, max_retries=5, backoff_factor=0.1):
    for i in range(max_retries):
        try:
            return spotify_client.audio_features(ids)
        except Exception as e:
            print(f"Could not get audio features. Trying again.")
            pass
        if i < max_retries - 1:
            delay = backoff_factor * (2 ** i)
            time.sleep(delay)
    
    print(f"Max retries exceeded. Ignoring audio features retrieval.")
    return None


def try_enrich_tracks(tracks, max_retries=5, backoff_factor=0.1):
    ids = [track['id'] for track in tracks if track != {}]
    audio_features_list = get_audio_features_with_retry(ids)
    if audio_features_list == None:
        return
    
    return [{**track, 'audio_features': audio_features} for track, audio_features in zip(tracks, audio_features_list)]


def upsert_all_tracks(query):
    results = search_with_retry(query, 'track', 'tracks')
    if results == None:
        return

    tracks = [try_extract_data(item) for item in results['items']]
    tracks = try_enrich_tracks(tracks)
    try_upsert_tracks(tracks)

    while results['next'] != None:
        results = next_with_retry(results, 'tracks')
        if results == None:
            break
        
        tracks = [try_extract_data(item) for item in results['items']]
        tracks = try_enrich_tracks(tracks)
        try_upsert_tracks(tracks)
    
    return


def upsert_all_artist(query):
    results = search_with_retry(query, 'artist', 'artists')
    if results == None:
        return []

    artists = [{'id': item['id'], 'name': item['name'], 'tracks_retrieved': False} for item in results['items']]
    try_upsert_artists(artists)

    while results['next'] != None:
        results = next_with_retry(results, 'artists')
        if results == None:
            break
        
        artists = [{'id': item['id'], 'name': item['name'], 'tracks_retrieved': False} for item in results['items']]
        try_upsert_artists(artists)
    
    return

### Load Genres from genres.txt

In [None]:
with open('genres.txt', 'r') as f:
    genres = [g.replace('\n', '') for g in f.readlines()]

print(f"'{len(genres)}' genres found.")

### Retrieve Artists from Genres

In [None]:
nb_artists_in_mongodb = artist_collection.count_documents({})

if nb_artists_in_mongodb > 0:
    print(f"There are '{nb_artists_in_mongodb}' artists stored. No need to retrieve them again.")
else:
    for genre in genres:
        print(f"Starting retrieval of artists for genre '{genre}'.")
        upsert_all_artist(f'genre:{genre}')
        print(f"Retrieval of artists for genre '{genre}' done.")

### Retrieve all Tracks from Artists

In [None]:
nb_unhandled_artists = artist_collection.count_documents({"tracks_retrieved":False})

if nb_unhandled_artists == 0:
    print('There is no unhandled artist. No need to retrieve tracks.')
else:
    print(f"'{nb_unhandled_artists}' unhandled artists found. Starting tracks retrieval.")
    unhandled_artists = list(artist_collection.find({"tracks_retrieved":False}))
    print('Unhandled artists retrieved.')
    artist_nb = 1
    for artist in unhandled_artists:
        print(f"Starting tracks upsert for artist '{artist['_id']}'. ({artist_nb}/{nb_unhandled_artists}).")
        upsert_all_tracks(f"artist:{artist['name']}")
        print(f"Tracks upsert for artist '{artist['_id']}' done.")
        artist_collection.update_one({"_id": artist['_id']}, {"$set": {"tracks_retrieved": True}})
        print('tracks_retrieved value set to true.')
        artist_nb += 1


### Retrieve first Tracks from Artists

In [None]:
nb_unhandled_artists = artist_collection.count_documents({"tracks_retrieved":False})

if nb_unhandled_artists == 0:
    print('There is no unhandled artist. No need to retrieve tracks.')
else:
    print(f"'{nb_unhandled_artists}' unhandled artists found. Starting tracks retrieval.")
    unhandled_artists = list(artist_collection.find({"tracks_retrieved":False}))
    print('Unhandled artists retrieved.')
    artist_nb = 1
    for artist in unhandled_artists:
        print(f"Starting tracks upsert for artist '{artist['_id']}'. ({artist_nb}/{nb_unhandled_artists}).")
        results = search_with_retry(f"artist:{artist['name']}", 'track', 'tracks')
        if results == None:
            print(f"No result found for artist '{artist['_id']}'.")
            continue

        tracks = [try_extract_data(item) for item in results['items']]
        tracks = try_enrich_tracks(tracks)
        try_upsert_tracks(tracks)
        print(f"Tracks upsert for artist '{artist['_id']}' done.")
        artist_collection.update_one({"_id": artist['_id']}, {"$set": {"tracks_retrieved": True}})
        print('tracks_retrieved value set to true.')
        artist_nb += 1

In [None]:
# Display how many artists were handled

artist_collection.count_documents({"tracks_retrieved":True})

### Set audio_features on Tracks that don't have one

In [None]:
tracks_without_audio_features = list(song_collection.find({"$or":[{"audio_features": {"$exists": False}},{"audio_features": {"$eq": None}}]}))
print(f"Found '{len(tracks_without_audio_features)}' tracks without audio_features")

def try_enrich_mongodb_tracks(tracks, max_retries=5, backoff_factor=0.1):
    ids = [track['_id'] for track in tracks]
    audio_features_list = get_audio_features_with_retry(ids)
    if audio_features_list == None:
        return
    
    return [{**track, 'audio_features': audio_features} for track, audio_features in zip(tracks, audio_features_list)]

def try_upsert_tracks_bulk(mongodb_tracks):
    bulk_updates = []
    for track in mongodb_tracks:
        try:
            bulk_updates.append(UpdateOne({"_id": track['_id']}, {"$set": track}))
        except Exception as e:
            print("Error occured: ", e)

    try:
        song_collection.bulk_write(bulk_updates)
    except Exception as e:
            print("Error occured: ", e)

if len(tracks_without_audio_features) > 0:
    group_number = 1
    for tracks in np.array_split(tracks_without_audio_features, (len(tracks_without_audio_features)/100)+1, axis=0):
        print(f"Starting enrichment for group '{group_number}'.")
        tracks = try_enrich_mongodb_tracks(tracks)
        print(f"Enrichment for group '{group_number}' done.")
        try_upsert_tracks_bulk(tracks)
        print(f"Upserted '{len(tracks)}' tracks on group '{group_number}'.")
        group_number += 1