## SearchEngine Class

In [1]:
# Data Manipulation
import numpy as np
import pandas as pd

class SearchEngine():
    def __init__(self, data):
        '''
        SearchEngine class
        Input: a songs dataset extracted from spotify API
        '''
        self.data = data

    def target_song(self, title='', artist=''):
        '''
        Search engine function for the target song
        Input: song title and/or artist
        Output: self.target dataset song
        '''
        # transform input strings in lowercase
        title = str(title).lower()
        artist = str(artist).lower()
        # filter self.data on the desired song
        if title != '' and artist != '':
            self.target = self.data[self.data[
                'artists'].str.lower().str.contains(artist)]
            self.target = self.target[self.target[
                'name'].str.lower().str.contains(title)]
        elif title != '':
            self.target = self.data[self.data[
                'name'].str.lower().str.contains(title)]
        elif artist != '':
            self.target = self.data[self.data[
                'artists'].str.lower().str.contains(artist)]
        else:
            print('Please select a song title and artist')
        # keep only the first song result
        self.target = self.target.head(1)
        self.artist = self.target["artists"].to_string(
            index=False).strip("['").strip("']")
        self.title = self.target["name"].to_string(index=False)
        print(f'TITLE: {self.title}')
        print(f'ARTIST: {self.artist}')

## Preprocessor Class

In [2]:
# Data Manipulation
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
from sklearn.utils import shuffle

# Pipeline and Column Transformers
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import ColumnTransformer, make_column_selector, make_column_transformer
from sklearn import set_config
#set_config(display = "diagram")

# Scaling
from sklearn.preprocessing import RobustScaler, StandardScaler, MinMaxScaler, OneHotEncoder

# Package classes
# from music_similarity.search_engine import SearchEngine

class Preprocessor():
    def __init__(self, se, ae):
        '''
        Preprocessor class
        Input: a songs dataset extracted from spotify API
        '''
        # Search Engine
        self.se = se
        # Api Extractor
        self.ae = ae
        # Categorical columns
        self.categorical_features = ['key']
        # Numerical columns
        self.numeric_features = ['energy', 'loudness', 
                            'acousticness', 'instrumentalness',
                            'liveness', 
                            'valence', 'tempo', 'sp1', 
                            'sp2', 'sp3', 'sp4', 'sp5', 
                            'sp6', 'sp7', 'sp8', 'sp9', 
                            'sp10', 'sp11', 'sp12', 'tm1', 
                            'tm2', 'tm3', 'tm4', 'tm5', 
                            'tm6', 'tm7', 'tm8', 'tm9', 
                            'tm10', 'tm11', 'tm12', 'mode', 'explicit']
        self.string_features = ['name', 'artists']
        # Categorical transformer
        self.categorical_transformer = Pipeline(steps=[(
            'onehot', OneHotEncoder(handle_unknown='ignore'))])
        # Numerical transformer
        self.numeric_transformer = Pipeline(steps=[(
            'scaler', MinMaxScaler())])
        # String transformer
        # self.string_transformer = Pipeline(steps=[(
        #     'pass', 'passthrough')])
        # Transformer
        self.transformer = ColumnTransformer(
            transformers=[
                ('minmax', self.numeric_transformer, self.numeric_features),
                ('cat', self.categorical_transformer, self.categorical_features)])

    def scale_se(self):
        '''
        Adapting data with target song present in the local dataset
        '''
        # Fit and transform data and target
        self.X_mmscaled = self.transformer.fit_transform(self.se.data)
        # Only apply a transformation to the target
        self.X_target_mmscaled = self.transformer.transform(self.se.target)
        # Transform np.array to dataset
        self.X_mmscaled = pd.DataFrame(self.X_mmscaled, 
                        columns=self.transformer.get_feature_names_out())
        self.X_target_mmscaled = pd.DataFrame(self.X_target_mmscaled, 
                        columns=self.transformer.get_feature_names_out())

    def scale_ae(self):
        '''
        Adapting data with target song present in Spotify database
        '''
        # Fit and transform data and target
        self.X_mmscaled = self.transformer.fit_transform(self.se.data)
        self.X_target_mmscaled = self.transformer.transform(self.ae.df_tfa)
        # Transform np.array to dataset
        self.X_mmscaled = pd.DataFrame(self.X_mmscaled, 
                        columns=self.transformer.get_feature_names_out())
        self.X_target_mmscaled = pd.DataFrame(self.X_target_mmscaled, 
                        columns=self.transformer.get_feature_names_out())


