# Spark Mllib

__The outline of this notebook__:

I. Hadoop ecosystem and Spark 

II. Recommendation System using Spark MLlib

## Hadoop ecosystem and Spark 

We first need to understand Hadoop and its ecosystem before dive deep into Spark. The Hadoop ecosystem is demonstrated as the below figure.

![hadoop-ecosystem](spark-imgs/Hadoop-Ecosystem.jpg)

A very goods articles about Hadoop ecosystem could be found [here](https://data-flair.training/blogs/hadoop-ecosystem-components/). It is worth reading for beginners. 

We do not see Spark in the above ecosystem. However, Apache Spark was created as an alternative to the implementation of MapReduce in Hadoop to gain efficiencies measured in orders of magnitude. 

The Spark core is complemented by a set of powerful, higher-level libraries which can be seamlessly used in the same application. These libraries currently include SparkSQL, Spark Streaming, MLlib (for machine learning), GraphX and SparkR, each of which is further detailed in [this article](https://www.toptal.com/spark/introduction-to-apache-spark).

![spark-components](spark-imgs/Apache-Spark-Components.jpg)

Additional key features of Spark include:

- Currently provides APIs in Scala, Java, Python and R.
- Integrates well with the Hadoop ecosystem and data sources (HDFS, Amazon S3, Hive, HBase, Cassandra, etc.).
- Can run on clusters managed by Hadoop YARN or Apache Mesos, and can also run standalone.

There are many concepts to learn about Hadoop and Spark. However, we focus on Spark MLlib in this notebook.

## Recommendation System using Spark MLlib

### Types of Recommendation Engines

- Content-Based
- Collaborative Filtering
- Matrix Fatorization
- Deep Learning 

Spark MLlib library for Machine Learning use Alternating Least Squares for Collaborative Filtering implementation. 

With Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.

The image below (from Wikipedia) shows an example of collaborative filtering. At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

<img src=https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif />

Let's see this all in action!

### Set up and Imports

To start, create a `SparkSession` then import the recommendation system and evaluation algorithms.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

### Load data 

In [2]:
df = spark.read.csv('movilen-dataset/movies_ratings_df.csv', inferSchema=True, header=True)

In [3]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [4]:
df = df.select(['user_id', 'movie_id', 'rating'])

In [5]:
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|          user_id|          movie_id|            rating|
+-------+-----------------+------------------+------------------+
|  count|          1000209|           1000209|           1000209|
|   mean|3024.512347919285|1865.5398981612843| 3.581564453029317|
| stddev|1728.412694899974| 1096.040689457246|1.1171018453732597|
|    min|                1|                 1|                 1|
|    max|             6040|              3952|                 5|
+-------+-----------------+------------------+------------------+



### Training the Model and Making Predictions

To train the model and make predictions, we need a training and an evaluation set. Here the training set is 80% of randomly selected samples and the rest are for evaluation.

In [6]:
training, test = df.randomSplit([0.8,0.2])

In [7]:
als = ALS(maxIter=5, regParam=0.01, userCol='user_id', itemCol='movie_id', ratingCol='rating')

model = als.fit(training)

predictions = model.transform(test)

### Cold Start Predictions

When there are cold start users or items to make predictions on (ones not available in the model) the predictions produce NaNs as shown in the summary below. This also causes evaluation with the mean squared error to produce a NaN.

In [8]:
predictions.describe().show()

+-------+------------------+------------------+------------------+----------+
|summary|           user_id|          movie_id|            rating|prediction|
+-------+------------------+------------------+------------------+----------+
|  count|            199488|            199488|            199488|    199488|
|   mean|3024.0575272698106|1867.3710248235484| 3.584807106191851|       NaN|
| stddev|1732.0740352907983|1097.9873513030911|1.1175103643076427|       NaN|
|    min|                 1|                 1|                 1|-7.5180225|
|    max|              6040|              3952|                 5|       NaN|
+-------+------------------+------------------+------------------+----------+



To solve this problem, the rows can be dropped with `predictions.na.drop()`. A more streamlined way is to add the `coldStartStrategy="drop"` as a model parameter.

In [9]:
predictions = predictions.na.drop()
predictions.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|           user_id|          movie_id|            rating|        prediction|
+-------+------------------+------------------+------------------+------------------+
|  count|            199456|            199456|            199456|            199456|
|   mean| 3024.089002085673|1867.3020064575646| 3.584976135087438|3.5557930856596407|
| stddev|1732.0920121091578|1097.9597800874194|1.1174216829378705|0.8012721307528049|
|    min|                 1|                 1|                 1|        -7.5180225|
|    max|              6040|              3952|                 5|          8.444759|
+-------+------------------+------------------+------------------+------------------+



In [10]:
predictions.show()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(str(rmse))
print("Root-mean-square error = " + str(rmse))

+-------+--------+------+----------+
|user_id|movie_id|rating|prediction|
+-------+--------+------+----------+
|    673|     148|     5| 3.3422542|
|   5333|     148|     3| 2.2194846|
|    482|     148|     2| 2.8373501|
|   4169|     463|     2|  2.827493|
|     26|     463|     3| 3.9142697|
|   3328|     463|     4|  3.803137|
|   3032|     463|     4| 4.3333335|
|    202|     463|     3|  2.901569|
|    721|     463|     4| 3.4054084|
|   1980|     463|     2| 2.4942303|
|    392|     471|     4| 3.7530158|
|   5614|     471|     5| 4.0558205|
|   1699|     471|     5|    3.8143|
|   3704|     471|     5|  4.530177|
|   5345|     471|     4| 4.2094007|
|    588|     471|     4|  4.133444|
|     78|     471|     4| 3.7239463|
|   4172|     471|     3| 3.7793446|
|   3685|     471|     5| 4.5230346|
|   4482|     471|     4| 3.5656407|
+-------+--------+------+----------+
only showing top 20 rows

0.8927010502873391
Root-mean-square error = 0.8927010502873391


The RMSE described our error in terms of the stars rating column.


So now that we have the model, how would we actually supply a recommendation to a user?

The same way we did with the test data! For example:

In [11]:
single_user = test.filter(test['user_id']==6).select(['movie_id','user_id'])

In [12]:
# User had 14 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+--------+-------+
|movie_id|user_id|
+--------+-------+
|      17|      6|
|      34|      6|
|     296|      6|
|     595|      6|
|     920|      6|
|    1101|      6|
|    1688|      6|
|    1806|      6|
|    1947|      6|
|    2006|      6|
|    2082|      6|
|    2100|      6|
|    2506|      6|
|    3524|      6|
|    3682|      6|
+--------+-------+



In [13]:
reccomendations = model.transform(single_user)

In [14]:
reccomendations.orderBy('prediction',ascending=False).show()

+--------+-------+----------+
|movie_id|user_id|prediction|
+--------+-------+----------+
|    2506|      6|  4.853216|
|     920|      6| 4.7831426|
|    1688|      6| 4.6620526|
|    1101|      6| 4.5707235|
|    1947|      6|  4.472553|
|    2006|      6| 4.2910757|
|     595|      6| 4.1513247|
|    2082|      6| 4.1028743|
|      17|      6| 3.8527308|
|    3524|      6| 3.6430182|
|    2100|      6| 3.5029235|
|    3682|      6| 3.5004683|
|      34|      6| 3.2677448|
|    1806|      6| 3.2543948|
|     296|      6| 1.6700982|
+--------+-------+----------+

