In [1]:
import pyspark 
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession

#sc = SparkContext('local', 'pyspark')
#sqlContext = HiveContext(sc)
#·spark = SparkSession.builder \
#   .master("local") \
#   .appName("Recomendador") \
#   .config("spark.executor.memory", '5gb') \
#   .config("spark.executor.heartbeatInterval", "10000000") \
#   .config("spark.network.timeout" , "10000000") 
#   .getOrCreate()

spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.driver.memory", "14g") \
   .config("spark.executor.memory", '12gb') \
   .config("spark.driver.maxResultSize", '12gb') \
   .getOrCreate()  

In [2]:

movies = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load('movies.csv')
movies.take(5)

[Row(movieId='1', title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId='2', title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId='3', title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId='4', title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId='5', title='Father of the Bride Part II (1995)', genres='Comedy')]

In [3]:
ratings = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load('ratings.csv')
ratings.take(5)

[Row(userId='1', movieId='2', rating='3.5', timestamp='1112486027'),
 Row(userId='1', movieId='29', rating='3.5', timestamp='1112484676'),
 Row(userId='1', movieId='32', rating='3.5', timestamp='1112484819'),
 Row(userId='1', movieId='47', rating='3.5', timestamp='1112484727'),
 Row(userId='1', movieId='50', rating='3.5', timestamp='1112484580')]

In [4]:
print (spark.version)

2.3.3


In [5]:

print ("Number of partitions for the movies DataFrame: " + str(movies.rdd.getNumPartitions()))
print ("Number of partitions for the ratings DataFrame: " + str(ratings.rdd.getNumPartitions()))



Number of partitions for the movies DataFrame: 1
Number of partitions for the ratings DataFrame: 4


In [6]:
repartitionedRatings = ratings.repartition(10)
print ("Number of partitions for the ratings DataFrame: " + str(ratings.rdd.getNumPartitions()))
print ("Number of partitions for the repartitionedRatings DataFrame: " + str(repartitionedRatings.rdd.getNumPartitions()))


Number of partitions for the ratings DataFrame: 4
Number of partitions for the repartitionedRatings DataFrame: 10


In [7]:
print ("Number of ratings: " + str(repartitionedRatings.count()))

Number of ratings: 20000263


In [8]:
repartitionedRatings.cache()

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

In [9]:
print ("Number of ratings: " + str(repartitionedRatings.count()))

Number of ratings: 20000263


In [10]:

movies.show(truncate=False)



+-------+-------------------------------------+-------------------------------------------+
|movieId|title                                |genres                                     |
+-------+-------------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                       |Comedy|Romance                             |
|8      |Tom and Huck (1995)                  |Adventure|Children               

In [11]:
movies.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [12]:
movies_file_name = 'movies.csv'
ratings_file_name = 'ratings.csv'

movies = spark.read.csv(movies_file_name, header=True, inferSchema=True).repartition(10).cache()
ratings = spark.read.csv(ratings_file_name, header=True, inferSchema=True).repartition(10).cache()

In [13]:


movies.printSchema()
ratings.printSchema()



root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [14]:
ratings.describe().show()

+-------+------------------+-----------------+------------------+--------------------+
|summary|            userId|          movieId|            rating|           timestamp|
+-------+------------------+-----------------+------------------+--------------------+
|  count|          20000263|         20000263|          20000263|            20000263|
|   mean| 69045.87258292554|9041.567330339605|3.5255285642993797|1.1009179216771154E9|
| stddev|40038.626653162835|19789.47744541314| 1.051988919294246|1.6216942478272852E8|
|    min|                 1|                1|               0.5|           789652004|
|    max|            138493|           131262|               5.0|          1427784002|
+-------+------------------+-----------------+------------------+--------------------+



In [15]:


print ("Number of different users: " + str(ratings.select('userId').distinct().count()))
print ("Number of different movies: " + str(ratings.select('movieId').distinct().count()))
print ("Number of movies with at least one rating strictly higher than 4: " + str(ratings.filter('rating > 4').select('movieId').distinct().count()))



Number of different users: 138493
Number of different movies: 26744
Number of movies with at least one rating strictly higher than 4: 17218


In [16]:
ratings.createOrReplaceTempView('ratings')
spark.sql("SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4").show()

+-----+
|   nb|
+-----+
|17218|
+-----+



In [17]:
ratings_url = ratings_file_name
sql = "SELECT * FROM csv.`" + ratings_url + "`"
spark.sql(sql).take(2)

