## Alternating Least Squares (ALS)

In [2]:
'''
Implements Alternating Least Squares (ALS) to create a recommender system for a subset of the Netflix dataset.
'''
import matplotlib.pyplot as plt
import numpy as np
import argparse
from scipy.sparse import csc_array, csr_array, csc_matrix, csr_matrix
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import ray
from random import shuffle


from more_itertools import chunked

In [3]:
ray_cores = 8
ray.init(num_cpus=ray_cores, ignore_reinit_error=True)

RayContext(dashboard_url='', python_version='3.9.13', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03d692b2be3e', address_info={'node_ip_address': '127.0.0.1', 'raylet_ip_address': '127.0.0.1', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-07-11_01-39-29_738956_2587/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-07-11_01-39-29_738956_2587/sockets/raylet', 'webui_url': '', 'session_dir': '/tmp/ray/session_2022-07-11_01-39-29_738956_2587', 'metrics_export_port': 53279, 'gcs_address': '127.0.0.1:50788', 'address': '127.0.0.1:50788', 'node_id': '4bc09a47edada935a71f08879629b5bfdf815dd6bb41b34d883180a7'})

In [4]:
# file, out_dir, n_features = '../datasets/netflix/ratings.csv.gz', '../datasets/netflix/mf/', 300
# file, out_dir, n_features = '../datasets/movie_lens/ratings.csv.gz', '../datasets/movie_lens/mf/', 200
# file, out_dir, n_features = '../datasets/kgrec/music_ratings.csv.gz', '../datasets/kgrec/mf/', 50
file, out_dir, n_features = '../datasets/spotify/ratings.csv.gz', '../datasets/spotify/mf/', 300
original_data = pd.read_csv(file)

print(original_data.head())
# chceck ids
assert original_data['user_id'].nunique() == original_data['user_id'].max() + 1
assert original_data['item_id'].nunique() == original_data['item_id'].max() + 1

   user_id  item_id
0   549000        0
1   549000        1
2   549000        2
3   549000        3
4   549000        4


In [5]:
# keep only users with at least n ratings
n = 10
num_ratings = original_data.groupby('user_id').size()
index_size_ok = num_ratings[num_ratings >= n].index
index_size_low = num_ratings[num_ratings < n].index
original_data_ok = original_data[original_data['user_id'].isin(index_size_ok)]
original_data_low = original_data[original_data['user_id'].isin(index_size_low)]

In [6]:
def load_and_process_df(file):
    '''
    Loads a dataframe from a file and returns a sparse matricies.
    '''
    # df = pd.read_csv(file)
    df = original_data_ok.copy()
    df_low = original_data_low.copy()

    if 'user_id' not in df.columns or 'item_id' not in df.columns:
        raise Exception('Dataframe does not have user_id and item_id columns')
    if not 'rating' in df.columns:
        df['rating'] = 1
        df_low['rating'] = 1
    
    df = df[['user_id', 'item_id', 'rating']]
    df_low = df_low[['user_id', 'item_id', 'rating']]


    # split to training and testing data 80:20 for each user
    training, testing = train_test_split(df, test_size=0.2, stratify=df['user_id'])
    # training = pd.concat([training, df_low])

    num_of_users = df['user_id'].max() + 1
    num_of_items = df['item_id'].max() + 1

    training_data_csc = csc_matrix((training['rating'], (training['user_id'], training['item_id'])), shape=(num_of_users, num_of_items))
    training_data_t_csc = csc_matrix((training['rating'], (training['item_id'], training['user_id'])), shape=(num_of_items, num_of_users))
    testing_data_csc = csc_matrix((testing['rating'], (testing['user_id'], testing['item_id'])), shape=(num_of_users, num_of_items))
    
    # check if the last value is only in training data xor testing data
    # print(training_data)
    uid = int(df.iloc[-1]['user_id'])
    iid = int(df.iloc[-1]['item_id'])
    rating = df.iloc[-1].rating
    in_train = training_data_csc[uid, iid] == rating
    in_test = testing_data_csc[df.iloc[-1]['user_id'], df.iloc[-1]['item_id']] == df.iloc[-1].rating
    assert (in_train and not in_test) or (not in_train and in_test), 'Dataframe is not split correctly'
    
    return training_data_csc, training_data_t_csc, testing_data_csc


training_data_csc, training_data_t_csc, testing_data_csc = load_and_process_df('../datasets/netflix/ratings.csv.gz')
# training_data_t = training_data.T
# testing_data_t = testing_data.T

print(training_data_csc.shape)
print(testing_data_csc.shape)
print(training_data_t_csc.shape)
# print(testing_data_t.shape)

(1000000, 2262292)
(1000000, 2262292)
(2262292, 1000000)


In [9]:
@ray.remote
def process_single_index_ray(j, l2_lambda, data_ray, fixed_matrix_ray):
    nonzeros = data_ray[:,j].nonzero()[0]
    y = data_ray[nonzeros, j].todense()
    X = fixed_matrix_ray[:, nonzeros]
    return np.squeeze(np.linalg.inv(X.dot(X.T) + l2_lambda * np.eye(X.shape[0])).dot(X.dot(y)))

