In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=387dd7e5a77dc7871b50112b8daaac5459b91f86e0fdf56c3c026e60e8b32127
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [25]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [4]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("test_app").getOrCreate()

In [5]:
movies_df=spark.read.format('csv').option("header",True).option("inferSchema",True).option("mode","PERMISSIVE").load("/content/movies.csv")
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [8]:
rating_df=spark.read.format('csv').option("header",True).option("inferSchema",True).option("mode","PERMISSIVE").load("/content/ratings.csv")
rating_df.show()


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [16]:
join_df=movies_df.join(rating_df,'movieId',"left")

In [20]:
join_df.count()

100854

In [17]:
(train,test)=join_df.randomSplit([0.8,0.2])

In [21]:
train.count()

80767

In [22]:
test.count()

20087

In [24]:
from pyspark.ml.recommendation import ALS
als=ALS(userCol="userId",itemCol="movieId",ratingCol="rating",nonnegative=True,implicitPrefs=False,coldStartStrategy="drop")

In [26]:
param_grid=ParamGridBuilder() \
           .addGrid(als.rank,[10,50,100,150])\
           .addGrid(als.regParam,[.01,0.05,.1,.15]) \
           .build()

In [27]:
evaluator=RegressionEvaluator(
          metricName="rmse",
          labelCol="rating",
          predictionCol="prediction"
)

In [28]:
cv=CrossValidator(estimator=als,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5)

In [None]:
model=cv.fit(train)
best_model=model.bestModel
test_predictions=best_model.transform(test)
RMSE=evaluator.evaluate(test_predictions)
print(RMSE)

In [None]:
recommendations=best_model.recommendForAllUsers(5)

In [None]:
df=recommendations

In [None]:
df2=df.withColumn("movie_rating",explode("recommendations"))

In [None]:
from pyspark.sql.functions import col
display(df2.select("userId",col("movieid_rating.movieId"),col("movieId_rating.rating")))