[Row(_c0='userId', _c1='movieId', _c2='rating', _c3='timestamp'),
 Row(_c0='1', _c1='2', _c2='3.5', _c3='1112486027')]

In [18]:
#import pandas as pd

#ratings.toPandas().head()






In [19]:


#import seaborn as sns
#%matplotlib inline

#ratingsPandas = ratings.toPandas()
#sns.lmplot(x='userId', y='movieId', data=ratingsPandas, fit_reg=False)



In [20]:
#sns.palplot(sns.diverging_palette(10, 133, sep=80, n=10))

In [21]:


#lm = sns.lmplot(x='userId', y='movieId', hue='rating', data=ratingsPandas, fit_reg=False, size=10, aspect=2, palette=sns.diverging_palette(10, 133, sep=80, n=10))
#axes = lm.axes
#axes[0,0].set_ylim(0,163949) # max movieId is 163949
#axes[0,0].set_xlim(0,671) # max userId is 671
#lm



In [22]:
#sns.violinplot([ratingsPandas.rating])

In [23]:


spark.sql("""
    SELECT *, 100 * nb_ratings/matrix_size AS percentage
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
""").show()



+--------+---------+----------+-----------+-----------------+
|nb_users|nb_movies|nb_ratings|matrix_size|       percentage|
+--------+---------+----------+-----------+-----------------+
|  138493|    26744|  20000263| 3703856792|0.539984781355445|
+--------+---------+----------+-----------+-----------------+



In [24]:
from pyspark.ml.recommendation import ALS

model = ALS(userCol="userId", itemCol="movieId", ratingCol="rating").fit(ratings)

In [25]:
predictions = model.transform(ratings)
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 74757|    148|   3.5|1064853335|  2.773692|
| 96393|    148|   3.0| 970170090|  2.680015|
| 53338|    148|   1.0| 834319825| 2.6870646|
| 22684|    148|   4.0| 832057800| 2.8577313|
| 97435|    148|   4.0|1042483722|  2.987529|
|136222|    148|   2.0| 849125057| 2.4274337|
|137949|    148|   4.0| 950909863| 3.1353018|
| 19067|    148|   2.0| 833483264| 1.5067592|
| 87301|    148|   2.0| 974945135| 2.6679068|
| 88527|    148|   2.0| 965659724|  2.308216|
|108726|    148|   3.0| 948831793| 2.8115225|
| 92852|    148|   3.0| 839813031| 2.5579293|
|123246|    148|   3.0| 833017056|  3.155644|
| 20132|    148|   3.0|1021775793|  2.655308|
| 22884|    148|   3.0| 944947868| 2.5870566|
| 96427|    148|   3.0| 860111242| 3.0752568|
| 10303|    148|   3.0| 940857361| 2.9307692|
| 36821|    148|   4.0| 979367598|  2.936095|
| 83090|    148|   2.0|1030400425|

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("El error cuadrático medio para nuestro modelo es:: " + str(evaluator.evaluate(predictions)))

El error cuadrático medio para nuestro modelo es: 0.779277791407944


In [27]:


(trainingRatings, testRatings) = ratings.randomSplit([80.0, 20.0])



In [28]:


als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)



In [29]:
predictions.show()


+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 88527|    148|   2.0| 965659724| 2.2641451|
| 20132|    148|   3.0|1021775793| 2.7054284|
| 13170|    148|   3.0| 885524891| 1.1319512|
| 60081|    148|   2.0| 837850255| 2.8961163|
|130122|    148|   3.0| 837811440| 2.8323267|
| 54726|    148|   5.0| 832703670| 3.3310623|
| 94994|    148|   4.0| 833661877| 2.7445264|
| 46380|    148|   4.0| 828462479| 2.8272812|
| 75781|    148|   3.0| 895230335| 2.8665926|
| 77165|    148|   3.0| 840699559|  2.869279|
| 68242|    148|   3.0|1047397251| 2.1411123|
| 35498|    148|   3.0| 848777439| 2.7368045|
| 81824|    148|   3.0| 847089543|   2.54909|
|109910|    148|   2.0| 907093395| 2.2866628|
| 86098|    148|   3.0| 842162037| 2.7881277|
| 10434|    148|   3.0| 837033792| 2.6159565|
|127911|    148|   1.0| 935288631|  2.132514|
| 91231|    148|   4.0|1025350818| 2.7349982|
| 89588|    148|   3.0|1049216998|