def cf_ridge_regression_column_wise(target_matrix, fixed_matrix, data_ray, l2_lambda):
    '''
    Solves a ridge regression problem using a closed form solution:
        w_i = (X'X + lambda * I)^-1 X'y
    for all i in the target matrix.
    '''
    fixed_matrix_ray = ray.put(fixed_matrix)
    matrix_column_indexes = list(range(target_matrix.shape[1]))
    shuffle(matrix_column_indexes)

    with tqdm(total=target_matrix.shape[1]) as pbar:
        for chunk_j in chunked(matrix_column_indexes, n=100):
            futures = []
            for j in chunk_j:
                futures.append(process_single_index_ray.remote(j, l2_lambda, data_ray, fixed_matrix_ray))
            results = ray.get(futures)
            for j, result in zip(chunk_j, results):
                target_matrix[:,j] = result
            pbar.update(len(chunk_j))
    
    del fixed_matrix_ray


def sum_squared_error(gt_data, U, M):
    U_small = U[:,0:1000].T
    M_small = M[:,0:1000]
    resulting = U_small.dot(M_small)
    gt_data_subset = gt_data[0:1000,0:1000]
    diff = gt_data_subset - resulting
    error = diff[gt_data_subset[0:1000,0:1000].nonzero()]
    error2 = (np.array(error) ** 2).sum()
    return error2
    # return (np.array((training_data[0:1000,0:1000] - U[0:1000,:].T.dot(M[0:1000,:]))[training_data[0:1000,0:1000].nonzero()]) ** 2).sum()

# Initialize the parameters
total_ratings = 100_000_000

converge = 1e-6

max_steps = 100
l2_lambda = 0.1
delta = converge + 1.
cur_error = 1.
cur_step = 0

verbose = True

U_features = np.ones((n_features, training_data_csc.shape[0])) # features x users
I_features = np.ones((n_features, training_data_csc.shape[1])) # features x items

training_trace = []
testing_trace = []

train_error = sum_squared_error(training_data_csc, U_features, I_features)
test_error = sum_squared_error(testing_data_csc, U_features, I_features)
training_trace.append(np.sqrt(train_error / total_ratings))
testing_trace.append(np.sqrt(test_error / total_ratings))

print('Training error: {0}'.format(np.sqrt(train_error / total_ratings)))
print('Testing error: {0}'.format(np.sqrt(test_error / total_ratings)))


while delta > converge and cur_step < max_steps:
    print('Step #{0}'.format(cur_step))

    # Use the closed-form solution for the ridge-regression subproblems
    training_data_csc_ray = ray.put(training_data_csc)
    training_data_t_csc_ray = ray.put(training_data_t_csc)
    # slice_size = 50
    print('Fitting M')
    # cf_ridge_regression_column_wise_m(target_matrix=I_features, fixed_matrix=U_features, data_ray=training_data_csc_ray, l2_lambda=l2_lambda, slice_size=slice_size)
    cf_ridge_regression_column_wise(target_matrix=I_features, fixed_matrix=U_features, data_ray=training_data_csc_ray, l2_lambda=l2_lambda)
    print('Fitting U')
    # cf_ridge_regression_column_wise_m(target_matrix=U_features, fixed_matrix=I_features, data_ray=training_data_t_csc_ray, l2_lambda=l2_lambda, slice_size=slice_size)
    cf_ridge_regression_column_wise(target_matrix=U_features, fixed_matrix=I_features, data_ray=training_data_t_csc_ray, l2_lambda=l2_lambda)

    # Track performance in terms of RMSE on both the testing and training sets
    train_error = sum_squared_error(training_data_csc, U_features, I_features)
    test_error = sum_squared_error(testing_data_csc, U_features, I_features)
    training_trace.append(np.sqrt(train_error / total_ratings))
    testing_trace.append(np.sqrt(test_error / total_ratings))

    # Track convergence
    prev_error = cur_error
    cur_error = train_error
    delta = np.abs(prev_error - cur_error) / (prev_error + converge)
    print('Training error: {0}'.format(np.sqrt(train_error / total_ratings)))
    print('Testing error: {0}'.format(np.sqrt(test_error / total_ratings)))
    print('Delta: {0}'.format(delta))
    # Update the step counter
    cur_step += 1



Training error: 1.7961796123996063
Testing error: 0.9278640148211375
Step #0
Fitting M


  3%|▎         | 73654/2262292 [00:10<05:17, 6896.94it/s] [2m[36m(raylet)[0m Spilled 10494 MiB, 9 objects, write throughput 575 MiB/s.
100%|██████████| 2262292/2262292 [05:25<00:00, 6947.68it/s]
21633it [01:21, 266.85it/s]


KeyboardInterrupt: 

In [None]:
# save features
#create output directory
from pathlib import Path


Path.mkdir(Path(out_dir), exist_ok=True)

np.save(out_dir + 'U_features.npy', U_features)
np.save(out_dir + 'I_features.npy', I_features)

# print statistics about features
print('U_features shape: {0}'.format(U_features.shape))
print('I_features shape: {0}'.format(I_features.shape))
print(U_features)
print(I_features)

In [None]:

plt.figure()
plt.plot(np.arange(cur_step + 1), np.array(training_trace), label='Training RMSE')
plt.plot(np.arange(cur_step + 1), np.array(testing_trace), label='Testing RMSE')
plt.yscale('log')
# plt.savefig(args.plot_results, bbox_inches='tight')
plt.show()