In [6]:
sc

In [7]:
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [8]:
#lines = spark.read.csv("hdfs:/user/data/ratings.csv", header="true",inferSchema="true").rdd
#lines = spark.read.csv("gs://dataset-rs/ml-20m/ratings.csv", header="true",inferSchema="true").rdd
lines = spark.read.csv("/home/aleja/Documentos/datasets/ml-20m/ratings.csv", header="true",inferSchema="true").rdd

lines.take(2)

[Row(userId=1, movieId=2, rating=3.5, timestamp=1112486027),
 Row(userId=1, movieId=29, rating=3.5, timestamp=1112484676)]

In [9]:

df_ratings = spark.createDataFrame(lines) 

df_ratings

DataFrame[userId: bigint, movieId: bigint, rating: double, timestamp: bigint]

In [10]:
df_ratings.rdd.getNumPartitions()

4

In [11]:
#newdf = ratings.limit(20000)
ratings = df_ratings.sample(False,fraction=0.5, seed=1)
ratings.select('userId').count()

9998448

In [12]:
ratings = ratings.repartition(1)
ratings.rdd.getNumPartitions()

1

In [13]:
ratings.select('userId').distinct().count()

138493

In [14]:
ratings.select('movieId').distinct().count()

24066

In [15]:
(training, test) = ratings.randomSplit([0.8, 0.2])
training.printSchema()


root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



In [16]:
training.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     32|   3.5|1112484819|
+------+-------+------+----------+
only showing top 2 rows



In [17]:
training.filter(training['rating'] > 4).show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|   4993|   5.0|1112484682|
|     1|   7153|   5.0|1112484633|
|     1|   8507|   5.0|1094786027|
|     2|     62|   5.0| 974820598|
|     2|     70|   5.0| 974820691|
|     2|    480|   5.0| 974820720|
|     2|    541|   5.0| 974821014|
|     2|    589|   5.0| 974820658|
|     2|    924|   5.0| 974821014|
|     2|   1214|   5.0| 974821014|
+------+-------+------+----------+
only showing top 10 rows



ALS
-numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
-rank is the number of latent factors in the model (defaults to 10).
-maxIter is the maximum number of iterations to run (defaults to 10).
-regParam specifies the regularization parameter in ALS (defaults to 1.0).
-implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
-alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
-nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).


In [18]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")


In [19]:
model = als.fit(training)
model.rank

10

In [20]:
#para usar despues
predictions_Data= test.select(test.columns[:2])

In [21]:
predictions_Data = sorted(predictions_Data.collect(), key=lambda r:r[0])

In [22]:
predictions_Data[0]

Row(userId=1, movieId=253)

In [23]:
predictions = model.transform(test)

In [24]:
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 74757|    148|   3.5|1064853335| 3.1865687|
| 87301|    148|   2.0| 974945135| 2.0974596|
| 44979|    148|   3.0| 830778220| 3.8049493|
| 81218|    148|   1.0| 833850593| 1.0988066|
| 41389|    148|   3.0| 840444102| 3.1667418|
| 86098|    148|   3.0| 842162037| 3.1746902|
|110991|    148|   2.5|1207008368|   3.39181|
|130531|    148|   1.0| 831284829|0.64112157|
|122639|    148|   3.0| 835454413| 2.4494636|
|107982|    148|   3.0|1146472014| 3.1706975|
| 42767|    148|   2.0| 828638788| 1.4618912|
| 72315|    148|   3.0| 852173782| 2.6381068|
|  5814|    148|   3.0| 859547410|  3.331962|
| 12539|    148|   3.0| 956789580| 3.1249228|
|107346|    148|   3.0| 853421750|  2.705225|
| 57653|    148|   4.0|1008641404| 3.0928473|
| 11635|    148|   3.0| 834361463| 2.2119815|
|  7318|    148|   4.0| 829770010| 3.3710506|
| 74142|    148|   3.0| 982179803|

In [25]:
# Evaluate the model by computing the RMSE on the test data


evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions.na.drop())
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8641321407684669


