In [10]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "4g") \
    .appName('movieRecommendationPySpark') \
    .getOrCreate()

In [11]:
ratings = (
    spark.read.csv(
        path = "../data/ml-25m/ratings.csv",
        sep=",", header=True,quote='"',schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
    ).select("userId", "movieId", "rating")
    .cache()
)

In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
als = ALS(
    userCol ="userId",
    itemCol ="movieId",
    ratingCol = "rating",
)
(training_data, validation_data) = ratings.randomSplit([8.0,2.0])

evaluator = RegressionEvaluator(
    metricName="rmse",labelCol="rating",predictionCol="prediction"
)

model = als.fit(training_data)

prediction = model.transform(validation_data)



KeyboardInterrupt: 

In [5]:
# prediction.show(10,False)
rmse = evaluator.evaluate(prediction.na.drop())
print(rmse)

0.8036587337662193


In [8]:
# userFactors=model.userFactors
# itemFactors = model.itemFactors
# userFactors.sort('id').show(5,False)
# itemFactors.sort('id').show(5,False)
# import numpy as np
# user91Features = model.userFactors.filter(f.col('id')==91).select(f.col('features')).rdd.flatMap(lambda x:x).collect()[0]
# item471Features = model.itemFactors.filter(f.col('id')==471).select(f.col('features')).rdd.flatMap(lambda x:x).collect()[0]

# print(user91Features)
# print(item471Features)
# print('Predicted rating of user 91 for movie 471: ' + str(np.dot(user91Features, item471Features)))
model.save("../modelrecommendation")

In [None]:
# from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# param_grid =  (
#     ParamGridBuilder().addGrid(als.rank,[5,10]).addGrid(als.maxIter,[20]).addGrid(als.regParam,[0.05,1]).build()
# )
# print(param_grid)

[{Param(parent='ALS_74c46e483e0a', name='rank', doc='rank of the factorization'): 5, Param(parent='ALS_74c46e483e0a', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_74c46e483e0a', name='regParam', doc='regularization parameter (>= 0).'): 0.05}, {Param(parent='ALS_74c46e483e0a', name='rank', doc='rank of the factorization'): 5, Param(parent='ALS_74c46e483e0a', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_74c46e483e0a', name='regParam', doc='regularization parameter (>= 0).'): 1.0}, {Param(parent='ALS_74c46e483e0a', name='rank', doc='rank of the factorization'): 10, Param(parent='ALS_74c46e483e0a', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='ALS_74c46e483e0a', name='regParam', doc='regularization parameter (>= 0).'): 0.05}, {Param(parent='ALS_74c46e483e0a', name='rank', doc='rank of the factorization'): 10, Param(parent='ALS_74c46e483e0a', name='maxIter', doc='max number of iterations (>= 0).

In [None]:
# rmse = evaluator.evaluate(prediction.na.drop())
# print(rmse)

0.8751367980191345


In [None]:
# model.userFactors.show(5,False)

+---+-------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                 |
+---+-------------------------------------------------------------------------------------------------------------------------+
|10 |[0.234548, -0.31415012, 0.053325303, -0.7197765, 0.011542611, -0.9042637, -0.44329885, 1.1098106, 1.0720254, -0.85817134]|
|20 |[0.5004493, -0.18024325, 0.08042228, -0.4120407, -0.82010114, 0.24364965, -0.32089552, 1.5661156, 1.0555433, -0.18861222]|
|30 |[0.29699945, 0.5122262, 0.7743655, 0.26239362, -0.23122218, -0.22499156, 0.2205979, 1.5108275, 1.0886868, -0.24839555]   |
|40 |[0.962316, 0.24725015, 0.2506125, 0.18621314, -0.3400564, 0.069881424, 0.31563887, 1.7079871, 0.6658317, 0.10538066]     |
|50 |[0.11444859, 0.07726337, -0.04935533, -0.12985186, -0.479587, 0.25315705, 0.28283778, 1.2838686, 0.

In [None]:
# prediction.toPandas()

Unnamed: 0,userId,movieId,rating,prediction
0,593,1580,1.5,3.060124
1,597,1580,3.0,3.781061
2,34,1580,2.5,3.102221
3,368,2366,4.0,3.169171
4,115,1580,4.0,3.457590
...,...,...,...,...
20082,535,2706,4.0,1.946153
20083,420,48385,3.5,3.572902
20084,18,2706,3.5,3.319682
20085,483,66785,4.0,3.423220


In [None]:
rec_all_users = model.recommendForAllUsers(5).cache()
# rec_all_users.show(5,False)
# rec_all_users.printSchema()



In [None]:
# recommendations_for_user91 = rec_all_users.filter(f.col('userId')==110)
# movies = (
#     spark.read.csv(
#         path = "./data/ml-25/movies.csv",
#         sep=",",
#         header = True,
#         quote='"',
#         schema = "movieId INT, title STRING, genres STRING",
#     )
# )
# movieid = recommendations_for_user91.withColumn('movie',f.explode('recommendations')).withColumn('movie',f.col('movie.movieId')).select('movie')
# movie_for_user91= movies.join(movieid,movies.movieId==movieid.movie,'inner').withColumn("title_year",f.split(f.col("title"),"\(")).withColumn("year",f.substring(f.col("title_year").getItem(1),0,4)).withColumn("titlestring",f.col("title_year").getItem(0)).select('movieId','titlestring','year')
# movie_for_user91.show(5,False)

+-------+------------------------------------------+----+
|movieId|titlestring                               |year|
+-------+------------------------------------------+----+
|3022   |General, The                              |1926|
|3379   |On the Beach                              |1959|
|7096   |Rivers and Tides                          |2001|
|89904  |The Artist                                |2011|
|177593 |Three Billboards Outside Ebbing, Missouri |2017|
+-------+------------------------------------------+----+



In [14]:
ratedMovies = ratings.filter(f.col('userId')==110).select('movieId').rdd.flatMap(lambda x:x).collect()

movies_to_be_rated = (
    ratings.filter(~ f.col('movieId').isin(ratedMovies))
    .select('movieId').distinct().withColumn('userId',f.lit(110))
)
# movies_to_be_rated.sort('movieId').show(5)

user_movie_predictions = model.transform(movies_to_be_rated)
user_movie_predictions.filter(~f.isnan('prediction')).orderBy('prediction',ascending=False).show(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 216.0 failed 1 times, most recent failure: Lost task 5.0 in stage 216.0 (TID 607) (DESKTOP-46CMPLJ.mshome.net executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 14 more
