In [1]:
import numpy as np
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from spotipy.oauth2 import SpotifyClientCredentials
import random
import time
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.linear_model import RidgeCV
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV

The history saving thread hit an unexpected error (DatabaseError('database disk image is malformed')).History will not be written to the database.


In [None]:
#Connect to Spotify API Developer Account
sp = spotipy.Spotify(
    client_credentials_manager=SpotifyClientCredentials(
        client_id="f4499c1475cc48b3bb23c9e8215f37ac",
        client_secret="a2ced9ec9e3845d4b1be65d5e0f097e4"
    )
)

In [None]:
# Function to fetch metadata for multiple songs
def fetch_songs_metadata(sp, prefixes, output_file, max_songs=30000):
    collected_songs = []
    try:
        for prefix in prefixes:
            for offset in range(0, 1000, 50):  # Pagination
                if len(collected_songs) >= max_songs:
                    break  # Stop collecting if max_songs is reached
                try:
                    results = sp.search(q=f"{prefix} year:2022", type="track", limit=50, offset=offset)
                    for track in results['tracks']['items']:
                        # Extract required metadata
                        song_data = {
                            "id": track['id'],
                            "name": track['name'],
                            "artist": track['artists'][0]['name'],
                            "popularity": track['popularity'],
                            "duration_ms": track['duration_ms'],
                            "explicit": track['explicit']
                        }
                        collected_songs.append(song_data)
                        if len(collected_songs) >= max_songs:
                            break  # Stop collecting if max_songs is reached

                    if len(results['tracks']['items']) == 0:
                        break  # No more results for this prefix

                    # Delay to respect rate limits
                    time.sleep(1)

                except spotipy.exceptions.SpotifyException as e:
                    if e.http_status == 429:  # Rate limit error
                        retry_after = int(e.headers.get('Retry-After', 5))
                        print(f"Rate limited. Retrying after {retry_after} seconds...")
                        time.sleep(retry_after)
                    else:
                        print(f"SpotifyException: {e}")
                        break

        # Save collected data to a CSV file
        pd.DataFrame(collected_songs).to_csv(output_file, index=False)
        print(f"Collected metadata for {len(collected_songs)} songs. Saved to {output_file}.")

    except Exception as e:
        print(f"Critical error during song collection: {e}")

# Example usage
prefixes = [chr(i) for i in range(97, 123)]  # 'a' to 'z'
output_file = "songs_metadata.csv"

# Collect metadata for up to 100 songs
fetch_songs_metadata(sp, prefixes, output_file, max_songs=30000)


In [2]:
#Lasso Regression Unoptimized:

# Assuming your dataset is saved in a CSV or DataFrame called 'df'
df = pd.read_csv('songs_metadata.csv')

# Independent and dependent variables
X = df[['artist', 'duration_ms', 'explicit']]
y = df['popularity']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Preprocessing pipeline
preprocessor = ColumnTransformer(
    transformers=[
        ('artist_enc', OneHotEncoder(handle_unknown='ignore'), ['artist']),
        ('scale', StandardScaler(), ['duration_ms']),
        ('pass', 'passthrough', ['explicit'])
    ]
)

# Build pipeline with Lasso regression
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('lasso', Lasso(alpha=0.1))  # Adjust alpha as needed
])

# Fit the model
model.fit(X_train, y_train)

# Predict and evaluate
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
test_rmse = np.sqrt(mse)
print("Mean Squared Error (MSE):", mse)
print(f"Test set MSE: {test_rmse}")

Mean Squared Error (MSE): 134.04618002779034
Test set MSE: 11.577831404360246


In [3]:
#Optimized Lasso Regression (Best Model)

# Define the preprocessing pipeline (same as before)
preprocessor = ColumnTransformer(
    transformers=[
        ('artist_enc', OneHotEncoder(handle_unknown='ignore'), ['artist']),
        ('scale', StandardScaler(), ['duration_ms']),
        ('pass', 'passthrough', ['explicit'])
    ]
)

# Lasso regression pipeline
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('lasso', Lasso())
])

# Define the grid of alpha values to search
param_grid = {
    'lasso__alpha': np.logspace(-3, 2, 20)  # Alphas ranging from 0.001 to 100
}

# Perform Grid Search with cross-validation
grid_search = GridSearchCV(
    model,
    param_grid,
    scoring='neg_mean_squared_error',  # Use MSE as the scoring metric (negated)
    cv=5,  # 5-fold cross-validation
    n_jobs=-1,  # Use all available cores
    verbose=1
)

# Fit the grid search to the training data
grid_search.fit(X_train, y_train)

# Best alpha and corresponding MSE
best_alpha = grid_search.best_params_['lasso__alpha']
best_mse = -grid_search.best_score_

print(f"Best alpha: {best_alpha}")
print(f"Best cross-validated MSE: {best_mse}")

