In [1]:
import findspark
findspark.init('/home/kant/spark-2.4.4-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Recomm System').getOrCreate()

In [6]:
data = spark.read.csv('movielens_ratings.csv',inferSchema=True, header=True)

In [7]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [8]:
from pyspark.ml.recommendation import ALS

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator

In [12]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [13]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [14]:
training,test = data.randomSplit([0.8,0.2], seed=22)

In [15]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId',itemCol='movieId', ratingCol='rating')

In [28]:
model = als.fit(training)

In [29]:
result = model.transform(test)

In [30]:
result.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|     31|   1.0|    13|  2.4573302|
|     31|   2.0|    25|  1.4530871|
|     85|   3.0|     1|  1.0600268|
|     85|   2.0|    20|  1.3161325|
|     85|   1.0|     5|-0.47454488|
|     85|   1.0|    23|   1.208001|
|     65|   1.0|    22| 0.14238077|
|     65|   1.0|    16|  3.0290678|
|     65|   2.0|    15|  1.9280043|
|     53|   2.0|    19|  2.0671425|
|     53|   5.0|     8|   1.142489|
|     53|   1.0|     7| 0.43845567|
|     53|   5.0|    21|  1.9923346|
|     78|   1.0|    12|-0.34021962|
|     34|   4.0|     2|   2.067679|
|     34|   1.0|     0| 0.86823946|
|     81|   1.0|     7|  2.3480508|
|     81|   2.0|    29|  1.0709865|
|     81|   4.0|    11| -1.2700993|
|     81|   3.0|    18|  2.8143103|
+-------+------+------+-----------+
only showing top 20 rows



In [31]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',predictionCol='prediction')

In [32]:
rmse = evaluator.evaluate(result)

In [33]:
print("RMSE")
rmse

RMSE


1.7420414770284125

In [34]:
test.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [38]:
single_user = test.filter(test['userId']==11).select('movieId','userId')

In [39]:
recommendations = model.transform(single_user)

In [40]:
recommendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     51|    11| 4.9817705|
|     80|    11| 2.9702358|
|     20|    11|  2.866027|
|     79|    11| 1.9840186|
|     32|    11| 1.7812376|
|     64|    11| 1.0605448|
|     99|    11| 0.9259043|
|     97|    11| 0.5417483|
|     37|    11|0.35563374|
|     50|    11|0.22347133|
|     81|    11|-1.2700993|
+-------+------+----------+