## Playlist Class

In [8]:
# Data Manipulation
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)

# Unsupervised Learning
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

# K-nn modelling
from sklearn.neighbors import NearestNeighbors

# Package classes
# from music_similarity.search_engine import SearchEngine
# from music_similarity.preprocessor import Preprocessor

class Playlist():
    def __init__(self, preprocessor, se):
        '''
        Extractor class
        Input: preprocessor class dataset extracted from spotify API
        '''
        self.preprocessor = preprocessor
        self.se = se
        self.playlist_songs = 15

    def build_model(self):
        '''
        Model builder function
        '''
        # Define the model
        self.model=NearestNeighbors(
            n_neighbors=self.playlist_songs + 1).fit(
            self.preprocessor.X_mmscaled)
        # Extract index and distance of self.playlist_songs+1
        # number of colest songs
        self.distance, self.index=self.model.kneighbors(
            self.preprocessor.X_target_mmscaled,
            n_neighbors=self.playlist_songs + 1)
        # Copy found index rows from the original not scaled dataset
        self.playlist = self.se.data.iloc[self.index[0],:]
        self.playlist['distance'] = self.distance[0]
        # Remove the target song from the list
        self.playlist = self.playlist.tail(self.playlist_songs)
        # Ordering the playlist on distance, ascending order
        self.playlist = self.playlist.sort_values(
            by=['distance'], ascending=True, ignore_index=True)
        # Drop not necessary columns
        self.playlist = self.playlist[['name', 'artists', 'distance']]
        # Strip square brackets from the artists strings
        self.playlist['artists'] = self.playlist['artists'].apply(
            lambda x: x.strip("['").strip("']"))
        # Set starting index from 0 to 1
        self.playlist.index += 1

## API DATA query

In [18]:
# Data Manipulation
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)

# API packages
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import time

