### Import library

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=4c21ce7022de44115d829211a9320e714694423ff01ea157db4b22ba505607e7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
import os
import time

# spark imports
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

### Preprocessing

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import pandas as pd
import math
data_path = "/content/drive/MyDrive/WebMining/explicit_ratings_en.csv"
test_path = "/content/drive/MyDrive/WebMining/test.csv"
df = pd.read_csv(data_path,sep=",", index_col=False).drop(['created_at', "watch_percentage"], axis=1)
df['rating'] = df['rating'].apply(lambda x: math.ceil(x/2))
df

Unnamed: 0,user_id,item_id,rating
0,224557,510,5
1,224557,615,5
2,224557,7680,5
3,224293,510,5
4,224293,515,5
...,...,...,...
3654,605220,376915,5
3655,605220,376916,5
3656,605220,376924,5
3657,605220,376925,5


In [5]:
df_test = pd.read_csv(test_path, sep=",")
df_test = df_test.iloc[: , 1:].drop(['created_at', "watch_percentage"], axis=1)
# df_test["rating"] = df_test["rating"] /2
df_test

Unnamed: 0,user_id,item_id,rating
0,533545,7104,3
1,594733,876,7
2,502991,888,3
3,482942,7104,2
4,493749,183716,1
...,...,...,...
354,478592,739,10
355,456376,126663,10
356,277945,64251,10
357,104074,45216,10


In [6]:
df_test['rating'] = df_test['rating'].apply(lambda x: math.ceil(x/2))
df_test

Unnamed: 0,user_id,item_id,rating
0,533545,7104,2
1,594733,876,4
2,502991,888,2
3,482942,7104,1
4,493749,183716,1
...,...,...,...
354,478592,739,5
355,456376,126663,5
356,277945,64251,5
357,104074,45216,5


In [8]:
df_train = df.copy()
for i in range(len(df_test)):
    dup = df[(df["user_id"] == df_test.iloc[i, 0]) & (df["item_id"] == df_test.iloc[i, 1])].index
    df_train.drop(dup , inplace=True)


Delete users which rate <= 3 items

In [15]:
value_count = df_train['item_id'].value_counts()
min_count = 3
filtered_values = value_count[value_count >= min_count].index
filtered_values

Int64Index([   510,  43457,    512,   7626,  16366,    545,    733,  43458,
            183716,  52609,
            ...
              7653, 280150,  12655,   7111,  84715,  62150,  45232,  43102,
             64256,  45043],
           dtype='int64', length=349)

In [17]:
df_filtered = df[df['item_id'].isin(filtered_values)]
df_filtered

Unnamed: 0,user_id,item_id,rating
0,224557,510,5
1,224557,615,5
2,224557,7680,5
3,224293,510,5
4,224293,515,5
...,...,...,...
3650,603582,312324,5
3651,602980,312324,5
3652,603725,312324,2
3654,605220,376915,5


In [19]:
df_filtered.to_csv("train.csv", index=False)

### Load data to spark

In [20]:
# spark config
spark = SparkSession \
    .builder \
    .appName("ALS RecSys") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

In [21]:
train_path = "/content/train.csv"
ratings = sc.textFile(train_path)
header = ratings.take(1)[0]
rating_data = ratings \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()
rating_data.take(3)

[(224557, 510, 5.0), (224557, 615, 5.0), (224557, 7680, 5.0)]

In [22]:
rating_test = sc.textFile(test_path)
header = rating_test.take(1)[0]
rating_data_test = rating_test \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[1]), int(tokens[2]), float(tokens[5]))) \
    .cache()
rating_data_test.take(3)

[(533545, 7104, 3.0), (594733, 876, 7.0), (502991, 888, 3.0)]

### Split data

In [23]:
train, validation = rating_data.randomSplit([8, 2], seed=99)
N = len(train.collect())
test = rating_data_test
# cache data
train.cache()
validation.cache()
test.cache()

PythonRDD[8] at RDD at PythonRDD.scala:53

### Train model

In [24]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [25]:
# hyper-param config
num_iterations = 10
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.01, 0.05, 0.1, 0.2, 0.5, 0.7]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

8 latent factors and regularization = 0.01: validation RMSE is 3.2963796440844897
8 latent factors and regularization = 0.05: validation RMSE is 1.969708565857005
8 latent factors and regularization = 0.1: validation RMSE is 1.6818485059211445
8 latent factors and regularization = 0.2: validation RMSE is 1.5521929980716471
8 latent factors and regularization = 0.5: validation RMSE is 1.5011936502793175
8 latent factors and regularization = 0.7: validation RMSE is 1.5438846332877987
10 latent factors and regularization = 0.01: validation RMSE is 3.0403881255714165
10 latent factors and regularization = 0.05: validation RMSE is 1.9115526832685124
10 latent factors and regularization = 0.1: validation RMSE is 1.6005435788787217
10 latent factors and regularization = 0.2: validation RMSE is 1.4662924179343213
10 latent factors and regularization = 0.5: validation RMSE is 1.444705934810555
10 latent factors and regularization = 0.7: validation RMSE is 1.4896024476883305
12 latent factors an

### Evaluation

In [26]:
# make prediction using test data
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

The out-of-sample RMSE of rating predictions is 3.5902


In [27]:
predictions.take(10)

[((572184, 7626), 4.350150361597729),
 ((283584, 7104), 1.052291517963099),
 ((283584, 43457), 1.603832805374682),
 ((568716, 66852), 4.606424165002528),
 ((284292, 768), 3.1737652763449447),
 ((502596, 7626), 2.554865127704305),
 ((550968, 886), 3.363970689993319),
 ((124548, 745), 1.0036157520405753),
 ((124548, 545), 1.1791684671618314),
 ((124548, 511), 1.0412729915565144)]

In [29]:
df_test.head(10)

Unnamed: 0,user_id,item_id,rating
0,533545,7104,2
1,594733,876,4
2,502991,888,2
3,482942,7104,1
4,493749,183716,1
5,274455,853,1
6,304796,7626,3
7,566747,312324,1
8,478099,545,1
9,288206,7626,2
