In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
bookDF = spark.read.json('dataset/goodreads_books_comics_graphic.json')
interactionDF = spark.read.json('dataset/goodreads_interactions_comics_graphic.json')

In [3]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="book_id", outputCol="book_idx",stringOrderType='alphabetAsc')
bookDF = bookDF.select('book_id','title')
bookMapping = indexer.fit(bookDF).transform(bookDF)
# bookRDD = bookMapping.rdd.map(lambda x: (str(x.book_id),str(x.title), str(x.book_idx))).cache()
bookMapping.take(3)
bookID = bookMapping.select('book_id','book_idx')

In [4]:
indexer = StringIndexer(inputCol="user_id", outputCol="user_idx",stringOrderType='alphabetAsc')
interactionDF = interactionDF.select('user_id','book_id','rating')
indexed = indexer.fit(interactionDF).transform(interactionDF)
indexed = indexed.join(bookID,'book_id','inner').cache()

In [10]:
toTrain = indexed.rdd \
    .map(lambda x: (int(x.user_idx),int(x.book_idx),str(x.rating)))
toTrain.take(3)

[(27610, 114, '4'), (299836, 114, '0'), (227421, 114, '0')]

In [11]:
# split data
training_RDD, validation_RDD, test_RDD = toTrain.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [12]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

KeyboardInterrupt: 