# Music Recommendation Engine Notebook

## Code

In [None]:

from custom_utils import load_and_concatenate_parquet_files
from dotenv import load_dotenv
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import MinMaxScaler
from spotipy.oauth2 import SpotifyOAuth
import hdbscan
import joblib
import numpy as np
import os
import pandas as pd
import spotipy
import plotly.express as px
from sklearn.decomposition import PCA

class MusicRecommendation:
    def __init__(self,
                 weight_year: float,
                 weight_popularity: float,
                 artist_name: str = None,
                 song_name:str = None,
                 recommendation_can_be_same_artist: bool  = None,
                 number_of_recommendations: int = None,
                 use_weighted_system: bool = None,
                 filter_by_genre: str = None,
                 filter_only_by_genre: bool = None,
                 selected_cluster_algorithm: str = None,
                 ):

        # Initialize the Spotify client with authentication
        load_dotenv()
        auth_manager = SpotifyOAuth(client_id=os.getenv('SPOTIFY_CLIENT_ID'), client_secret=os.getenv('SPOTIFY_CLIENT_SECRET'), redirect_uri=os.getenv('SPOTIFY_REDIRECT_URI'), scope="user-library-read")
        self.sp = spotipy.Spotify(auth_manager=auth_manager)
        
        # Loading predefined_variables
        self.original_data = load_and_concatenate_parquet_files("data/preprocessed_spotify_data")
        
        self.kmeans_only_path = "data/kmeans_clustered_subset"
        self.hdbscan_only_path = "data/hdbscan_clustered_subset"
        self.kmeans_hdbscan_path = "data/kmeans_hdbscan_clustered_subset"
        self.metrics = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo','time_signature']
        
        # User input variables
        self.weight_year = weight_year
        self.weight_popularity = weight_popularity
        self.use_weighted_system = use_weighted_system
        self.artist_name = artist_name
        self.song_name = song_name
        self.only_recommend_new_artists = recommendation_can_be_same_artist
        self.number_of_recommendations = number_of_recommendations
        self.filter_by_genre = filter_by_genre
        self.filter_only_by_genre = filter_only_by_genre
        self.selected_cluster_algorithm = selected_cluster_algorithm
        
        self.clustered_data = None
    
    def get_user_input(self) -> None:
        """ Get user input for the recommendation system """
        if self.artist_name == None:
            while not self.artist_name or self.artist_name == "":
                self.artist_name = input("Enter the artist name: ")
                
        if self.song_name == None:
            while not self.song_name or self.song_name == "":
                self.song_name = input("Enter the song name: ")
                
        if self.only_recommend_new_artists == None:
            self.only_recommend_new_artists = input("Only recommend new artists? (yes/no): ").strip().lower() == 'yes'
            
        if self.number_of_recommendations == None:
            self.number_of_recommendations = int(input("How many songs should be recommended (<=10): "))
            while self.number_of_recommendations > 10:
                print("Invalid number")
                self.number_of_recommendations = int(input("How many songs should be recommended (<=10): "))
                
        if self.use_weighted_system == None:
            self.use_weighted_system = input("Should newer/more popular songs be preferred? (yes/no): ").strip().lower() == 'yes'
            
        if self.filter_by_genre == None:
            self.filter_by_genre = input("Enter a genre you want to filter by: ")
            
        counter = 0
        while self.filter_by_genre != "" and self.filter_by_genre not in self.original_data["genre"].unique():
            print(f"Invalid genre. Tries remaining: {3 - counter}")
            print("Please select one of the following genres:")
            print(self.original_data["genre"].unique())
            self.filter_by_genre = input("Enter a genre you want to filter by: ")
            counter += 1
            if counter == 2:
                print("Max tries reached. Not filtering by genre")
                self.filter_by_genre = ""
                break 
        
        if self.filter_only_by_genre == None and self.filter_by_genre != None and self.filter_by_genre != "":
            self.filter_only_by_genre = input("Only filter by genre and not by cluster? (yes/no): ").strip().lower() == 'yes'
            
        if self.selected_cluster_algorithm == None and not self.filter_only_by_genre:
            self.selected_cluster_algorithm = input("Which clustering algorithm to use? (kmeans/hdbscan/both): ").strip().lower()
            while self.selected_cluster_algorithm not in ["kmeans", "hdbscan", "both"]:
                print("Invalid choice")
                self.selected_cluster_algorithm = input("Which clustering algorithm to use? (kmeans/hdbscan/both): ").strip().lower()
        
        path = self.kmeans_only_path if self.selected_cluster_algorithm == "kmeans" else self.hdbscan_only_path if self.selected_cluster_algorithm == "hdbscan" else self.kmeans_hdbscan_path
        self.clustered_data = load_and_concatenate_parquet_files(path)
    
    def fetch_spotify_data(self, song_name: str, artist_name: str) -> pd.DataFrame:
        results = self.sp.search(q=f"track:{song_name} artist:{artist_name}", type="track", limit=1)
        
        if not results['tracks']['items']:
            print("Track not found")
            return pd.DataFrame()  # Return an empty DataFrame if the track is not found
        
        track = results['tracks']['items'][0]
        track_id = track['id']
        track_name = track['name']
        artist_name = track['artists'][0]['name']
        popularity = track['popularity']
        release_date = track['album']['release_date']
        year = int(release_date.split('-')[0])
        duration_ms = track['duration_ms']
        audio_features = self.sp.audio_features(track_id)[0]
        
        data = {
            'danceability': audio_features['danceability'],
            'energy': audio_features['energy'],
            'key': audio_features['key'],
            'loudness': audio_features['loudness'],
            'mode': audio_features['mode'],
            'speechiness': audio_features['speechiness'],
            'acousticness': audio_features['acousticness'],
            'instrumentalness': audio_features['instrumentalness'],
            'liveness': audio_features['liveness'],
            'valence': audio_features['valence'],
            'tempo': audio_features['tempo'],
            'time_signature': audio_features['time_signature'],
            'artist_name': artist_name,
            'track_name': track_name,
            'track_id': track_id,
            'popularity': popularity,
            'year': year,
            'duration_ms': duration_ms
        }
        
        song = pd.DataFrame([data])
        
        self.artist_name = song["artist_name"].values[0]
        self.song_name = song["track_name"].values[0]
        print("Song found on Spotify")
        self.print_preview_urls(song)
        return song
    
    def prepare_song_data(self, song: pd.DataFrame) -> pd.DataFrame:
        clustered_data = self.remove_song_from_clustered_data_if_present(song, self.clustered_data)
        
        all_songs = pd.concat([clustered_data, song], ignore_index=True) 
        all_song_standardized = self.normalize_data(all_songs, self.metrics)
        
        song_standardized = all_song_standardized[(all_song_standardized["track_name"] == self.song_name) & (all_song_standardized["artist_name"] == self.artist_name)]
        all_song_standardized = all_song_standardized.drop(song_standardized.index)
        
        return song_standardized, all_song_standardized
    
    def normalize_data(self, data: pd.DataFrame, metric_columns:list[str]) -> pd.DataFrame:
        numeric_columns = data[metric_columns].copy()
        other_columns = data.drop(columns=metric_columns).reset_index(drop=True).copy()
        
        min_max_scaler = MinMaxScaler()
        normalized_data = min_max_scaler.fit_transform(numeric_columns)
        return pd.merge(pd.DataFrame(normalized_data, columns=metric_columns), other_columns, left_index=True, right_index=True)
    
    def remove_song_from_clustered_data_if_present(self, song: pd.DataFrame, data: pd.DataFrame) -> pd.DataFrame:
        matched_rows = data[(data["track_name"] == song["track_name"].values[0]) & (data["artist_name"] == song["artist_name"].values[0])]
        if not matched_rows.empty:
            print(f"Removing {len(matched_rows)} rows from possible recommendations")
            if len(matched_rows) > 1:
                print("Rows to be removed:")
                display(matched_rows.head(5))
            indexes_to_remove = matched_rows.index
            return data.drop(indexes_to_remove)
        else:
            return data
        
    def filter_possible_recommendations(self, data, kmeans_cluster=None, hdbscan_cluster=None) -> pd.DataFrame:
        if self.filter_by_genre != "":
            data = data[data["genre"] == self.filter_by_genre]
        if self.filter_only_by_genre:
            return data
        
        if kmeans_cluster is not None:
            data = data[data["kmeans_cluster"] == kmeans_cluster]
        if hdbscan_cluster is not None:
            data = data[data["hdbscan_cluster"] == hdbscan_cluster]     
        if not self.only_recommend_new_artists:
            data = data[data["artist_name"] != self.artist_name]
        return data

    def get_song_cluster(self, song: pd.DataFrame, selected_clustering_method: str | None) -> int:
        kmeans_cluster = None
        hdbscan_cluster = None
        if selected_clustering_method is None or self.filter_only_by_genre:
            return kmeans_cluster, hdbscan_cluster
        
        if selected_clustering_method == "hdbscan":
            hdbscan_cluster = self.use_hdbscan(song, self.hdbscan_only_path)
            print(f"Predicted HDBSCAN cluster: {hdbscan_cluster}")
            
        elif selected_clustering_method == "kmeans":
            kmeans_cluster = self.use_kmeans(song, self.kmeans_only_path)
            print(f"Predicted KMeans cluster: {kmeans_cluster}")
            
        elif selected_clustering_method == "both":
            kmeans_cluster = self.use_kmeans(song, self.kmeans_hdbscan_path)
            hdbscan_cluster = self.use_hdbscan(song, self.kmeans_hdbscan_path, number=kmeans_cluster)
            print(f"Predicted KMeans cluster: {kmeans_cluster}, HDBSCAN cluster: {hdbscan_cluster}")
        return kmeans_cluster, hdbscan_cluster
    
    def use_kmeans(self, song: pd.DataFrame, path) -> int:
        kmeans_model = joblib.load(f"{path}/kmeans_model.pkl")
        kmeans_cluster = kmeans_model.predict(song[self.metrics])[0]
        return kmeans_cluster
    
    def use_hdbscan(self, song: pd.DataFrame, path, number=None) -> int:
        hdbscan_model = joblib.load(f"{path}/hdbscan_model{f'_{number}' if number is not None else ''}.pkl")
        new_data_point = song[self.metrics].values.reshape(1, -1)
        predicted_cluster, _ = hdbscan.approximate_predict(hdbscan_model, new_data_point)
        hdbscan_cluster = predicted_cluster[0]
        return hdbscan_cluster

    def find_nearest_neighbors(self, song: pd.DataFrame, possible_recommendations: pd.DataFrame) -> tuple:
        number_of_neighbors = 100 if len(possible_recommendations) > 100 else len(possible_recommendations)
        knn_model = NearestNeighbors(n_neighbors=number_of_neighbors)
        cluster_data = possible_recommendations[self.metrics]
        knn_model.fit(cluster_data)
        distances, indices = knn_model.kneighbors(song[self.metrics], n_neighbors=number_of_neighbors)
        neighbors_df = possible_recommendations.iloc[indices[0]].copy()
        neighbors_df["distance"] = distances[0]
        return neighbors_df, distances[0]

    def get_weighted_scores(self, neighbors_df: pd.DataFrame, neighbor_distances: np.ndarray) -> pd.DataFrame:
        neighbors_df = neighbors_df.copy()
        scaler = MinMaxScaler()
        neighbors_df[['year_normalized', 'popularity_normalized']] = scaler.fit_transform(
            neighbors_df[['year', 'popularity']]
        )
        
        year_normalized = neighbors_df['year_normalized'].values
        popularity_normalized = neighbors_df['popularity_normalized'].values

        base_scores = 1 / (neighbor_distances + 1e-8)  # Avoid division by zero
        boosting_scores = year_normalized * self.weight_year + popularity_normalized * self.weight_popularity
        final_scores = base_scores + boosting_scores
        neighbors_df["weighted_score"] = final_scores
   
        ranked_indices = np.argsort(final_scores)[::-1]  # Sort in descending order
        
        new_order_df = neighbors_df.iloc[ranked_indices].copy()
        
        new_order_df = new_order_df.drop(columns=['year_normalized', 'popularity_normalized'])
        return new_order_df

    def print_preview_urls(self, song_df: pd.DataFrame) -> None:
        for _, row in song_df.iterrows():
            track_id = row['track_id']
            track = self.sp.track(track_id)
            preview_url = track.get('preview_url')
            if preview_url:
                print(f"Track: {row['track_name']} by {row['artist_name']}")
                print(f"Preview URL: {preview_url}")
            else:
                print(f"Track: {row['track_name']} by {row['artist_name']}")
                print("Preview URL not available.")
    
    def prepare_results(self, neighbors_df: pd.DataFrame,song_standardized, all_song_standardized) -> pd.DataFrame:
        closest_songs = neighbors_df.head(self.number_of_recommendations)
        self.plot_pca_scatter(possible_recommendations=all_song_standardized, recommended_songs=closest_songs, user_selected_song=song_standardized)
        closest_songs = closest_songs.drop(columns=["artist_name", "track_name", "year", "popularity", "duration_ms", "genre"] + self.metrics)
        original_selected_songs = self.original_data[self.original_data["track_id"].isin(closest_songs["track_id"])].copy()
        songs_to_recommend = pd.merge(original_selected_songs, closest_songs, on="track_id")
        print("\nRecommended songs:")
        sort_value = "weighted_score" if self.use_weighted_system else "distance"
        songs_to_recommend = songs_to_recommend.sort_values(by=sort_value, ascending=sort_value == "distance")
        self.print_preview_urls(songs_to_recommend)
        return songs_to_recommend

    def plot_pca_scatter(self, possible_recommendations: pd.DataFrame, recommended_songs:pd.DataFrame, user_selected_song:pd.DataFrame, n_components=2, circle_opacity=0.5):
        
        def prepare_pca_df(data, pca, source_label):
            """Apply PCA and add relevant columns to the resulting DataFrame."""
            pca_transformed = pca.transform(data[self.metrics])
            pca_df = pd.DataFrame(pca_transformed, columns=[f'PC{i+1}' for i in range(n_components)])
            pca_df['source'] = source_label
            pca_df = pd.concat([pca_df, data[['genre', 'danceability', 'energy', 'valence', 'artist_name', 'track_name']].reset_index(drop=True)], axis=1)
            return pca_df
        
        sample_size = 100000 if len(possible_recommendations) > 100000 else len(possible_recommendations)
        possible_recommendations = possible_recommendations.sample(n=sample_size, random_state=42)
        

        pca = PCA(n_components=n_components)
        pca.fit(possible_recommendations[self.metrics])

        possible_recommendations_pca = prepare_pca_df(possible_recommendations, pca, 'all_songs')
        recommended_songs_pca = prepare_pca_df(recommended_songs, pca, 'recommended_songs')
        user_selected_song_pca = prepare_pca_df(user_selected_song, pca, 'user_selected_song')
   
        df_combined = pd.concat([possible_recommendations_pca, recommended_songs_pca, user_selected_song_pca], ignore_index=True)

        df_combined['danceability'] = df_combined['danceability'].round(4)
        df_combined['energy'] = df_combined['energy'].round(4)
        df_combined['valence'] = df_combined['valence'].round(4)
        df_combined["genre"] = df_combined["genre"].fillna("Unknown")
        
        df_combined = df_combined[~((df_combined['source'] == 'all_songs') &
          (df_combined['track_name'].isin(recommended_songs_pca["track_name"].unique())) & df_combined['artist_name'].isin(recommended_songs_pca["artist_name"].unique()))
                ]
        
        marker_shapes = {'all_songs': 'circle', 'recommended_songs': 'triangle-up', 'user_selected_song': 'cross'}
        
        fig = px.scatter(
            df_combined, 
            x='PC1', y='PC2', 
            color='source', symbol='source',
            hover_data={'PC1': False, 'PC2': False, 'genre': True, 'artist_name': True, 'track_name': True, 
                        'danceability': True, 'energy': True, 'valence': True},
            symbol_map=marker_shapes, 
            title='PCA Scatter Plot',
        )

        fig.update_traces(marker=dict(size=8))
        fig.for_each_trace(
            lambda trace: trace.update(marker=dict(opacity=circle_opacity)) if trace.name == 'all_songs' else ()
        )
        fig.for_each_trace(
            lambda trace: trace.update(marker=dict(size=16)) if trace.name != 'all_songs' else ()
        )

        hover_template = (
            "<b>%{customdata[2]}</b><br>"  # Track name
            "Artist: %{customdata[1]}<br>"
            "Genre: %{customdata[0]}<br>"
            "Danceability: %{customdata[3]:.4f}<br>"
            "Energy: %{customdata[4]:.4f}<br>"
            "Valence: %{customdata[5]:.4f}<extra></extra>"
        )

        fig.update_traces(hovertemplate=hover_template)
        fig.show()
    
    def recommend_songs(self) -> pd.DataFrame:
        self.get_user_input()
        song = self.fetch_spotify_data(self.song_name, self.artist_name)
        
        if song.empty:
            print("No match found. Please try again.")
            return None
        
        song_standardized, all_song_standardized = self.prepare_song_data(song)
        
        kmeans_cluster, hdbscan_cluster = self.get_song_cluster(song_standardized, self.selected_cluster_algorithm)
        
        possible_recommendations = self.filter_possible_recommendations(data=all_song_standardized, kmeans_cluster=kmeans_cluster, hdbscan_cluster=hdbscan_cluster)
        
        neighbors_df, neighbor_distances = self.find_nearest_neighbors(song_standardized, possible_recommendations)
        
        if self.use_weighted_system:
            neighbors_df = self.get_weighted_scores(neighbors_df, neighbor_distances)
            
        songs_to_recommend = self.prepare_results(neighbors_df=neighbors_df, song_standardized=song_standardized, all_song_standardized=all_song_standardized)
        
        return songs_to_recommend

