In [6]:
from pyspark.sql import SparkSession, DataFrameReader
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
#from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Create a SparkSession entry point.
spark = SparkSession \
    .builder \
    .appName('NetLens') \
    .master('local') \
    .getOrCreate() \
    #.config(conf) \

# Connect to the penguin database, read from the ratings table
# and store the contents into a dataframe.
url = 'jdbc:postgresql://penguin.kent.ac.uk/ai261'
properties = {'user': 'ai261', 'password': 'pla%boy', 'driver': 'org.postgresql.Driver'}
df = spark.read.jdbc(url=url, table='ratings', properties=properties)
df = df.select(['userid', 'movieid', 'rating'])

# Split the dataset into a training and a test set.
(training, test) = df.randomSplit([0.8, 0.2])
training.cache()
test.cache()

# Create a recommendations model using ALS algorithm on the
# training data.
als = ALS(userCol='userid', itemCol='movieid', ratingCol='rating', rank=11,
          maxIter=7, seed=5, regParam=0.17, coldStartStrategy='drop')

# Build a parameter grid to assign a range of values to
# the given ALS parameters.
#output = ParamGridBuilder() \
#             .addGrid(als.rank, [10, 11, 12]) \
#             .addGrid(als.maxIter, [6, 7, 8]) \
#             .addGrid(als.regParam, [.16, .17, .18]) \
#             .addGrid(als.seed, [0, 5, 10]) \
#             .build()

# Use the regression evaluator and fit the model to the training data.
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
#tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=output, evaluator=evaluator)
#model = tvs.fit(training)
#tuned_model = model.bestModel
model = als.fit(training)
predictions = model.transform(test)

# Calculate the RMSE of the recommendations model on the test data.
rmse = evaluator.evaluate(predictions)
print('RMSE: ' + str(rmse))

# Display the top 10 movies recommendations for each user.
mr = model.recommendForAllUsers(10)
mr.show()

# Print the current ALS parameters used.
#print('Rank: ' + str(model._java_obj.parent().getRank()))
#print('MaxIter: ' + str(model._java_obj.parent().getMaxIter()))
#print('RegParam: ' + str(model._java_obj.parent().getRegParam()))
#print('Seed: ' + str(model._java_obj.parent().getSeed()))

#model.save(spark, '/path/to/model')
#load_model = MatrixFactorizationModel.load(spark, '/path/to/model')

#spark.stop()

RMSE: 0.8699558347162722
+------+--------------------+
|userid|     recommendations|
+------+--------------------+
|   471|[[3379, 4.837304]...|
|   463|[[3379, 4.9989934...|
|   496|[[3379, 4.867341]...|
|   148|[[67618, 4.577209...|
|   540|[[3379, 5.320341]...|
|   392|[[6818, 5.365404]...|
|   243|[[67618, 5.595224...|
|    31|[[67618, 5.259552...|
|   516|[[3379, 5.0185905...|
|   580|[[60943, 5.004139...|
|   251|[[3379, 5.903529]...|
|   451|[[3379, 5.6381693...|
|    85|[[1140, 4.8237133...|
|   137|[[3379, 5.0201507...|
|    65|[[3379, 5.0349503...|
|   458|[[67618, 5.518743...|
|   481|[[42730, 4.075913...|
|    53|[[3379, 6.907212]...|
|   255|[[1194, 4.350659]...|
|   588|[[3379, 4.702357]...|
+------+--------------------+
only showing top 20 rows

