In [None]:
# Machine Learning Assignment - Exercise 01
# Prof. Klaus Berberich
# Students:
# Aaron Dassen
# Jan Beckhausen
# Germain Girndt

In [None]:
# IMPORTS AND DEFINITIONS ONLY

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split


def analyze_feature_individually(feature):
    feature_mean = feature.mean()
    feature_variance = feature.var()
    
    print(f"Analyzed Feature: {acronym_to_name.get(feature.name)}")
    print(f"Mean Value: {feature_mean:.2f}")
    print(f"Variance: {feature_variance:.2f}\n")

def analyze_features_jointly(feature_one, feature_two):
    pearsons_correlation = feature_one.corr(feature_two, method='pearson')
    covariance = feature_one.cov(feature_two)

    print(f"Covariance: {covariance:.2f}")
    print(f"Pearson's Correlation: {pearsons_correlation:.2f}")
    
def create_figure(feature_x, feature_y):
    title = f"{acronym_to_name.get(feature_x.name)} x {acronym_to_name.get(feature_y.name)}"
    figure = plt.figure()
    ax = figure.add_subplot(111)
    
    color = np.abs(feature_x) + np.abs(feature_y)
    ax.scatter(feature_x, feature_y, c=color, cmap='plasma')
    ax.set_xlabel(acronym_to_name.get(feature_x.name))
    ax.set_ylabel(acronym_to_name.get(feature_y.name))
    ax.set_title(title)
    
    return figure
    

def create_figure_with_regression_line(feature_x, feature_y, w0_star, w1_star):
    figure = create_figure(feature_x, feature_y)
    ax = figure.get_axes()[0]
    regression_line_y = w0_star + w1_star * feature_x
    ax.plot(feature_x, regression_line_y, color='red', label='Regression Line')
    ax.legend()
    
    return figure

    
# Load the data
PATH_TO_DATA="/Users/germaingirndt/source/machine_learning/prog_exercise_01/task_01/kaggle/Spotify 2010 - 2019 Top 100.csv"
data = pd.read_csv(PATH_TO_DATA)












In [None]:
acronym_to_name = {
    "bpm": "Beats per Minute (bpm)",
    "nrgy": "Energy (ngry)",
    "dnce": "Danceability (dnce)",
    "val": "Mood (val)",
    "pop": "Popularity (pop)"
}

# Setting variables used by the following snippets
feature_names = ["pop", "top genre", "bpm", "nrgy", "dnce", "val"]

features_df = data[feature_names]
pop_series = features_df.get("pop")
genre_series = features_df.get("top genre")

other_features_df = features_df[feature_names[2:]]



In [None]:
# Taking a look at the data

df_cleaned = features_df.copy()
df_cleaned.dropna(inplace=True)

df_cleaned.describe(include="all").round(2)

# Note: The NaNs are not representative;
# They're simply caused by trying to output numeric statical coefficients for non-numeric values and vice-versa

In [None]:
# 1.1a - Create scatter plots and compute Pearson's correlation coefficients
for _, feature_series in other_features_df.items():
    
    create_figure(feature_series, pop_series)

    plt.show()
        
    analyze_feature_individually(pop_series)
    analyze_feature_individually(feature_series)
    analyze_features_jointly(pop_series, feature_series)
    
    
    print("\n" + "-" * 100)

In [None]:
# 1.1b - Ordinary Least Squares:
# Determine optimal coeficients, add a regression line to the plot, compute mean squared error (MSE)

for _, feature_series in other_features_df.items():
    
    # Defining X and Y
    Y = pop_series
    X = feature_series


    # Dropping rows where either Y or X have NaNs
    df = pd.DataFrame({Y.name: Y, X.name: X}).dropna(how='any')

    filtered_Y = df.get(Y.name)
    filtered_X = df.get(X.name)

    # Making sure that both variables have the same size
    treated_Y = filtered_Y[0:len(filtered_X)]
    # Converting 1d matrix into a 2d matrix (required by the model.fit() method)
    treated_X = filtered_X.values.reshape(-1, 1)


    # Creating a linear regression model and fitting it to the data
    model = LinearRegression()
    model.fit(treated_X, treated_Y)
    
    # Separating coeficients
    w0_star = model.intercept_
    w1_star = model.coef_[0]


    # Calculating the mean squared error (MSE)
    mse = sum((model.predict(treated_X) - treated_Y)**2) / len(treated_Y) 
    
    # Ploting chart
    create_figure_with_regression_line(X, Y, w0_star, w1_star)
    plt.show()
    
    # Printing Info
    print(f"Feature: {acronym_to_name.get(X.name)}")
    print(f"w0*: {w0_star}")
    print(f"w1*: {w1_star}")
    print(f"Mean Squared Error: {mse}")
    print("\n" + "-"*50)





In [None]:
# 1.c - Multiple linear regression
# Model for all features (including the nominal feature genre)
# Randomly split data into training (80%) and test (20%)
# Determine optimal coefficients for the training and mean squared error for the test data


def print_multiple_linear_regression_test(X, Y):
    # Splitting into train and test data
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=2023)


    # Creating a linear regression model and fitting it to the data
    model = LinearRegression()
    model.fit(X_train, y_train)

    w0_star = model.intercept_
    w1_until_n_star = model.coef_


    # Calculating the mean squared error (MSE)
    mse = sum((model.predict(X_test) - y_test)**2) / len(y_test)

    print(f"Results:")
    print(f"Mean Squared Error: {mse}")
    print(f"w0*: {w0_star}")
    print(f"w1...n*: {w1_until_n_star}")
    print("\n" + "-"*50)

