In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=d676a79da3dc9bb14d6194a5e38f6362b6a0d3a59513c73c466432064aadb19f
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
#Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()

In [None]:
myschema = StructType([StructField("userID", IntegerType(), True),
                       StructField("movieID", IntegerType(), True),
                       StructField("rating",IntegerType(), True),
                       StructField("unixTimestamp",IntegerType(), True),
                        ])

df = spark.read.format("csv").schema(myschema).option("delimiter", "\t").load("u.data")

In [None]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|            userID|           movieID|            rating|    unixTimestamp|
+-------+------------------+------------------+------------------+-----------------+
|  count|            100000|            100000|            100000|           100000|
|   mean|         462.48475|         425.53013|           3.52986|8.8352885148862E8|
| stddev|266.61442012750905|330.79835632558473|1.1256735991443214|5343856.189502848|
|    min|                 1|                 1|                 1|        874724710|
|    max|               943|              1682|                 5|        893286638|
+-------+------------------+------------------+------------------+-----------------+



In [None]:
df = df.drop("unixTimestamp")

In [None]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|            userID|           movieID|            rating|
+-------+------------------+------------------+------------------+
|  count|            100000|            100000|            100000|
|   mean|         462.48475|         425.53013|           3.52986|
| stddev|266.61442012750905|330.79835632558473|1.1256735991443214|
|    min|                 1|                 1|                 1|
|    max|               943|              1682|                 5|
+-------+------------------+------------------+------------------+



In [None]:
(train, test) = df.randomSplit([0.8, 0.2], seed=27)

In [None]:
als = ALS(maxIter=10, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")

In [None]:
model_col = als.fit(train)
model_col.setColdStartStrategy("drop")

ALSModel: uid=ALS_2ada4bb9cd06, rank=10

In [None]:
pred = model_col.transform(test)

In [None]:
pred.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|            userID|           movieID|            rating|        prediction|
+-------+------------------+------------------+------------------+------------------+
|  count|             20097|             20097|             20097|             20097|
|   mean|462.41633079564116|424.83594566353185|3.5384385729213315| 3.503691946704773|
| stddev|268.01904127668723| 329.2804043654686|1.1283688986929494|0.9762263565978738|
|    min|                 1|                 1|                 1|        -4.3135796|
|    max|               943|              1664|                 5|         10.111652|
+-------+------------------+------------------+------------------+------------------+



In [None]:
pred.show()

+------+-------+------+----------+
|userID|movieID|rating|prediction|
+------+-------+------+----------+
|    85|    496|     4| 4.1257744|
|   580|    471|     3|  3.278465|
|   458|    496|     3| 2.5580938|
|   613|    471|     3| 3.9795089|
|   593|    471|     3| 3.9884386|
|   847|    496|     4|  4.421972|
|   406|    463|     5|   3.29274|
|   731|    496|     5| 5.1285243|
|    26|    148|     3| 2.4580867|
|   577|    471|     3|  3.992433|
|    44|    148|     4| 2.9922082|
|   271|    496|     5|  4.398302|
|   606|    833|     5| 3.0456917|
|   908|    496|     5|  4.224905|
|   916|    148|     2| 2.4652426|
|   103|    471|     4| 3.8080456|
|   236|    148|     4| 2.8052418|
|   236|    496|     3| 4.9839306|
|   548|    471|     5| 3.4697282|
|   222|    471|     3| 3.5952637|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = eval.evaluate(pred)
print("RMSE: ", rmse)

RMSE:  1.0835917250920846


In [None]:
user_recs = model_col.recommendForAllUsers(5)

In [None]:
user_recs.select(user_recs.recommendations).where(user_recs.userID==10).cache().collect()[0][0]

[Row(movieID=1643, rating=5.746307373046875),
 Row(movieID=1664, rating=5.214720249176025),
 Row(movieID=701, rating=5.124952793121338),
 Row(movieID=318, rating=5.095107555389404),
 Row(movieID=64, rating=5.042793273925781)]