# Spotify Extended Streaming History Rehydrator 

*The aim of this code is to rehydrate the three 'extended streaming histories' gathered as part of the 'Mood Music' dataset: it extracts them from a JSON file, merges and tidies them into a format that matches the yearly streaming data gathered, and then retrieves the artist ids, artist information, and audio features from each of the files from Spotify's API*

To run this code you need to change: 
- The Spotify API client id, secret id, and url 
- The filepaths (marked as FILEPATH) to match your computer 
- occasionally, 'XXXX' denotes the three personIDs found at the beginning of the filenames; relevant changes are outlined in the comments

In [10]:
import pandas as pd
import numpy as np
import json
import os
from tqdm import tqdm
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

## Merging the JSON files 
----
*Takes the relevant JSON extended streaming history files and adds a personID column, saves as a csv, and merges into one big streaming history file*

In [None]:
#import every file beginning with the four relevant letters and merge them 

def merge_json_files(files, output_path):
    """imports every file specific to one individual and merges them into one file per personID"""
    merged_data = pd.DataFrame()

    #loop: iterates through each file in the file list ('files'), reads it in, and adds it to a merged_data fataframe for the personID in question
    for file in files:
        with open(file, 'r') as f:
            data = pd.read_json(f)
            merged_data = pd.concat([merged_data, data], ignore_index=True)

    #inserts a column containing the personID based on the filename from which it was imported 
    merged_data.insert(0, 'personID', files[0][45:49])
    # Construct the output file name (based on first file name)
    output_file = os.path.join(output_path, files[0][45:59] + '_merged.tsv')
    
    # Saves the merged data as a csv file 
    merged_data.to_csv(os.path.join(output_path, output_file), sep='\t', index=False)

In [None]:
#import every file beginning with the four relevant letters and merge them 
def merge_all_csv_files(files, output_path):
    """imports every streaming file and merges them"""
    merged_data = pd.DataFrame()
    for file in files:
        data = pd.read_csv(file,sep='\t')
        merged_data = pd.concat([merged_data, data], ignore_index=True)
    # Construct the output file name (based on first file name)
    output_file = os.path.join(output_path, 'all_merged_with_ids.tsv')

    #saves the full merged dataset as a csv file
    merged_data.to_csv(os.path.join(output_path, output_file), sep='\t', index=False)

In [None]:
#set your input and output filepaths 
file_path = (r'FILEPATH')
output = (r'OUTPUT_FILEPATH')

#creates a list of files based on the inputted personID (put one in at a time marked XXXX)
files_to_merge = [os.path.join(file_path, f) for f in os.listdir(file_path) if "XXXX" in f]

In [None]:
merge_json_files(files_to_merge, output)

In [None]:
merge_all_csv_files(files_to_merge, output)

## Tidying the dataset without metadata
-----
*Tidying the merged dataset to make it match the information found in the yearly streaming histories*

In [None]:
#imports the [above] merged file as untidy_data 
untidy_data = pd.read_csv(r'FILEPATH\all_merged_with_ids.tsv', sep="\t")

In [None]:
#removes and renames relevant columns to make it match yearly dataset 
def tidy_data(data:pd.DataFrame)-> pd.DataFrame:
    """takes the merged data and removes/modifies the columns to make it match the yearly datasets from the main streaming histories"""
    singlefile_df = pd.DataFrame(data)

    #dropping unnecessary columns
    singlefile_df.drop(['username', 'platform', 'conn_country','master_metadata_album_album_name',
       'ip_addr_decrypted', 'user_agent_decrypted','reason_start',
       'reason_end', 'shuffle', 'skipped', 'offline', 'offline_timestamp',
       'episode_name','episode_show_name', 'spotify_episode_uri',
       'incognito_mode', 'endTime', 'artistName', 'trackName', 'msPlayed'], axis = 'columns', inplace = True)
    
    #renaming necessary columns to match yearly streaming data
    singlefile_df.rename(columns={'ts':'endTime', 'ms_played':'msPlayed', 'master_metadata_track_name':'trackName',
                   'master_metadata_album_artist_name':'artistName','spotify_track_uri':'trackID'
                   }, inplace=True)
    
    #returns tidied dataset
    return(singlefile_df)

In [None]:
tidy_data = tidy_data(untidy_data)

In [None]:
#converts the endTime column into the correct format for filtering by dates
tidy_data['endTime']=pd.to_datetime(tidy_data['endTime'])

