In [1]:
import requests
import json
import pandas as pd
import numpy as np

### Get a user's recently played songs

- `limit`: The maximum number of items to return. Default: 20. Minimum: 1. Maximum: 50.
- `after`: A Unix timestamp in milliseconds. Returns all items after (but not including) this cursor position. If `after` is specified, `before` must not be specified.
- `before`: A Unix timestamp in milliseconds. Returns all items before (but not including) this cursor position. If `before` is specified, `after` must not be specified.

In [2]:
def get_recent_songs(user_id, token, **kwargs):
    query = "https://api.spotify.com/v1/me/player/recently-played"
    
    if len(kwargs.keys()) > 0:
        query += '?'
        lst = [str(x[0]) + '=' + str(x[1]) for x in zip(kwargs.keys(), kwargs.values())]
        query += '&'.join(lst) ## add all parameters from kwargs 
    #print(query)

    response = requests.get(query, 
                   headers={"Accept": "application/json",
                            "Content-Type":"application/json", 
                            "Authorization":f"Bearer {token}"})
    json_response = response.json()
    print("status:", response.status_code)
    song_ids = [json_response['items'][i]['track']['id'] for i in range(len(json_response['items']))]
    return song_ids

### Get information about a song

- `market`: The market you’d like to request.

In [3]:
def get_song(user_id, token, song_id, market):
    
    ## get audio features of the song/track
    song_url = "https://api.spotify.com/v1/audio-features/"
    query = f'{song_url}{song_id}'
    success = False
    while not success:
        try:
            response = requests.get(query, 
                           headers={"Content-Type":"application/json", 
                                    "Authorization":f"Bearer {token}"})
            json_response = response.json()
            df_response = pd.json_normalize(json_response)
            success = True
        except Exception as e:
            print(e)
    
    ## get album information and the first artist
    base_url = "https://api.spotify.com/v1/tracks/"
    query = f'{base_url}{song_id}?market={market}'
    success = False
    while not success:
        try:
            response = requests.get(query, 
                           headers={"Accept": "application/json",
                                    "Content-Type":"application/json", 
                                    "Authorization":f"Bearer {token}"})
            json_response = response.json()
            album_id = json_response['album']['id'] if 'album' in json_response.keys() else None
            artist_id = json_response['artists'][0]['id'] if 'artists' in json_response.keys() else None
            success = True
        except Exception as e:
            print(e)
            
    ## get genre of the artist
    base_url = "https://api.spotify.com/v1/artists/"
    query = f'{base_url}{artist_id}'
    success = False
    while not success:
        try:
            response = requests.get(query, 
                           headers={"Accept": "application/json",
                                    "Content-Type":"application/json", 
                                    "Authorization":f"Bearer {token}"})
            json_response = response.json()
            genre = json_response['genres'] if 'genres' in json_response.keys() else None
            success = True
        except Exception as e:
            print(e)
    
    df_response['artist_id'] = artist_id if artist_id is not None and len(artist_id) >0 else None
    df_response['albuma_id'] = album_id if album_id is not None and len(album_id) >0 else None
    df_response['genre'] = [genre] if genre is not None and len(genre) > 0 else None
    try:
        return df_response.drop(['type', 'uri', 'track_href', 'analysis_url'], axis=1).set_index('id')
    except Exception:
        print("Cannot find features about this song: ", song_id)

## Generate a target playlist

- `workout_plan`: Workout plan. For example, `workout_plan=[(0, 'trough'), (8, 'peak'), (20, 'trough')]` means that in the first 8 minutes, to increase energy from "trough" to "peak" and then decrease energy to "trough" in the next 12 minutes.
- `history_args`: Parameters used to extract recently played songs. For example, `history_args = {'limit': 15, 'after': 28885117}`.
- `start_song`: The first song in the target playlist.
- `end_song`: The last song in the target playlist.
- `energy_intensity`: Define the energy intensity in a numeric way. For example, `energy_intensity = {'peak': 0.8, 'trough': 0.3}`.
- `pars_weight`: When calculating the difference between songs, we use a weighted average of the difference. 
- `genre`: If it is not None, we will filter songs based on genre.

