### DATA 612 - Project #5

Zach Alexander  
7/4/2020

### Instructions

Adapt one of your recommendation systems to work with Apache Spark and compare the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R), sparklyr (R), or Scala. Please include in your conclusion: For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

##### Declaring time-tracking functions to measure performance in PySpark 

Before diving into the work, in order to measure and compare the performance of my work in Databricks (utilizing PySpark), relative to my work in R in previous weeks, I decided to load in two functions `tic()` and `toc()`. These will help set up a time window to measure the time elapsed from the beginning to the end of a certain piece of code. I'll use this throughout, but wanted to make sure I have these ready to go for my work in the following lines.

In [4]:
def tic():
    #Homemade version of matlab tic and toc functions
    import time
    global startTime_for_tictoc
    startTime_for_tictoc = time.time()

def toc():
    import time
    if 'startTime_for_tictoc' in globals():
        print("Elapsed time is " + str(time.time() - startTime_for_tictoc) + " seconds.")
    else:
        print("Toc: start time not set")

### Loading in Movies Data

Similar to how our professor walked through this process in our lecture this past week, I'll take a look at the MovieLens data, which I also did extensive analysis on the first few weeks of the course. Given that I ran multiple algorithms in past projects, and built recommender systems based on these ratings, I'll be able to see if an algorithm that I run on PySpark will perform better than my previous iterations in R.

##### Saving the movies data as a csv file

First, I saved the movies data as a separate csv file, and loaded it into the "Data" section of Databricks. After assigning it with a file type of .csv, indicating to read the first row as a header, and delimiter as a comma, I then created a Spark dataframe and saved it as `movies_df`. I then displayed this below:

In [7]:
# File location and type
movies_file_location = "/FileStore/tables/movies-2.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
movies_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(movies_file_location)

# display(movies_df)

### Loading in the Ratings Data

With the movies data loaded in as a Spark dataframe, I then did the same process for the ratings data. Since we'll be utilizing the ratings data to build our model later, we also need to define a schema, indicating the data type for each of the four columns. I saved the schema as `movies_schema`, saved the ratings data in a dataframe, `ratings_df`, and displayed it below:

In [9]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType

ratings_file_location = "/FileStore/tables/ratings-2.csv"

movie_schema = StructType([
  StructField("userId", IntegerType()),
  StructField("movieId", IntegerType()),
  StructField("rating", DoubleType()),
  StructField("timestamp", DoubleType())
])

# The applied options are for CSV files. For other file types, these will be ignored.
ratings_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(movie_schema) \
  .load(ratings_file_location)

# display(ratings_df)

We can also take a quick look at the dimensions of the dataframe:

In [11]:
print("Number of rows in ratings_df dataset : {}".format(str(ratings_df.count())))
print("Number of columns in ratings_df dataset : {}".format(str(len(ratings_df.columns))))

### Splitting into Training and Testing Datasets

Now, with our ratings and movies data loaded into Databricks successfully, I then was able to split the full `ratings_df` into a training and test dataset. I decided to do a split of 80/20, similar to past weeks. In the end, we can see the dimensions of our training and test datasets below:

In [13]:
(training, test) = ratings_df.randomSplit([0.8, 0.2])

In [14]:
print("Number of rows in training dataset : {}".format(str(training.count())))
print("Number of columns in training dataset : {}".format(str(len(training.columns))))
print("Number of rows in test dataset : {}".format(str(test.count())))
print("Number of columns in test dataset : {}".format(str(len(test.columns))))

In [15]:
from pyspark.ml.recommendation import ALS

tic()
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
toc()

In [16]:
tic()
model = als.fit(training)
toc()

In [17]:
tic()
predictions = model.transform(test)
toc()

In [18]:
# display(predictions)

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", 
    predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

In [20]:
# Evaluation results
print("RMSE : {}".format(str(rmse)))

### Assessing performance

After running the matrix factorization method above (ALS), and measuring the elapsed time from fitting the training data to the model as well as the time it took to make the predictions, we can see the following when we compare these processing times to our SVD factorization method in R in past weeks:

In [22]:
import pandas as pd