#filters out any data from before 31/12/22 to make it a similar timeframe as the yearly streaming data 
filtered_df = tidy_data.loc[(tidy_data['endTime'] >= '2022-12-31')]

In [None]:
#saves out the tidied dataset 
filtered_df.to_csv(os.path.join(r'FILEPATH', 'merged_and_tidied.tsv'), sep='\t', index=False)

## Spotify Authentication
----
*Creates an access token to Spotify's API, allowing you to make API calls for relevant information; must be done before running any code below*

In [11]:
#setting environmental variables and authenticating to Spotify using SpotifyClientCredentials [input your Spotify credentials here; for more information on how to create, see here:
#https://developer.spotify.com/documentation/web-api/concepts/apps

os.environ['SPOTIPY_CLIENT_ID']="XXXX"
os.environ['SPOTIPY_CLIENT_SECRET']="XXXX"
os.environ['SPOTIPY_REDIRECT_URI']='XXXX'

#authenticates to Spotify
spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

### Testing the token
----
*Just here to make sure everything is working correctly - the below code should take ~10s to run; if it exceeds 15s, terminate and try authenticating to Spotify again*

In [None]:
# test block to check token has worked [only run if necessary]; found from the spotipy example page: https://spotipy.readthedocs.io/en/2.24.0/
birdy_uri = 'spotify:artist:2WX2uTcsvV5OnS0inACecP'
results = spotify.artist_albums(birdy_uri, album_type='album')
albums = results['items']
while results['next']:
    results = spotify.next(results)
    albums.extend(results['items'])

for album in albums:
    print(album['name'])

## Creating the unique_tracks dataframe
----
*Extracts the unique track ids from the above file*

In [None]:
#function for creating dataframe containing unique track ids from the extended streaming history; adds in batch identifiers and sorts out the columns
tracks_batch_size = 50
audio_batch_size = 100

def unique_extractor(track_ids:pd.DataFrame) -> pd.DataFrame:
    """extracts the unique tracks from the dataframe and adds in batch identifiers; returns a dataframe containing only unique tracks"""
    track_ids_unique = pd.unique(track_ids['trackID'])
    unique_tracks_df = pd.DataFrame({'trackID':track_ids_unique})

    #adds in batch identifiers; the limit is 50 for tracks() and 100 for audio_features()
    unique_tracks_df['tracks_batch_id'] = 1 + np.floor((np.arange(len(unique_tracks_df)) /tracks_batch_size)).astype(int)
    unique_tracks_df['audio_batch_id'] = 1+np.floor((np.arange(len(unique_tracks_df))/audio_batch_size)).astype(int)

    #renames column to trackID
    unique_tracks_df.rename(columns= {'0':'trackID'})

    #creates empty NoneType indicator column for artistID (used later)
    unique_tracks_df['artistID']=None

    #returns unique_tracks dataframe 
    return(unique_tracks_df)

In [5]:
#runs the unique extractor function on the merged json extended streaming histories [fill in filepath and filename]
track_ids_full = pd.read_csv(r'FILEPATH\FILENAME.tsv', sep="\t")
unique_tracks_df = unique_extractor(track_ids_full)

In [None]:
#saves out the unique tracks dataframe as 'indicator' file 
unique_tracks_df.to_csv(os.path.join(".", "indicator.tsv"), sep = "\t", index=False)

## Retrieving Artist Ids
----
*Set of functions to retrieve the artist ids from the get.tracks() endpoint of spotify's API using the tracks() function from spotipy in batches*

In [6]:
def get_next_tracks(indicator:pd.DataFrame) -> list:
    """creates a batch of 50 trackIDs based on the filled artistID column to send to the API and returns them as a list"""
    #finds the first empty artistID row and creates a batch of 50 track_ids 
    try:
        start_position = indicator['artistID'].last_valid_index() + 1
        print(start_position)
    
    #in case of an error for the first batch of 50
    except TypeError:
        start_position = 0
    end_position = start_position + 50 
    print(end_position)

    #returns the batch 
    batch = indicator['trackID'].iloc[start_position:end_position]

    #Converts from series to list and removes the na from the series before it is turned into a list 
    batch = batch.dropna()
    batch = batch.to_list()
    
    return(batch)