In [30]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions)))

The root mean squared error for our model is: nan


In [31]:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ("The average rating in the dataset is: " + str(avgRatings))

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions.na.fill(avgRatings))))

The average rating in the dataset is: 3.5255285642993797
The root mean squared error for our model is: 0.805761885113466


In [32]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("The root mean squared error for our model is: " + str(evaluator.evaluate(predictions.na.drop())))

The root mean squared error for our model is: 0.805617065671183


In [33]:
def repeatALS(data, k=3, userCol="userId", itemCol="movieId", ratingCol="rating", metricName="rmse"):
    evaluations = []
    for i in range(0, k):  
        (trainingSet, testingSet) = data.randomSplit([k-1.0, 1.0])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol="rating", predictionCol="prediction")
        evaluation = evaluator.evaluate(predictions.na.drop())
        print ("Loop " + str(i+1) + ": " + metricName + " = " + str(evaluation))
        evaluations.append(evaluation)
    return sum(evaluations)/float(len(evaluations))



In [34]:
print ("RMSE = " + str(repeatALS(ratings, k=4)))

Loop 1: rmse = 0.8056646639129105
Loop 2: rmse = 0.8061619472392852
Loop 3: rmse = 0.8058760169694468
Loop 4: rmse = 0.8063192947753541
RMSE = 0.8060054807242492


In [35]:

def kfoldALS(data, k=3, userCol="userId", itemCol="movieId", ratingCol="rating", metricName="rmse"):
    evaluations = []
    weights = [1.0] * k
    splits = data.randomSplit(weights)
    for i in range(0, k):  
        testingSet = splits[i]
        trainingSet = spark.createDataFrame(sc.emptyRDD(), data.schema)
        for j in range(0, k):
            if i == j:
                continue
            else:
                trainingSet = trainingSet.union(splits[j])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol="rating", predictionCol="prediction")
        evaluation = evaluator.evaluate(predictions.na.drop())
        print ("Loop " + str(i+1) + ": " + metricName + " = " + str(evaluation))
        evaluations.append(evaluation)
    return sum(evaluations)/float(len(evaluations))



In [36]:
print ("RMSE = " + str(kfoldALS(ratings, k=4)))

Loop 1: rmse = 0.8060031982561844
Loop 2: rmse = 0.8060467963052774
Loop 3: rmse = 0.8059659720574583
Loop 4: rmse = 0.805918756109011
RMSE = 0.8059836806819829


In [37]:
print ("RMSE = " + str(kfoldALS(ratings, k=10)))

Loop 1: rmse = 0.8048771593190648
Loop 2: rmse = 0.8054179576399685
Loop 3: rmse = 0.804122110369164
Loop 4: rmse = 0.804993093327777
Loop 5: rmse = 0.8052123895992385
Loop 6: rmse = 0.8051736224763585
Loop 7: rmse = 0.8047446017929901
Loop 8: rmse = 0.8052915758751963
Loop 9: rmse = 0.8053031155619893
Loop 10: rmse = 0.8049827538797963
RMSE = 0.8050118379841542


In [38]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select("movieId").distinct().withColumn("userId", lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.userId == user).select("movieId", "userId")

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy("prediction", ascending=False).limit(nbRecommendations).select("movieId", "prediction")

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title, movies.genres, predictions.prediction)

    recommendations.show(truncate=False)

In [39]:
print ("Recommendations for user 133:")
recommendMovies(model, 133, 10)
print ("Recommendations for user 471:")
recommendMovies(model, 471, 10)
print ("Recommendations for user 496:")
recommendMovies(model, 496, 10)

Recommendations for user 133:
+-------+-----------------------------------------------------+--------------------+----------+
|movieId|title                                                |genres              |prediction|
+-------+-----------------------------------------------------+--------------------+----------+
|112473 |Stuart: A Life Backward (2007)                       |Drama               |4.485329  |
|56869  |Drained (O cheiro do Ralo) (2006)                    |Comedy              |4.470287  |
|81117  |Moth, The (Cma) (1980)                               |Drama               |4.4433036 |
|100553 |Frozen Planet (2011)                                 |Documentary         |4.4707317 |
|120134 |Doggiewoggiez! Poochiewoochiez! (2012)               |Comedy              |4.5738916 |
|130347 |Bill Hicks: Sane Man (1989)                          |Comedy              |4.7635846 |
|116183 |It's Love I'm After (1937)                           |Comedy              |4.498064  |
|101717 |E