In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("Chapter4-4").getOrCreate()
RATINGS_CSV_LOCATION = "/home/jovyan/data-sets/ml-latest-small/ratings.csv"

In [3]:
ratings = (
    spark.read.csv(
        path=RATINGS_CSV_LOCATION,
        sep=",",
        header=True,
        quote='"',
        schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
    )
    # .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
    .select("userId", "movieId", "rating")
    .cache()
)

The ALS class has this signature:

```python
class pyspark.ml.recommendation.ALS(
    rank=10,
    maxIter=10,
    regParam=0.1,
    numUserBlocks=10,
    numItemBlocks=10,
    implicitPrefs=False,
    alpha=1.0,
    userCol="user",
    itemCol="item",
    seed=None,
    ratingCol="rating",
    nonnegative=False,
    checkpointInterval=10,
    intermediateStorageLevel="MEMORY_AND_DISK",
    finalStorageLevel="MEMORY_AND_DISK",
    coldStartStrategy="nan",
)
```

In [4]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
)

(training_data, validation_data) = ratings.randomSplit([8.0, 2.0])

evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)

model = als.fit(training_data)
predictions = model.transform(validation_data)

In [6]:
predictions.show(10, False)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|436   |471    |3.0   |3.685289  |
|555   |471    |3.0   |4.1700554 |
|176   |471    |5.0   |3.9755733 |
|136   |471    |4.0   |3.17576   |
|273   |471    |5.0   |3.438043  |
|216   |471    |3.0   |3.7373188 |
|469   |471    |5.0   |3.1684062 |
|608   |471    |1.5   |2.751447  |
|191   |496    |5.0   |NaN       |
|307   |833    |1.0   |1.1041915 |
+------+-------+------+----------+
only showing top 10 rows



In [7]:
rmse = evaluator.evaluate(predictions.na.drop())

In [8]:
print(rmse)

0.8841250685065566


In [9]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

parameter_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [1, 5, 10])
    .addGrid(als.maxIter, [20])
    .addGrid(als.regParam, [0.05, 0.1])
    .build()
)

In [10]:
type(parameter_grid)

list

In [11]:
from pprint import pprint

pprint(parameter_grid)

[{Param(parent='ALS_7d5cfdb1146e', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_7d5cfdb1146e', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_7d5cfdb1146e', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='ALS_7d5cfdb1146e', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_7d5cfdb1146e', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_7d5cfdb1146e', name='regParam', doc='regularization parameter (>= 0).'): 0.1},
 {Param(parent='ALS_7d5cfdb1146e', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_7d5cfdb1146e', name='rank', doc='rank of the factorization'): 5,
  Param(parent='ALS_7d5cfdb1146e', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='ALS_7d5cfdb1146e', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_7d5cfdb1146e', name='rank', doc='rank of t

In [12]:
crossvalidator = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=evaluator,
    numFolds=2,
)

crossval_model = crossvalidator.fit(training_data)
predictions = crossval_model.transform(validation_data)


In [13]:
rmse = evaluator.evaluate(predictions.na.drop())
print(rmse)

0.9414003459442237


In [14]:
model = crossval_model.bestModel