# Merchant recommender service using Spark - Building the recommender

This notebook explains how to use Shopback's affinity based data similar to  [MovieLens dataset](http://grouplens.org/datasets/movielens/) to build a merchant recommender model using [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) with [Spark's Alternating Least Saqures](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) implementation. It is about getting and parsing movies and ratings data into Spark RDDs. The second is about building and using the recommender and persisting it for later use in our on-line recommender system.    

It is also [**publicly available since 2014 at Spark Summit**](https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)

## Getting and processing the data

The list of task we can pre-compute includes:  

- Loading and parsing the dataset. Persisting the resulting RDD for later use.  
- Building the recommender model using the complete dataset. Persist the dataset for later use.  

This notebook explains the first of these tasks.  

### Loading and parsing datasets

No we are ready to read in each of the files and create an RDD consisting of parsed lines.  

Each line in the ratings dataset (`ratings.csv`) is formatted as:  

`userId,storeid,rating`  

Each line in the movies (`stores.csv`) dataset is formatted as:  

`storeID,merchant_name`

The format of these files is uniform and simple, so we can use Python [`split()`](https://docs.python.org/2/library/stdtypes.html#str.split) to parse their lines once they are loaded into RDDs. Parsing the merchants and ratings files yields two RDDs:  

* For each line in the ratings dataset, we create a tuple of `(UserID, MerchantID, Affinity)`.  
* For each line in the merchants dataset, we create a tuple of `(MerchantID, Merchant Name)`.

So let's load the raw ratings data. We need to filter out the header, included in each file.    

In [26]:
ratings_file = os.path.join('Final_input.csv')

ratings_raw_data = sc.textFile(ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]

In [27]:
ratings_raw_data_header

u'account_id,Merchant,store_id,Visit,Merchant,Affinity'

Now we can parse the raw data into a new RDD.  

In [28]:
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[2]),float(tokens[5]))).cache()

For illustrative purposes, we can take the first few lines of our RDD to see the result. In the final script we don't call any Spark action (e.g. `take`) until needed, since they trigger actual computations in the cluster.  

In [29]:
ratings_data.take(3)

[(1, 16, 0.0), (1, 1, 0.0), (1, 4, 0.0)]

We proceed in a similar way with the `merchants.csv` file.

In [30]:
merchants_file = os.path.join('Store_ID.csv')

merchants_raw_data = sc.textFile(merchants_file)
merchants_raw_data_header = merchants_raw_data.take(1)[0]

merchants_data = merchants_raw_data.filter(lambda line: line!=merchants_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),str(tokens[1]))).cache()
    
merchants_data.take(3)

[(1, 'Groupon'), (2, 'ZALORA'), (3, 'Lazada')]

The following sections introduce *Collaborative Filtering* and explain how to use *Spark MLlib* to build a recommender model. We will close the tutorial by explaining how a model such this is used to make recommendations, and how to persist it for later use (e.g. in our Python/flask web-service).

## Collaborative Filtering

In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.  

