### API Initialization

In [2]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from spotipy.oauth2 import SpotifyClientCredentials
import json
import os
import csv
import time
import retrying
import pandas as pd
from surprise import Dataset, Reader, KNNBasic

#########################################################################################
#######################               INIT            ###################################
#########################################################################################
# setting API keys
# os.environ['SPOTIPY_CLIENT_ID'] = "YOUR KEY HERE"
# os.environ['SPOTIPY_CLIENT_SECRET'] = "YOUR KEY HERE"
# os.environ['SPOTIPY_REDIRECT_URI'] = "https://localhost:8888/callback"

# declaring what this app can control within a user's spotify
scope = ["user-library-read", "user-read-private", "user-library-modify", "playlist-read-private",
"playlist-read-collaborative", "user-read-currently-playing", "playlist-modify-public", "playlist-modify-private",
"user-top-read", "streaming", "user-read-recently-played", "app-remote-control"]


#Connecting to user's spotify data
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))

#get top artists and tracks as json to test
top_artists = sp.current_user_top_artists(limit=50, offset=0, time_range='short_term')
top_tracks = sp.current_user_top_tracks(limit=50, offset=0, time_range='short_term')
print(top_artists)



{'items': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/5oifjQw72WO7Jut07fVWMy'}, 'followers': {'href': None, 'total': 0}, 'genres': [], 'href': 'https://api.spotify.com/v1/artists/5oifjQw72WO7Jut07fVWMy', 'id': '5oifjQw72WO7Jut07fVWMy', 'images': [{'height': 640, 'url': 'https://i.scdn.co/image/ab6761610000e5ebb05f625e8f337fd8756d253f', 'width': 640}, {'height': 320, 'url': 'https://i.scdn.co/image/ab67616100005174b05f625e8f337fd8756d253f', 'width': 320}, {'height': 160, 'url': 'https://i.scdn.co/image/ab6761610000f178b05f625e8f337fd8756d253f', 'width': 160}], 'name': 'KAYTRAMINÉ', 'popularity': 68, 'type': 'artist', 'uri': 'spotify:artist:5oifjQw72WO7Jut07fVWMy'}, {'external_urls': {'spotify': 'https://open.spotify.com/artist/1wlzPS1hSNrkriIIwLFTmU'}, 'followers': {'href': None, 'total': 0}, 'genres': ['alternative hip hop', 'chill abstract hip hop', 'indie hip hop'], 'href': 'https://api.spotify.com/v1/artists/1wlzPS1hSNrkriIIwLFTmU', 'id': '1wlzPS1hSNrkriIIwLFTmU'

### Helper functions
For ease of data handling

In [18]:
@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_top_tracks():
    return sp.current_user_top_tracks(limit=50, offset=0, time_range='short_term')

@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_tracks(tracks):
    return sp.tracks(tracks)['tracks']

@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_audio_analysis(track_id):
    return sp.audio_analysis(track_id)['track']

@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_audio_features(track_id):
    return sp.audio_features(track_id)[0]

@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_related_artists(artist_id):
    return sp.artist_related_artists(artist_id)['artists']
    
@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_artist_albums(artist_id):
    return sp.artist_albums(artist_id)['items']

@retrying.retry(stop_max_attempt_number=3, wait_fixed=1000)
def get_album_tracks(album_id):
    return sp.album_tracks(album_id)['items']

In [19]:
def getTop50songs():
    ## gets the user's top 50 most listened to songs.
    top_tracks = get_top_tracks();
    ids = []
    for i in range(50):
        id_ = top_tracks['items'][i]['id']
        ids.append(id_)
    
    return ids
    

In [20]:
def top50songsDriver():
    ## gets top 50 songs, adds all necessary track data, and writes to file.
    print("Getting your top 50 most listened to songs...")
    top_50_ids = getTop50songs()
    print("Extracting data from your songs...")
    top_50_samples = generateSamples(top_50_ids)
    print("Writing to file...")
    writeLabelsToFile(rowFields(), "base")
    writeSongsToFile(top_50_samples, "base")
    print("Complete.")
    
    

In [21]:
def getTopArtistInfo():
    ## gets info on user's top 50 artists.
    ids = []
    names = []
    for i in range(50):
        ids.append(top_artists['items'][i]['id'])
        names.append(top_artists['items'][i]['name'])

    return ids, names

In [26]:
def generateSamples(tracks):
    # constructs all needed attributes from batch of tracks
    genAttributes = ['name', 'popularity', 'explicit']
    analysisAttributes =  ['num_samples', 'duration', 'offset_seconds', 'window_seconds', 'analysis_sample_rate', 'end_of_fade_in', 'start_of_fade_out', 
                'loudness', 'tempo', 'tempo_confidence', 'time_signature', 'time_signature_confidence', 'key', 'key_confidence', 'mode', 'mode_confidence']
    featureAttributes = ['danceability', 'energy', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence']
    samples = []
    
    track_chunks = [tracks[i:i+50] for i in range(0, len(tracks), 50)]
    
    for chunk in track_chunks:
        tracks_info = get_tracks(chunk)
        
        for track in tracks_info:
            sample = []
            
            sample.append(track['artists'][0]['name'])

            for att in genAttributes:
                if att in track:
                    sample.append(track[att])
                else:
                    sample.append(None)
                
            track_id = track['id']

            analysis = get_audio_analysis(track_id)
            time.sleep(2.5) # avoiding rate limiting
            for att in analysisAttributes:
                if analysis and att in analysis:
                    sample.append(analysis[att])
                else:
                    sample.append(None)

            features = get_audio_features(track_id)
            time.sleep(2.5) # avoiding rate limiting
            for att in featureAttributes:
                if features and att in features:
                    sample.append(features[att])
                else:
                    sample.append(None)

            samples.append(sample)
    return samples

In [27]:
def rowFields():
    return ['id', 'artist_name(s)', 'song_name', 'popularity', 'explicit?',
        'num_samples', 'duration', 'offset_seconds', 'window_seconds', 
        'analysis_sample_rate', 'end_of_fade_in', 'start_of_fade_out', 
        'loudness', 'tempo', 'tempo_confidence', 'time_signature', 
        'time_signature_confidence', 'key', 'key_confidence', 'mode', 
        'mode_confidence', 'danceability', 'energy', 'speechiness', 
        'acousticness', 'instrumentalness', 'liveness', 'valence']

In [28]:
def writeLabelsToFile(fields, fileType):
    filename = "Spotify_" + fileType + "_data.txt"
    with open(filename, 'a', encoding="utf-8") as file:
        write = csv.writer(file)
        write.writerow(fields)

In [29]:
def writeSongsToFile(data, fileType):
    filename = "Spotify_" + fileType + "_data.txt"
    with open(filename, 'a', encoding="utf-8") as file:
        write = csv.writer(file)
        for sample in data:
            write.writerow(sample)

In [30]:
def searchRelated(artists_to_search, artists_searched):
    ## takes a list of artists and returns all related artists from each artist.
    newArtistIds = []
    newArtistNames = []

    for i in artists_to_search:
        related_artists = get_related_artists(i);
        for j in range(len(related_artists)):
            if related_artists[j]['id'] not in artists_to_search and related_artists[j]['id'] not in artists_searched:
                newArtistIds.append(related_artists[j]['id'])
                newArtistNames.append(related_artists[j]['name'])

    return newArtistIds, newArtistNames

In [31]:
def baseScrape():
    """Takes top 50 artists, scrapes related artists from all 50."""
    artists_ids = []
    artists_names = []
    print("Performing base scrape...")
    for i in range(50):
        artist_id = top_tracks['items'][i]['artists'][0]['id']
        artists_searching = []
        artists_searching.append(artist_id)
        if i == 0:
            artists_ids.append(artist_id)
            artists_names.append(top_tracks['items'][i]['artists'][0]['name'])
            searchedIds, searchedNames = searchRelated(artists_searching,[])
        else:
            searchedIds, searchedNames = searchRelated(artists_searching, artists_ids)

        for j in range(len(searchedNames)):
            artists_ids.append(searchedIds[j])
            artists_names.append(searchedNames[j])

    return artists_ids, artists_names

In [32]:
def recursiveScrape(ids, names, step):
    """Takes artists/ids from base scrape, and computes a recursive scrape for x amount of steps.
        *** Not currently in use *** """

    if step == 0:
        return ids, names
    else:
        for id_ in ids:
            artist_id = id_
            artists_searching = []
            artists_searching.append(artist_id)
            searchedIds, searchedNames = searchRelated(artists_searching, ids)

            for j in range(len(searchedNames)):
                ids.append(searchedIds[j])
                names.append(searchedNames[j])

        return recursiveScrape(ids, names, step-1)

In [33]:
def getSongsFromArtist(artist_id):
    # gets all songs from an artist
    artist_track_ids = []
    ##get album ids from artist id
    artist_albums = get_artist_albums(artist_id)
    album_ids = []
    for i in range(len(artist_albums)):
        album_id = artist_albums[i]['id']
        if album_id not in album_ids:
            album_ids.append(album_id)

            ##now have all albums from artist.
            ## get every song from every album.
                
            album_tracks = get_album_tracks(album_id)
            for j in range(len(album_tracks)):
                track_id = album_tracks[j]['id']
                print("Track id added: {}".format(track_id))
                artist_track_ids.append(track_id)
                

    return artist_track_ids

In [34]:
def scrapedSongsDriver(artist_ids):
    writeLabelsToFile(rowFields(), "scraped")
    for artist in artist_ids:
        print("Gathering songs from id: {}".format(artist))
        artist_tracks = getSongsFromArtist(artist)
        artist_track_samples = generateSamples(artist_tracks)
        writeSongsToFile(artist_track_samples, "scraped")
        

In [72]:
def mainScrapingDriver():
    """Stores user's songs + public scraped songs in respective files for learning + reccomending"""

    ## get top 50 artists info
    print("Getting your top artists info...")
    ids, names = getTopArtistInfo()
    for i in range(len(ids)):
        print("{} : {}".format(ids[i], names[i]))

    # Store user songs
    print("Analyzing your top 50 songs...")
    top50songsDriver() ##gets 50 most listened to songs, writes necessary track info to file

    ## Perform base artist scrape
    print("Searching for more artists...")
    scrapedArtistIds, scrapedArtistNames = baseScrape() 

    print("Artist scrape complete.")
    print(scrapedArtistNames)
    
    print("Scraping tracks from collected Artists...")
    scrapedSongsDriver(scrapedArtistIds)
    print("All tracks gathered. Proceeding to analyze and select songs...")

In [36]:
mainScrapingDriver()

Getting your top artists info...
2exebQUDoIoT0dXA8BcN1P : Home
5oifjQw72WO7Jut07fVWMy : KAYTRAMINÉ
1wlzPS1hSNrkriIIwLFTmU : MIKE
1vmLIa1VRY38hZoar8AyYS : Chester Watson
6qgnBH6iDM91ipVXv28OMu : KAYTRANADA
17Zu03OgBVxgLxWmRUyNOJ : Knxwledge
2Ng5YIxfFxELV3scQVISlB : B. Cool-Aid
6yJ6QQ3Y5l0s0tn7b0arrO : JPEGMAFIA
2GqaakAnuhfpY4drbXrEmL : Devonwho
2pAWfrd7WFF3XhVt9GooDL : MF DOOM
5j93hwFBNo29RJMsWvtzj8 : Zelooperz
7HY1ISUuRotG01FVu0PKWh : mynameisntjmack
0Y4inQK6OespitzD6ijMwb : Freddie Gibbs
39vtb2iiz3079nqfL5nfFc : billy woods
62v3nR2gE0z1AFSFHxKepe : ぬいぐるみクレヨン Lush Crayon
40ZElxHldNyvn7x8WRC6fh : Pink Siifu
6OjtkJDlAZzlzAydEn78cK : Bktherula
4DpmPt7gfAAq7WEx0E1X8s : piri
6P7H3ai06vU1sGvdpBwDmE : Steely Dan
1peoXq0RPx7czVoFjloeDQ : MAVI
3A5tHz1SfngyOZM2gItYKu : Earl Sweatshirt
3BLx7avD36sNpMNA1nZ7Dj : Take Van
3ZMur3elMyOs248ah86NRk : Mach-Hommy
53xeKWbSRuGgTxViJTAZKC : Sideshow
78rUTD7y6Cy67W1RVzYs7t : PinkPantheress
7r8EHfxHZHU16sUV3BEH1t : Macabre Plaza
4WjLhyf6zlQ4R9v7uVDN2E : bbrain

ReadTimeout: HTTPSConnectionPool(host='api.spotify.com', port=443): Read timed out. (read timeout=5)

### Producing Reccomendations

In [13]:
from sklearn.metrics.pairwise import cosine_similarity

top_songs = pd.read_csv("Spotify_base_data.txt")
similar_songs = pd.read_csv("Spotify_scraped_data.txt")

merged_data = pd.concat([top_songs, similar_songs], ignore_index=True)

# Identify the column name representing the song identifier
song_id_column = 'id'  # Replace 'id' with the actual column name

# Filter the dataset based on relevant attributes
relevant_attributes = ['id', 'artist_name(s)', 'song_name', 'popularity', 'explicit?', 'duration', 'loudness', 
                       'tempo', 'danceability', 'energy', 'acousticness', 'time_signature', 'key', 'mode', 'speechiness',
                      'instrumentalness', 'liveness', 'valence']
filtered_data = merged_data[relevant_attributes]

# Create a user-item matrix
user_item_matrix = pd.pivot_table(filtered_data, values='popularity', index=song_id_column)

# Calculate cosine similarity
similarity_matrix = cosine_similarity(user_item_matrix.fillna(0))

# Get the indices of your top songs
your_top_song_indices = [user_item_matrix.index.get_loc(song_id) for song_id in your_top_songs]

# Calculate song similarities with your top songs
similarities_with_top_songs = pd.DataFrame(similarity_matrix[your_top_song_indices].sum(axis=0), columns=['similarity'])
similarities_with_top_songs[song_id_column] = user_item_matrix.index

# Merge with the original dataset to get song names and track IDs
recommended_songs = pd.merge(similarities_with_top_songs, filtered_data, on=song_id_column)

# Sort the songs based on similarity
recommended_songs = recommended_songs.sort_values(by='similarity', ascending=False)

# Select the top 100 songs as recommendations
recommended_songs = recommended_songs.head(100)

# Print the recommended songs
for index, row in recommended_songs.iterrows():
    print(f"Song Name: {row['song_name']}")
    print(f"Artist: {row['artist_name(s)']}")
    print(f"Similarity: {row['similarity']}")
    print(f"Track ID: {row['id']}")
    print()


Song Name: 29
Artist: Round Of Applause
Similarity: 149.0
Track ID: 2oo7

Song Name: 45
Artist: All Is Lost
Similarity: 149.0
Track ID: Surf Curse

Song Name: 46
Artist: Arrow
Similarity: 149.0
Track ID: Surf Curse

Song Name: 37
Artist: Cathy
Similarity: 149.0
Track ID: Surf Curse

Song Name: 48
Artist: Sugar
Similarity: 149.0
Track ID: Surf Curse

Song Name: 40
Artist: Lost Honor
Similarity: 149.0
Track ID: Surf Curse

Song Name: 44
Artist: Self Portrait
Similarity: 149.0
Track ID: Surf Curse

Song Name: 37
Artist: Unwell
Similarity: 149.0
Track ID: Surf Curse

Song Name: 31
Artist: Strange
Similarity: 149.0
Track ID: Surf Curse

Song Name: 44
Artist: TVI
Similarity: 149.0
Track ID: Surf Curse

Song Name: 34
Artist: Little Rock ‘n’ Roller
Similarity: 149.0
Track ID: Surf Curse

Song Name: 30
Artist: No Tomorrows
Similarity: 149.0
Track ID: Surf Curse

Song Name: 38
Artist: Fear City
Similarity: 149.0
Track ID: Surf Curse

Song Name: 37
Artist: Randall Flagg
Similarity: 149.0
Track ID