In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
sparkSession = SparkSession \
               .builder \
               .config("spark.mongodb.output.uri","mongodb://ertgrulyksk:Emre19931998@verimaratonu.xtzrk5v.mongodb.net/?retryWrites=true&w=majority") \
               .appName('Veri Maratonu ALS Tavsiye Sistemi').getOrCreate()

In [0]:
sparkSession

In [0]:
rawDF = sparkSession.read.csv('/FileStore/tables/ratings.csv',header=True,inferSchema=True)

In [0]:
rawDF.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [0]:
rawDF.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
mlDF = rawDF.drop('timestamp')

In [0]:
mlDF.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows



In [0]:
als = ALS(userCol="userId",itemCol="movieId",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)

In [0]:
(trainDF,testDF) = mlDF.randomSplit([0.8,0.2])

In [0]:
trainDF.count()

Out[75]: 79996

In [0]:
testDF.count()

Out[76]: 20008

In [0]:
model = als.fit(trainDF)

In [0]:
predictDF = model.transform(testDF)

In [0]:
predictDF.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|     29|   3.5| 4.3450456|
|   148|    172|   2.5| 2.8763885|
|   148|    364|   4.0| 4.0569887|
|   148|    480|   4.5| 3.9974854|
|   148|    904|   5.0| 4.5001197|
|   148|   1247|   4.0| 4.2599726|
|   148|   1249|   3.5| 4.1916094|
|   148|   1269|   5.0|  4.090647|
|   148|   1288|   4.5| 4.3818746|
|   148|   1676|   3.5| 3.1046002|
|   148|   1732|   4.0| 3.8960655|
|   148|   1779|   3.5| 2.8224945|
|   148|   1921|   4.5| 3.6940336|
|   148|   2085|   4.0| 3.5332987|
|   148|   2184|   4.5|  4.031855|
|   148|   2324|   4.0| 4.6116457|
|   148|   2571|   4.5| 4.5279284|
|   148|   3262|   3.0| 3.2998357|
|   148|   4105|   3.0| 3.4870784|
|   148|   5956|   3.5| 3.6180902|
+------+-------+------+----------+
only showing top 20 rows



In [0]:
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [0]:
rmse = evaluator.evaluate(predictDF)

In [0]:
print(rmse)

0.9119005560780818


In [0]:
userRecommendDF = model.recommendForAllUsers(numItems=3)

In [0]:
itemRecommendDF = model.recommendForAllItems(numUsers=3)

In [0]:
userRecommendDF.write.format("mongo").mode("append").option("uri","mongodb+srv://ertgrulyksk:Emre19931998@verimaratonu.xtzrk5v.mongodb.net/?retryWrites=true&w=majority").option("database","verimaratonu").option("collection","alsalluser").save()

In [0]:
itemRecommendDF.write.format("mongo").mode("append").option("uri","mongodb+srv://ertgrulyksk:Emre19931998@verimaratonu.xtzrk5v.mongodb.net/?retryWrites=true&w=majority").option("database","verimaratonu").option("collection","alsallitems").save()