In [4]:
def generate_target(user_id, token, market, workout_plan, history_args = {}, start_song = None, end_song = None, 
                    energy_intensity = {'peak': 0.8, 'trough': 0.3}, 
                    pars_weight = {'energy': 0.6, 'duration_ms': 0.4},
                    genre = None):
    
    convert = lambda x: 60 * 1000 * x ## convert to ms
    reconvert = lambda x: x / 60 / 1000 ## convert to minute
    intersection = lambda lst1, lst2: [value for value in lst1 if value in lst2]
    
    song_ids = get_recent_songs(user_id, token, **history_args)
    song_df = pd.concat([get_song(user_id, token, id, market) for id in song_ids])
    song_df = song_df[~song_df.index.duplicated(keep='first')] ## remove duplicate rows
    if genre is not None:
        i = 0
        while i < song_df.shape[0]:
            if song_df.genre[i] is not None and len(intersection(genre, song_df.genre[i])) < 1:
                song_df = song_df.drop(index = song_df.index[i], axis = 0)
            else:
                i += 1
    
    song_df = song_df[pars_weight.keys()]
    
    ## remove duplicates
    if workout_plan[0][1] == 'none':
        workout_plan = [workout_plan[i] for i in range(1, len(workout_plan)) 
                        if workout_plan[i][1] != workout_plan[i-1][1] and workout_plan[i][1] != 'none']
    else:
        workout_plan = [workout_plan[0]] + [workout_plan[i] for i in range(1, len(workout_plan)) 
                           if workout_plan[i][1] != workout_plan[i-1][1] and workout_plan[i][1] != 'none']

    songs = []
    total_durations = 0
    if start_song is None:
        start_energy = workout_plan[0][1]
        start_song = song_df.index[np.argmin((song_df['energy'] - energy_intensity[start_energy]).apply(abs))]
        song_df = song_df.drop(index = start_song, axis = 0)
    if end_song is None:
        end_energy = workout_plan[-1][1]
        end_song = song_df.index[np.argmin((song_df['energy'] - energy_intensity[end_energy]).apply(abs))]
        song_df = song_df.drop(index = end_song, axis = 0)

    total_durations += get_song(user_id, token, start_song, market)['duration_ms'][0]
    songs.append(start_song)
    current_energy = get_song(user_id, token, start_song, market)['energy'][0]
    
    print('Start!')
    for i in range(1, len(workout_plan)):
        next_position = convert(workout_plan[i][0])
        if i == len(workout_plan)-1:
            next_position = next_position - get_song(user_id, token, end_song, market)['duration_ms'][0]
        next_energy = energy_intensity[workout_plan[i][1]]
        trend = '+' if next_energy == energy_intensity['peak'] else '-'

        while total_durations < next_position:
            df = song_df[song_df.energy > current_energy] if trend == '+' else song_df[song_df.energy < current_energy]
            if df.shape[0] < 1:
                break
            if any(next_position - total_durations >= df.duration_ms):
                energy_weights = [0.5, 0.5]
            else:
                energy_weights = [0.2, 0.8]
            
            energy_weights = np.array(energy_weights) * pars_weight['energy']
            weight = list(energy_weights) + [pars_weight['duration_ms']]
            diff_current = df['energy'] - current_energy
            diff_next = df['energy'] - next_energy
            diff_duration = reconvert(df['duration_ms'] - (next_position - total_durations))
            diff = ((pd.DataFrame(np.c_[np.array(diff_current), np.array(diff_next), np.array(diff_duration)], index = df.index).apply(abs))*weight).sum(axis = 1).sort_values()
            song_id = diff.index[0]
            total_durations += get_song(user_id, token, song_id, market)['duration_ms'][0]
            songs.append(song_id)
            print('Add song:', song_id)
            current_energy = get_song(user_id, token, song_id, market)['energy'][0]
            df.drop(index = song_id, axis = 0)
    songs.append(end_song)
    return songs

# Test

In [5]:
# settings
token = "BQADQPMUWXVTbMZB6BYRulE_HZ45ZjYz5OGgzV5gMKEhuJjavCaQibQK2nZnLuU7uJ5Io-gpj-BPPb9GKBY6RTCVDiNUFVa8_a09aeDcS-7jvvplFStBF68JsIE4MmPo40ODOkY5YgFWUaAQnjQh1L_jTelQf87fC1IB4h3FFu-RqkSQqTQt8ipWfV8xYVPU3LMUNFhTdVgpwL4_ZwisU_hxwDGwBzzNS972BiAM-juEOdJs"
user_id = "pbwppse1hilahmk43ls424ao4"

