# INM432 Big Data Coursework 2016/207 Part 2: Spark Pipelines and Evaluation of Scaling of Algorithms

### Team Members: Ryan Nazareth and Aimore Dutra 

---

# 1) Introduction
With the advent of the internet connecting everything and everyone, it became easy to one access a large amount of information. However, the facility to reach so much data also brought some problems. Consumers have to deal with an immeasurable number of items, loosing their time trying to find what they look for.

Hence, big companies that have an immensity of products in their database are keen on advertising their products in a smart way helping their clients to find what they want.

Nowadays, Recommendation Systems are being developed to address this problem.

---


## 1.1) Task

Our task is to create a Recommender System that can suggest new movies to users based on their preferences (ratings).

There are several possible approaches for the recommendation task [1]:

##### 1) Recommend the most popular items
##### 2) Use a classifier to make recommendation
##### 3) Collaborative Filtering

#### We chose the Collaborative Filtering technique because this method gives more personalization and makes a more efficient use of data.

The Collaborative Filtering approach has two main types:
* a) User to User
* b) Item to Item

Item-item most of the time tends to be more accurate and computationally cheaper.




> #### References
[1] https://www.analyticsvidhya.com/blog/2016/06/quick-guide-build-recommendation-engine-python/


## 1.2) Dataset

*Movies and most recently series have become a trend due to their current amazing quality and quantity at hand. Thanks to the advances in technology allowing them to be cheaper and quickly produced, there are millions of movies and series available.
Not only more content is being created, but the existing ones are being stored. This has resulted in viewers having difficulties to find new video entertainment instances that they like.*



The selected dataset for the coursework was the "(ml-20m)" from MovieLens, a movie recommendation service [1,2]. We made this choice because it has a lot of data and it most important it contains user ratings that allow us to use the Collaborative Filtering technique. The details of the dataset is below: 

- 27,278 movies (with 19 different Genres)
- 138,493 users
- 465,564 tag applications 
- and 20,000,263 ratings (from 1-5 stars)

These data were created by  users between January 09, 1995 and March 31, 2015.

The data are divided in six files, containing each:
- genome-scores.csv: MovieID::TagId::relevance
- genome-tags.csv:   TagId::Tag
- links.csv:         MovieID::imdbID::tmdbID
- movies.csv:        MovieID::Title::Genres
- ratings.csv:       UserID::MovieID::Rating::Timestamp
- tags.csv:          UserID::MovieID::Tag::Timestamp


> #### References
[1] F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages. DOI=http://dx.doi.org/10.1145/2827872

>[2] http://files.grouplens.org/datasets/movielens/ml-20m-README.html

## 1.3) Learning Algorithm - ALS

Collaborative filtering is commonly used for recommender systems [1]. These techniques aim to fill in the missing entries of a user-item association matrix. spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.ml has the following parameters:

"
- **numBlocks** is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
- **rank** is the number of latent factors in the model (defaults to 10).
- **maxIter** is the maximum number of iterations to run (defaults to 10).
- **regParam** specifies the regularization parameter in ALS (defaults to 1.0).
- **implicitPrefs** specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
- **alpha** is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
- **nonnegative** specifies whether or not to use nonnegative constraints for least squares (defaults to false)."

We will be focusing in the **rank** and the **maxIter** parameters because


### Implicity vs Explicity
We will utilize the explicity method since the dataset doesn't contain implicit feedback (e.g. views, clicks, purchases, likes, shares etc.).

### Train-Validation Split
In addition to CrossValidator Spark also offers TrainValidationSplit for hyper-parameter tuning. TrainValidationSplit only evaluates each combination of parameters once, as opposed to k times in the case of CrossValidator. It is therefore less expensive, but will not produce as reliable results when the training dataset is not sufficiently large. [2]



> #### References
[1] https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

>[2] https://spark.apache.org/docs/latest/ml-tuning.html

