In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import os
import sklearn


In [None]:
import kagglehub

if os.path.exists("/home/abhay/.cache/kagglehub/datasets/salvatorerastelli/spotify-and-youtube/versions/2"):
    path = "/home/abhay/.cache/kagglehub/datasets/salvatorerastelli/spotify-and-youtube/versions/2"
else:
    path = kagglehub.dataset_download("salvatorerastelli/spotify-and-youtube")

print("Path to dataset files:", path)

In [None]:
from sklearn.model_selection import train_test_split

all_data = pd.read_csv(path + "/Spotify_Youtube.csv")

train, test = train_test_split(all_data, test_size=0.2, random_state=42)
train, val = train_test_split(train, test_size=0.2, random_state=42)


In [None]:
train.head(3)

In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
import copy

def clip(x, mean, std, num_std):
    if x > mean + num_std * std:
        return mean + num_std * std
    elif x < mean - num_std * std:
        return mean - num_std * std
    else:
        return x

def get_train_df(df: pd.DataFrame, numerical_predictors: list, categorical_predictors: list, output: str):
    before_dropped_len = len(df)

    all_cols = numerical_predictors + categorical_predictors + [output]
    all_numerical_cols = numerical_predictors + [output]

    df = df.dropna(subset=all_cols)
    df = df.copy()

    print(f'Dropped {before_dropped_len - len(df)} rows with NaN values. Remaining rows: {len(df)}')

    # Filter out rows in train where Views > mean + std
    output_mean = df[output].mean()
    output_std = df[output].std()
    df = df[df[output] <= output_mean + output_std]

    df[output].hist(bins=100)
    plt.title(f"Distribution of {output} after filtering")
    plt.show()

    total_count = 0

    for col in numerical_predictors:
        mean = df[col].mean()
        std = df[col].std()
        count = ((df[col] - mean) / std).abs() > 3
        # print(f'Column {col} has {count.sum()} values more than 3 standard deviations from the mean')
        total_count += count.sum()

        # print(f"{col} max before clipping:", df[col].max())
        # print(f"{col} min before clipping:", df[col].min())
        # print(f"{col} mean before clipping:", df[col].mean())
        # print(f"{col} std before clipping:", df[col].std())

        df[col] = df[col].apply(lambda x: clip(x, mean, std, 2))  # Ensure reassignment

        # print(f"{col} max after clipping:", df[col].max())
        # print(f"{col} min after clipping:", df[col].min())
    
    print(f'Total outliers: {total_count}. Percentage: {total_count / len(df) * 100:.2f}%')
    

    if output == "Views":
        df[output] = df[output].apply(lambda x: np.sqrt(x))
    
    if 'Duration_ms' in df.columns:
        df['Duration_ms'] = df['Duration_ms'].apply(lambda x: np.sqrt(x))
    if 'Key' in df.columns:
        df['key_angle'] = df['Key'].apply(lambda x: x * 30)
        df['key_height'] = df['key_angle'].apply(lambda x: np.sin(np.radians(x)))
        df['key_width'] = df['key_angle'].apply(lambda x: np.cos(np.radians(x)))
        numerical_predictors.remove("Key")
        numerical_predictors.append("key_height")
        numerical_predictors.append("key_width")

    df['mean_views'] = df['Artist'].map(df.groupby('Artist')['Views'].mean()).apply(np.sqrt)


    nrows = int(len(numerical_predictors) / 4) + 1
    fig, axes = plt.subplots(nrows, ncols=4, figsize=(60, 20))
    count = 0
    for row in range(nrows):
        for col in range(4):
            if count < len(numerical_predictors):
                axes[row, col].hist(df[numerical_predictors[count]], bins=50)
                axes[row, col].set_title(f"Column {numerical_predictors[count]}")
                count += 1
    if count <= len(numerical_predictors):
        axes[row, col].hist(df[output], bins=50)
    plt.tight_layout()
    plt.show()

    if "Instrumentalness" in df.columns:
        instrumental_mean, instrumental_std = df["Instrumentalness"].mean(), df["Instrumentalness"].std()
        df['instrumental_binary'] = df["Instrumentalness"].apply(lambda x: 1 if x > instrumental_mean + 1 * instrumental_std or x < instrumental_mean - 1 * instrumental_std else 0)

        numerical_predictors.remove("Instrumentalness")
        numerical_predictors.append("instrumental_binary")
    print("Numerical predictors train: ", numerical_predictors)

    x_scaler = StandardScaler()
    x_scaler.fit(df[numerical_predictors])
    df[numerical_predictors] = x_scaler.transform(df[numerical_predictors])

    x = df[numerical_predictors].to_numpy()

    nrows = int(x.shape[1] / 4) + 1
    fig, axes = plt.subplots(nrows, ncols=4, figsize=(60, 20))
    count = 0
    for row in range(nrows):
        for col in range(4):
            if count < x.shape[1]:
                axes[row, col].hist(x[:, count], bins=50)
                axes[row, col].set_title(f"Column {col}")
                count += 1
    # if count <= len(numerical_predictors):
    #     axes[row, col].hist(df[output], bins=50)
    plt.tight_layout()
    plt.show()

    y = df[output].to_numpy().ravel() # Use ravel() to ensure y is 1D

    pca = None
    pca = PCA(n_components=7)
    pca.fit(x)
    x = pca.transform(x)
    explained_variance_ratio = pca.explained_variance_ratio_
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio, marker='o', linestyle='--')
    plt.title('Scree Plot')
    plt.xlabel('Principal Component')
    plt.ylabel('Explained Variance Ratio')
    plt.xticks(range(1, len(explained_variance_ratio) + 1))
    plt.grid(True)
    plt.show()
    print("PCA explained variance ratio:", explained_variance_ratio)
    print("PCA components shape:", pca.components_.shape)

    # TODO if going to add categorical variables, do it here (after)

    return x, y, x_scaler, df['mean_views'], pca


