<a href="https://colab.research.google.com/github/gve0456/Emotion-Detection-and-Music-Recommended-system/blob/main/millionsong_new.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import pandas as pd
import numpy as np
import kagglehub
import joblib
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from scipy import stats
from google.colab import drive
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
dataset_path = kagglehub.dataset_download("undefinenull/million-song-dataset-spotify-lastfm")
extract_path = dataset_path

print(f"Dataset downloaded and extracted to: {extract_path}")

Using Colab cache for faster access to the 'million-song-dataset-spotify-lastfm' dataset.
Dataset downloaded and extracted to: /kaggle/input/million-song-dataset-spotify-lastfm


In [None]:
dataframes = {}

for root, dirs, files in os.walk(extract_path):
    for file in files:
        file_path = os.path.join(root, file)
        if file.endswith(".csv") or file.endswith(".tsv"):
            try:
                if file.endswith(".csv"):
                    df_temp = pd.read_csv(file_path)
                else:
                    df_temp = pd.read_csv(file_path, sep="\t")
                dataframes[file] = df_temp
                print(f"Loaded {file} with shape {df_temp.shape}")
            except Exception as e:
                print(f"Could not load {file}: {e}")


if 'Music Info.csv' in dataframes:
    df = dataframes['Music Info.csv'].copy()
    print("\nProcessing 'Music Info.csv':")
    print(df.head())
else:
    print("\n'Music Info.csv' not found in the dataset.")
    df = None

Loaded Music Info.csv with shape (50683, 21)
Loaded User Listening History.csv with shape (9711301, 3)

Processing 'Music Info.csv':
             track_id             name           artist  \
0  TRIOREW128F424EAF0   Mr. Brightside      The Killers   
1  TRRIVDJ128F429B0E8       Wonderwall            Oasis   
2  TROUVHL128F426C441  Come as You Are          Nirvana   
3  TRUEIND128F93038C4      Take Me Out  Franz Ferdinand   
4  TRLNZBD128F935E4D8            Creep        Radiohead   

                                 spotify_preview_url              spotify_id  \
0  https://p.scdn.co/mp3-preview/4d26180e6961fd46...  09ZQ5TmUG8TSL56n0knqrj   
1  https://p.scdn.co/mp3-preview/d012e536916c927b...  06UfBBDISthj1ZJAtX4xjj   
2  https://p.scdn.co/mp3-preview/a1c11bb1cb231031...  0keNu0t0tqsWtExGM3nT1D   
3  https://p.scdn.co/mp3-preview/399c401370438be4...  0ancVQ9wEcHVd0RrGICTE4   
4  https://p.scdn.co/mp3-preview/e7eb60e9466bc3a2...  01QoK9DA7VTeTSE3MNzp4I   

                               

In [None]:
if df is not None:
    print("\nHandling missing values...")
    threshold = 0.5
    df = df.loc[:, df.isnull().mean() < threshold].copy()

    for col in df.columns:
        if df[col].dtype in ["float64", "int64"]:
            df[col] = df[col].fillna(df[col].median())
        else:
            if not df[col].mode().empty:
                df[col] = df[col].fillna(df[col].mode()[0])

    print("Missing values after cleaning:", df.isnull().sum().sum())


Handling missing values...
Missing values after cleaning: 0


In [None]:
if df is not None:
    print("\nRemoving outliers...")
    numeric_cols = df.select_dtypes(include=["int64", "float64"]).columns
    if not numeric_cols.empty:
        z_scores = np.abs(stats.zscore(df[numeric_cols]))
        initial_rows = df.shape[0]
        df = df[(z_scores < 3).all(axis=1)].copy()
        print(f"Shape after outlier removal: {df.shape} ({initial_rows - df.shape[0]} rows removed)")
    else:
        print("No numeric columns found for outlier removal.")


Removing outliers...
Shape after outlier removal: (45264, 20) (5419 rows removed)


