# Alternating Least Squares (ALS) Recommender V1.0

In [31]:
# import necessary libraries
from pyspark.sql import SparkSession

# instantiate SparkSession object
# spark = SparkSession.builder.master('local').getOrCreate()

spark = SparkSession\
        .builder\
        .appName('ALSExample').config('spark.driver.host', 'localhost')\
        .getOrCreate()

In [32]:
# read in the dataset into pyspark DataFrame
action_ratings = spark.read.csv('./data/ratings-v2.csv', header='true', inferSchema='true')

Check the data types of each of the columns to ensure that they are a type that makes sense given the column.

In [33]:
action_ratings.dtypes

[('userId', 'int'),
 ('actionId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

We aren't going to need the timestamp, so we can go ahead and remove that column.

In [34]:
action_ratings = action_ratings.drop('timestamp')

### Fitting the Alternating Least Squares Model


* Import `ALS` from `pyspark.ml.recommendation` module 
* Use the `.randomSplit()` method on the pyspark DataFrame to separate the dataset into training and test sets
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the `userCol`, `itemCol`, and `ratingCol` to the appropriate columns given this dataset. Then fit the data to the training set and assign it to a variable model  

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

# split into training and testing sets
(training, test) = action_ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5,rank=4, regParam=0.01, userCol='userId', itemCol='actionId', ratingCol='rating',
          coldStartStrategy='drop')

# fit the ALS model to the training set
model = als.fit(training)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* Import `RegressionEvalutor` from `pyspark.ml.evaluation` 
* Generate predictions with your model for the test set by using the `.transform()` method on your ALS model 
* Evaluate your model and print out the RMSE from your test set 

In [36]:
# importing appropriate library
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Root-mean-square error = ' + str(rmse))

Root-mean-square error = 0.9929050579482068


### Cross-validation to Find the Optimal Model

Find the optimal values for the parameters of the ALS model. Use the built-in `CrossValidator` in PySpark with a suitable param grid and determine the optimal model. Try with the parameters:

* regularization = [0.01, 0.001, 0.1]
* rank = [4, 10, 50]


In [37]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model
als_model = ALS(userCol='userId', itemCol='actionId', 
                ratingCol='rating', coldStartStrategy='drop')

# create the parameter grid                 
params = ParamGridBuilder()\
          .addGrid(als_model.regParam, [0.01, 0.001, 0.1])\
          .addGrid(als_model.rank, [4, 10, 50]).build()


# instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)
best_model = cv.fit(action_ratings)    

# We see the best model has a rank of 50, so we will use that in our future models with this dataset
best_model.bestModel.rank

                                                                                

50

## Getting Recommendations

Now it's time to actually get some recommendations! The ALS model has built-in methods called `.recommendForUserSubset()` and `.recommendForAllUsers()`. We'll start off with using a subset of users. 

In [38]:
users = action_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended action by way of the function you just created, using number one item for this user.

In [39]:
# use indexing to obtain the action id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

Of course, you can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [40]:
recommendations = model.recommendForAllUsers(5)
recommendations.where(recommendations.userId == 3).collect()

[Row(userId=3, recommendations=[Row(actionId=72167, rating=7.379295825958252), Row(actionId=3142, rating=6.3740620613098145), Row(actionId=4663, rating=6.285203456878662), Row(actionId=1866, rating=6.064001083374023), Row(actionId=33830, rating=5.827307224273682)])]

### Getting Predictions for a New User

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some actions they've rated and then return $n$ number of highest recommended actions. This function will have multiple different steps to it:

* Adding the new ratings into the DataFrame (hint: look into using the `.union()` method) 
* Fitting the ALS model  
* Make recommendations for the user of choice 
* Print out the names of the top $n$ recommendations in a reader-friendly manner 



In [41]:
def new_user_recs(user_id, new_ratings, rating_df, num_recs):
    # turn the new_recommendations list into a spark DataFrame
    new_user_ratings = spark.createDataFrame(new_ratings,rating_df.columns)
    
    # combine the new ratings df with the rating_df
    action_ratings_combined = rating_df.union(new_user_ratings)
    
    # split the dataframe into a train and test set
#     (training, test) = action_ratings_combined.randomSplit([0.8, 0.2],seed=0)
    
    # create an ALS model and fit it
    als = ALS(maxIter=5,rank=50, regParam=0.01, userCol="userId", itemCol="actionId", ratingCol="rating",
          coldStartStrategy="drop")
    model = als.fit(action_ratings_combined)
    
    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)
    
    # get recommendations specifically for the new user that has been added to the DataFrame
    recs_for_user = recommendations.where(recommendations.userId == user_id).take(1)
    
    for ranking, (action_id, rating) in enumerate(recs_for_user[0]['recommendations']):
        action_string = action_id
        print('Recommendation {}: {}  | predicted score :{}'.format(ranking+1,action_string,rating))
        

In [42]:
user_id = 100000
user_ratings_1 = [(user_id,3253,5),
                  (user_id,2459,5),
                  (user_id,2513,4),
                  (user_id,6502,5),
                  (user_id,1091,5),
                  (user_id,441,4)]
new_user_recs(user_id,
             new_ratings=user_ratings_1,
             rating_df=action_ratings,
             num_recs = 10)

Recommendation 1: 318  | predicted score :5.573312759399414
Recommendation 2: 593  | predicted score :5.422237873077393
Recommendation 3: 1198  | predicted score :5.411786079406738
Recommendation 4: 356  | predicted score :5.4110493659973145
Recommendation 5: 1270  | predicted score :5.398866653442383
Recommendation 6: 919  | predicted score :5.389611721038818
Recommendation 7: 260  | predicted score :5.38344144821167
Recommendation 8: 58559  | predicted score :5.342963695526123
Recommendation 9: 7153  | predicted score :5.339134216308594
Recommendation 10: 4993  | predicted score :5.3378376960754395