## 1.4) Considerations
Since we are using collaborative filtering algorithm to predict ratings and recommend movies, we do not have a requirement for working with Feature Extractors, Transformers and Selectors, since the algorithm only requires a user-item matrix and user ratings. Therefore, we will focus more on the modeling and commenting on the results for different parameters. Also, in the end we will try to use our trained system to recommend movies for a new user and print out the 20 most relevant based on his ratings.


>#### References
[1]  https://spark.apache.org/docs/1.6.0/ml-features.html

---
# 2) Code


## 2.1) Loading data and applying transformations

In [159]:
## Import and Load Dataset

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
import numpy as np
import math 
import time

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Load data from the path to a dataframe called "ratings"
## Small Dataset
rating = spark.read.format("csv").option("header", "true").load("hdfs://saltdean/data/movielens/ml-latest-small/ratings.csv")
## Large Dataset
# ratings = spark.read.format("csv").option("header", "true").load("hdfs://saltdean/data/movielens/ml-20m/ratings.csv")
# Check which features are present 
print("rating")
rating.show()

rating
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
|     1|   1263|   2.0|1260759151|
|     1|   1287|   2.0|1260759187|
|     1|   1293|   2.0|1260759148|
|     1|   1339|   3.5|1260759125|
|     1|   1343|   2.0|1260759131|
|     1|   1371|   2.5|1260759135|
|     1|   1405|   1.0|1260759203|
|     1|   1953|   4.0|1260759191|
|     1|   2105|   4.0|1260759139|
|     1|   2150|   3.0|1260759194|
|     1|   2193|   2.0|1260759198|
|     1|   2294|   2.0|1260759108|
|     1|   2455|   2.5|1260759113|
|     1|   2968|   1.0|1260759200|
|     1|   3671|   3.0|1260759117|
+------+-------+------+----------+
only showing top 20 rows



In [160]:
## Change the Dataset size

# (rating, garbage) = rating.randomSplit([0.999, 0.001]) # ~100,000 rows
(rating, garbage) = rating.randomSplit([0.1, 0.9])       # ~10,000 rows
# (rating, garbage) = rating.randomSplit([0.01, 0.99])   # ~1000 rows
# (rating, garbage) = rating.randomSplit([0.001, 0.999]) # ~100 rows
# Print dataset size
print('dataset data size: ', rating.count(),' rows') 
# --------------------------------------------------------------------------- #
# Split the data into training (80%) and hold-out testing data (20%)
(training, test) = rating.randomSplit([0.8, 0.2])
# --------------------------------------------------------------------------- #
# Reduce the training data set 
(training, garbage) = training.randomSplit([0.5, 0.5])   # ~50%
# (training, garbage) = training.randomSplit([0.25, 0.75]) # ~25% 
# (training, garbage) = training.randomSplit([0.1, 0.9])   # ~10% 
# (training, garbage) = training.randomSplit([0.01, 0.99]) # ~1% 
# --------------------------------------------------------------------------- #
# Print traindata size
print('training data size: ',training.count(),' rows') 
# Print training data size
print('test data size: ',test.count(),' rows')

dataset data size:  9837  rows
training data size:  3941  rows
test data size:  1988  rows


In [161]:
## Manipulation of the Data

# Load data from path to dataframe called "movies" (all the movies)
movies = spark.read.format("csv").option("header", "true").load("hdfs://saltdean/data/movielens/ml-20m/movies.csv")
# Check which features are present 
print("movies")
movies.show()

# Join the columns of "movies" with "training" and "test" datasets into "movielens_training" and "movielens_test"
movielens_training = training.join(movies, "movieId")
movielens_test = test.join(movies, "movieId")

# Cast data type from String to Integer and Double for training and test datasets
movielens_training = movielens_training.withColumn("movieId", training["movieId"].cast(IntegerType()))
movielens_training = movielens_training.withColumn("rating", training["rating"].cast(DoubleType()))
movielens_training = movielens_training.withColumn("timestamp", training["timestamp"].cast(IntegerType()))
movielens_training = movielens_training.withColumn("userId", training["userId"].cast(IntegerType()))

