In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=2cfd0c3218767533d7080906de264fa560299d47494ff85be1dbaff5c14aaeed
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
# Import Apache Spark SQL
from pyspark.sql import SparkSession

# Create Spark Session/Context
# We are using local machine with all the CPU cores [*]
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Hello Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=15, regParam=1.0, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.5023941602641004


RMSE

>5 Max Iter Param 0.01 = 1.8145

>10 Max Iter Param 0.5 = 1,2214

>20 Max Iter Param 1 = Request Too Large causing an Error

>Try 15 Max Iter Param 1 = 1,5023

10 Max Iter with Param 0.5 perform better.

The performance depends with the size of the dataset

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{32, 1.5048801},...|
|    10|[{32, 1.3171088},...|
|     0|[{32, 1.072787}, ...|
|     1|[{32, 1.1952989},...|
|    21|[{32, 1.1602384},...|
|    11|[{32, 1.9412564},...|
|    12|[{32, 1.549379}, ...|
|    22|[{32, 1.8191476},...|
|     2|[{32, 1.7550833},...|
|    13|[{32, 1.2230905},...|
|     3|[{32, 1.5511663},...|
|    23|[{32, 1.881327}, ...|
|     4|[{32, 1.2932472},...|
|    24|[{32, 1.4221613},...|
|    14|[{32, 1.4982002},...|
|     5|[{32, 1.4009567},...|
|    15|[{32, 1.0645754},...|
|    25|[{32, 1.2116536},...|
|    26|[{32, 1.6276153},...|
|     6|[{32, 1.1234887},...|
+------+--------------------+
only showing top 20 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{11, 0.94409543}...|
|     40|[{11, 1.0766476},...|
|     10|[{11, 1.1317606},...|
|     50|[{11, 1.1283668},...|
|     80|[{11, 1.5739257},...|
|     

In [None]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

userSubsetRecs.show()

# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

movieSubSetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{32, 1.6276153},...|
|    19|[{32, 1.2559781},...|
|    29|[{32, 1.6720246},...|
+------+--------------------+

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     65|[{11, 0.70824015}...|
|     26|[{11, 0.7648331},...|
|     29|[{11, 1.5966567},...|
+-------+--------------------+