In [6]:
market = 'US'
history_args = {'limit': 15, 'after': 28885117} ## {} means using default values
workout_plan = [(0, 'trough'), (8, 'peak'), (20, 'trough'), (28, 'peak')]
energy_intensity = {'peak': 0.8, 'trough': 0.3}
pars_weight = {'energy': 0.8, 'duration_ms': 0.2}

In [7]:
songs = generate_target(user_id, token, market, workout_plan, history_args, energy_intensity = energy_intensity, 
                    pars_weight = pars_weight)
songs

status: 200
Start!
Add song: 7bzks4LGpQUuPKBzJ6iQ7y
Add song: 6llAqB4o2GS2Sx0rKvDIK0
Add song: 7nBR4Tt431p1MTgv3lVsmX
Add song: 7dF4sXP1ISIharu35gQ86k


['4MvbRbrOEsJgdYRGNGBjTE',
 '7bzks4LGpQUuPKBzJ6iQ7y',
 '6llAqB4o2GS2Sx0rKvDIK0',
 '7nBR4Tt431p1MTgv3lVsmX',
 '7dF4sXP1ISIharu35gQ86k',
 '6mybKC52hIM1WYfp73CaOl']

See the duration for each song and the total duration

In [8]:
durations = pd.concat([get_song(user_id, token, id, market)['duration_ms'] for id in songs])
durations, durations.sum() / 60/ 1000

(id
 4MvbRbrOEsJgdYRGNGBjTE    202964
 7bzks4LGpQUuPKBzJ6iQ7y    274387
 6llAqB4o2GS2Sx0rKvDIK0    215240
 7nBR4Tt431p1MTgv3lVsmX    409558
 7dF4sXP1ISIharu35gQ86k    273904
 6mybKC52hIM1WYfp73CaOl    363415
 Name: duration_ms, dtype: int64,
 28.991133333333334)

See the energy for each song

In [9]:
pd.concat([get_song(user_id, token, id, market)['energy'] for id in songs])

id
4MvbRbrOEsJgdYRGNGBjTE    0.454
7bzks4LGpQUuPKBzJ6iQ7y    0.700
6llAqB4o2GS2Sx0rKvDIK0    0.902
7nBR4Tt431p1MTgv3lVsmX    0.685
7dF4sXP1ISIharu35gQ86k    0.655
6mybKC52hIM1WYfp73CaOl    0.744
Name: energy, dtype: float64

#### Test with genre

You can go to https://developer.spotify.com/console/get-available-genre-seeds/ to find all avaliable genres.

In [10]:
genre = ['new french touch']

In [11]:
songs = generate_target(user_id, token, market, workout_plan, history_args, energy_intensity = energy_intensity, 
                    pars_weight = pars_weight, genre = genre)
songs

status: 200
Start!
Add song: 7dF4sXP1ISIharu35gQ86k
Add song: 6Jk7mNRofCpHStChx1EYOj
Add song: 7nBR4Tt431p1MTgv3lVsmX
Add song: 7JPjG8J0sXCIh9KkWBy4vw


['6n0E5CPhtGPLW1544OlSTg',
 '7dF4sXP1ISIharu35gQ86k',
 '6Jk7mNRofCpHStChx1EYOj',
 '7nBR4Tt431p1MTgv3lVsmX',
 '7JPjG8J0sXCIh9KkWBy4vw',
 '7bzks4LGpQUuPKBzJ6iQ7y']

In [12]:
durations = pd.concat([get_song(user_id, token, id, market)['duration_ms'] for id in songs])
durations, durations.sum() / 60/ 1000

(id
 6n0E5CPhtGPLW1544OlSTg    293493
 7dF4sXP1ISIharu35gQ86k    273904
 6Jk7mNRofCpHStChx1EYOj    299013
 7nBR4Tt431p1MTgv3lVsmX    409558
 7JPjG8J0sXCIh9KkWBy4vw    297089
 7bzks4LGpQUuPKBzJ6iQ7y    274387
 Name: duration_ms, dtype: int64,
 30.790733333333332)

In [13]:
pd.concat([get_song(user_id, token, id, market)['energy'] for id in songs])

id
6n0E5CPhtGPLW1544OlSTg    0.562
7dF4sXP1ISIharu35gQ86k    0.655
6Jk7mNRofCpHStChx1EYOj    0.646
7nBR4Tt431p1MTgv3lVsmX    0.685
7JPjG8J0sXCIh9KkWBy4vw    0.937
7bzks4LGpQUuPKBzJ6iQ7y    0.700
Name: energy, dtype: float64