In [7]:
def retrieve_artist_id(track_ids:list) -> pd.DataFrame:
    """calls the API for track information and extracts the artistID from the dictionary/list it returns"""
    #calls API for track information
    track_info = spotify.tracks(track_ids)

    #creates empty list
    artist_ids = []

    #iterates through tracks and appends artist id to artists_ids
    for track in tqdm(track_info['tracks']):
        track_id = track['uri']
        details = {
            'trackID': track_id,
            'artistID_new': track['artists'][0]['uri']
        }
        artist_ids.append(details)
    
    #creates dataframe from artist_ids 
    return(pd.DataFrame(artist_ids))

In [8]:
def read_write_artist_ids():
    """master function that runs the [above] functions on the indicator file and rewrites it with the retrieved information"""
    #reads in indicator file 
    indicator = pd.read_csv(os.path.join(".", "indicator.tsv"), sep = "\t")

    #runs above functions to retrieve the batch for the API call and then call the API for the artistID
    batch = get_next_tracks(indicator)
    new_artists = retrieve_artist_id(batch)

    #joins new_artists to indicator file
    new_indicator = pd.merge(indicator, new_artists, on='trackID', how='left')
    new_indicator['artistID'].fillna(new_indicator['artistID_new'], inplace=True)
    new_indicator.drop('artistID_new', axis=1, inplace=True)
    
    #rewrites the indicator file to include the new retrieved information
    new_indicator.to_csv(os.path.join(".", "indicator.tsv"), sep = "\t", index=False)
    
    #check if there are more rows to be filled 
    return(len(new_indicator.index) - new_indicator['artistID'].last_valid_index())

In [None]:
#runs read_write_artist_ids until it fails out, having gathered all of the information in batches according to the above code
rowsleft = 10 
while rowsleft != 0:
    rowsleft = read_write_artist_ids()

## Retrieving Artist Info
----
*Set of functions to retrieve the artist information from the get.artists() endpoint of Spotify's API using the artists() function from spotipy in batches*

In [None]:
#creates a new indicator file for the artist info with empty columns, indicator column, and unique artist ids 
    #reads in indicator 
indicator = pd.read_csv(os.path.join(".", "indicator.tsv"), sep = "\t")

    #drops any duplicate artistIDs so only left with unique values 
indicator_unique_artists = indicator.drop_duplicates(subset=['artistID'])

    #adds in empty NoneType columns for the information to be retrieved 
indicator_unique_artists['artistINFO']=None 
indicator_unique_artists['artist_genre']=None
indicator_unique_artists['artist_popularity']=None 
indicator_unique_artists['name']=None

    #saves out as artist_indicator 
indicator_unique_artists.to_csv(os.path.join(".", "artist_indicator.tsv"), sep = "\t", index=False)

In [37]:
def get_next_artists(indicator:pd.DataFrame) -> list:
    """retrieves next batch of 50 based on the empty artistINFO columns as a list"""

    #finds the first empty artist info row and creates a batch of 50 track_ids 
    try:
        start_position = indicator['artistINFO'].last_valid_index() + 1
        print(start_position)

    #in case of an error for the first batch of 50
    except TypeError:
        start_position = 0
    end_position = start_position + 50 
    print(end_position)

    #returns batch between start and end positions defined 
    batch = indicator['artistID'].iloc[start_position:end_position]

    #Converts from series to list and removes the na from the series before it is turned into a list 
    batch = batch.dropna()
    batch = batch.to_list()

    #returns the batch list 
    return(batch)

In [32]:
def retrieve_artist_info(artist_ids:list) -> pd.DataFrame:
    """retrieves the relevant artist information from the API"""

    #retrieves the artist information from the API 
    artist_info = spotify.artists(artist_ids)

    #creates artist_details empty list 
    artist_details = []

    #creates artist_details from the list/dict retrieved by spotify.artists()
    for artist in tqdm(artist_info['artists']):
        details = { 
            'artistID': artist['uri'],
            'name_new': artist['name'],
            'artist_genre_new': artist['genres'],
            'artist_popularity_new': artist['popularity'],
            'artistINFO_new': 'Retrieved'
        }
        artist_details.append(details)
    
    #returns artist_details as a dataframe 
    return(pd.DataFrame(artist_details))