In [26]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                    |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148   |[[53853,13.008501], [32811,10.503595], [56779,10.057627], [57868,9.83757], [109953,9.833212], [103721,9.817218], [107910,9.77579], [100581,9.583071], [42602,9.350698], [69464,9.33087]]           |
|463   |[[105084,10.110758], [73529,9.168722], [112907,9.099682], [100902,8.650026], [82051,8.576818], [87040,8.571697], [82055,8.529364], [57868,8.357774], [104583,8.319008], [792

In [27]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.show(truncate=False)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                                   |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1580   |[[38788,5.931761], [67233,5.89398], [97160,5.7156553], [111191,5.6878886], [36893,5.6826572], [69605,5.659505], [135200,5.6463013], [32498,5.641259], [111273,5.6393633], [4404,5.6339464]]       |
|4900   |[[61007,11.748933], [1425,10.661896], [28169,10.044614], [18643,9.590103], [119956,9.202298], [107804,9.098656], [7591,8.981808], [137282,8.64775], [62488,8.455342], [1688

In [28]:
movies_data = spark.read.csv("/home/aleja/Documentos/datasets/ml-20m/movies.csv", header="true",inferSchema="true").rdd
df_movies = spark.createDataFrame(movies_data)
df_movies

DataFrame[movieId: bigint, title: string, genres: string]

In [29]:
df_movies.select('movieId').count()

27278

In [30]:
movies = df_movies.sample(False,fraction=0.5, seed=1)
movies.select('movieId').count()

13710

In [31]:
movies.show(truncate=False)

+-------+----------------------------------------------------+-------------------------------------------+
|movieId|title                                               |genres                                     |
+-------+----------------------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                                    |Adventure|Animation|Children|Comedy|Fantasy|
|3      |Grumpier Old Men (1995)                             |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)                            |Comedy|Drama|Romance                       |
|7      |Sabrina (1995)                                      |Comedy|Romance                             |
|9      |Sudden Death (1995)                                 |Action                                     |
|12     |Dracula: Dead and Loving It (1995)                  |Comedy|Horror                              |
|13     |Balto (1995)                

In [32]:
movies = movies.repartition(1)
movies.rdd.getNumPartitions()

1

In [33]:
data = ratings.select("movieId").distinct().withColumn("userId", lit(148))
data.show()


+-------+------+
|movieId|userId|
+-------+------+
|   2529|   148|
|    474|   148|
|  45726|   148|
|     29|   148|
|  60756|   148|
|   1950|   148|
| 106002|   148|
| 106100|   148|
|    964|   148|
|   2927|   148|
|   3091|   148|
|   1806|   148|
|   1677|   148|
|   3764|   148|
|     26|   148|
|   5385|   148|
|  51709|   148|
|  96829|   148|
|  51418|   148|
|   2453|   148|
+-------+------+
only showing top 20 rows



In [40]:
datamv = ratings.filter(ratings.userId == 148).select("movieId", "userId")
datamv.show()

+-------+------+
|movieId|userId|
+-------+------+
|     17|   148|
|     18|   148|
|     39|   148|
|     46|   148|
|     86|   148|
|    222|   148|
|    224|   148|
|    252|   148|
|    342|   148|
|    353|   148|
|    356|   148|
|    362|   148|
|    468|   148|
|    597|   148|
|    708|   148|
|    902|   148|
|    914|   148|
|    916|   148|
|    933|   148|
|   1057|   148|
+-------+------+
only showing top 20 rows



In [41]:
data_pred = model.transform(data.subtract(datamv)).dropna().orderBy("prediction",ascending=False).limit(5).select("movieId", "prediction")
data_pred.show()

+-------+----------+
|movieId|prediction|
+-------+----------+
|  53853| 13.008501|
|  32811| 10.503595|
|  56779| 10.057627|
|  57868|   9.83757|
| 109953|  9.833212|
+-------+----------+



In [42]:
data_pred

DataFrame[movieId: bigint, prediction: float]

In [43]:
movies

DataFrame[movieId: bigint, title: string, genres: string]

In [44]:
rec_mv = data_pred.join(movies, data_pred.movieId == movies.movieId)
#select(data_pred.movieId, movies.title, data_pred.prediction)
rec_mv.show()

Py4JJavaError: An error occurred while calling o468.showString.
: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:235)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:263)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:235)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
from pyspark.sql.functions import lit



def recommendMovies(model, user, nbRecommendations):
     # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select("movieId").distinct().withColumn("userId", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.userId == user).select("movieId", "userId")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("movieId", "prediction")
    
    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)

    return recommendations.show(truncate=False)

In [None]:
print ("Recommendations for user 133:")
recommendMovies(model,148,10)

Now we can use the various prediction functions on the model variable.

*predict*: - Return a single floating point value
*predictAll*: -Returns RDD of Rating Objects
*recommendUsers*: -Returns a List of Ratings in Descending Order by Rating
*recommendProducts*: -Returns a List of Ratings
*recommendProductsForUsers*:-Returns RDD with(UserID, (RatingObj, RatingObj, …) ) where RatingObj is sorted descending by rating
*recommendUsersforProducts*:-Returns RDD with(ProductID, (RatingObj, RatingObj, …) ) where RatingObj is sorted descending by rating
