In [2]:
import numpy as np
import pandas as pd
from subprocess import call
from sklearn.metrics import mean_squared_error
import math

## Data Handling

In [3]:
# Load the data
df_full = pd.read_csv('../data_raw/data_train.csv')
df_train = pd.read_csv('../data_raw/cross_validation/train_split_4.csv')
df_test = pd.read_csv('../data_raw/cross_validation/test_split_4.csv')

dic_full = {
    'user_id': [int(str(x).partition("_")[0][1:]) for x in df_full['Id']],
    'item_id': [int(str(x).partition("_")[2][1:]) for x in df_full['Id']],
    #'combined': [(str(x).partition("_")[0][1:],str(x).partition("_")[2][1:]) for x in df['Id']],
    'rating': [float(x) for x in df_full['Prediction']],
}
dic_train = {
    'user_id': [int(str(x).partition("_")[0][1:]) for x in df_train['Id']],
    'item_id': [int(str(x).partition("_")[2][1:]) for x in df_train['Id']],
    'rating': [float(x) for x in df_train['Prediction']],
}
dic_test = {
    'user_id': [int(str(x).partition("_")[0][1:]) for x in df_test['Id']],
    'item_id': [int(str(x).partition("_")[2][1:]) for x in df_test['Id']],
    'rating': [float(x) for x in df_test['Prediction']],
}

full_data = pd.DataFrame(dic_full)
train_data = pd.DataFrame(dic_train)
test_data = pd.DataFrame(dic_test)
full_data[:100]


Unnamed: 0,user_id,item_id,rating
0,44,1,4.0
1,61,1,3.0
2,67,1,4.0
3,72,1,3.0
4,86,1,5.0
...,...,...,...
95,2706,1,4.0
96,2820,1,3.0
97,2883,1,2.0
98,2939,1,3.0


In [73]:
n_users = 10000
n_items = 1000
ratings = np.zeros((n_users, n_items))
train = np.zeros((n_users, n_items))
test = np.zeros((n_users, n_items))

# Create train and test sets via Tobi
for row in full_data.itertuples(index = False):
    ratings[int(row.user_id) - 1, int(row.item_id) - 1] = int(row.rating)

for row in train_data.itertuples(index = False):
    train[int(row.user_id) - 1, int(row.item_id) - 1] = int(row.rating)

for row in test_data.itertuples(index = False):
    test[int(row.user_id) - 1, int(row.item_id) - 1] = int(row.rating)


print(ratings[:10, :10])

[[0. 0. 0. 0. 0. 0. 0. 0. 0. 5.]
 [0. 0. 0. 3. 0. 5. 0. 4. 0. 0.]
 [0. 0. 0. 2. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 2. 0. 0. 0. 5. 0. 3. 0. 0.]
 [0. 0. 0. 0. 0. 5. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 5. 0. 0. 0. 3.]
 [0. 0. 0. 1. 0. 5. 0. 5. 0. 0.]]


In [74]:
# normalization per item
train_T = np.transpose(train)
test_T = np.transpose(test)
print(train_T.shape)
avgs, stds = [], []
for item in range(n_items):
    mask = np.nonzero(train_T[item])
    mask_test = np.nonzero(test_T[item])
    avg = np.mean(train_T[item][mask])
    std = np.std(train_T[item][mask])
    avgs.append(avg)
    stds.append(std)
    train_T[item][mask] = (train_T[item][mask] - avg) / std
    test_T[item][mask_test] = (test_T[item][mask_test] - avg) / std


train = np.transpose(train_T)
test_norm = np.transpose(test_T)
print(train.shape)
print(train[:10][:10])

(1000, 10000)
(10000, 1000)
[[ 0.          0.          0.         ...  0.          0.
   0.        ]
 [ 0.          0.          0.         ...  2.34660803 -0.08553854
  -0.45348009]
 [ 0.          0.          0.         ...  0.          0.
   0.        ]
 ...
 [ 0.          0.          0.         ...  0.          0.
   0.        ]
 [ 0.          0.          0.         ...  0.          0.
   0.        ]
 [ 0.          0.          0.         ...  0.          0.
   0.        ]]


## Model

In [1]:
# Todo:
# - change n_iter to some stop condition (often converges after 10 iterations)

