## Import Modules

In [3]:
import pandas as pd
import json
from sklearn import preprocessing
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from chunkdot import CosineSimilarityTopK

## Import challenge data

In [15]:
# Load JSON data from file
with open('data/challenge_set.json', 'r') as file: # Replace with local dataset path
    data = json.load(file)

# Initialize an empty list to collect all track data
all_tracks = []

# Loop through each playlist in the dataset
for playlist in data['playlists']:
    for track in playlist['tracks']:
        # Add playlist-level information to each track record
        track_info = {
            'playlist_name': playlist.get('name', 'Unknown'),
            'playlist_pid': playlist['pid'],
            'playlist_num_tracks': playlist['num_tracks'],
            'track_pos': track['pos'],
            'artist_name': track['artist_name'],
            'track_uri': track['track_uri'],
            'artist_uri': track['artist_uri'],
            'track_name': track['track_name'],
            'album_uri': track['album_uri'],
            'duration_ms': track['duration_ms'],
            'album_name': track['album_name']
        }
        all_tracks.append(track_info)

# Convert the list of track dictionaries to a DataFrame
df_spotify = pd.DataFrame(all_tracks)

## Import and clean additional data

### Import and clean lyrics

In [5]:
from langdetect import detect

# Import lyrics
df_lyrics = pd.read_csv('lyrics.csv')

# Detect language for every lyric in df_lyrics
detected_languages = [detect(text) for text in df_lyrics['lyrics']]

# Append the list of detected languages to the DataFrame as a new column
df_lyrics['lan_lyrics'] = detected_languages

# Drop rows with lyrics that are not in english (3449 non english lyrics)
index_lyrics = df_lyrics[ (df_lyrics['lan_lyrics'] != 'en')].index
df_lyrics.drop(index_lyrics , inplace=True)

# Remove lyrics that are no lyrics
rows_to_remove = str("abcdefghijklmnopqrst|by year: |the notorious b.i.g.'s songs|highest to lowest|total:")

# Drop rows with incorrect lyrics (2126 incorrect lyrics)
index_remove = df_lyrics.loc[df_lyrics.lyrics.str.contains(rows_to_remove),:].index
df_lyrics.drop(index_remove , inplace=True)

### Create and build word vectors for lyrics

In [6]:
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

#Link: https://www.kaggle.com/code/zeeshanlatif/countvectorizer-vs-tfidfvectorizer

# Removing stopwords from the data
stop_words = stopwords.words("english")
df_lyrics['lyrics'] = df_lyrics['lyrics'].apply(lambda x: " ".join(word for word in x.split() if word not in stop_words))

# applying lemmatization
wnl = WordNetLemmatizer()
df_lyrics['lyrics'] = df_lyrics['lyrics'].apply(lambda x: " ".join(wnl.lemmatize(word) for word in x.split()))

# Build vectors
from sklearn.feature_extraction.text import TfidfVectorizer

# Initialize a TF-IDF Vectorizer
vectorizer = TfidfVectorizer(lowercase=True)
tfidf_matrix = vectorizer.fit_transform(df_lyrics['lyrics'])
tfidf_matrix.shape

(58405, 127753)

### Create sentiment scores with Vader

In [7]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Initialize the VADER sentiment intensity analyzer
analyzer = SentimentIntensityAnalyzer()

# Function to get the compound sentiment score
def get_sentiment(text):
    score = analyzer.polarity_scores(text)
    return score['compound']  # Return the compound score

# Apply the sentiment analysis function to the cleaned lyrics
df_lyrics['vader_sentiment_score'] = df_lyrics['lyrics'].apply(get_sentiment)

def assign_sentiment(score):
    if score > 0.05:
        return 'positive'
    elif score < -0.05:
        return 'negative'
    else:
        return 'neutral'

# Apply sentiment label assignment
df_lyrics['vader_sentiment_label'] = df_lyrics['vader_sentiment_score'].apply(assign_sentiment)

### Combine lyrics and original data set

In [16]:
df_spotify = df_spotify.merge(df_lyrics, on='track_uri', how='left')

# After the merge there are 22322 rows without lyrics which are removed here
df_spotify.dropna(inplace=True)

In [17]:
df_spotify.shape

(251731, 15)

### Import genre data

In [20]:
df_genres = pd.read_csv('dummy_encoded_genres.csv')
#df_genres.drop(labels='Unnamed: 0', axis=1, inplace=True)

# Merge genre with the rest
df_spotify = df_spotify.merge(df_genres, on='artist_uri', how='left')

In [66]:
#df_sentiment = pd.read_csv('data/sentiment_data.csv')

## Create cosine similarity model

### Drop unwanted features 

In [25]:
drop_list = ['playlist_name', 'playlist_num_tracks', 'artist_name', 'track_uri',
             'track_name', 'duration_ms', 'album_name', 'lyrics', 'lan_lyrics', 'vader_sentiment_score'] 

In [26]:
df_spotify_pipeline = df_spotify.drop(columns=drop_list)

### Set up pipline

In [27]:
#numeric_features = ['vader_sentiment_score']
categorical_features = ['playlist_pid', 'artist_uri', 'album_uri', 'track_pos', 'vader_sentiment_label']

#numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())])
categorical_transformer = Pipeline(steps=[("encoder", OneHotEncoder())])

