# Recommender System

___
**Challenges**
    - Provide good user experience to every user
    - Satisfy the business objectives
    - Measure its impact
    - Scaling
    - Expired items
    - Cold start - new users, new products
    - Sparsity of user preference
___    
**Types of Recommenders**
    - Content based
        - using explicit features of the users and/or items
    - Collaborative filtering
        - implicit features
        - based on observed interactions rather than metadata
    - Hybrid
___    
    
**Alternating Least Square(ALS) - Method of Choice**

There are many techniques for generating recommendations, such as 
    - Matrix factorization
    - co-occurrence analysis
    - content based filtering
    - graph based algorithms
    - hybrids
    
In our case, my dataset mainly contains only implicit features. I choose ALS recommender, which is a widely popular matrix factorization algorithm that uses alternating least squares with weighted lambda regularization. It factors the user-to-item matrix into the user-to-feature matrix and the item-to-feature matrix. This recommendation was also successfully used in the Netflix competition. 

One of the big strengths of ALS based recommender, compared to the user or item based recommender, is its ability to handle large sparse data sets and its better prediction performance. It is best suited for our datasets.
___

In [1]:
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.sql import Row, SQLContext
from pyspark.sql.functions import lit, udf
import pandas as pd
import numpy as np

___
The data file contains the following 3 columns:
    - timestamp
    - user_id
    - product_id
I drop the timeaxis, which is not relevant for my model
___

In [2]:
df = pd.read_csv("purchases.csv", sep=";", names=["timestamp", "user_id", "product_id"], header= None)
df = df.drop("timestamp", axis=1)

___
Here, I group users and the **unique products** that they have purchased (few users have purchased more than 1 item. In those cases, it is redundant and memory intensive to use those details)
___

In [3]:
rawdata = df.groupby("user_id").product_id.apply(set)

___
For the implementation of ALS using spark machine learning library, it is important that the input is integer. Strings such as product_ids cannot be used.

So, it is necessary that the product_ids should be integer. For this purpose, I create a dictionary to assign an integer to a product_id and another dictionary to reconvert integer to their corresponding product_ids
___

In [4]:
product_to_ix = {prod:i for i, prod in enumerate(df.product_id.unique())}
ix_to_product = {i:prod for i, prod in enumerate(df.product_id.unique())}

___
Convert the pandas dataframe to a format readable by the spark dataframe for applying ALS method
___

In [5]:
def expand_user(a):
    return [Rating(user, product_to_ix[item], 1) for user in a.index for item in a[user]]
ratings = expand_user(rawdata)

In [6]:
ratingsRDD = sc.parallelize(ratings)

___
In order to predict and improve the accuracy of my model, I randomly split the data into training and test.
___

In [7]:
training_RDD, test_RDD = ratingsRDD.randomSplit([8, 2], seed=123)
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

___
Now, the data is ready for training and testing. 

___

Things to note:
    - It's hard to test recommender systems, unless you put it into production
    - User's feedback from the recommended products is the best way to test, how well your recommender works
___


Tuning parameters:
    - lambda or the regularization parameter
        - This fights against overfitting of the model
    - rank
        - Number of latent factors
        - Increasing rank will decrease RMSE
        - Increasing rank is **computationally expensive**
    - Number of iterations
        - Tune this until RMSE stops decreasing. Best is somewhere between 5 and 20
    - Alpha - confidence parameter
        - Relevant with implicit data like the one we have
        - Original paper suggests 40, spark default is 0.1.

___

**Note**: I am getting memory errors for higher ranks. So, for this working example, I am restricting the ranks to a maximum of 25
___

In [9]:
seed = 4242
iterations = 10
regularization_parameter =[i * 0.01 for i in range(1, 20, 2)]
ranks = [3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25]
errors = [[0]*len(regularization_parameter)] * len(ranks)

min_error = float('inf')
best_lambda = -1
best_lambda_index = -1
best_model = None
best_rank = -1
best_rank_index = -1