def get_val_df(df: pd.DataFrame, numerical_predictors: list, categorical_predictors: list, output: str, x_scaler=None, mean_views=None, pca=None):
    all_cols = numerical_predictors + categorical_predictors + [output]
    df = df.dropna(subset=all_cols).copy()

    for col in numerical_predictors:
        mean = df[col].mean()
        std = df[col].std()
        df[col] = df[col].apply(lambda x: clip(x, mean, std, 2))

    if output == "Views":
        df[output] = df[output].apply(lambda x: np.sqrt(x))
    if 'Duration_ms' in df.columns:
        df['Duration_ms'] = df['Duration_ms'].apply(lambda x: np.sqrt(x))
    if 'Key' in df.columns:
        df['key_angle'] = df['Key'].apply(lambda x: x * 30)
        df['key_height'] = df['key_angle'].apply(lambda x: np.sin(np.radians(x)))
        df['key_width'] = df['key_angle'].apply(lambda x: np.cos(np.radians(x)))
        numerical_predictors.remove("Key")
        numerical_predictors.append("key_height")
        numerical_predictors.append("key_width")

    if mean_views is not None:
        df['mean_views'] = df['Artist'].map(mean_views).apply(np.sqrt)
    else:
        df['mean_views'] = df['Artist'].map(df.groupby('Artist')[output].mean()).apply(np.sqrt)

    feature_list = list(numerical_predictors)
    if "Instrumentalness" in df.columns and "Instrumentalness" in feature_list:
        instrumental_mean, instrumental_std = df["Instrumentalness"].mean(), df["Instrumentalness"].std()
        df['instrumental_binary'] = df["Instrumentalness"].apply(
            lambda x: 1 if x > instrumental_mean + 1 * instrumental_std or x < instrumental_mean - 1 * instrumental_std else 0)
        feature_list.remove("Instrumentalness")
        feature_list.append("instrumental_binary")

    if x_scaler is not None:
        df[feature_list] = x_scaler.transform(df[feature_list])

    x = df[feature_list].to_numpy()

    if pca is not None:
        x = pca.transform(x)

    y = df[output].to_numpy().ravel()
    return x, y

In [None]:
cols_to_use = [
    'Danceability',
    'Energy',
    'Key',
    'Loudness',
    'Speechiness',
    'Acousticness',
    'Instrumentalness',
    'Liveness', # like energy
    'Valence',
    'Tempo',
    'Duration_ms',
]

train[cols_to_use].describe()

In [None]:
val_cols_to_use = copy.deepcopy(cols_to_use)
test_cols_to_use = copy.deepcopy(cols_to_use)
train_x_np, train_y_np, x_scaler, mean_views, pca = get_train_df(train, cols_to_use, [], 'Views')

val_x_np, val_y_np = get_val_df(val, val_cols_to_use, [], 'Views', x_scaler, mean_views, pca)
test_x_np, test_y_np = get_val_df(test, test_cols_to_use, [], 'Views', x_scaler, mean_views, pca)