In [43]:
def read_write_artist_info():
    """runs above functions to gather artistINFO and rewrite artist_indicator with the new information"""

    #imports the artist_indicator
    indicator = pd.read_csv(os.path.join(".", "artist_indicator.tsv"), sep = "\t")

    #runs the above functions to retrieve the batch of information from the API 
    batch = get_next_artists(indicator)
    new_artist_info = retrieve_artist_info(batch)

    #joins new_artist_info to indicator file
    new_indicator = pd.merge(indicator, new_artist_info, on='artistID', how='left')

    #fills in correct columns 
    new_indicator['artistINFO'].fillna(new_indicator['artistINFO_new'], inplace=True)
    new_indicator['name'].fillna(new_indicator['name_new'], inplace=True)
    new_indicator['artist_genre'].fillna(new_indicator['artist_genre_new'], inplace=True)
    new_indicator['artist_popularity'].fillna(new_indicator['artist_popularity_new'], inplace=True)

    #drops extra columns 
    dropped_cols =['artistINFO_new', 'name_new', 'artist_genre_new', 'artist_popularity_new']
    new_indicator.drop(dropped_cols, axis=1, inplace=True)
    new_indicator.to_csv(os.path.join(".", "artist_indicator.tsv"), sep = "\t", index=False)

    #checks if there are more rows to be filled 
    return(len(new_indicator.index) - new_indicator['artistINFO'].last_valid_index())

In [None]:
#runs read_write_artist_info until it fails out, having gathered all of the information in batches according to the above code
rowsleft = 10 
while rowsleft != 0:
    rowsleft = read_write_artist_info()

## Retrieving Audio Info
----
*Set of functions to retrieve the audio information from the get.audio.information() endpoint of Spotify's API using the audio_features() function from spotipy in batches*

In [111]:
#creates the audio_indicator file from indicator
    #reads in indicator 
indicator = pd.read_csv(os.path.join(".", "indicator.tsv"), sep = "\t")

    #adds empty audio info columns into indicator file to create an audio_indicator 
colnames = ['speechiness', 'danceability', 'tempo', 'energy', 'key', 'loudness', 'mode', 
            'acousticness', 'instrumentalness', 'liveness', 'valence', 'duration_ms', 'time_signature',
            'audioINFO', 'type', 'uri',	'track_href', 'analysis_url']
indicator[colnames]=None

    #saves out as audio_indicator 
indicator.to_csv(os.path.join(".", "audio_indicator.tsv"), sep = "\t", index=False)

In [5]:
def get_next_audio(indicator:pd.DataFrame) -> list:
    """defines the next batch of 100 tracks based on the next empty row of audioINFO"""

    #finds the first empty audioINFO row and creates a batch of 100 track_ids
    try:
        start_position = indicator['audioINFO'].last_valid_index() + 1
        print(start_position)
    
    #in the case of an error in the first iteration
    except TypeError:
        start_position = 0
    end_position = start_position + 100
    print(end_position)

    #creates the batch
    batch = indicator['trackID'].iloc[start_position:end_position]

    #Converts from series to list and removes the na from the series before it is turned into a list 
    batch = batch.dropna()
    batch = batch.to_list()

    #returns the batch list
    return(batch)

In [67]:
def retrieve_audio_info(track_ids:list) -> pd.DataFrame:
    """retrieves the audio features for the batch inputted"""
    
    #calls API for audio information
    audio_info = spotify.audio_features(track_ids)

    #avoids any NA values triggering the function to stop completely 
    audio_info = [entry if entry is not None else {} for entry in audio_info]
    audio_info_df = pd.DataFrame(audio_info)

    #fills in audio info indicator column
    audio_info_df['audioINFO']='Retrieved'

    #drops/renames columns 
    audio_info_df.rename(columns={'uri':'trackID'}, inplace=True)
    audio_info_df.drop(columns='id', inplace=True)

    #returns the audio_info_df
    return(audio_info_df)

In [49]:
def read_write_audio_info():
    """runs the above functions on the audio_indicator file and rewrites it with the new (retrievd) information"""

    #imports the audio_indicator file
    indicator = pd.read_csv(os.path.join(".", "audio_indicator.tsv"), sep = "\t") 

    #runs the above functions to retrieve the audioINFO from the API   
    batch=get_next_audio(indicator)
    new_audio = retrieve_audio_info(batch)

    #merges the indicator dataframe with the retrieved information dataframe, and marks duplicate columns with suffixes _og (for old info) and _new (for new info)
    merged_df = pd.merge(indicator, new_audio, on='trackID', how='left', suffixes=('_og', '_new'))

    #goes through duplicate columns and keeps the one containing new information, then drops the duplicates
    for col in colnames:
        merged_df[col] = merged_df[f'{col}_og'].combine_first(merged_df[f'{col}_new'])
    merged_df = merged_df.drop(columns=[f'{col}_og' for col in colnames] + 
                                            [f'{col}_new' for col in colnames])
    
    #rewrites the audio_indicator file with the new information
    merged_df.to_csv(os.path.join(".", "audio_indicator.tsv"), sep = "\t", index=False)

    #checks if there are more batches left
    return(len(merged_df.index) - merged_df['audioINFO'].last_valid_index())


