# Collecting My Streaming Data from Spotify's API

In [None]:
import ast
from typing import List
from datetime import datetime
from os import listdir

# Function to help us extract our data from JSON files
def get_streamings(path: str = 'MyData', 
                ) -> List[dict]:
    
    '''Returns a list of streamings form spotify MyData dump.
    Will not acquire track features.'''
    
    files = ['MyData/' + x for x in listdir(path)
             if x.split('.')[0][:-1] == 'StreamingHistory']
    
    all_streamings = []
    
    for file in files: 
        with open(file, 'r', encoding='UTF-8') as f:
            new_streamings = ast.literal_eval(f.read())
            all_streamings += [streaming for streaming in new_streamings]
            
    #adding datetime field
    for streaming in all_streamings:
        streaming['datetime'] = datetime.strptime(streaming['endTime'], '%Y-%m-%d %H:%M')    
    return all_streamings

In [None]:
import spotipy.util as util

# Making a request to the API for this users info
username = 'AAA'
client_id = 'BBB'
client_secret = 'CCC'
redirect_uri = 'http://localhost:7777/callback'
scope = 'user-read-recently-played'

token = util.prompt_for_user_token(username=username,
                                    scope=scope,
                                    client_id=client_id,
                                    client_secret=client_secret,
                                    redirect_uri=redirect_uri)

In [None]:
print(token)

In [None]:
import requests

# Function to get the id's of the songs I listen to
def get_api_id(track_info: str, token: str,
                artist: str = None) -> str:

    headers = {
    'Accept': 'application/json',
    'Content-Type': 'application/json',
    'Authorization': f'Bearer ' + token,
    }
    track_name = track_info.split("___")[0]
    params = [
    ('q', track_name),
    ('type', 'track'),
    ]
    artist = track_info.split("___")[-1]
    if artist:
        params.append(('artist', artist))
        
    try:
        response = requests.get('https://api.spotify.com/v1/search', 
                    headers = headers, params = params, timeout = 5)
        json = response.json()
        results = json['tracks']['items']
        first_result = json['tracks']['items'][0]
        # Check if searched artist is in response as the first one isn't
        # necessarily the right one
        if artist:
            for result in results:
                if artist.strip() == result['artists'][0]['name'].strip():
                    track_id = result['id']
                    return track_id
        # If specific artist is not found from results, use the first one
        track_id = first_result['id']
        return track_id
    except:
        return None

In [None]:
# Testing get_api_id
adoreyou_id = get_api_id('Adore You', token, artist = 'Harry Styles')
print(adoreyou_id)

In [None]:
import pandas as pd
import spotipy

# Function to save the ids of the songs in my Streaming History
def get_saved_ids(tracks, path: str = 'MyData/track_ids.csv') -> dict:
    track_ids = {track: None for track in tracks}
    folder, filename = path.split('/')
    if filename in listdir(folder):
        try:
            idd_dataframe = pd.read_csv('MyData/track_ids.csv', 
                                     names = ['name', 'idd'])
            idd_dataframe = idd_dataframe[1:]                    #removing first row
            added_tracks = 0
            for index, row in idd_dataframe.iterrows():
                if not row[1] == 'nan':                          #if the id is not nan
                    track_ids[row[0]] = row[1]                    #add the id to the dict
                    added_tracks += 1
            print(f'Saved IDs successfully recovered for {added_tracks} tracks.')
        except:
            print('Error. Failed to recover saved IDs!')
            pass
    return track_ids

# Function to get the API features
def get_api_features(track_id: str, token: str) -> dict:
    sp = spotipy.Spotify(auth=token)
    try:
        features = sp.audio_features([track_id])
        return features[0]
    except:
        return None

# Function to get the album names
def get_album(track_id: str, token: str) -> dict:
    sp = spotipy.Spotify(auth=token)
    try:
        album = sp.track(track_id)
        album_id = album['album']['id']
        album_name = album['album']['name']
        return album_name, album_id
    except:
        return None, None

# Function to save all the features in a CSV file
def get_saved_features(tracks, path = 'MyData/features.csv'):
    folder, file = path.split('/')
    track_features = {track: None for track in tracks}
    if file in listdir(folder):
        features_df = pd.read_csv(path, index_col = 0)
        n_recovered_tracks = 0
        for track in features_df.index:
            features = features_df.loc[track, :]
            if not features.isna().sum():          #if all the features are there
                track_features[track] = dict(features)
                n_recovered_tracks += 1
        print(f"Added features for {n_recovered_tracks} tracks.")
        return track_features
    else:
        print("Did not find features file.")
        return track_features