# Test set performance
best_model = grid_search.best_estimator_
y_test_pred = best_model.predict(X_test)
test_mse = mean_squared_error(y_test, y_test_pred)
test_rmse = np.sqrt(test_mse)
print(f"Test set MSE: {test_mse}")
print(f"Test set MSE: {test_rmse}")

Fitting 5 folds for each of 20 candidates, totalling 100 fits
Best alpha: 0.001
Best cross-validated MSE: 85.36714963684665
Test set MSE: 80.72035179365008
Test set MSE: 8.984450556024564


In [4]:
#Standard Linear Regressor (No Regularization)

# Features and target variable
X = df[['artist', 'duration_ms', 'explicit']]
y = df['popularity']

# One-hot encode the 'artist' column and keep the other features
# Define the transformer for categorical encoding
preprocessor = ColumnTransformer(
    transformers=[
        ('artist_encoder', OneHotEncoder(handle_unknown='ignore'), ['artist']),
    ],
    remainder='passthrough'  # Keep the other numeric and boolean features
)

# Create the pipeline with preprocessing and regression model
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', LinearRegression())
])

# Split data into training and test sets (70/30 split)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train the model
model.fit(X_train, y_train)

# Predict on the test set
y_pred = model.predict(X_test)

# Calculate the Mean Squared Error
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"Mean Squared Error on the test set: {mse}")
print(f"RMSE on the test set: {rmse}")

Mean Squared Error on the test set: 134.14224191516598
RMSE on the test set: 11.581979188168402


In [4]:
#Linear Regressor with Optimized Ridge Regularization

# Define the preprocessing for categorical and numerical features
preprocessor = ColumnTransformer(
    transformers=[
        ('artist_encoder', OneHotEncoder(handle_unknown='ignore'), ['artist']),
    ],
    remainder='passthrough'  # Keep the numerical and boolean features unchanged
)

# Define Ridge Regression pipeline
ridge_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('ridge_regressor', Ridge())  # No alpha here; GridSearchCV will handle it
])

# Define a range of alpha values
alpha_values = [0.01, 0.1, 1, 10, 100, 1000]
param_grid = {'ridge_regressor__alpha': alpha_values}

# Use GridSearchCV to find the best alpha
grid_search = GridSearchCV(
    estimator=ridge_pipeline,
    param_grid=param_grid,
    scoring='neg_mean_squared_error',  # Use negative MSE
    cv=5,  # 5-fold cross-validation
    verbose=1
)

# Fit the model
grid_search.fit(X_train, y_train)

# Get the best alpha
best_alpha = grid_search.best_params_['ridge_regressor__alpha']
print(f"Optimal alpha: {best_alpha}")

# Evaluate the model on the test set
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)

print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")

Fitting 5 folds for each of 6 candidates, totalling 30 fits
Optimal alpha: 1000
Mean Squared Error (MSE): 134.14224191516453
Root Mean Squared Error (RMSE): 11.581979188168338


In [5]:
from sklearn.linear_model import Ridge
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
import numpy as np

# Define the preprocessing pipeline (same as before)
preprocessor = ColumnTransformer(
    transformers=[
        ('artist_enc', OneHotEncoder(handle_unknown='ignore'), ['artist']),
        ('scale', StandardScaler(), ['duration_ms']),
        ('pass', 'passthrough', ['explicit'])
    ]
)

# Ridge regression pipeline
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('ridge', Ridge())
])

# Define the grid of alpha values to search
param_grid = {
    'ridge__alpha': np.logspace(-3, 2, 20)  # Alphas ranging from 0.001 to 100
}

# Perform Grid Search with cross-validation
grid_search = GridSearchCV(
    model,
    param_grid,
    scoring='neg_mean_squared_error',  # Use MSE as the scoring metric (negated)
    cv=5,  # 5-fold cross-validation
    n_jobs=-1,  # Use all available cores
    verbose=1
)

# Fit the grid search to the training data
grid_search.fit(X_train, y_train)

# Best alpha and corresponding MSE
best_alpha = grid_search.best_params_['ridge__alpha']
best_mse = -grid_search.best_score_

print(f"Best alpha: {best_alpha}")
print(f"Best cross-validated MSE: {best_mse}")

# Test set performance
best_model = grid_search.best_estimator_
y_test_pred = best_model.predict(X_test)
test_mse = mean_squared_error(y_test, y_test_pred)
test_rmse = np.sqrt(test_mse)
print(f"Test set MSE: {test_mse}")
print(f"Test set RMSE: {test_rmse}")


Fitting 5 folds for each of 20 candidates, totalling 100 fits
Best alpha: 0.42813323987193913
Best cross-validated MSE: 77.9505026275579
Test set MSE: 72.062473501812
Test set RMSE: 8.488961862431236