The image below (from [Wikipedia](https://en.wikipedia.org/?title=Collaborative_filtering)) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.  

![collaborative filtering](https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif)

Spark MLlib library for Machine Learning provides a [Collaborative Filtering](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) implementation by using [Alternating Least Squares](http://dl.acm.org/citation.cfm?id=1608614). The implementation in MLlib has the following parameters:  

- numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).  
- rank is the number of latent factors in the model.  
- iterations is the number of iterations to run.  
- lambda specifies the regularization parameter in ALS.  
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.  
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.  


## Selecting ALS parameters

In order to determine the best ALS parameters, we need first to split it into train, validation, and test datasets.

In [31]:
training_RDD, validation_RDD, test_RDD = ratings_data.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

Now we can proceed with the training phase. 

In [32]:
from pyspark.mllib.recommendation import ALS
import math

In [33]:
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.215259181275
For rank 8 the RMSE is 0.215127988361
For rank 12 the RMSE is 0.215151993856
The best model was trained with rank 8


But let's explain this a little bit. First, let's have a look at how our predictions look.  

In [34]:
predictions.take(3)

[((98319, 15), 0.001720996763899785),
 ((15813, 15), 0.0029729293239385943),
 ((61266, 15), 0.0)]

Basically we have the UserID, the MovieID, and the Rating, as we have in our ratings dataset. In this case the predictions third element, the rating for that movie and user, is the predicted by our ALS model.

Then we join these with our validation data (the one that includes ratings) and the result looks as follows:  

In [35]:
rates_and_preds.take(3)

[((22717, 6), (0.17, 0.04679419737175364)),
 ((47922, 3), (0.0, 0.01507274185881634)),
 ((20273, 9), (0.0, 0.0))]

To that, we apply a squared difference and the we use the `mean()` action to get the MSE and apply `sqrt`.

Finally we test the selected model.

In [36]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.217612766924


## How to make recommendations

Although we aim at building an merchant recommender, now that we know how to have our recommender model ready, we can give it a try providing some merchant recommendations.

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular merchant. These are less costly operations than training the model itself.    

In [40]:
merchant_names = merchants_data.map(lambda x: (int(x[0]),x[1]))

In [42]:
print "There are %s merchants in the dataset" % (merchant_names.count())

There are 25 merchants in the dataset


Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per merchant.  

In [43]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

merchant_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
merchant_ID_with_avg_ratings_RDD = merchant_ID_with_ratings_RDD.map(get_counts_and_averages)
merchant_rating_counts_RDD = merchant_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings

Now we need to rate some merchants for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the dataset. 

In [44]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,3,0.5), # Lazada
     (0,4,0.25), # RedMart
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [(0, 3, 0.5), (0, 4, 0.25)]


Now we add them to the data we will use to train our recommender model. We use Spark's `union()` transformation for this.  

In [45]:
new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

And finally we train the ALS model using all the parameters we selected before (when using the small dataset).

In [46]:
from time import time

t0 = time()
new_ratings_model = ALS.train(new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 1.851 seconds


It took some time. We will need to repeat that every time a user add new ratings. Ideally we will do this in batches, and not for every single rating that comes into the system for every user.

### Getting top recommendations

Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.  

In [48]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just merchant IDs
new_user_unrated_merchants_RDD = (ratings_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
# Use the input RDD, new_user_unrated_merchants_RDD, with new_ratings_model.predictAll() to predict new ratings for the merchants
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_merchants_RDD)

We have our recommendations ready. Now we can print out the 25 merchants with the highest predicted ratings. And join them with the merchants RDD to get the merchant names, and ratings count in order to get merchants with a minimum number of counts. First we will do the join and see what does the result looks like.

In [58]:
# Transform new_user_recommendations_RDD into pairs of the form (Merchant ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_name_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(merchant_names).join(merchant_rating_counts_RDD)

In [59]:
new_user_recommendations_rating_name_and_count_RDD.take(12)

[(1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (1, ((0.18588031331650415, 'Groupon'), 17673)),
 (22, ((0.599511208127164, 'Booking.com'), 260)),
 (22, ((0.599511208127164, 'Booking.com'), 260)),
 (22, ((0.599511208127164, 'Booking.com'), 260)),
 (2, ((0.17926402914694517, 'ZALORA'), 16433)),
 (23, ((0.48003086064160616, 'HotelClub'), 29))]

So we need to flat this down a bit in order to have `(Merchant Namew, Rating, Ratings Count)`.

In [51]:
new_user_recommendations_rating_name_and_count_RDD = \
    new_user_recommendations_rating_name_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.

In [52]:
top_merchants = new_user_recommendations_rating_name_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended merchants (with more than 25 affinity):\n%s' %
        '\n'.join(map(str, top_merchants)))

TOP recommended merchants (with more than 25 affinity):
('Booking.com', 0.599511208127164, 260)
('Booking.com', 0.599511208127164, 260)
('Booking.com', 0.599511208127164, 260)
('HotelClub', 0.48003086064160616, 29)
('HotelClub', 0.48003086064160616, 29)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('Groupon', 0.18588031331650415, 17673)
('ZALORA', 0.17926402914694517, 16433)
('Apple', 0.13102541089348396, 1004)
('Apple', 0.13102541089348396, 1004)
('Apple', 0.13102541089348396, 1004)
('Taobao', 0.08430277922600536, 5699)
('Taobao', 0.08430277922600536, 5699)
('Taobao', 0.08430277922600536, 5699)
('Taobao', 0.08430277922600536, 5699)
('Shopbop', 0.08025878945362683, 1578)
('HipVan', 0.07407002598122106, 1318)
('HipVan', 0.07407002598122106, 1318)
('HipVan', 0.07407002598122106, 1318)
('HipVan', 

### Getting individual ratings

Another useful usecase is getting the predicted rating for a particular merchant for a given user. The process is similar to the previous retreival of top recommendations but, instead of using `predcitAll` with every single merchant the user hasn't rated yet, we will just pass the method a single entry with the movie we want to predict the rating for.  

In [38]:
my_merchant = sc.parallelize([(0, 500)]) # 
individual_merchant_rating_RDD = new_ratings_model.predictAll(new_user_unrated_merchants_RDD)
individual_merchant_rating_RDD.take(1)

[Rating(user=0, product=315, rating=0.0)]