class ApiExtractor:
    def __init__(self, se):
        '''
        Extractor class
        Input: preprocessor class dataset extracted from spotify API
        '''
        # credentials need to be exported via the command line
        self.auth_manager = SpotifyClientCredentials()
        self.sp_connection = spotipy.Spotify(auth_manager=self.auth_manager)
        self.search_limit = 10
        self.se = se

    def get_track_base_attrs(self, title, artist):
        '''
        Function that returns the basic features of a desired song
        if it's not present in the local dataset
        '''
        self.ta_response = self.sp_connection.search(
            q="artist:" + artist + " track:" + title,
            type="track",
            limit=1)

        # parse attributes from track
        self.track = self.ta_response['tracks']['items'][0]
        self.track_name = self.track['name']
        self.track_uri = self.track['uri']
        self.track_popularity = self.track['popularity']
        self.track_explicit = self.track['explicit']
        self.track_artists = [artist['name'] for artist in self.track['artists']]

        # get track features
        self.track_features = self.sp_connection.audio_features(tracks = self.track_uri)[0]

        return [self.track_name, self.track_artists, self.track_popularity,
                self.track_features['danceability'], self.track_features['valence'],
                self.track_features['energy'], self.track_explicit, self.track_features['key'],
                self.track_features['liveness'], self.track_features['loudness'],
                self.track_features['speechiness'], self.track_features['tempo']]

    def get_track_full_attrs(self, title, artist):
        '''
        Function that returns the advanced features of a desired song
        if it's not present in the local dataset
        '''
        # get search features

        # Perform API search request
        time.sleep(0.001)
        # Try the query
        # try:
        self.tfa_response = self.sp_connection.search(
                                    q="artist:" + artist + " track:" + title,
                                    type="track",
                                    limit=1)
        # If there is an exception:
        # print an error log
        # except:
        #     print(f"Oops! artist & song not found")

        # Saving the track informations
        self.tfa_track = self.tfa_response['tracks']['items'][0]
        # saving uri of the track
        self.tfa_song_uri = self.tfa_track['uri']
        # saving name of the track
        self.tfa_song_name = self.tfa_track['name']
        # saving artists of the track
        self.tfa_song_artists = [artist['name'] for artist in self.tfa_track['artists']]
        # saving popularity of the track
        self.tfa_song_popularity = self.tfa_track['popularity']
        # saving explicit of the track
        self.tfa_song_explicit = self.tfa_track['explicit']

        self.df_tfa = pd.DataFrame()
        self.df_tfa['uri'] = self.tfa_song_uri
        self.df_tfa['name'] = self.tfa_song_name
        self.df_tfa['artists'] = self.tfa_song_artists
        self.df_tfa['popularity'] = self.tfa_song_popularity
        self.df_tfa['explicit'] = self.tfa_song_explicit

        # Rewrite uri and name is needed
        self.df_tfa['uri'] = self.tfa_song_uri
        self.df_tfa['name'] = self.tfa_song_name

        # get audio_analysis features

        time.sleep(0.015)
        # Perform API audio_analysis request
        # try:
        self.tfa_track_analysis = self.sp_connection.audio_analysis(
                track_id = self.tfa_song_uri)
        # except:
        #    print(f"Oops! artist & song not found")
        # Reset segments arrays before new song analysis
        self.tfa_df_song_segment = pd.DataFrame()
        
        self.tfa_segment_duration = []
        self.tfa_segment_pitch = []
        self.tfa_segment_timbre = []
        # Extract pitch and timbre for each segment of the song
        for segment in range(len(self.tfa_track_analysis['segments'])):
            self.tfa_segment_pitch.append(self.tfa_track_analysis['segments'][segment]['pitches'])
            self.tfa_segment_timbre.append(self.tfa_track_analysis['segments'][segment]['timbre'])
        # Reset df_song_segments dataframe
        self.tfa_df_song_segment = pd.DataFrame()

        # create pitch and timbre columns with np arrays
        self.tfa_df_song_segment['segment_pitch'] = self.tfa_segment_pitch
        self.tfa_df_song_segment['segment_timbre'] = self.tfa_segment_timbre
        # Split the array elements in different columns
        self.df_split_pitches = pd.DataFrame(self.tfa_df_song_segment['segment_pitch'].tolist(), columns=['sp1', 'sp2', 'sp3', 'sp4', 'sp5', 'sp6', 'sp7', 'sp8', 'sp9', 'sp10', 'sp11', 'sp12'])
        self.df_split_timbres = pd.DataFrame(self.tfa_df_song_segment['segment_timbre'].tolist(), columns=['tm1', 'tm2', 'tm3', 'tm4', 'tm5', 'tm6', 'tm7', 'tm8', 'tm9', 'tm10', 'tm11', 'tm12'])
        # Add new columns to df_segment
        self.tfa_df_song_segment = pd.concat([self.tfa_df_song_segment, self.df_split_pitches, self.df_split_timbres], axis=1)
        self.tfa_df_song_segment.drop(['segment_pitch', 'segment_timbre'], axis = 1, inplace = True)
        # Transpose mean serie into a dataframe row
        self.tfa_df_song_segment = self.tfa_df_song_segment.mean().to_frame().T

        # get audio_features features

        time.sleep(0.015)
        # Perform API audio_features request
        # try:
        self.tfa_track_features = self.sp_connection.audio_features(tracks = self.tfa_song_uri)[0]
        # except:
        #     print("Oops! artist & song with no features")
        self.tfa_df_track_features = pd.DataFrame()
        self.tfa_df_track_features = pd.DataFrame.from_dict([self.tfa_track_features])
        try:
            self.tfa_df_track_features.drop(columns=[
            'type',
            'track_href',
            'analysis_url',
            'duration_ms',
            'time_signature',
            'id',
            'uri'], inplace=True)
        except:
            print("Oops! artist & song with no features")
        # Merging all the features together
        self.df_tfa = pd.concat([self.df_tfa, self.tfa_df_track_features, self.tfa_df_song_segment], axis=1, join='inner')

    def create_df_songs(self):
        '''
        Function that create self.df_songs dataframe
        with the following features:

        - uri
        - name
        - artists
        - popularity
        - explicit
        features
        '''
        # extract the list of artists in a np array
        # artists_array from the baseline dataset
        self.artists_array = np.array([])
        for artist in self.se.data.artists.unique():
            artist_cleaned = artist.replace('"[', '').replace(']"', '')
            artist_cleaned = artist_cleaned.replace("['", '').replace("']", '')
            artist_cleaned = artist_cleaned.replace("', '", ', ')
            artist_cleaned_list = artist_cleaned.split(', ')
            self.artists_array = np.append(self.artists_array, artist_cleaned[:98])
            if len(artist_cleaned_list) > 1:
                for subartist in artist_cleaned_list:
                    self.artists_array = np.append(self.artists_array, subartist[:98])
        self.artists_array = np.unique(self.artists_array)

        # find 10 most popular songs for each artist
        # creating features np arrays to store first 5 informations
        self.songs_uri_array = np.array([])
        self.songs_name_array = np.array([])
        self.songs_artists_array = np.array([])
        self.songs_popularity_array = np.array([])
        self.songs_explicit_array = np.array([])

        # Progress counter
        counter = 0
        for artist in self.artists_array:
            # Perform API search request
            counter = counter + 1
            # Log the progress status
            if counter in np.arange(100, 7500, 200):
                print(f'{counter} / {len(self.artists_array)}')
            time.sleep(0.01)
            # print(f'{artist} ...')
            # Try the query
            try:
                self.asl_response = self.sp_connection.search(
                                    q="artist:" + artist,
                                    type="track",
                                    limit=self.search_limit)
            # If there is an exception:
            # jump to the next loop
            except:
                print(f"Oops! {artist} not valid.  Skipped...")
                continue

            # iteration over the self.search_limit songs
            for i in range(len(self.asl_response['tracks']['items'])):
                self.asl_track = self.asl_response['tracks']['items'][i]
                artists_list = [artist['name'] for artist in self.asl_track['artists']]
                artists_string = ', '.join(artists_list)
                # append uri of the track
                self.songs_uri_array = np.append(
                    self.songs_uri_array, self.asl_track['uri'])
                # append name of the track
                self.songs_name_array = np.append(
                    self.songs_name_array, self.asl_track['name'])
                # append artists of the track
                self.songs_artists_array = np.append(
                    self.songs_artists_array, artists_string)
                # append popularity of the track
                self.songs_popularity_array = np.append(
                    self.songs_popularity_array, self.asl_track['popularity'])
                # append explicit of the track
                self.songs_explicit_array = np.append(
                    self.songs_explicit_array, self.asl_track['explicit'])

        self.df_songs = pd.DataFrame()
        self.df_songs['uri'] = self.songs_uri_array
        self.df_songs['name'] = self.songs_name_array
        self.df_songs['artists'] = self.songs_artists_array
        self.df_songs['popularity'] = self.songs_popularity_array
        self.df_songs['explicit'] = self.songs_explicit_array
        #self.df_songs.to_csv('../raw_data/songs.csv')

    def create_df_audio_analysis(self):
        '''
        Function that create a self.df_analysis dataframe
        with the following features:

        - 12 pitch features
        - 12 timbre features
        - uri
        '''
        # From uri array list, query pitch and timbre features
        self.df_analysis = pd.DataFrame()
        self.df_songs_csv = pd.read_csv('../raw_data/songs.csv')
        self.songs_uri_array_csv = self.df_songs_csv['uri']
        # For testing purposes only iterate over the first 3 uri
        #for uri in self.songs_uri_array[:3]:
        #for uri in self.songs_uri_array:

        # Progress counter
        counter = 0
        for uri in self.songs_uri_array_csv:
            counter = counter + 1
            # Log the progress status
            if counter in np.arange(100, 50000, 100):
                print(f'{counter} / {len(self.songs_uri_array_csv)}')
            time.sleep(0.015)
            # print(f'{uri} ...')
            # Perform API audio_analysis request
            try:
                self.track_analysis = self.sp_connection.audio_analysis(track_id = uri)
            except:
                print(f"Oops! {uri} not valid.  Skipped...")
                continue
            # Reset segments arrays before new song analysis
            self.segment_duration = []
            self.segment_pitch = []
            self.segment_timbre = []
            # Extract pitch and timbre for each segment of the song
            for segment in range(len(self.track_analysis['segments'])):
                self.segment_pitch.append(self.track_analysis['segments'][segment]['pitches'])
                self.segment_timbre.append(self.track_analysis['segments'][segment]['timbre'])
            # Reset df_song_segments dataframe
            self.df_song_segments = pd.DataFrame()
            # create pitch and timbre columns with np arrays
            self.df_song_segments['segment_pitch'] = self.segment_pitch
            self.df_song_segments['segment_timbre'] = self.segment_timbre
            # Split the array elements in different columns
            self.df_split_pitches = pd.DataFrame(self.df_song_segments['segment_pitch'].tolist(), columns=['sp1', 'sp2', 'sp3', 'sp4', 'sp5', 'sp6', 'sp7', 'sp8', 'sp9', 'sp10', 'sp11', 'sp12'])
            self.df_split_timbres = pd.DataFrame(self.df_song_segments['segment_timbre'].tolist(), columns=['tm1', 'tm2', 'tm3', 'tm4', 'tm5', 'tm6', 'tm7', 'tm8', 'tm9', 'tm10', 'tm11', 'tm12'])
            # Add new columns to df_segment
            self.df_song_segments = pd.concat([self.df_song_segments, self.df_split_pitches, self.df_split_timbres], axis=1)
            self.df_song_segments.drop(['segment_pitch', 'segment_timbre'], axis = 1, inplace = True)
            # Transpose mean serie into a dataframe row
            self.df_song_segments = self.df_song_segments.mean().to_frame().T
            # Add uri column to perform the future merge
            self.df_song_segments['uri'] = uri
            # Add song features to the main dataframe
            self.df_analysis = pd.concat([self.df_analysis, self.df_song_segments])
        #self.df_analysis.to_csv('../raw_data/analysis.csv')

    def create_df_audio_features(self):
        '''
        Function that create a self.df_features dataframe
        with the following features:

        - danceability
        - energy
        - key
        - loudness
        - mode
        - speechiness
        - acousticness
        - instrumentalness
        - liveness
        - valence
        - tempo
        - uri
        '''
        self.df_features = pd.DataFrame()
        self.df_songs_csv = pd.read_csv('../raw_data/songs.csv')
        self.songs_uri_array_csv = self.df_songs_csv['uri']
        #for uri in self.songs_uri_array[:2]:
 
        # Progress counter
        counter = 0
        for uri in self.songs_uri_array_csv:
            counter = counter + 1
            # Log the progress status
            if counter in np.arange(100, 70000, 100):
                print(f'{counter} / {len(self.songs_uri_array_csv)}')
            time.sleep(0.015)
            # Perform API audio_features request
            try:
                self.track_features = self.sp_connection.audio_features(tracks = uri)[0]
            except:
                print(f"Oops! {uri} not valid.  Skipped...")
                continue
            self.df_track_features = pd.DataFrame()
            self.df_track_features = pd.DataFrame.from_dict([self.track_features])
            try:
                self.df_track_features.drop(columns=[
                'type',
                'track_href',
                'analysis_url',
                'duration_ms',
                'time_signature',
                'id'], inplace=True)
            except:
                print(f"Oops! {uri} has no features.  Skipped...")
            self.df_features = pd.concat([self.df_features, self.df_track_features])
        #self.df_features.to_csv('../raw_data/features.csv')

    def merge_dataframes(self):
        '''
        Function that merge all the dataframe toghether
        '''
        self.full_data = pd.merge(self.df_songs, self.df_analysis, how='outer', on='uri')
        self.full_data = pd.merge(self.full_data, self.df_features, how='outer', on='uri')