performance_table = pd.DataFrame([['PySpark', "{:.2f}".format(14.17) + " seconds", "{:.2f}".format(0.02) + " seconds"],
             ['R', "{:.2f}".format(1.7) + " seconds", "{:.2f}".format(0.06) + " seconds"]],
            columns = ['Platform', 'Fitting Training Data to Model/Computations', 'Making Predictions'])

performance_table

Unnamed: 0,Platform,Fitting Training Data to Model/Computations,Making Predictions
0,PySpark,14.17 seconds,0.02 seconds
1,R,1.70 seconds,0.06 seconds


Interestingly, we can see that R seems to perform faster during the time it takes to fit the training data to the matrix factorization model. However, we see a noticeable difference in speed in making predictions between PySpark and R. PySpark was able to make predictions on the full ratings dataset about **3 times faster** than our predictions in R.

### Second Algorithm Test -- Attempting to run SVD in PySpark and comparing to R

After reading a fair bit of documentation, it seems like it's not very straightforward to run SVD in PySpark. However, I'm going to attempt to do it here and measure its performance against the process I outlined in my Project #3 assignment a few weeks ago. Although I've already seen that running a similar matrix factorization method (ALS) above is much faster in PySpark, I did want to see if I could do an "apples to apples" comparison from one platform to the other.

Therefore, I initially decided to load in my user-movie matrix that I developed in R from Project #3. This matrix has already been imputed to have its missing values recalculated to the row mean. Therefore, this should be all set to run through SVD here in PySpark.

Below, you can see that I've successfully loaded in this user-movie matrix and assigned it to a spark dataframe of `R_movie_matrix`:

In [25]:
file_location = "/FileStore/tables/movie_matrix-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
R_movie_matrix = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# display(R_movie_matrix)

We can confirm that this matrix has the same rows and columns as our matrix in R.

In [27]:
print("Number of rows in R_movie_matrix dataset : {}".format(str(R_movie_matrix.count())))
print("Number of columns in R_movie_matrix dataset : {}".format(str(len(R_movie_matrix.columns))))

Next, we need to convert our spark dataframe into a RDD RowMatrix format in order to do matrix multiplication on it later. Therefore, I mapped through the dataframe and created a RowMatrix:

In [29]:
from pyspark.mllib.linalg import Vectors, DenseMatrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix, RowMatrix

rdd = R_movie_matrix.rdd.map(list)
mat = RowMatrix(rdd)


With our RowMatrix ready to go, I then was able to use the `computeSVD()` function in PySpark to run singular value decomposition with our k value equal to 20, which was the same value as my optimal SVD computations in R in Project #3. The goal here is to obtain the same eigen values for s:

In [31]:
svd = mat.computeSVD(20, computeU=True)
U = svd.U
s = svd.s
V = svd.V

After running SVD on our matrix, we can indeed see that we obtain the same eigenvalues for s:

In [33]:
print("Singular values are: %s" % s)

I then was able to save my U, s, and V variables, and created a sigma variable below that has the eigenvalues of s oriented in a diagonal matrix. I was also able to compute the transpose of V. I created two new DenseMatrix variables in order to do this.

In [35]:
import numpy as np

sigma = DenseMatrix(len(s), len(s), np.diag(s).ravel("F"))

V_transpose = DenseMatrix(V.numCols, V.numRows, V.toArray().transpose().ravel("F"))

With the new values computed and saved in proper PySpark formats, I was able to multiply the U orthonormal matrix by the sigma diagonal matrix by the V orthonormal matrix to obtain my SVD matrix with ratings predictions.

In [37]:
mat_ = U.multiply(sigma).multiply(V_transpose)

As we can see below, our new `mat_` matrix has the same dimensions as our original matrix we started with, which was to be expected. However, `mat_` should have our new prediction ratings computed for all user-movie combinations.

In [39]:
print(mat_.numCols())
print(mat_.numRows())

After doing a quick check on my `movie_matrix_svd` matrix that I computed in R for Project #3, I'm able to calculate the same prediction ratings here in PySpark. We can confirm by looking at the values of the first user, which are indeed the same as the user-predicted ratings for my Project #3 dataframe in R.