class ExplicitMF:
    """
    Training the ALS model, while updating the user and item factors.
    """

    def __init__(self, n_iters, n_factors, reg):
        self.reg = reg
        self.n_iters = n_iters
        self.n_factors = n_factors  
        self.n_user, self.n_item = train.shape
        self.user_factors = np.random.random((self.n_user, self.n_factors))
        self.item_factors = np.random.random((self.n_item, self.n_factors))
        
        
    def fit(self, train, test):
        """
        train the model. Doing ALS steps
        """

        self.test_rmse_record  = []
        self.train_rmse_record = []   
        for i in range(self.n_iters):
            print('iteration: ', i)
            self.user_factors = self._als_step(train, self.user_factors, self.item_factors)
            self.item_factors = self._als_step(train.T, self.item_factors, self.user_factors)
            predictions = self.predict()
            test_rmse = self.compute_rmse(test, predictions)
            train_rmse = self.compute_rmse(train, predictions)
            self.test_rmse_record.append(test_rmse)
            self.train_rmse_record.append(train_rmse)
        
        return self    
    
    def _als_step(self, ratings, solve_vecs, fixed_vecs):
        """
        Doing an ALS step as seen in the lecutre.
        """
        for i in range(solve_vecs.shape[0]):
            A = np.zeros((self.n_factors, self.n_factors))
            b = np.zeros((self.n_factors,))
            for j in range(fixed_vecs.shape[0]):
                if ratings[i, j] != 0:
                    A += np.outer(fixed_vecs[j], fixed_vecs[j])
                    b += ratings[i][j] * fixed_vecs[j]
            A += self.reg * np.eye(self.n_factors)
            solve_vecs[i] = np.linalg.solve(A, b).T
        
        return solve_vecs
    
    def predict(self):
        """predict ratings for every user and item"""
        pred = self.user_factors.dot(self.item_factors.T)
        return pred
    
    @staticmethod
    def compute_rmse(y_true, y_pred):
        """ignore zero terms prior to comparing the mse"""
        mask = np.nonzero(y_true)
        mse = mean_squared_error(y_true[mask], y_pred[mask])
        return math.sqrt(mse)

In [76]:
model = ExplicitMF(n_iters = 20, n_factors = 30, reg = 5.0)
model.fit(train, test_norm)
print(model.train_rmse_record)

iteration:  0
iteration:  1
iteration:  2
iteration:  3
iteration:  4
iteration:  5
iteration:  6
iteration:  7
iteration:  8
iteration:  9
iteration:  10
iteration:  11
iteration:  12
iteration:  13
iteration:  14
iteration:  15
iteration:  16
iteration:  17
iteration:  18
iteration:  19
[0.8941612767307773, 0.8009665240994799, 0.7758816958385989, 0.7615298830541224, 0.7527317955480755, 0.7469116032890195, 0.7427886653157787, 0.7397125919546231, 0.7373249917787268, 0.7354123062687097, 0.7338403288975182, 0.7325207645598608, 0.7313930647951241, 0.7304142768255687, 0.7295531334444967, 0.7287865221450797, 0.7280973215898928, 0.7274729545281443, 0.7269042685680689, 0.7263845849806015]


In [None]:
# print(model.test_mse_record)
pred = model.predict()
pred_T = np.transpose(pred)
for item in range(n_items):
    pred_T[item] = pred_T[item] * stds[item] + avgs[item]
    for i in range(len(pred_T[item])):
        if pred_T[item][i] < 1:
            pred_T[item][i] = 1
        elif pred_T[item][i] > 5:
            pred_T[item][i] = 5

pred = np.transpose(pred_T)

print(f'RMSE: {model.compute_rmse(test, pred)}')
print('\n', pred[:10][:10])
print('\n', train[:10][:10])

## Cross Validation

In [None]:
# Big Cross Validation
regs = [0.5, 0.1, 0.05]
n_factors = [25, 30, 35]

best = (0, 0)
best_rmse = 100
for reg in regs:
    for n_factor in n_factors:
        model = ExplicitMF(n_iters = 100, n_factors = n_factor, reg = reg)
        model.fit(train, test)
        pred = model.predict()
        pred_T = np.transpose(pred)
        for item in range(n_items):
            pred_T[item] = pred_T[item] * stds[item] + avgs[item]
        pred = np.transpose(pred_T)
        rmse = model.compute_rmse(test, pred)
        if rmse < best_rmse:
            best_rmse = rmse
            best = (reg, n_factor)
            print(f'New best Hyperparameters: {best} with RMSE: {best_rmse}')


# Submission

In [38]:
# normalize training data
ratings_T = np.transpose(ratings)
avgs_full, stds_full = [], []
for item in range(n_items):
    mask = np.nonzero(ratings_T[item])
    avg = np.mean(ratings_T[item][mask])
    std = np.std(ratings_T[item][mask])
    avgs_full.append(avg)
    stds_full.append(std)
    ratings_T[item][mask] = ratings_T[item][mask] - avg / std

ratings = np.transpose(ratings_T)

# train the model
print('\n', 'Training the model...')
model = ExplicitMF(n_iters = 200, n_factors = 30, reg = 0.1)
model.fit(ratings, test)

# predict ratings
pred = model.predict()
pred_T = np.transpose(pred)
for item in range(n_items):
    pred_T[item] = pred_T[item] * stds_full[item] + avgs_full[item]