In [19]:
# if 'se' in globals():
#     del se
# spotify = pd.read_csv('../raw_data/ML_spotify_data.csv', index_col=0)
# se = SearchEngine(spotify)

# if 'ae' in globals():
#     del ae
# ae = ApiExtractor(se)
# ae.create_df_songs()
# ae.create_df_audio_analysis()
# ae.create_df_audio_features()


In [21]:
if 'se' in globals():
    del se
spotify = pd.read_csv('../raw_data/full_data.csv', index_col=0)
se = SearchEngine(spotify)
se.target_song("another one bite", "queen")

if 'ae' in globals():
    del ae
ae = ApiExtractor(se)
ae.get_track_full_attrs("another one bite", "queen")

if 'preprocessor' in globals():
    del preprocessor
preprocessor = Preprocessor(se, ae)
preprocessor.scale_ae()

if 'playlist' in globals():
    del playlist
    
playlist = Playlist(preprocessor, se)
playlist.build_model()

playlist.playlist

TITLE: Another One Bites The Dust - Remastered 2011
ARTIST: Queen


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.playlist['distance'] = self.distance[0]


Unnamed: 0,name,artists,distance
1,I'm In Love,"Evelyn ""Champagne"" King",0.507674
2,Mister Magic,"Grover Washington, Jr.",0.531467
3,Thank You,Landa Feat. Jazze PHA,0.54274
4,Cola Bottle Baby,Edwin Birdsong,0.543041
5,The Only One (feat. Angela Johnson) - Extended...,"Micky More & Andy Tee, Angela Johnson",0.552582
6,Autumn Eyes,Edwin Birdsong,0.564832
7,Black Market,Weather Report,0.576858
8,Steam,Leon Bridges,0.587435
9,"Ain't Nothin' Gonna Keep Me From You - 12"" Ver...",Teri DeSario,0.591218
10,It's a Love Thing,The Whispers,0.595445