movielens_test = movielens_test.withColumn("movieId", test["movieId"].cast(IntegerType()))
movielens_test = movielens_test.withColumn("rating", test["rating"].cast(DoubleType()))
movielens_test = movielens_test.withColumn("timestamp", test["timestamp"].cast(IntegerType()))
movielens_test = movielens_test.withColumn("userId", test["userId"].cast(IntegerType()))

# ----- Print the training -----
print("\n Training")
training_count = movielens_training.count()
print('movielens_training data size: ',training_count,' rows') 
movielens_training.show(50)
# Print the types used in each column
print("Type of each Column \n")
movielens_training.printSchema()

# ----- Print the test -----
print("\n Test")
test_count=movielens_test.count()
print('movielens_test data size: ',test_count,' rows') 
movielens_test.show(50)
# Print the types used in each column
print("Type of each Column \n")
movielens_test.printSchema()

# Create the whole dataset again with less data
print("\n All Dataset")
all_count = test_count+training_count
print('All Dataset size: ',(all_count),' rows') 
movie_ratings = movielens_training.unionAll(movielens_test)
movie_ratings.show()

movies
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|    

## 2.2) Training and Testing the System
- Split Training (80%) and Testing(20%) data
- Change the training data size
- Do a Grid Search to select the best model
- Predict test data using the best model
- Evaluate the best model's performance and time taken for training and testing

In [162]:
## Create the recommender system 

print("Create Training Algorithm (ALS)")
# Create an Alternate Least Square learning algorithm (Estimator)
als = ALS(rank=10, 
          maxIter=10,
          userCol="userId",   
          itemCol="movieId",  
          ratingCol="rating")
### numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
### rank is the number of latent factors in the model (defaults to 10).
### maxIter is the maximum number of iterations to run (defaults to 10).
### regParam specifies the regularization parameter in ALS (defaults to 1.0).
print("Create Parameter Grid Builder")
## Create a ParamGridBuilder to construct a grid of parameters to search over. (ParameterMaps)
# paramGrid = ParamGridBuilder().addGrid(als.rank, [1,3,10,20])\
#                                 .addGrid(als.maxIter, [1,3,10,20])\
#                                 .build()
            
## No Grid Search
paramGrid = ParamGridBuilder().build() # * UnComment this line and comment the block above to run quickly

# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# In this case the estimator is simply the linear regression. (Evaluator)
regEval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

#### ------------------------ TRAINING ------------------------ ####
# Get start time
s=time.time() # training time

# Train and Validate models
tvs = TrainValidationSplit(estimator=als,
                            estimatorParamMaps=paramGrid,
                            evaluator=regEval,
                            # 80% of the data will be used for training, 20% for validation.
                            trainRatio=0.8, seed=5)

# Run TrainValidationSplit to train and choose the best set of parameters.
model = tvs.fit(movielens_training)

# Get the best model parameters
best_model = model.bestModel
maxIter = (best_model
    ._java_obj     # Get Java object
    .parent()      # Get parent (ALS estimator)
    .getMaxIter()) # Get best maxIter

rank = best_model.rank # Get best rank
print("rank:", rank)
print("maxIter:", maxIter)
print('\n-----------')

# Get end time
e=time.time() # training time

# Test the model's prediction in the hold-out Training data           
predictions = best_model.transform(movielens_training)

# Drop any rows with nan values from prediction (due to cold start problem)
# predictions = predictions.dropna()
predictions = predictions.fillna(0);

# Evaluate the overall performance of the model by computing the Root-mean-square error (RMSE) on the Training data
rmse = regEval.evaluate(predictions)
print("Training Error (RMS) = ", round(rmse, 4))

## Print the size of training data
# print('Training data size: ',training.count(),' rows')  
     