In [None]:
nrows = int(train_x_np.shape[1] / 4) + 1
fig, axes = plt.subplots(nrows, ncols=4, figsize=(60, 20))
count = 0
for row in range(nrows):
    for col in range(4):
        if count < train_x_np.shape[1]:
            axes[row, col].hist(train_x_np[:, count], bins=50)
            axes[row, col].set_title(f"Column {col}")
            count += 1
# if count <= len(numerical_predictors):
#     axes[row, col].hist(df[output], bins=50)
plt.tight_layout()
plt.show()

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.ensemble import RandomForestRegressor


linear_model = LinearRegression().fit(train_x_np, train_y_np)

print("Model Coefficients:", linear_model.coef_)


poly = PolynomialFeatures(degree=3)
poly.fit(train_x_np)
train_x_poly = poly.transform(train_x_np)
poly_model = LinearRegression().fit(train_x_poly, train_y_np)
print("Polynomial Model Coefficients:", poly_model.coef_)


# Create and fit the Random Forest Regressor
tree = RandomForestRegressor(n_estimators=100)
tree.fit(train_x_np, train_y_np)

print("Random Forest Feature Importances:", tree.feature_importances_)


# print(f"Model score on train data: {model.score(train_x_np, train_y_np)}")

In [None]:
from sklearn.metrics import mean_squared_error, r2_score

def print_scores(y_actual, y_pred, name=str):
    mse = mean_squared_error(y_actual, y_pred)
    r2 = r2_score(y_actual, y_pred)

    print(f"{name} Mean Squared Error (Scaled): {mse}")
    print(f"{name} R^2 Score (Scaled): {r2}")

    fig, ax = plt.subplots()
    ax.scatter(y_actual, y_pred, alpha=0.5)
    # ax.plot([y_actual.min(), y_actual.max()], [y_actual.min(), y_actual.max()], 'r--', lw=2)
    ax.set_xlabel("Actual Views")
    ax.set_ylabel("Predicted Views")
    ax.set_title(f"Actual vs Predicted Scaled Views {name}")
    ax.set_aspect('equal', adjustable='box')
    plt.grid(True)
    plt.show()

print_scores(val_y_np, linear_model.predict(val_x_np), "Linear Regression")
print_scores(val_y_np, poly_model.predict(poly.transform(val_x_np)), "Polynomial Regression")
print_scores(val_y_np, tree.predict(val_x_np), "Random Forest Regression")



In [None]:
import random as rd

def plot_predications_vs_actual(y_actual, y_pred, number_plotting: int, name: str):
    indices = rd.sample(range(len(y_pred)), number_plotting) # Use sample to avoid duplicates

    w, x = 0.35, np.arange(number_plotting)

    fig, ax = plt.subplots(figsize=(12, 6))

    y_pred_sampled = y_pred[indices]
    y_actual_sampled = y_actual[indices]

    ax.bar(x - w, y_actual_sampled.squeeze(), width=w, label='Actual')
    ax.bar(x, y_pred_sampled.squeeze(), width=w, label=f'Predicted {name}')
    ax.set_xticks(x)
    ax.set_xticklabels([f'Song {i+1}' for i in range(number_plotting)])
    ax.set_ylabel("Scaled Views")
    ax.set_title(f"Predicted vs Actual Scaled Views for {number_plotting} Random Samples using {name}")
    ax.legend()
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()


number_plotting = 15
plot_predications_vs_actual(val_y_np, linear_model.predict(val_x_np), number_plotting, "Linear")
plot_predications_vs_actual(val_y_np, poly_model.predict(poly.transform(val_x_np)), number_plotting, "Polynomial")
plot_predications_vs_actual(val_y_np, tree.predict(val_x_np), number_plotting, "Random Forest")

# Final use of test data

In [None]:
print_scores(test_y_np, tree.predict(test_x_np), "Tree")
plot_predications_vs_actual(test_y_np, tree.predict(test_x_np), number_plotting, "Tree")

# Useful tree plots

In [None]:
test_x_np.shape

In [None]:
from sklearn.tree import plot_tree
import matplotlib.pyplot as plt

tree_to_plot = tree.estimators_[0]

# Plot the decision tree
plt.figure(figsize=(20, 10))
plot_tree(tree_to_plot, feature_names=cols_to_use, filled=True, rounded=True, fontsize=10)
plt.title("Decision Tree from Random Forest")
plt.show()