pred = np.transpose(pred_T)
print(pred)

# write to submission-file
sample_sub = pd.read_csv("../data_raw/sampleSubmission.csv")
prediction = []
for cell_id in sample_sub.Id:
    row, col = cell_id.split("_")
    prediction.append(pred[int(row[1:])-1, int(col[1:])-1])
sample_sub.Prediction = prediction
sample_sub.to_csv("../data/als.csv", index=False)
sample_sub

[[3.36702062 3.50447268 3.49662615 ... 3.22360299 3.33021409 3.68486082]
 [3.36264126 3.50196089 3.48604375 ... 3.37077123 3.39492532 3.70106834]
 [3.36657948 3.50439185 3.5122743  ... 3.26365413 3.38339937 3.64334944]
 ...
 [3.38493845 3.49272399 3.4756898  ... 3.22355556 3.3733448  3.73381173]
 [3.40735796 3.53016222 3.49874185 ... 3.21514371 3.33079054 3.77398583]
 [3.37612169 3.53682238 3.52696103 ... 3.31148591 3.34532214 3.74647044]]


Unnamed: 0,Id,Prediction
0,r37_c1,3.413237
1,r73_c1,3.366761
2,r156_c1,3.533388
3,r160_c1,3.442889
4,r248_c1,3.432730
...,...,...
1176947,r9974_c1000,3.681848
1176948,r9977_c1000,3.707133
1176949,r9978_c1000,3.602696
1176950,r9982_c1000,3.570269


# Library Code

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    validation_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS().setMaxIter(maxIter).setRank(rank).setRegParam(reg)
            # train ALS model
            print(f'Training model with rank {rank} and regularization {reg}')
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {} with RSME = {}'.format(best_rank, best_regularization, min_error))
    return best_model


def make_recommendations(self, fav_movie, n_recommendations):
    """
    make top n movie recommendations
    Parameters
    ----------
    fav_movie: str, name of user input movie
    n_recommendations: int, top n recommendations
    """
    # get data
    movie_user_mat_sparse, hashmap = self._prep_data()
    # get recommendations
    raw_recommends = self._inference(
        self.model, movie_user_mat_sparse, hashmap,
        fav_movie, n_recommendations)
    # print results
    reverse_hashmap = {v: k for k, v in hashmap.items()}
    print('Recommendations for {}:'.format(fav_movie))
    for i, (idx, dist) in enumerate(raw_recommends):
        print('{0}: {1}, with distance '
              'of {2}'.format(i+1, reverse_hashmap[idx], dist))

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [6]:
train_lib = list(zip(train_data.user_id, train_data.item_id, train_data.rating))
test_lib = list(zip(test_data.user_id, test_data.item_id, test_data.rating))
full_lib = train_lib + test_lib

df_full = spark.createDataFrame(full_lib, ["user", "item", "rating"])
df = spark.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"])


In [None]:
# only for manual testing
als = ALS().setMaxIter(15).setRank(30).setRegParam(0.1)
# train ALS model
model = als.fit(df_train)
# evaluate the model by computing the RMSE on the validation data

predictions = model.transform(df_test)
print(predictions.show())

evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print('RMSE on test data = {}'.format(rmse))

In [None]:
model = tune_ALS(df_train, df_test, maxIter = 12, regParams = [0.05, 0.1, 0.2], ranks = [10, 12])
print('Training complete')
pred = model.transform(df_test)


evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
rmse = evaluator.evaluate(pred)
print('RSME: {}' .format(rmse))

# Submission Library

In [14]:
# predict ratings
sample_sub = pd.read_csv("../data_raw/sampleSubmission.csv")
to_predict = []
for i, cell_id in enumerate(sample_sub.Id):
    row, col = cell_id.split("_")
    to_predict.append((int(row[1:]), int(col[1:])))
df_sub = spark.createDataFrame(to_predict, ["user", "item"])

ensemble = 5
average = np.asarray([0.0] * len(to_predict))
for i in range(ensemble):
    als = ALS(seed=i).setMaxIter(15).setRank(30).setRegParam(0.1)
    print(f'Training model {i+1}/{ensemble}')
    model = als.fit(df_full)
    predictions = model.transform(df_sub)
    pandas_df = predictions.toPandas()
    pandas_df = pandas_df.sort_values(by=['item', 'user'], ascending=True)
    average += np.asarray(pandas_df['prediction'].values.tolist())
    print(pandas_df[:3])
average = average / ensemble

print(pandas_df[:5])
print(predictions.show())

# write to csv
sample_sub.Prediction = average
sample_sub.to_csv("../data/als.csv", index=False)
sample_sub

Training model 1/5
22/07/22 19:00:44 WARN TaskSetManager: Stage 1503 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


[Stage 1503:>                                                       (0 + 1) / 1]