# Droping rows where either Y or X have NaNs
def clean_na(X, Y):
    df = pd.concat([X, Y], axis=1).dropna(how='any')

    cleaned_Y_series = df.get(Y.name)
    cleaned_X_df = df.loc[:, df.columns != Y.name]
    
    return [cleaned_X_df, cleaned_Y_series]

def merge_less_frequent_genres(one_hot_encoded_genre):

    # Minimum number of occurrences to keep the genre category as is
    number_of_data_points = one_hot_encoded_genre.shape[0]
    genre_threshold =  number_of_data_points * 0.005

    # Initialize a new category to group less frequent genres
    other_genre_category = "_others"
    
    
    # Identify genre categories with less than the threshold
    genre_counts = one_hot_encoded_genre[one_hot_encoded_genre.columns].sum()
    less_frequent_genres = genre_counts[genre_counts < genre_threshold].index.tolist()

    # Group less frequent genres into the "other" category
    if less_frequent_genres:
        one_hot_encoded_genre[other_genre_category] = one_hot_encoded_genre[less_frequent_genres].max(axis=1)
        one_hot_encoded_genre.drop(columns=less_frequent_genres, inplace=True)
        
    return one_hot_encoded_genre

def merge_similar_categories(one_hot_encoded_genre):

    big_genre_categories = [ "rock", "hip hop", "rap", "dance", "r&b", "reggae", "electro", "indie", "wave", "house", "metal", "french", "techno", "afro", "funk", "country", "latin", "pop"]

    for genre_category in big_genre_categories:
        # Get all columns names containing the substring genre_category
        genre_columns = [col for col in one_hot_encoded_genre.columns if genre_category in col]

        # set a new column
        one_hot_encoded_genre["_" + genre_category] = 0

        # Set the new column's bit for all rows which match rows
        has_row_any_genre_column_set_as_true = one_hot_encoded_genre[genre_columns].any(axis=1)
        one_hot_encoded_genre.loc[has_row_any_genre_column_set_as_true, "_" + genre_category] = 1
        
        # Drop the original genre columns
        other_columns = list(filter( lambda x : x != genre_category, genre_columns))

        one_hot_encoded_genre.drop(columns=genre_columns, inplace=True)
    


    return one_hot_encoded_genre
        
        
print("Multiple linear regression WITHOUT one hot encoding (good results)")
Y = pop_series.copy()
X = other_features_df.copy()
[X, Y] = clean_na(X, Y)
print_multiple_linear_regression_test(X, Y)

print("Multiple linear regression WITH one hot encoding (worst results)")
one_hot_encoded_genre = pd.get_dummies(genre_series) # One hot encoding for genre
one_hot_encoded_genre.rename(columns={"pop": "_pop music"}, inplace=True) # renaming the column 'pop' genre for avoiding colision with the feature pop

Y = pop_series.copy()
X = pd.concat([other_features_df.copy(), one_hot_encoded_genre.copy()], axis=1)
[X, Y] = clean_na(X, Y)
print_multiple_linear_regression_test(X, Y)


print("Multiple linear regression WITH TREATED one hot encoding (best results!!!)")
treated_one_hot_encoded_genre = merge_less_frequent_genres(merge_similar_categories(one_hot_encoded_genre.copy()))


Y = pop_series.copy()
X = pd.concat([other_features_df.copy(), treated_one_hot_encoded_genre.copy()], axis=1)
[X, Y] = clean_na(X, Y)

print_multiple_linear_regression_test(X, Y)

In [None]:
# One Hot Encoding visualization:
print("One hot encoded WITHOUT treatement:\n")
print("\n\n\nCategory count:")
print(one_hot_encoded_genre.sum())
print("\n\n\Description:")
print(one_hot_encoded_genre.describe(include="all").round(2))

print("One hot encoded WITH treatment:\n")
print("\n\n\nCategory count:")
print(treated_one_hot_encoded_genre.sum())
print("\n\n\Description:")
print(treated_one_hot_encoded_genre.describe(include="all").round(2))








In [None]:
# Draft 1 (Not part of the exercise)

import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D

# Generate fake data
np.random.seed(1)
num_points = 100

# Create a list of points
points = []
for _ in range(num_points):
    point = {
        'x': np.random.normal(0, 1),
        'y': np.random.normal(0, 1),
        'z': np.random.normal(0, 1)
    }
    points.append(point)

# Create scatterplot
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

for point in points:
    ax.scatter(point.get("x"), point.get("y"), point.get("z"), color="red", marker="x")

# Set labels and title
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')




In [None]:
# Draft 2 (Not part of the exercise)


# Defining X and Y
Y = pop_series
X = other_features_df.iloc[:, 1]


# Dropping rows where either Y or X have NaNs
df = pd.DataFrame({Y.name: Y, X.name: X}).dropna(how='any')
    
cleaned_Y = df.get(Y.name)
cleaned_X = df.get(X.name)
Y = cleaned_Y
X = cleaned_X



def calculate_w1_star(X, Y):
    n = len(X)
    sigma_x = X.sum()
    sigma_y = Y.sum()    
    sigma_x_multiplied_by_y = sum([Xi * Yi for Xi, Yi in zip(X, Y)])
    sigma_x_squared = sum([Xi ** 2 for Xi in X])

    w1_star = ((n * sigma_x_multiplied_by_y) - sigma_x * sigma_y) / (n * sigma_x_squared - sigma_x ** 2)
    return w1_star
    
def calculate_w0_star(X, Y, w1_star):
    w0_star = Y.mean() - w1_star * X.mean()
    return w0_star
    
def predict_y(x):
    w1_star = calculate_w1_star(X, Y)
    w0_star = calculate_w0_star(X, Y, w1_star)
    

    print(f"w0*: {w0_star}")
    print(f"w1*: {w1_star}")
    
    return w0_star + w1_star * x



predict_y(100)