In [None]:
if df is not None:
    print("\nHandling infinite values...")
    numeric_cols_after_outliers = df.select_dtypes(include=["int64", "float64"]).columns
    if not numeric_cols_after_outliers.empty:
        df.replace([np.inf, -np.inf], np.nan, inplace=True)
        for col in numeric_cols_after_outliers:
             if df[col].isnull().any():
                 finite_median = df[col][np.isfinite(df[col])].median()
                 df[col] = df[col].fillna(finite_median)

        print("Infinite values and resulting NaNs handled.")
    else:
        print("No numeric columns to handle infinite values.")


Handling infinite values...
Infinite values and resulting NaNs handled.


In [None]:
if df is not None:
    print("\nEncoding categorical features...")
    label_encoders = {}
    high_cardinality_cols = []
    categorical_cols = df.select_dtypes(include=["object", "category"]).columns
    for col in categorical_cols:
        if pd.api.types.is_object_dtype(df[col]) or pd.api.types.is_string_dtype(df[col]):
            if df[col].nunique() <= 50:
                le = LabelEncoder()
                df[col] = le.fit_transform(df[col].astype(str))
                label_encoders[col] = le
            else:
                high_cardinality_cols.append(col)

    df = df.drop(columns=high_cardinality_cols).copy()
    print(f"Dropped high cardinality columns: {high_cardinality_cols}")
    print(f"Shape after encoding categorical features: {df.shape}")


Encoding categorical features...
Dropped high cardinality columns: ['track_id', 'name', 'artist', 'spotify_preview_url', 'spotify_id', 'tags']
Shape after encoding categorical features: (45264, 14)


In [None]:
if df is not None:
    print("\nScaling numeric columns...")
    numeric_cols_after_encoding = df.select_dtypes(include=["int64", "float64"]).columns
    if not numeric_cols_after_encoding.empty:
        scaler = StandardScaler()
        print("Performing final check and handling of non-finite values before scaling...")
        for col in numeric_cols_after_encoding:
            if not np.isfinite(df[col]).all():
                 print(f"Column '{col}' has non-finite values before scaling. Replacing with median of finite values.")
                 finite_median = df[col][np.isfinite(df[col])].median()
                 df[col].replace([np.inf, -np.inf, np.nan], finite_median, inplace=True)
            df[col] = df[col].astype(np.float64)

        df[numeric_cols_after_encoding] = scaler.fit_transform(df[numeric_cols_after_encoding])
        print("Numeric columns scaled.")
    else:
        print("No numeric columns to scale after encoding.")


Scaling numeric columns...
Performing final check and handling of non-finite values before scaling...
Numeric columns scaled.


In [None]:

if df is not None and not df.empty:
    print("\nStarting K-Means Clustering for Mood Categorization...")


    clustering_features = ['danceability', 'energy', 'acousticness', 'instrumentalness', 'valence', 'tempo']
    clustering_cols = [col for col in clustering_features if col in df.columns]

    if not clustering_cols:
        print(" Required clustering features not found. Skipping K-Means.")
    else:
        X_scaled_clust = df[clustering_cols]
        K_FINAL = 4
        kmeans = KMeans(n_clusters=K_FINAL, random_state=42, n_init=10)
        df['mood_cluster'] = kmeans.fit_predict(X_scaled_clust)

        print(f"K-Means clustering done with K={K_FINAL}. New 'mood_cluster' column added.")
        print(df[['danceability', 'energy', 'valence', 'mood_cluster']].head(5))


if df is not None and 'mood_cluster' in df.columns:


    def map_cluster_to_tag(cluster_id):
        if cluster_id == 0: return "energetic danceable party happy"
        if cluster_id == 1: return "calm peaceful relaxing acoustic"
        if cluster_id == 2: return "sad melancholic slow minor_key"
        if cluster_id == 3: return "aggressive loud rock powerful"
        return "neutral ambient"

    df['music_tags'] = df['mood_cluster'].apply(map_cluster_to_tag)
    print("\n'music_tags' feature created by mapping K-Means clusters.")




