In [None]:
# Load the autoreload extension
%load_ext autoreload

# Set autoreload to reload all modules every time a cell is executed
%autoreload 2

In [None]:
import random
import numpy as np

# Set the seed for reproducibility
seed = 0
random.seed(seed)
np.random.seed(seed)

In [None]:
import time
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.model_selection import cross_val_score
from plotly import graph_objects as go


def benchmark(trainset, recommender):
    """
    Gets the elapsed time for fitting a recommender to a trainset.
    """
    start_time = time.time()
    try:
        recommender.fit(trainset)
    except MemoryError:
        print("Memory error")
        elapsed_time = None
    else:
        end_time = time.time()
        elapsed_time = end_time - start_time
    return elapsed_time


def get_polynomial_degree_that_fits_best(x, y):
    """
    Gets the polynomial degree that fits best to the data.
    """
    degrees = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    best_degree = 1
    for degree in degrees:
        poly = PolynomialFeatures(degree)
        x_poly = poly.fit_transform(x.reshape(-1, 1))
        model = LinearRegression()
        scores = -cross_val_score(model, x_poly, y, cv=5, scoring="neg_mean_squared_error")
        average_mse = np.mean(scores)
        if degree == 1:
            best_mse = average_mse
        elif average_mse < best_mse:
            best_mse = average_mse
            best_degree = degree
    return best_degree


def get_polynomial(x: np.array, y: np.array):
    if isinstance(x, list):
        x = np.array(x)
    if isinstance(y, list):
        y = np.array(y)

    degree = get_polynomial_degree_that_fits_best(x, y)
    polynomial = PolynomialFeatures(degree)
    x_polynomial = polynomial.fit_transform(x.reshape(-1, 1))
    model = LinearRegression()
    model.fit(x_polynomial, y)

    return model, polynomial


def print_polynomial(model: LinearRegression):
    coefficients = model.coef_
    equation = f"Y = {coefficients[0]}"
    for i in range(1, len(coefficients)):
        equation += f" + {coefficients[i]} * x^{i}"

    print("Polynomial degree: ", len(coefficients) - 1)
    print(equation)


def plot_polynomials(x: np.array, x_title: str, polynomial_data):
    layout = go.Layout(
        title="Polynomial Regression",
        xaxis=dict(title=x_title),
        yaxis=dict(title="Runtime in seconds"),
    )

    fig = go.Figure(layout=layout)
    x_range = np.linspace(min(x), 2 * max(x), 100)

    for i, data in enumerate(polynomial_data):
        y, model, poly, name = data

        color = f"hsl({i * 360 / len(polynomial_data)}, 100%, 50%)"

        scatter_trace = go.Scatter(
            x=x, y=y, mode="markers", name=name, marker=dict(size=10,color=color)
        )
        fig.add_trace(scatter_trace)

        x_range_poly = poly.transform(x_range.reshape(-1, 1))
        y_range_pred = model.predict(x_range_poly)
        line_trace = go.Scatter(
            x=x_range, y=y_range_pred, mode="lines", name=f"{name} Polynomial Curve", line=dict(color=color), showlegend=False
        )
        fig.add_trace(line_trace)

    fig.show()

In [None]:
# Global experiment parameters
RATING_SCALE = 5
SPARSITY_TARGET = 0.1

In [None]:
import gc
from dataset.random_dataset import RandomDataset
from surprise.prediction_algorithms import SVD
from surprise.prediction_algorithms import KNNBasic
from recommenders.grecond_recommender import GreConDRecommender
from recommenders.binaps_recommender import BinaPsRecommender
from recommenders import DEBUG_LOGGER


# Experiment parameters
NUMBERS_OF_ITEMS = np.arange(100, 50100, 5000)
NUMBER_OF_USERS = 1000

ratings_numbers = []
svd_times = []
knn_times = []
binaps_times = []

for number_of_items in NUMBERS_OF_ITEMS:
    print(f"Number of items: {number_of_items}")
    dataset = RandomDataset(NUMBER_OF_USERS, number_of_items, RATING_SCALE, SPARSITY_TARGET)
    trainset = dataset.construct_trainset(dataset.raw_ratings)
    ratings_numbers.append(trainset.n_ratings)

    recommender = SVD(n_factors=100, n_epochs=100, biased=False)
    elapsed_time = benchmark(trainset, recommender)
    svd_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = KNNBasic(sim_options={"name": "cosine", "user_based": True}, verbose=False)
    elapsed_time = benchmark(trainset, recommender)
    knn_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = BinaPsRecommender(
        epochs=1000,
        hidden_dimension_neurons_number=100,
        logger=DEBUG_LOGGER,
    )
    elapsed_time = benchmark(trainset, recommender)
    binaps_times.append(elapsed_time)
    del recommender
    gc.collect()

    del dataset
    del trainset
    gc.collect()

svd_model, svd_polynomial = get_polynomial(NUMBERS_OF_ITEMS, svd_times)
knn_model, knn_polynomial = get_polynomial(NUMBERS_OF_ITEMS, knn_times)
binaps_model, binaps_polynomial = get_polynomial(NUMBERS_OF_ITEMS, binaps_times)

