In [7]:
import numpy as np
import pandas as pd

file_path = '../data/raw/dataset.csv'

In [8]:
def ridge_regression(X, y, alpha):
    X_bias = pd.concat([pd.Series(1, index=X.index, name='bias'), X], axis=1)

    identity_matrix = np.identity(X_bias.shape[1])
    coefficients = np.linalg.inv(X_bias.T @ X_bias + alpha * identity_matrix) @ X_bias.T @ y
    
    return coefficients

def predict(X, coefficients):
    X_bias = pd.concat([pd.Series(1, index=X.index, name='bias'), X], axis=1)
    
    predictions = X_bias @ coefficients
    
    return predictions

def mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def k_fold_cross_validation(df, k, alpha_values, target_column='popularity', stop_condition=None):
    X = df.drop(columns=[target_column])
    y = df[target_column]
    
    fold_size = len(X) // k
    errors = []

    for alpha in alpha_values:
        alpha_errors = []
        for i in range(k):
            start, end = i * fold_size, (i + 1) * fold_size

            # Split the data into training and validation sets
            X_train = pd.concat([X.iloc[:start], X.iloc[end:]])
            y_train = pd.concat([y.iloc[:start], y.iloc[end:]])
            X_val = X.iloc[start:end]
            y_val = y.iloc[start:end]

            # Train ridge regression model
            coefficients = ridge_regression(X_train, y_train, alpha)

            # Make predictions on the validation set
            predictions = predict(X_val, coefficients)

            # Calculate mean squared error
            mse = mean_squared_error(y_val, predictions)
            alpha_errors.append(mse)

            # Check the stop condition
            if stop_condition is not None and stop_condition(y_val):
                break

        # Average the errors across folds for the current alpha
        errors.append(np.mean(alpha_errors))

        # Check the stop condition after each alpha value
        if stop_condition is not None and stop_condition(errors):
            break

    return errors


In [None]:
def stop_condition(iteration, max_iterations=1000):
    return iteration >= max_iterations

def stop_condition2(coefficients, threshold=1e-4):
    return all(abs(coef) < threshold for coef in coefficients)

def stop_condition3(validation_errors, threshold=1e-4):
    return any(error > threshold for error in validation_errors)


In [9]:
df = pd.read_csv(file_path)
shuffled_df = df.sample(frac=1, random_state=42)
train_percentage = 0.7

train_size = int(len(shuffled_df) * train_percentage)

df_train = shuffled_df.iloc[:train_size]
df_test = shuffled_df.iloc[train_size:]

alpha_values = [0.1, 1.0, 10.0]
k_fold = 5

errors = k_fold_cross_validation(df_train, k_fold, alpha_values, stop_condition=stop_condition3)
print(errors)

In [None]:
# Find the alpha with the lowest mean squared error
best_alpha = alpha_values[np.argmin(errors)]
print("Best alpha:", best_alpha)

In [None]:
# Train the final model with the best alpha using the entire training set
final_coefficients = ridge_regression(df_train.drop(columns=['popularity']), df_train['popularity'], best_alpha)

In [None]:
# Evaluate the model on the test set
test_predictions = predict(df_test.drop(columns=['popularity']), final_coefficients)
test_mse = mean_squared_error(df_test['popularity'], test_predictions)
print("Test Mean Squared Error:", test_mse)