In [None]:
# Testing get_api_features
adoreyou_features = get_api_features(adoreyou_id, token)
print(adoreyou_features)

In [None]:
from time import sleep

def main():

    streamings = get_streamings()
    print(f'Recovered {len(streamings)} streamings.')
    
    #getting a list of unique tracks in our history
    # Add artist names too as multiple artist can have same song name
    tracks = set([f"{streaming['trackName']}___{streaming['artistName']}" for streaming in streamings])
    print(f'Discovered {len(tracks)} unique tracks.')
    
    #getting saved ids for tracks
    track_ids = get_saved_ids(tracks)
    
    #checking tracks that still miss idd
    tracks_missing_idd = len([track for track in tracks if track_ids.get(track) is None])
    print(f'There are {tracks_missing_idd} tracks missing ID.')
    
    if tracks_missing_idd > 0:
        #using spotify API to recover track ids
        #note: this methods works only for tracks. 
        #podcasts and other items will be ignored.
        print('Connecting to Spotify to recover tracks IDs.')
        sleep(3)
        id_length = 22
        for track, idd in track_ids.items(): 
            if idd is None:
                try:
                    found_idd = get_api_id(track, token)
                    track_ids[track] = found_idd
                    print(f"{found_idd:<{id_length}} : {', '.join(track.split('___'))}")
                except:
                    pass
        
        #how many tracks did we identify? 
        identified_tracks = [track for track in track_ids
                         if track_ids[track] is not None]
        print(f'Successfully recovered the ID of {len(identified_tracks)} tracks.')
        
        #how many items did we fail to identify? 
        n_tracks_without_id = len(track_ids) - len(identified_tracks)
        print(f"Failed to identify {n_tracks_without_id} items. "
              "However, some of these may not be tracks (e.g. podcasts).")
        
        #using pandas to save tracks ids (so we don't have to API them in the future)
        ids_path = 'MyData/track_ids.csv'
        ids_dataframe = pd.DataFrame.from_dict(track_ids, 
                                               orient = 'index')
        ids_dataframe.to_csv(ids_path)
        print(f'track ids saved to {ids_path}.')
    
    #recovering saved features
    track_features = get_saved_features(tracks)
    tracks_without_features = [track for track in tracks if track_features.get(track) is None]
    print(f"There are still {len(tracks_without_features)} tracks without features.")
    path = 'MyData/features.csv'
    
    #connecting to spotify API to retrieve missing features
    if len (tracks_without_features):
        print('Connecting to Spotify to extract features...')
        acquired = 0
        for track, idd in track_ids.items():
            if idd is not None and track in tracks_without_features:
                try:
                    features = get_api_features(idd, token)
                    track_features[track] = features
                    features['albumName'], features['albumID'] = get_album(idd, token)
                    if features:
                        acquired += 1
                        print(f"Acquired features: {', '.join(track.split('___'))}. Total: {acquired}")
                except:
                    features = None
        tracks_without_features = [track for track in tracks if track_features.get(track) is None]
        print(f'Successfully recovered features of {acquired} tracks.')
        if len(tracks_without_features):
            print(f'Failed to identify {len(tracks_without_features)} items. Some of these may not be tracks.')
        
        #saving features 
        features_dataframe = pd.DataFrame(track_features).T
        features_dataframe.to_csv(path)
        print(f'Saved features to {path}.')
    
    #joining features and streamings
    print('Adding features to streamings...')
    streamings_with_features = []
    for streaming in sorted(streamings, key= lambda x: x['endTime']):
        track = streaming['trackName'] + "___" + streaming['artistName']
        features = track_features.get(track)
        if features:
            streamings_with_features.append({'name': track, **streaming, **features})
    print(f'Added features to {len(streamings_with_features)} streamings.')
    print('Saving streamings...')
    df_final = pd.DataFrame(streamings_with_features)
    df_final.to_csv('MyData/final.csv')
    perc_featured = round(len(streamings_with_features) / len(streamings) *100, 2)
    print(f"Done! Percentage of streamings with features: {perc_featured}%.") 
    print("Run the script again to try getting more information from Spotify.")


if __name__ == '__main__':
    main()