22/07/22 19:00:48 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1503 (TID 3752): Attempting to kill Python Worker
22/07/22 19:00:48 WARN TaskSetManager: Stage 1504 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/07/22 19:01:14 WARN TaskSetManager: Stage 1577 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

        user  item  prediction
500501    37     1    3.208503
789489    73     1    3.000793
822574   156     1    3.679502
Training model 2/5
22/07/22 19:01:21 WARN TaskSetManager: Stage 1620 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


[Stage 1620:>                                                       (0 + 1) / 1]

22/07/22 19:01:25 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1620 (TID 4120): Attempting to kill Python Worker
22/07/22 19:01:25 WARN TaskSetManager: Stage 1621 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/07/22 19:01:51 WARN TaskSetManager: Stage 1694 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

        user  item  prediction
500501    37     1    3.228759
789489    73     1    2.980950
822574   156     1    3.667770
Training model 3/5
22/07/22 19:01:58 WARN TaskSetManager: Stage 1737 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


[Stage 1737:>                                                       (0 + 1) / 1]

22/07/22 19:02:02 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1737 (TID 4488): Attempting to kill Python Worker
22/07/22 19:02:02 WARN TaskSetManager: Stage 1738 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/07/22 19:02:28 WARN TaskSetManager: Stage 1811 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

        user  item  prediction
500501    37     1    3.224522
789489    73     1    2.964534
822574   156     1    3.662883
Training model 4/5
22/07/22 19:02:34 WARN TaskSetManager: Stage 1854 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


[Stage 1854:>                                                       (0 + 1) / 1]

22/07/22 19:02:38 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1854 (TID 4856): Attempting to kill Python Worker
22/07/22 19:02:38 WARN TaskSetManager: Stage 1855 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/07/22 19:03:04 WARN TaskSetManager: Stage 1928 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

        user  item  prediction
500501    37     1    3.208846
789489    73     1    3.029075
822574   156     1    3.647963
Training model 5/5
22/07/22 19:03:11 WARN TaskSetManager: Stage 1971 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


[Stage 1971:>                                                       (0 + 1) / 1]

22/07/22 19:03:15 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1971 (TID 5224): Attempting to kill Python Worker


                                                                                

22/07/22 19:03:16 WARN TaskSetManager: Stage 1972 contains a task of very large size (19247 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/07/22 19:03:41 WARN TaskSetManager: Stage 2045 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

        user  item  prediction
500501    37     1    3.165476
789489    73     1    3.012002
822574   156     1    3.676820
         user  item  prediction
500501     37     1    3.165476
789489     73     1    3.012002
822574    156     1    3.676820
805715    160     1    3.271206
1016742   248     1    3.242455
22/07/22 19:03:48 WARN TaskSetManager: Stage 2120 contains a task of very large size (8901 KiB). The maximum recommended task size is 1000 KiB.
+----+----+----------+
|user|item|prediction|
+----+----+----------+
|  37|   1| 3.1654763|
|  73|   1| 3.0120022|
| 156|   1| 3.6768196|
| 160|   1|  3.271206|
| 248|   1| 3.2424545|
| 256|   1| 3.3523054|
| 284|   1| 3.0376537|
| 400|   1| 3.0315964|
| 416|   1| 3.5371642|
| 456|   1| 3.2642639|
| 474|   1| 2.5645995|
| 495|   1| 3.1156893|
| 515|   1| 1.7636681|
| 518|   1| 3.4835413|
| 521|   1| 3.9568653|
| 559|   1| 2.7974775|
| 596|   1| 3.1955385|
| 614|   1| 3.4028368|
| 621|   1| 2.9847357|
| 661|   1| 3.1761184|
+----+----+

Unnamed: 0,Id,Prediction
0,r37_c1,3.207221
1,r73_c1,2.997471
2,r156_c1,3.666987
3,r160_c1,3.276756
4,r248_c1,3.231271
...,...,...
1176947,r9974_c1000,3.383960
1176948,r9977_c1000,3.483347
1176949,r9978_c1000,2.910252
1176950,r9982_c1000,3.124082


In [28]:
pandas_df = predictions.toPandas()
pandas_df = pandas_df.sort_values(by=['item', 'user'], ascending=True)
print(pandas_df[:30])

sample_sub.Prediction = pandas_df['prediction'].values.tolist()
sample_sub.to_csv("../data/als.csv", index=False)
sample_sub

Unnamed: 0,Id,Prediction
0,r37_c1,3.173383
1,r73_c1,2.997006
2,r156_c1,3.646690
3,r160_c1,3.256994
4,r248_c1,3.261189
...,...,...
1176947,r9974_c1000,3.395679
1176948,r9977_c1000,3.484329
1176949,r9978_c1000,2.893643
1176950,r9982_c1000,3.079063
