# CUNY DATA612 Summer 2020 - Project 5

# John K. Hancock

### Project Scope
The goal of this project is give you practice beginning to work with a distributed recommender system. It is sufficient for this assignment to build out your application on a single node.
Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala. Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

### Project Plan

For this project, I switched to Python, specifically Apache Spark using the PySpark libraries in order to learn something new. This project was a real stretch for me. I chose to build the movie recommender on the distributed system using Databricks. Additionally, I created a model using Alternating Least Squares ("ALS"). The project begins with a discussion of ALS. Next, I prepare and explore the data. I then split the data into test and train, discuss the ALS parameters and hyper-parameters, fit the model, make predicitions, and finally evaluate the model.

##### Citation: This project is based on the DataCamp class: "Building Recommendation Engines with PySpark" by Jamen Long
###### https://learn.datacamp.com/courses/recommendation-engines-in-pyspark

### Alternating Least Squares (ALS)

Commonoly used in Recommender systems, Collaborative based filtering models are used to find users who share similar interests in order to recommend products to users. User-User Collaborative filtering finds users that are similar to the target user. Item-Item Collaborative filtering finds and recommends items that are similar to items with the target user. Normally, we track user similarities by user ratings matrix where users are the rows and the items are the columns. These user ratings are recorded in a matrix, and most often the matrix will be sparse as most users wont rate most products.

Enter Alternating Least Squares ("ALS"). ALS uses a factorization method called non-negative factorization. ALS factors a matrix into 2 matrixes starting with random numbers. It then re-populated the two matrixes unti it approximates the original matrix. It holds the original matrix and the first factored matrix constant while it makes adjustments to the second factored matrix.  In other factorizations, the matrixes can return negative values after factorization. ALS requires that factorizations only return positive values. The dimensions of the factor matrixes that don't match the original matrix are called the Rank or Latent features. We can choose the rank or the number of latent features. The Root Mean Squared Error determines how close the factored matrixes are to the original matrix. Through iterations, ALS adjusts both the first and second matrix until the RMSE is minimized. The end result is a dense matrix which can be used to fill in the missing values of the sparse matrix.

### Load the Libraries

In [6]:
#Import the pyspark sql libraries.  These are used to execute sql queries.
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, avg, min

#Import the pyspark machine learning libraries
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.recommendation import MatrixFactorizationModel, Rating
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics


sqlContext = SQLContext(sc)

import numpy as np
import pandas as pd

### Data Preparation and Exploration
In this section, I loaded the GroupLens MovieLens ratings dataset. Using the textFile method on the Spark context object, "sc", the data is read into Spark's primary data structure, Resilient Distributed Dataset (RDD) =  Resilient, fault tolerant; Distributed, data resides on multiple nodes; Dataset, the records of the dataset.

In [8]:
data = sc.textFile("/FileStore/tables/ratings.csv")
data.take(5)

I then converted the RDD into a spark data frame using the Spark SQL method read.csv method.

In [10]:
ratings = spark.read.csv(data, header=True, inferSchema=True)
ratings.take(5)

The columns of the dataset are 'userIsd', 'movieId', 'rating', and 'timestamp'.  Over the next two blocks, we see the first 20 columns of the dataset along with the schema.

In [12]:
print(ratings.columns)

In [13]:
print(ratings.show())

In [14]:
ratings

The following block computes the sparsity of the ratings matrix which shows 98.3% empty

In [16]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()
denominator = num_users * num_movies
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [17]:
ratings.select("rating").count()

Below we see the first 20 users in the dataset

In [19]:
ratings.filter(col("userId") < 100).show()

##### Users with the most ratings

In [21]:
print("Users with the most ratings: ")
display(ratings.groupBy("userId").count().sort(col("count").desc()))

userId,count
414,2698
599,2478
474,2108
448,1864
274,1346
610,1302
68,1260
380,1218
606,1115
288,1055


In [22]:
print("Users with the most ratings.")
ratings.groupBy("userId").count().sort(col("count").desc()).show()


In [23]:
print("Users with the fewest ratings: ")
ratings.groupBy("userId").count().sort(col("count").asc()).show()

##### Users with the Fewest Ratings

In [25]:
print("Users with the fewest ratings: ")
display(ratings.groupBy("userId").count().sort(col("count").asc()))

userId,count
406,20
431,20
194,20
53,20
569,20
320,20
576,20
257,20
595,20
189,20


In [26]:
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().sort(col("count").asc()).show()

##### The average number of ratings per movie is 10.37

In [28]:
# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieId").count().select(avg("count")).show()

##### The fewest rating by a user is 20

In [30]:
print("User with the fewest ratings: ")
ratings.groupBy("userId").count().select(min("count")).show()

##### The average number of ratings by a user is 165.30

In [32]:
# Avg num ratings per users
ratings.groupBy("userId").count().select(avg("count")).show()

### Split Data into train and test

The data is split between train and test using randomSplit.  Additionally, I cached the data as well to ensure faster processing.

In [34]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)
train.cache()
test.cache()

### Create the ALS model - Parameters and Hyperparameters

userCol, itemCol and ratingCol are the parameters taken from the dataset.

"rank" represents the number of latent features that ALS allows you to set

"maxIter" tells the ALS how many times to adjust the matrixes to reduce the RMSE. Here I set it to 5 to adjust for time

"regParam" is the regularization parameter which is a number added to the error metric to keep it from overfitting

"nonnegative" ensures no negative numbers in the matrix factors

"coldStartStrategy" setting it to drop means preventing users whose data is entirely in the train set from being used to calculate the RMSE

"implicitPrefs" setting it to False means that I want the ALS model to treat the ratings as explicit preferences

In [36]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", 
          rank =15,
          regParam = .05,
          maxIter = 3,
          nonnegative = True,
          coldStartStrategy="drop",
          implicitPrefs = False)

In [37]:
type(als)

Below, I fit the model to the training set

In [39]:
model = als.fit(train)

Below I make predictions on the test set.

In [41]:
# Predict the model  
predictions = model.transform(test)
predictions.take(5)


### Evaluate the Model

Below I used the RegressionEvaluator function to create an evaluator object with the metricName, "rmse"

In [43]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [44]:
print(evaluator.getMetricName())
print(evaluator.getLabelCol())
print(evaluator.getPredictionCol())

In [45]:
# Calculate and print the RMSE of test_predictions
RMSE = evaluator.evaluate(predictions)
print(RMSE)

### Summary

The RMSE for this model is .911 compared to Assignment 2, the UBCF model resulted in a .99 RMSE.  The ALS model improved on the results.  The overall speed and efficiency of the Apache Spark model is far superior.  Project two's project had 22,112 ratings. This project had 100,836 ratings, yet the entire processing time for this project was lower. In my final project, I will use cross validation to find the best model.