In [None]:
print("SVD")
print_polynomial(svd_model)

print("KNN")
print_polynomial(knn_model)

print("BinaPs")
print_polynomial(binaps_model)

plot_polynomials(
    NUMBERS_OF_ITEMS,
    "Number of items",
    [
        (svd_times, svd_model, svd_polynomial, "SVD"),
        (knn_times, knn_model, knn_polynomial, "KNN"),
        (binaps_times, binaps_model, binaps_polynomial, "BinaPs"),
    ],
)

In [None]:

import gc
from dataset.random_dataset import RandomDataset
from surprise.prediction_algorithms import SVD
from surprise.prediction_algorithms import KNNBasic
from recommenders.grecond_recommender import GreConDRecommender
from recommenders.binaps_recommender import BinaPsRecommender
from recommenders import DEBUG_LOGGER


# Experiment parameters
NUMBERS_OF_ITEMS = np.arange(100, 5000, 1000)
NUMBER_OF_USERS = 1000

ratings_numbers = []
svd_times = []
knn_times = []
grecond_times = []
binaps_times = []

for number_of_items in NUMBERS_OF_ITEMS:
    print(f"Number of items: {number_of_items}")
    dataset = RandomDataset(NUMBER_OF_USERS, number_of_items, RATING_SCALE, SPARSITY_TARGET)
    trainset = dataset.construct_trainset(dataset.raw_ratings)
    ratings_numbers.append(trainset.n_ratings)

    recommender = SVD(n_factors=100, n_epochs=100, biased=False)
    elapsed_time = benchmark(trainset, recommender)
    svd_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = GreConDRecommender()
    elapsed_time = benchmark(trainset, recommender)
    grecond_times.append(elapsed_time)
    del recommender
    del dataset
    del trainset
    gc.collect()

svd_model, svd_polynomial = get_polynomial(NUMBERS_OF_ITEMS, svd_times)
grecond_model, grecond_polynomial = get_polynomial(NUMBERS_OF_ITEMS, grecond_times)


In [None]:
print("SVD")
print_polynomial(svd_model)

print("GreConD")
print_polynomial(grecond_model)

plot_polynomials(
    NUMBERS_OF_ITEMS,
    "Number of items",
    [
        (svd_times, svd_model, svd_polynomial, "SVD"),
        (grecond_times, grecond_model, grecond_polynomial, "GreConDKNN"),
    ],
)

In [None]:
import gc
from dataset.random_dataset import RandomDataset
from surprise.prediction_algorithms import SVD
from surprise.prediction_algorithms import KNNBasic
from recommenders.grecond_recommender import GreConDRecommender
from recommenders.binaps_recommender import BinaPsRecommender


# Experiment parameters
NUMBERS_OF_ITEMS = 1000
NUMBERS_OF_USERS = np.arange(1000, 22000, 5000)

ratings_numbers = []
svd_times = []
knn_times = []
grecond_times = []
binaps_times = []

for number_of_users in NUMBERS_OF_USERS:
    print(f"Number of users: {number_of_users}")
    dataset = RandomDataset(number_of_users, NUMBERS_OF_ITEMS, RATING_SCALE, SPARSITY_TARGET)
    trainset = dataset.construct_trainset(dataset.raw_ratings)
    ratings_numbers.append(trainset.n_ratings)

    recommender = SVD(n_factors=100, n_epochs=100, biased=False)
    elapsed_time = benchmark(trainset, recommender)
    svd_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = KNNBasic(sim_options={"name": "cosine", "user_based": True}, verbose=False)
    elapsed_time = benchmark(trainset, recommender)
    knn_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = GreConDRecommender()
    elapsed_time = benchmark(trainset, recommender)
    grecond_times.append(elapsed_time)
    del recommender
    gc.collect()

    recommender = BinaPsRecommender(epochs=1000)
    elapsed_time = benchmark(trainset, recommender)
    binaps_times.append(elapsed_time)
    del recommender
    gc.collect()

    del dataset
    del trainset
    gc.collect()

svd_model, svd_polynomial = get_polynomial(NUMBERS_OF_USERS, svd_times)
knn_model, knn_polynomial = get_polynomial(NUMBERS_OF_USERS, knn_times)
grecond_model, grecond_polynomial = get_polynomial(NUMBERS_OF_USERS, grecond_times)
binaps_model, binaps_polynomial = get_polynomial(NUMBERS_OF_USERS, binaps_times)

In [None]:
print("SVD")
print_polynomial(svd_model)

print("KNN")
print_polynomial(knn_model)

print("GreConD")
print_polynomial(grecond_model)

print("BinaPs")
print_polynomial(binaps_model)

plot_polynomials(
    NUMBERS_OF_USERS,
    "Number of users",
    [
        (svd_times, svd_model, svd_polynomial, "SVD"),
        (knn_times, knn_model, knn_polynomial, "KNN"),
        (grecond_times, grecond_model, grecond_polynomial, "GreConDKNN"),
        (binaps_times, binaps_model, binaps_polynomial, "BinaPs"),
    ],
)