# Music Genre Classification from Spotify Streaming History
## CS156 Machine Learning Pipeline Project

This notebook implements a machine learning pipeline to classify music genres using personal Spotify streaming history data. The project demonstrates the complete machine learning lifecycle from data collection to model evaluation.

## Table of Contents
1. [Data Collection and Description](#1.-Data-Collection-and-Description)
2. [Data Loading and Conversion](#2.-Data-Loading-and-Conversion)
3. [Data Preprocessing and EDA](#3.-Data-Preprocessing-and-EDA)
4. [Analysis Planning](#4.-Analysis-Planning)
5. [Model Selection](#5.-Model-Selection)
6. [Model Training](#6.-Model-Training)
7. [Model Evaluation](#7.-Model-Evaluation)
8. [Results Visualization](#8.-Results-Visualization)
9. [Executive Summary](#9.-Executive-Summary)
10. [References](#10.-References)

Let's begin by setting up our environment and importing required libraries.

In [None]:
# Import required libraries
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import librosa
import warnings
from pathlib import Path
from datetime import datetime

# Ignore warnings
warnings.filterwarnings('ignore')

# Set plot style
plt.style.use('seaborn')
%matplotlib inline

# 1. Data Collection and Description

## Dataset Overview

The dataset used in this project consists of personal Spotify streaming history data. This data includes:

1. Streaming history JSON files from Spotify
2. Audio preview files for feature extraction
3. Audio features and characteristics

### Data Sources

1. **Spotify Account Data**: 
   - Personal streaming history downloaded from Spotify
   - Contains track names, artists, and listening timestamps
   - Format: JSON files (`StreamingHistory*.json`)

2. **Audio Previews**:
   - 30-second preview clips of tracks
   - Used for extracting audio features
   - Format: MP3 files

### Sampling Methodology

The data represents my personal listening history from 2023 to 2025, providing a comprehensive view of my music preferences. This temporal range ensures:

1. Recent listening patterns are captured
2. Sufficient data for meaningful analysis
3. Diverse genre representation

Let's start by examining our raw data.

In [None]:
# Function to load streaming history from JSON files
def load_streaming_history(file_path):
    """Load and parse Spotify streaming history JSON file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return pd.DataFrame(data)
    except Exception as e:
        print(f"Error loading file {file_path}: {str(e)}")
        return None

# Load the streaming history data
file_path = 'Streaming_History_Audio_2023-2025_0.json'
streaming_df = load_streaming_history(file_path)

if streaming_df is not None:
    print("Dataset Overview:")
    print(f"Number of entries: {len(streaming_df)}")
    print("\nFirst few rows:")
    display(streaming_df.head())
    print("\nColumns:", streaming_df.columns.tolist())

# 2. Data Loading and Conversion

In this section, we'll convert our raw data into a format suitable for machine learning. This involves:

1. Parsing JSON data into structured DataFrame
2. Converting timestamps and durations
3. Creating a proper data structure for analysis

## Data Loading Pipeline

We'll create a `DataLoader` class to handle the data loading and initial processing:

In [None]:
class DataLoader:
    """Class for loading and initial processing of Spotify streaming data."""
    
    def __init__(self, file_path):
        self.file_path = file_path
        self.data = None
    
    def load_data(self):
        """Load JSON data into pandas DataFrame."""
        try:
            with open(self.file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            self.data = pd.DataFrame(data)
            return True
        except Exception as e:
            print(f"Error loading data: {str(e)}")
            return False
    
    def parse_timestamps(self):
        """Convert timestamp strings to datetime objects."""
        if self.data is not None:
            self.data['endTime'] = pd.to_datetime(self.data['endTime'])
            self.data['date'] = self.data['endTime'].dt.date
            self.data['hour'] = self.data['endTime'].dt.hour
            self.data['day_of_week'] = self.data['endTime'].dt.day_name()
    
    def convert_durations(self):
        """Convert milliseconds played to minutes and seconds."""
        if self.data is not None:
            self.data['secondsPlayed'] = self.data['msPlayed'] / 1000
            self.data['minutesPlayed'] = self.data['secondsPlayed'] / 60
    
    def process_data(self):
        """Run all processing steps."""
        if self.load_data():
            self.parse_timestamps()
            self.convert_durations()
            return self.data
        return None

# Create data loader instance and process data
loader = DataLoader('Streaming_History_Audio_2023-2025_0.json')
processed_df = loader.process_data()

if processed_df is not None:
    print("Processed Data Overview:")
    print(f"Number of entries: {len(processed_df)}")
    print("\nFirst few rows:")
    display(processed_df.head())
    print("\nColumns:", processed_df.columns.tolist())

# 3. Data Preprocessing and EDA

In this section, we'll clean our data and perform exploratory data analysis. This includes:

1. Data cleaning
2. Feature engineering
3. Basic statistics and visualizations

## Data Cleaning

Let's start by cleaning our dataset:

In [None]:
class DataCleaner:
    """Class for cleaning and preprocessing streaming data."""
    
    def __init__(self, df):
        self.df = df.copy()
    
    def remove_short_plays(self, min_seconds=30):
        """Remove tracks played for less than min_seconds."""
        initial_count = len(self.df)
        self.df = self.df[self.df['secondsPlayed'] >= min_seconds]
        removed = initial_count - len(self.df)
        print(f"Removed {removed} short plays (<{min_seconds}s)")
    
    def clean_names(self):
        """Clean artist and track names."""
        self.df['artistName'] = self.df['artistName'].str.strip()
        self.df['trackName'] = self.df['trackName'].str.strip()
    
    def remove_duplicates(self):
        """Remove duplicate entries based on timestamp and track info."""
        initial_count = len(self.df)
        self.df = self.df.drop_duplicates(
            subset=['endTime', 'artistName', 'trackName'],
            keep='first'
        )
        removed = initial_count - len(self.df)
        print(f"Removed {removed} duplicate entries")
    
    def clean_data(self):
        """Run all cleaning steps."""
        self.remove_short_plays()
        self.clean_names()
        self.remove_duplicates()
        return self.df

# Clean the data
cleaner = DataCleaner(processed_df)
cleaned_df = cleaner.clean_data()

print("\nCleaned Data Overview:")
print(f"Final number of entries: {len(cleaned_df)}")
display(cleaned_df.head())

## Exploratory Data Analysis

Let's analyze our cleaned dataset to understand patterns and distributions:

In [None]:
def plot_listening_patterns(df):
    """Create visualizations of listening patterns."""
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. Listening time by hour
    hourly_counts = df.groupby('hour').size()
    sns.barplot(x=hourly_counts.index, y=hourly_counts.values, ax=axes[0,0])
    axes[0,0].set_title('Listening Activity by Hour')
    axes[0,0].set_xlabel('Hour of Day')
    axes[0,0].set_ylabel('Number of Tracks')
    
    # 2. Listening time by day of week
    day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    daily_counts = df.groupby('day_of_week').size()
    daily_counts = daily_counts.reindex(day_order)
    sns.barplot(x=daily_counts.index, y=daily_counts.values, ax=axes[0,1])
    axes[0,1].set_title('Listening Activity by Day of Week')
    axes[0,1].set_xlabel('Day of Week')
    axes[0,1].set_ylabel('Number of Tracks')
    plt.xticks(rotation=45)
    
    # 3. Distribution of track durations
    sns.histplot(data=df, x='minutesPlayed', bins=50, ax=axes[1,0])
    axes[1,0].set_title('Distribution of Track Durations')
    axes[1,0].set_xlabel('Minutes Played')
    axes[1,0].set_ylabel('Count')
    
    # 4. Top artists
    top_artists = df['artistName'].value_counts().head(10)
    sns.barplot(x=top_artists.values, y=top_artists.index, ax=axes[1,1])
    axes[1,1].set_title('Top 10 Most Played Artists')
    axes[1,1].set_xlabel('Number of Tracks')
    
    plt.tight_layout()
    plt.show()

# Generate visualizations
plot_listening_patterns(cleaned_df)

# Print basic statistics
print("\nBasic Statistics:")
print(f"Total unique tracks: {cleaned_df['trackName'].nunique()}")
print(f"Total unique artists: {cleaned_df['artistName'].nunique()}")
print(f"Average listening time per track: {cleaned_df['minutesPlayed'].mean():.2f} minutes")

# 4. Analysis Planning

Now that we have cleaned and analyzed our data, let's plan our genre classification approach:

## Classification Task Definition

We will build a music genre classifier using the following:

1. **Input Features**:
   - Audio features extracted using librosa
   - Temporal features (tempo, rhythm)
   - Spectral features (MFCC, spectral centroid, etc.)

2. **Target Variable**:
   - Music genre (multi-class classification)

Let's prepare our feature extraction pipeline:

In [None]:
class AudioFeatureExtractor:
    """Extract audio features from MP3 files using librosa."""
    
    def __init__(self, sample_rate=22050):
        self.sample_rate = sample_rate
    
    def load_audio(self, audio_path, duration=30):
        """Load audio file with specified duration."""
        try:
            y, sr = librosa.load(audio_path, sr=self.sample_rate, duration=duration)
            return y, sr
        except Exception as e:
            print(f"Error loading audio: {str(e)}")
            return None, None
    
    def extract_features(self, y, sr):
        """Extract various audio features."""
        features = {}
        
        try:
            # Rhythm features
            tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
            features['tempo'] = tempo
            
            # Spectral features
            features['spectral_centroid'] = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
            features['spectral_bandwidth'] = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))
            features['spectral_rolloff'] = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr))
            
            # MFCC features
            mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
            for i, mfcc in enumerate(mfccs):
                features[f'mfcc_{i}'] = np.mean(mfcc)
            
            return features
        except Exception as e:
            print(f"Error extracting features: {str(e)}")
            return None
    
    def process_audio_file(self, audio_path):
        """Process a single audio file and extract features."""
        y, sr = self.load_audio(audio_path)
        if y is not None:
            return self.extract_features(y, sr)
        return None

# Example usage (commented out as we don't have audio files yet)
'''
extractor = AudioFeatureExtractor()
features = extractor.process_audio_file('path_to_audio.mp3')
if features:
    print("Extracted features:", features)
'''

# 5. Model Selection

For our genre classification task, we'll compare several models:

1. **Random Forest Classifier**
   - Ensemble method combining multiple decision trees
   - Good for handling non-linear relationships
   - Built-in feature importance

2. **Support Vector Machine (SVM)**
   - Effective for high-dimensional data
   - Strong theoretical guarantees
   - Kernel tricks for non-linear classification

3. **Gradient Boosting Classifier**
   - Sequential ensemble method
   - Often achieves state-of-the-art results
   - Good handling of imbalanced data

Let's set up our model pipeline:

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline

class GenreClassifier:
    """Music genre classification model pipeline."""
    
    def __init__(self, model_type='rf'):
        self.model_type = model_type
        self.pipeline = None
        self.setup_pipeline()
    
    def setup_pipeline(self):
        """Create the model pipeline with preprocessing and classifier."""
        if self.model_type == 'rf':
            clf = RandomForestClassifier(random_state=42)
            param_grid = {
                'classifier__n_estimators': [100, 200, 300],
                'classifier__max_depth': [10, 20, 30, None]
            }
        elif self.model_type == 'svm':
            clf = SVC(random_state=42)
            param_grid = {
                'classifier__C': [0.1, 1, 10],
                'classifier__kernel': ['rbf', 'linear']
            }
        else:  # gradient boosting
            clf = GradientBoostingClassifier(random_state=42)
            param_grid = {
                'classifier__n_estimators': [100, 200],
                'classifier__learning_rate': [0.01, 0.1]
            }
        
        self.pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('classifier', clf)
        ])
        
        self.param_grid = param_grid
    
    def train(self, X_train, y_train, cv=5):
        """Train the model using cross-validation."""
        grid_search = GridSearchCV(
            self.pipeline,
            self.param_grid,
            cv=cv,
            scoring='accuracy',
            n_jobs=-1
        )
        grid_search.fit(X_train, y_train)
        self.pipeline = grid_search.best_estimator_
        return grid_search.best_score_, grid_search.best_params_

# Example usage (commented out as we don't have features yet)
'''
# Prepare data
X_train, X_test, y_train, y_test = train_test_split(
    features_matrix,
    genre_labels,
    test_size=0.2,
    random_state=42
)

# Train model
classifier = GenreClassifier(model_type='rf')
best_score, best_params = classifier.train(X_train, y_train)
print(f"Best CV score: {best_score}")
print(f"Best parameters: {best_params}")
'''

# 6. Model Training

Once we have our features extracted and models set up, we'll train our models with cross-validation and hyperparameter tuning. The actual training will be performed when we have our feature matrix ready.

For now, let's prepare the evaluation metrics we'll use:

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

def evaluate_model(model, X_test, y_test):
    """Evaluate model performance with various metrics."""
    # Make predictions
    y_pred = model.predict(X_test)
    
    # Print classification report
    print("Classification Report:")
    print(classification_report(y_test, y_pred))
    
    # Create confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

# Example usage (commented out as we don't have trained model yet)
'''
evaluate_model(classifier.pipeline, X_test, y_test)
'''

# Next Steps

To complete our machine learning pipeline, we need to:

1. Extract audio features from our dataset
2. Create the feature matrix and labels
3. Train and evaluate our models
4. Visualize results
5. Write up conclusions and recommendations

The next step is to implement the audio feature extraction pipeline. We'll do this by:

1. Setting up the audio processing environment
2. Downloading preview URLs for our tracks
3. Extracting features from the audio files
4. Creating our feature matrix

Would you like to proceed with implementing these next steps?