In [1]:
import math
import os
from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext, SparkConf

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Loading Rating data...")
sc = SparkContext()
dataset_path = os.path.join('../datasets', 'BOOK')
ratings_file_path = os.path.join(dataset_path, 'Ratings.csv')
# Preprocessing ratings data
ratings_raw_RDD = sc.textFile(ratings_file_path)
ratings_raw_data_header = ratings_raw_RDD.take(1)[0]
ratings_RDD = ratings_raw_RDD.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(";"))\
    .map(lambda tokens: (int(tokens[0][1:-1]), abs(hash(tokens[1][1:-1])) % (10 ** 8), int(tokens[2][1:-1]))).cache()

# Splitting data into train, test, and validate sets
test, train, validate = ratings_RDD.randomSplit(weights=[0.3, 0.6, 0.1], seed=1)

test = test.map(lambda token: (token[0], token[1]))

# Setting parameters for ALS model
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12, 16, 20, 30]
errors = [0, 0, 0, 0, 0, 0]
err = 0
tolerance = 0.02

# Training ALS model with different ranks and finding the best model
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(train, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(test).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validate.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print(f'For rank {rank} the RMSE is {error}')
    if error < min_error:
        min_error = error
        best_rank = rank

print(f'The best model was trained with rank {best_rank}')


INFO:__main__:Loading Rating data...
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/31 22:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/31 22:13:16 WARN BlockManager: Task 1 already completed, not releasing lock for rdd_3_0
24/03/31 22:13:20 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 2): Attempting to kill Python Worker
24/03/31 22:13:24 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 3 (TID 3): Attempting to kill Python Worker
24/03/31 22:13:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/03/31 22:13:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/03/31 22:13:37 WARN BlockManager: Task 206 already completed, not releasing lock for rdd_3_0
24/03/31 22:13

For rank 4 the RMSE is 6.340276637195192


24/03/31 22:13:48 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 181 (TID 240): Attempting to kill Python Worker
24/03/31 22:13:52 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 182 (TID 241): Attempting to kill Python Worker
24/03/31 22:13:59 WARN BlockManager: Task 444 already completed, not releasing lock for rdd_3_0
24/03/31 22:14:06 WARN BlockManager: Task 477 already completed, not releasing lock for rdd_3_0


For rank 8 the RMSE is 2.391180911253368


24/03/31 22:14:10 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 360 (TID 478): Attempting to kill Python Worker
24/03/31 22:14:14 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 361 (TID 479): Attempting to kill Python Worker
24/03/31 22:14:22 WARN BlockManager: Task 682 already completed, not releasing lock for rdd_3_0
24/03/31 22:14:27 WARN BlockManager: Task 715 already completed, not releasing lock for rdd_3_0


For rank 12 the RMSE is 2.6713517485332616


24/03/31 22:14:31 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 539 (TID 716): Attempting to kill Python Worker
24/03/31 22:14:35 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 540 (TID 717): Attempting to kill Python Worker
24/03/31 22:14:44 WARN BlockManager: Task 920 already completed, not releasing lock for rdd_3_0
24/03/31 22:14:49 WARN BlockManager: Task 953 already completed, not releasing lock for rdd_3_0


For rank 16 the RMSE is 4.173401658626104


24/03/31 22:14:53 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 718 (TID 954): Attempting to kill Python Worker
24/03/31 22:14:57 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 719 (TID 955): Attempting to kill Python Worker
24/03/31 22:15:08 WARN BlockManager: Task 1158 already completed, not releasing lock for rdd_3_0
                                                                                

For rank 20 the RMSE is 4.440313270484995


24/03/31 22:15:14 WARN BlockManager: Task 1191 already completed, not releasing lock for rdd_3_0
24/03/31 22:15:18 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 897 (TID 1192): Attempting to kill Python Worker
24/03/31 22:15:22 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 898 (TID 1193): Attempting to kill Python Worker
24/03/31 22:15:40 WARN BlockManager: Task 1396 already completed, not releasing lock for rdd_3_0

For rank 30 the RMSE is 5.448719406907229
The best model was trained with rank 8


                                                                                