## Get Recommendations

### Tutorial

Just execute the cell below to get recommendations.\
You will be ask to enter the following things:
- Artist Name: The Artist Name of the song you want to base the recommendations on.
- Song Name: The Name of the Song you want to base the recommendations on.
- Number of Recommendations: How many recommendations do you want?
- Can be same Artist: Can recommendations come from the same artist or do you just want to discover other artists? *(Optional)*
- Filter by Genre: You can add the name of a genre that you want the recommendations be in. *(Optional)*
- Only filter by Genre: You can decide if you want to only use Genre as a cluster. *(Optional)*
- Select Cluster Algorithm: You can select the cluster algorithm which is used for the first recommendation filtering.
- Use Weighted System: You can prioritize more popular and more recent songs if set to True. The weights can be tweaked as class parameters. *(Optional)*

All these values can either be set via **User Input Field** or **Class Input Parameters**.

*It is recommended to also add a genre filter for the best possible results.*

**Important:**\
You need to have a `.cache` and `.env` variable set up and have installed all libraries in the `requirements.txt` file.\
For for information take a look at the *README*.

In [None]:
recommended_songs_df = MusicRecommendation(weight_year=0.6,
                                weight_popularity=0.4,
                                artist_name=None,
                                song_name=None,
                                filter_by_genre=None,
                                filter_only_by_genre=None,
                                selected_cluster_algorithm=None,
                                use_weighted_system=None,
                                number_of_recommendations=None,
                                recommendation_can_be_same_artist=None
                                ).recommend_songs()
display(recommended_songs_df)