# Print the time spent to train
print('Training time: ',round(e-s, 4),' seconds')

#### ------------------------ TESTING ------------------------ ####
# Get relative time
s=time.time() # testing time

# Test the model's prediction in the hold-out Test data           
predictions = best_model.transform(movielens_test)

# Get end time
e=time.time() # testing time

# Drop or replace with 0 any rows with nan values from prediction (due to cold start problem)
predictions = predictions.dropna()
# predictions = predictions.fillna(0);
               
# Evaluate the overall performance of the model by computing the Root-mean-square error (RMSE) on the Test data
rmse = regEval.evaluate(predictions)
print('')
print("Test Error (RMS) = ", round(rmse, 4))

## Print the size of test data
# print('Test data size: ',test.count(),' rows')  
     
# Print the time spent to test
print('Test time: ',round(e-s, 4),' seconds')

####------ Results for training and testing without Grid Search -------####
## Training data size: ~80,000 rows
# Training Error (RMS) =  0.6382  || Test Error (RMS) =  1.1182
# Training time:  10.497  seconds || Test time:  0.0601  seconds

## Training data size: ~8,000 rows
# Training Error (RMS) =  0.2326  || Test Error (RMS) =  2.8208
# Training time:  7.6864  seconds || Test time:  0.0603  seconds
    
## Training data size: ~800 rows
# Training Error (RMS) =   0.0547 || Test Error (RMS) =  3.7398
# Training time:  6.492  seconds  || Test time:  0.0514  seconds
 
## Training data size: ~80 rows
# Training Error (RMS) =  0.0501  || Test Error (RMS) =  3.7025
# Training time:  5.419  seconds  || Test time:  0.0504  seconds

# As we can see the larger the dataset the more computational expensive it is. 
# The training error is low and testing error is high for small dataset (meaning overfitting). 
# When the dataset is large, the model generalizes better, reducing overfitting.
# One important point is that, only when the data set is really big (around 100,000) 
# is when the model starts to having a good prediction

# The Parameter Grid Search was applied in the largest dataset size and the results are documented below:
# Training Error (RMS) =  2.4116    || Test Error (RMS) =  3.2335
# Training time:  251.6069  seconds || Test time:  0.0446  seconds

Create Training Algorithm (ALS)
Create Parameter Grid Builder
rank: 10
maxIter: 10

-----------
Training Error (RMS) =  0.1224
Training time:  20.8864  seconds

Test Error (RMS) =  2.1485
Test time:  0.0658  seconds


## 2.3) Adding a new user and recommending movies (Extra Work)

In [180]:
## Suggest movies to a new user

import numpy as np
from pyspark.sql.functions import desc

df =[(0,2,2),    # 
     (0,32,3),   # Twelve Monkeys
     (0,589,5),  # Terminator 2
     (0,50,4),   # Usual Suspects
     (0,1080,4), # Monty Python 
     (0,1278,1), # Young Frankenstein
     (0,1266,1), # Unforgiven 
     (0,1249,1), # Femme Nikita 
     (0,1090,1), # Platoon 
     (0,919,1) , # Wizard of Oz
     (0,47,5)]   # Seven 

df1 = sqlContext.createDataFrame(df)

newuser_rdd = df1.rdd
newuser_rdd.take(10)

complete_ratings_rdd = rating.rdd
complete_ratings_rdd.take(10)

movies = movies.withColumn("movieId", movies["movieId"].cast(IntegerType()))

# movies_rdd = movies.rdd
movies_rdd = movie_ratings.rdd
movies_rdd.take(10)
# y = movies_rdd.map(lambda x: x)
# y.take(1)

myRatedMovieIds = newuser_rdd.map(lambda x: x[1])
list = myRatedMovieIds.take(11)

candidates = movies_rdd.map(lambda x: x if x[0] not in list else 0)

candidates = candidates.filter(lambda x: x is not 0)
# candidates.take(30)