preprocessor = ColumnTransformer(
    transformers=[
        #("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

cos_sim = CosineSimilarityTopK(top_k=50)

cos_sim_pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("cos_sim", cos_sim)])

cos_sim_pipeline

### Compute cosine similarity matrix

In [28]:
# Compute cos_sim matrix
sim_matrix = cos_sim_pipeline.fit_transform(df_spotify_pipeline)

# Convert csr.matrix to Dataframe
sim_matrix_df = pd.DataFrame.sparse.from_spmatrix(sim_matrix)
sim_matrix_df.shape

(251731, 251731)

In [2]:
# code for exporting/storing the cos sim matrix
""" # code from here: https://stackoverflow.com/questions/75158465/saving-large-sparse-arrays-in-hdf5-using-pickle
import numpy as np
import scipy as sp

# Save sparse matrix
sp.sparse.save_npz('sparse_matrix.npz', sim_matrix) # documentation: https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.save_npz.html
# load matrix e.g. in a different file a and assign to a dataframe
sim_matrix_test = sp.sparse.load_npz('sparse_matrix.npz')
sim_matrix_test_df = pd.DataFrame.sparse.from_spmatrix(sim_matrix_test) """

" # code from here: https://stackoverflow.com/questions/75158465/saving-large-sparse-arrays-in-hdf5-using-pickle\nimport numpy as np\nimport scipy as sp\n\n# Save sparse matrix\nsp.sparse.save_npz('sparse_matrix.npz', sim_matrix) # documentation: https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.save_npz.html\n# load matrix e.g. in a different file a and assign to a dataframe\nsim_matrix_test = sp.sparse.load_npz('sparse_matrix.npz')\nsim_matrix_test_df = pd.DataFrame.sparse.from_spmatrix(sim_matrix_test) "

## Make recommendations

In [49]:
""" # Code from neuefische google colab; modified so it works with duplicates for track input
# Build index with track identifiers
track_uri = df_spotify['track_uri']
indices = pd.Series(df_spotify.index, index=df_spotify['track_uri'])

# Function that get track recommendations based on the cosine similarity 
def track_recommendations(track):

    #get the index of the track we put into the function
    idx = indices[track].iloc[0]

    #calculate all cosine similarities to that track and store it in a list
    sim_scores = list(enumerate(sim_matrix_df[idx]))

    #sort the list staring with the highest similarity
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    #sim_scores = sim_scores.drop_duplicates(keep='first')

    # get the similarities from 1:1001 (not starting with 0 because it is the same track)
    # We overshoot here on purpose so there is leeway to remove duplicates and still end up 
    # with the correct amount of predictions to return (this is a very lazy fix...)
    sim_scores = sim_scores[1:1001]

    #get the indeces of that 1000 tracks
    track_indices = [i[0] for i in sim_scores]

    # Remove duplicates from our selection of 1000 tracks and
    # return the track uris of a duplicate-free subset of 500 tracks
    recommended_tracks = track_uri.iloc[track_indices].drop_duplicates(keep='first').iloc[:4]
    return recommended_tracks """


# Code from neuefische google colab; modified so it works with duplicates for track input

# Build index with track uris
track_uri = df_spotify['track_uri']
indices = pd.Series(df_spotify.index, index=df_spotify['track_uri'])

# Function that get track recommendations based on the cosine similarity 
def track_recommendations(track):

    #get the index of the track we put into the function
    idx = indices[track].iloc[0]

    #catch duplicates
    # duplicates = indices[track].iloc[1:]
    # dup_list = list(enumerate(sim_matrix_df[duplicates]))

    #calculate all cosine similarities to that track and store it in a list
    sim_scores = list(enumerate(sim_matrix_df[idx]))

    #sort the list staring with the highest similarity
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    #sim_scores = sim_scores.drop_duplicates(keep='first')

    # get the similarities from 1:1001 (not starting with 0 because it is the same track)
    # We overshoot here on purpose so there is leeway to remove duplicates and still end up 
    # with the correct amount of predictions to return (this is a very lazy fix...)
    sim_scores = sim_scores[1:100]

    #get the indeces of that 1000 tracks
    track_indices = [i[0] for i in sim_scores]

    # Remove duplicates from our selection of 1000 tracks and
    # return the track uris of a duplicate-free subset of 500 tracks
    recommended_tracks = track_uri.iloc[track_indices].drop_duplicates(keep='first').iloc[1:4]
    return recommended_tracks.to_list()


In [50]:
# Get recommendations for multiple tracks
track_list = ['spotify:track:0uppYCG86ajpV2hSR3dJJ0', 'spotify:track:3d9DChrdc6BOeFsbrZ3Is0']
recommended = []
for track in track_list:
    print(track)
    x = track_recommendations(track)
    for i in x:
        recommended.append(i)

# Here the second track throws an error
# track_list = ['spotify:track:0uppYCG86ajpV2hSR3dJJ0', 'spotify:track:2pAho4WqtK5hQtgImHzT74']

spotify:track:0uppYCG86ajpV2hSR3dJJ0
spotify:track:3d9DChrdc6BOeFsbrZ3Is0


In [51]:
from CreatePlaylist import CreatePlaylist

spotify_api = CreatePlaylist()
my_playlist = spotify_api.create_playlist(name="Test für Robin", description="My new Sentify playlist")
test_uri = recommended
spotify_api.add_tracks_to_playlist(my_playlist['id'], test_uri)