In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

### Adjust spark.driver.memory for your system. You can leave it out, but if you get Java Out of Memory error, you need to put it in there.

In [2]:
spark = SparkSession.builder.appName('Recommendation_system').config('spark.executor.memory', '5g').config('spark.driver.memory','120g').config('spark.local.dir','/home/haris/raid/tmp').getOrCreate()
# spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()

### Here you get a UI to monitor spark cluster status

In [3]:
spark

In [4]:
df = spark.read.csv('all_rating.csv',header=True)

In [5]:
df.show(10)

+---+--------+-------+------+----------+
|_c0|movie_id|user_id|rating|      date|
+---+--------+-------+------+----------+
|  0|       1|1488844|     3|2005-09-06|
|  1|       1| 822109|     5|2005-05-13|
|  2|       1| 885013|     4|2005-10-19|
|  3|       1|  30878|     4|2005-12-26|
|  4|       1| 823519|     3|2004-05-03|
|  5|       1| 893988|     3|2005-11-17|
|  6|       1| 124105|     4|2004-08-05|
|  7|       1|1248029|     3|2004-04-22|
|  8|       1|1842128|     4|2004-05-09|
|  9|       1|2238063|     3|2005-05-11|
+---+--------+-------+------+----------+
only showing top 10 rows



### ALS expects integer values for user_id, movie_id and rating

In [6]:
als_data=df.select(df['movie_id'],df['user_id'],df['rating'])
als_data = als_data.withColumn("movie_id", als_data["movie_id"].cast(IntegerType()))
als_data = als_data.withColumn("user_id", als_data["user_id"].cast(IntegerType()))
als_data = als_data.withColumn("rating", als_data["rating"].cast(IntegerType()))

In [7]:
(training,test)=als_data.randomSplit([0.8, 0.2])

In [8]:
als = ALS(userCol="user_id",itemCol="movie_id",ratingCol="rating",coldStartStrategy="drop",nonnegative=True,implicitPrefs=False)
# als = als.setMaxIter(5).setRegParam(0.09).setRank(25)

In [9]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

### Use following three lines if you want to run a single model fitting. Otherwise continue for Parameter Grid Search

In [10]:
model=als.fit(training)

In [11]:
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)

In [12]:
print("RMSE="+str(rmse))

RMSE=0.8721375797478425


### Parameter Grid Search 

In [13]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

In [14]:
param_grid = ParamGridBuilder().addGrid(als.regParam, [.05, .1, 1.5]).build() # .addGrid(als.maxIter, [5, 250, 500]).addGrid(als.rank, [5, 80, 120])

In [15]:
cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 5)

In [16]:
model = cv.fit(training)

In [17]:
best_model = model.bestModel

In [18]:
predictions=best_model.transform(test)
rmse=evaluator.evaluate(predictions)

In [19]:
print ("**Best Model**")
print (f"RMSE = {rmse}")
print (f" Rank: {best_model.rank}")
print (f" MaxIter: {best_model._java_obj.parent().getMaxIter()}")
print (f" RegParam: {best_model._java_obj.parent().getRegParam()}") 

**Best Model**
RMSE = 0.8560327465597013
 Rank: 10
 MaxIter: 10
 RegParam: 0.05