In [41]:
mat_.rows.first()

### Assessing performance of SVD in PySpark and R

After doing this apples-to-apples comparison between the two platforms, we can see the following processing time breakdown:

In [43]:
performance_table = pd.DataFrame([['PySpark SVD', "{:.2f}".format(502) + " seconds", "{:.2f}".format(0.07) + " seconds"],
             ['R SVD', "{:.2f}".format(1.7) + " seconds", "{:.2f}".format(0.06) + " seconds"]],
            columns = ['Platform/Method', 'Computing SVD', 'Making Predictions'])

performance_table

Unnamed: 0,Platform/Method,Computing SVD,Making Predictions
0,PySpark SVD,502.00 seconds,0.07 seconds
1,R SVD,1.70 seconds,0.06 seconds


### Conclusion

After working through both ALS and SVD in PySpark, I've found a few takeaways:  

+ ALS in PySpark seemed to perform about **three times faster** than SVD in R when looking strictly at its ability to make ratings predictions  
+ ALS in PySpark was much slower than R, about 14 seconds to 2 seconds, in terms of fitting the training data to the model and computing singular values in R (although this is not a true comparison, so doesn't hold much weight)  

Although we cannot really draw too many concrete conclusions from this initial comparison, it does look like PySpark may be more efficient at making predictions once the data has been processed and/or fit to the training data. When we ran the SVD technique in PySpark as well, and compared this to the same computations from Project #3, this seems to confirm these findings:  

+ SVD computation in PySpark was *very* slow, taking about 502 seconds (~8.5 minutes) with a k-value of 20. This was much slower than the approximately 2 seconds it took to run this function in R.  
+ However, once SVD had been computed in PySpark, it seemed to make predictions on our user-movie matrix **at about the same efficiency** as we saw in R (about 0.06 seconds for each).  


##### Envisioned Implimentation
This makes me consider a few things about implimentation -- 1) Using SVD in PySpark may not be the most efficient technique to use for matrix factorization and dimensionality reduction. If we were to role this out to production, we'd seriously want to think through the implications of using a technique that takes about 8.5 minutes to process a relatively small dataset of ratings. If SVD did yield the best predictions and was chosen to role out to production, it would be important to think about how this system would handle inputs of new ratings. Likely, it would be advantageous to run updates to the predictions (based on new data) in batches, in order to prevent slow processing times on the front-end of an application. However, if it were possible to use ALS as a good alternative to SVD, it may be more worthwhile to do so. 2) PySpark seems to do a good job of making predictions and utilizing the data it has available once it has been processed. Although we saw slower processing times in PySpark than we did in R, the efficiencies of making faster predictions may be more beneficial when thinking about implementation. As mentioned above, batch updates could help keep the models up to date as much as possible, while still having the advantages of faster processing time for predictions. In the end, it would be important to consider the pros and cons to these two factors in determining which algorithms, techniques and platforms to use to implement a recommender system in production.  

##### Transitioning to Distributed Platform
I could definitely see it becoming necessary to transition to a distributed platform when the volume of data reaches into the millions and billions. Given that PySpark would be able to handle computations in a distributed fashion, eventually the larger your data volume, the more efficient it'll be. For our example utilizing the MovieLens data (and doing computations over a thousand rows), we didn't see the full advantages of Spark. However, once an application or system is scaled to meet the demand of a much larger volume of users/clients (i.e. millions/billions, big data, etc.), it would not be possible to process these consuming algorithms and techniques in a non-distributed fashion.

##### Added Complexity of Using Spark
I will say that it was a bit tricky to get acclimated to Spark this week. The resources are helpful, but at this point in time, it's difficult for me to fully wrap my head around the nuances of distributed data types such as RowMatrices, Spark dataframes, etc., and how they operate differently from something like a Pandas dataframe. With time, I'm sure these types of things become more familiar, but for those running algorithms/machine learning techniques on smaller datasets, I can definitely see why some data scientists tend to work in a non-distributed system to save on time, confusion (and sometimes efficiency).