<a href="https://colab.research.google.com/github/JJtheNOOB/Recommender_system/blob/master/Recommender_System_with_ALS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#grab java and pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark
#Changing directories
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

import findspark
findspark.init()

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder #Measure the performance of the model / fine tuning hyperparameters

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [0]:
def loadMovieNames():
    movieNames = {}
    with open("u.item", encoding = "ISO-8859-1") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def parseInput(line):
    fields = line.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))

In [0]:
# Create a SparkSession
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Load up our movie ID -> name dictionary
movieNames = loadMovieNames()

# Get the raw data
lines = spark.sparkContext.textFile("u.data")
# Convert it to a RDD of Row objects with (movieID, rating)
movies = lines.map(parseInput)
# Convert that to a DataFrame
movie_ratings = spark.createDataFrame(movies)

In [11]:
movie_ratings.show(10)

+-------+------+------+
|movieID|rating|userID|
+-------+------+------+
|     50|   5.0|     0|
|    172|   5.0|     0|
|    133|   1.0|     0|
|    242|   3.0|   196|
|    302|   3.0|   186|
|    377|   1.0|    22|
|     51|   2.0|   244|
|    346|   1.0|   166|
|    474|   4.0|   298|
|    265|   2.0|   115|
+-------+------+------+
only showing top 10 rows



In [0]:
#Create test and train set
(train, test) = movie_ratings.randomSplit([0.8, 0.2])

In [0]:
#Create ALS model
#We do not want any negative predictions
als = ALS(userCol = 'userID', itemCol = 'movieID', ratingCol = 'rating', coldStartStrategy= 'drop', nonnegative= True)

In [0]:
#Tuning model using ParamGridBuilder
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [12, 13, 14]) \
.addGrid(als.maxIter, [18, 19, 20]) \
.addGrid(als.regParam, [.17, .18, .19]) \
.build()

In [0]:
#Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [0]:
#Build cross validation with TrainValidationSplit
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator)

#cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 3)

In [0]:
#Fit AlS model to training dataset
model = tvs.fit(train)

In [0]:
#Extract the best model
best_model = model.bestModel

In [0]:
#Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [65]:
#Print evaluation metrics and model parameters
print("RMSE = " + str(rmse))
print("**Best Model**")
print("  Rank: {}".format(best_model.rank))
print("  MaxIter: {}".format(best_model._java_obj.parent().getMaxIter()))
print("  RegParam: {}".format(best_model._java_obj.parent().getRegParam()))

RMSE = 0.9140117363885056
**Best Model**
  Rank: 10
  MaxIter: 10
  RegParam: 0.1


In [35]:
best_model._java_obj.parent().getRegParam()

0.1

In [46]:
predictions.sort("userID", "rating").show()

+-------+------+------+----------+
|movieID|rating|userID|prediction|
+-------+------+------+----------+
|    247|   1.0|     1| 1.5480329|
|    254|   1.0|     1| 1.7032442|
|    104|   1.0|     1| 1.5732744|
|    213|   2.0|     1| 4.0008407|
|    155|   2.0|     1| 2.8452184|
|    105|   2.0|     1| 2.3112144|
|     27|   2.0|     1| 2.6857083|
|    149|   2.0|     1| 3.4934838|
|     30|   3.0|     1| 4.2438455|
|    272|   3.0|     1|  4.513656|
|     83|   3.0|     1|  3.981583|
|      4|   3.0|     1| 3.8707056|
|    215|   3.0|     1| 3.7305856|
|    200|   3.0|     1|  3.845335|
|    189|   3.0|     1|  4.022809|
|      2|   3.0|     1| 2.9541957|
|    205|   3.0|     1|  4.137641|
|    232|   3.0|     1| 2.9891453|
|     38|   3.0|     1|   2.28269|
|    214|   4.0|     1| 3.6274855|
+-------+------+------+----------+
only showing top 20 rows



In [0]:
#Top 10 recommendor for each user
user_rec = best_model.recommendForAllUsers(10)

In [0]:
def get_recs_for_user(recs):
  '''
  recs should be for a specific user
  '''
  recs = recs.select("recommendations.movieID", "recommendations.rating")
  movies = recs.select("movieId").toPandas().iloc[0, 0]
  ratings = recs.select("rating").toPandas().iloc[0, 0]
  ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
  ratings_matrix["ratings"] = ratings
  ratings_matrix_ps = spark.createDataFrame(data = ratings_matrix)
  return ratings_matrix_ps
  

In [61]:
import pandas as pd
from pyspark.sql import SQLContext
db = get_recs_for_user(user_rec)
db.show()

+-------+------------------+
|movieId|           ratings|
+-------+------------------+
|   1159| 6.172708511352539|
|   1260| 5.787796974182129|
|    113| 5.293854236602783|
|   1294|5.2445831298828125|
|   1589| 5.174691677093506|
|    361| 5.171027183532715|
|   1217| 5.169522762298584|
|   1061| 5.141602516174316|
|   1275| 5.087820529937744|
|    394| 5.043245315551758|
+-------+------------------+