## Transform RDD to DataFrame
candid = sqlContext.createDataFrame(candidates)
# cand.show()

## Apply the candidates into the best model predictor
predictions = best_model.transform(candid)
# p.show()

## Drop Nans values
predictions = predictions.dropna()
# p.show()

# Sort by highest predictions
recommendations = predictions.sort(desc("prediction"))

# Removed duplicated movies
rec = recommendations.dropDuplicates(["movieId"])

# Sort again by highest predictions
recommend = rec.sort(desc("prediction"))

# Print Recommendations
print("\nRecommendations\n")
recommend.show(10)

print("\nRecommended Movies\n")
recommend.select("title").show(10)


Recommendations

+-------+------+------+----------+--------------------+--------------------+----------+
|movieId|userId|rating| timestamp|               title|              genres|prediction|
+-------+------+------+----------+--------------------+--------------------+----------+
|    527|   214|   5.0| 978381970|Schindler's List ...|           Drama|War| 5.0367374|
|    760|   153|   5.0|1046739825|   Stalingrad (1993)|           Drama|War| 5.0352488|
|    750|   214|   5.0| 978468585|Dr. Strangelove o...|          Comedy|War|  5.022012|
|   1617|   216|   4.5|1095792780|L.A. Confidential...|Crime|Film-Noir|M...|  5.005936|
|  79469|   450|   5.0|1475736993|Northerners, The ...|              Comedy| 5.0019436|
|  30810|   275|   5.0|1326920440|Life Aquatic with...|Adventure|Comedy|...|  4.998276|
|  53123|   117|   5.0|1320636446|         Once (2006)|Drama|Musical|Rom...|  4.988882|
|   2571|   565|   5.0| 954602535|  Matrix, The (1999)|Action|Sci-Fi|Thr...| 4.9887924|
|   1276|   11

# 3) Conclusions and Discussions

We implemented a collaborative filtering approach which treats the values in the user-items matrix as explicit user preferences (i.e. ratings directly given by the user). The ALS algorithm in Spark decomposes the user-item matrix into a product of latent factor matrices which are then used for predicting ratings. Therefore, we achieve a system that can recommend movies based on the user's preferences (ratings). Although this is an efficient way to predict non available ratings and therefore recommend products, there is still some issues which we came across in this work. Firstly, there is the problem of new users who have not rated the movies i.e. cold start problem. This generated 'NaN; prediction estimates for these users and hence had to be removed from the dataset. Secondly, collaborative filtering only really has a good accuracy with larger training data set size. Reducing the training set size increased the root-mean square error. 

The machine learning parameters used as part of our grid search were rank and the number of iterations. The larger the rank, generally the better our model is, as more latent factors are used. However, the downside is the computational time required. We chose an arbitary range of initial training parameters in our parameter grid (1-20). The Mlib library also provides an option for Implicit feedback to treat values in the user-item matrix implicitly as user preferences (i.e. clicks, views etc.) where the higher the number, the larger the level of confidence in user preferences.[2]Based on this information, latent factors are inferred.

We split our dataset into training and test set, and used a grid search approach (mentioned above) to train multiple models on the training set.The best model is then used to predict the Root Mean Square Error (RMSE) in the test data set.  

As the last section, we have used our trained model to recommended additional movies for a new user based on ratings given for 11 movies. 

#### References

[1] Zhou, Y., Wilkinson, D., Schreiber, R. and Pan, R., 2008, June. Large-scale parallel collaborative filtering for the netflix prize. In International Conference on Algorithmic Applications in Management (pp. 337-348). Springer Berlin Heidelberg.

[2] Hu, Y., Koren, Y. and Volinsky, C., 2008, December. Collaborative filtering for implicit feedback datasets. In Data Mining, 2008. ICDM'08. Eighth IEEE International Conference on (pp. 263-272). Ieee.

https://sourceforge.net/p/jupiter/wiki/markdown_syntax/