Starting K-Means Clustering for Mood Categorization...
K-Means clustering done with K=4. New 'mood_cluster' column added.
   danceability    energy   valence  mood_cluster
0     -0.821463  0.912236 -0.774594             1
1     -0.514642  0.804503  0.826756             1
2      0.047863  0.531028  0.405963             1
3     -1.253285 -0.140231  0.199463             1
4      0.087636 -1.109827 -1.304482             1

'music_tags' feature created by mapping K-Means clusters.


In [None]:
if df is not None:
    print("\nSplitting data into train and test sets...")
    if not df.empty:
        train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
        print(f"Train set shape: {train_df.shape}")
        print(f"Test set shape: {test_df.shape}")
    else:
        print("Dataframe is empty after preprocessing. Skipping train/test split.")


Splitting data into train and test sets...
Train set shape: (36211, 14)
Test set shape: (9053, 14)


In [None]:
if df is not None and not df.empty:
    print("\nSaving preprocessed data and transformers to Google Drive...")
    output_dir = "/content/drive/MyDrive/preprocessed_million_song_dataset"
    os.makedirs(output_dir, exist_ok=True)

    train_df.to_csv(os.path.join(output_dir, "train.csv"), index=False)
    test_df.to_csv(os.path.join(output_dir, "test.csv"), index=False)

    if 'scaler' in locals():
         joblib.dump(scaler, os.path.join(output_dir, "scaler.pkl"))
         print("Scaler saved.")
    if label_encoders:
         joblib.dump(label_encoders, os.path.join(output_dir, "label_encoders.pkl"))
         print("Label encoders saved.")

    print(f"Preprocessed datasets saved to Google Drive at: {output_dir}")
    print("Scaler and label encoders saved for future use.")
else:
    print("Skipping data saving as dataframe is empty or not loaded.")


Saving preprocessed data and transformers to Google Drive...
Scaler saved.
Preprocessed datasets saved to Google Drive at: /content/drive/MyDrive/preprocessed_million_song_dataset
Scaler and label encoders saved for future use.


In [None]:

# Define the Emotion-to-Tag mapping that links the output from emotion.ipynb to this engine
EMOTION_MAPPING = {
    'happy': 'energetic upbeat danceable major_key',
    'sad': 'melancholic slow minor_key acoustic',
    'angry': 'aggressive loud rock powerful fast',
    'calm': 'calm peaceful relaxing ambient slow',
    'surprise': 'energetic danceable fast major_key',
    'fear': 'ominous anxious minor_key fast',
    'neutral': 'chill ambient laid_back'
}

def generate_playlist_recommendation(detected_emotion, music_df, n_recommendations=5):
    """
    Recommends a playlist based on the detected emotion using Content-Based Filtering.
    """
    if detected_emotion not in EMOTION_MAPPING:
        return pd.DataFrame({'Title': ["Error: Emotion not mapped"], 'Artist': ["N/A"]})

    query_features = EMOTION_MAPPING[detected_emotion]


    tfidf = TfidfVectorizer(stop_words='english')
    music_matrix = tfidf.fit_transform(music_df['music_tags'].fillna(''))

    query_vector = tfidf.transform([query_features])


    cosine_similarities = cosine_similarity(query_vector, music_matrix).flatten()

    top_indices = cosine_similarities.argsort()[:-n_recommendations-1:-1]

    recommendations = music_df.iloc[top_indices].copy()

    if 'title' in recommendations.columns and 'artist' in recommendations.columns:
        return recommendations[['title', 'artist', 'music_tags']].reset_index(drop=True)
    else:
        return recommendations[['music_tags']].reset_index(drop=True)


In [None]:
print(df.columns)

Index(['year', 'duration_ms', 'danceability', 'energy', 'key', 'loudness',
       'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness',
       'valence', 'tempo', 'time_signature'],
      dtype='object')