# Loop over all possible value fr lambda and rank to find the best parameters for our model that minimize the rmse
for i, rank in enumerate(ranks):
    for j, regParam in enumerate(regularization_parameter):
        model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regParam)
        predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
        rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        error = np.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors[i][j] = error
        print('For lambda %s and rank %s the RMSE is %s' % (regParam, rank, error))
        if error < min_error:
            min_error = error
            best_lambda = regParam
            best_model = model
            best_rank = rank
            best_rank_index = i
            best_lambda_index = j
        with open('sparkLogging', 'a') as f:
            f.write("RMSE on testing set: {}, with rank: {}, lambda: {}\n".format(error, rank, regParam))


print('The best model was trained with lambda %s, rank %s and RMSE: %s' % (best_lambda, best_rank, min_error))

with open('sparkLoggingBest', 'a') as f:
    f.write("RMSE on testing set: {}, with rank: {} at index {}, lambda: {} at index {}\n".format(errors[best_rank_index][best_lambda_index], best_rank, best_lambda_index,  best_lambda, best_lambda_index))



For lambda 0.01 and rank 3 the RMSE is 0.561399001661
For lambda 0.03 and rank 3 the RMSE is 0.288752568934
For lambda 0.05 and rank 3 the RMSE is 0.216985722377
For lambda 0.07 and rank 3 the RMSE is 0.194333427767
For lambda 0.09 and rank 3 the RMSE is 0.18901181331
For lambda 0.11 and rank 3 the RMSE is 0.192734344792
For lambda 0.13 and rank 3 the RMSE is 0.201245941937
For lambda 0.15 and rank 3 the RMSE is 0.212475473862
For lambda 0.17 and rank 3 the RMSE is 0.22573564362
For lambda 0.19 and rank 3 the RMSE is 0.240504605629
For lambda 0.01 and rank 5 the RMSE is 0.551075230103
For lambda 0.03 and rank 5 the RMSE is 0.250500729443
For lambda 0.05 and rank 5 the RMSE is 0.184466149679
For lambda 0.07 and rank 5 the RMSE is 0.172463567727
For lambda 0.09 and rank 5 the RMSE is 0.175188894463
For lambda 0.11 and rank 5 the RMSE is 0.18363965509
For lambda 0.13 and rank 5 the RMSE is 0.195089331643
For lambda 0.15 and rank 5 the RMSE is 0.208422095848
For lambda 0.17 and rank 5 the 

___
**Best model was trained with lambda 0.07 and rank 17 the RMSE is 0.149434352372**

___

Now, I tune the iterations. More than 25 iterations, I have memory error. 
___

In [11]:
seed = 4242
iterations = [5, 10, 15, 20]
regParam = 0.07
rank = 17

for iteration in iterations:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iteration, lambda_=0.07)
    predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = np.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    print('For %d iterations, the RMSE is %s' % (iteration, error))

For 5 iterations, the RMSE is 0.33973580526
For 10 iterations, the RMSE is 0.149434352372
For 15 iterations, the RMSE is 0.135585874754
For 20 iterations, the RMSE is 0.133802020976


___
Now, I use the best fit parameters on my model to train
___

In [13]:
model = ALS.train(ratingsRDD, rank=17, seed=4242, iterations=20, lambda_=0.07)

In [33]:
### replace users with list(df.user_id.unique()) to get recommendation for all users
users = [4, 1, 12, 11, 10]
for user in users:
    best_5_recommendations = model.recommendProducts(user, 5)
    print("_"*50)
    print("\nFor user %d we recommend the following 5 products:\n" %user)
    for recommendation in best_5_recommendations:
        print(" "*10, ix_to_product[recommendation.product])
        

__________________________________________________

For user 4 we recommend the following 5 products:

           AC016EL50CPHALID-1749
           AP082ELAD87SANID-176936
           IL086ELABGL9ANID-74783
           DR901TBAC245ANID-110616
           SA848ELAD130ANID-165269
__________________________________________________

For user 1 we recommend the following 5 products:

           AP082ELAD69IANID-173547
           KE263SPABEIOANID-71986
           NO749ELAD9EXANID-178702
           AP082EL51ANKALID-348
           DI747EL56RLNANID-50444
__________________________________________________

For user 12 we recommend the following 5 products:

           AP082EL51ANKALID-348
           NO749ELAD9EXANID-178702
           KE263SPABEIOANID-71986
           AP082ELAD69IANID-173547
           AC016EL58BKFALID-941
__________________________________________________

For user 11 we recommend the following 5 products:

           AC016EL58BKFALID-941
           AD029EL42BKVALID-957
           W