In [None]:
#runs read_write_audio_info until it fails out, having gathered all of the information in batches according to the above code
rowsleft = 10
while rowsleft != 0:
    result = read_write_audio_info() 

## Merging artist and audio information
----
*Tidies up and merges the artist and audio indicators, and combines them back into the full track ids dataset*

In [55]:
def unique_audio_artist_merger(audio:pd.DataFrame, artists:pd.DataFrame):
    """tidies up and merges the audio and artist indicator files based on artistID; produces a unique_metadata file containing all relevant information for unique tracks"""
    
    #tidies up audio empty & unecessary columns
    audio.drop(columns=['artistINFO', 'artist_genre', 'artist_popularity', 'name'])
    artists.drop(columns='trackID', inplace=True)

    #merges audio and artist files
    master_info_df = pd.merge(audio, artists, on='artistID', how='left', suffixes=('_audio', '_artist'))
    #tidies up duplicate columns 
    master_info_df = master_info_df.drop(columns=[col for col in master_info_df.columns if col.endswith('_audio')])

    master_info_df.drop(columns =['tracks_batch_id_artist', 'audio_batch_id_artist', 'uri', 'audioINFO', 'artistINFO_artist'], inplace=True)
    master_info_df.rename(columns = {'artist_genre_artist':'artist_genre', 'name_artist':'name', 'artist_popularity_artist':'artist_popularity'}, inplace=True)

    #saves out as a file containing artist and audio information for unique tracks 
    master_info_df.to_csv(os.path.join(".", "unique_metadata.tsv"), sep = "\t", index=False)

In [56]:
#loads in the artists and audio information indicators created above 
artists = pd.read_csv(os.path.join(".", "artist_indicator.tsv"), sep = "\t")
audio = pd.read_csv(os.path.join(".", "audio_indicator.tsv"), sep = "\t")

#runs the function on the dataframes loaded in above 
unique_audio_artist_merger(audio, artists)

In [78]:
def back_to_full_df(unique_metadata:pd.DataFrame, track_ids_full:pd.DataFrame):
    """merges the unique dataset back into the main dataset containing duplicates of the trackIDs based on listening activity"""

    #merges the unique and full dataframes; denotes duplicate columns with the suffixes _og for the full dataset and _retrieved for the unique one
    master_metadata = pd.merge(track_ids_full, unique_metadata, on='trackID', how='left', suffixes=('_og', '_retrieved'))

    #matches column names to non-extended datasets 
    master_metadata.drop(columns =['analysis_url', 'track_href'], inplace = True)
    master_metadata.rename(columns ={'artist_genre':'genres', 'artist_popularity':'popularity', 'name':'returned_artist'}, inplace = True)
    master_metadata['returned_track']=None

    #saves out as complete_metadata file 
    master_metadata.to_csv(os.path.join(".", "complete_metadata.tsv"), sep = "\t", index=False)

In [81]:
#imports full track ids dataframe and unique track ids dataframe [remember to replace FILEPATH with your path]
track_ids_full = pd.read_csv(r'FILEPATH\merged_and_tidied.tsv', sep="\t")
unique_metadata = pd.read_csv(os.path.join(".", "unique_metadata.tsv"), sep = "\t")

#runs the above function to create the complete_metadata file 
back_to_full_df(unique_metadata, track_ids_full)

In [91]:
#imports the complete_metadata file 
master_metadata = pd.read_csv(os.path.join(".", "complete_metadata.tsv"), sep = "\t")

#provides a list of unique personIDs in dataset
pd.unique(master_metadata['personID'])

#saves out as unique files for each personID input [go through each of the 3 unique personIDs]
filtered_df = master_metadata[master_metadata['personID'] =='XXXX'] 
filtered_df.to_csv(os.path.join(".", "XXXX_hydrated.tsv"), sep = "\t", index=False)