Recommender Systems using PySpark
=============
Recommender Systems have derived great importance in understanding user behavior. It has found its extensive application in e-commerce, user modelling,etc. PySpark is one of the best tools for implementing recommender systems for date of size  upto 5GB. This is because of extremely fast computations in pyspark through its in-memory processing capabilities.
> Never update the PATH env variable in \/etc\/environment directly the way you modify other variables. Rather add the expanded path in the exisiting PATH env variable declaration as bellow

What is Map-Reduce Operation?
-----
MapReduce consists of three Phases
1. Mapping - Lines to (key,value)
2. Shuffle and Sorting - (key,value) goes to unique reducer 
4. Reduction - Processing of data corrosponding to that key
For more information look into [http://www.tutorialspoint.com/map_reduce/map_reduce_introduction.htm]

Hadoop stores the output of mapper into HDFS datanode at the end of Mapping phase, which consumes a lot of time. Even for shuffle phase, it take some time, because each reducer machine's HDFS is supposed to consists of all the data with one or group of keys atomically. What I mean by Atomic is that you han't have data corrosponding to  a single key stored in different reducer machines. Finally the reading of reducer input and dumping it to final location again takes a toll, because of latency introduced by IO operations.
These all problems, completely go away by use of Spark as it does in-memory processing using RDD's and tries to minimize disk-access. It has only one time IO access, which is during sort and shuffle phase.




![Map Reduce word count Diagram](MapReduceWordCountOverview1.png "Word Count - A map-reduce example")

What is HDFS File System and how does a program execute on a cluster?
-----
HDFS File system stands for Hadoop's Distributed File System. It is used to manage data stored in distributed manner across various machines. Essentially, lets say I have 50 PC or 50 EC2 Machines in Hadoop Cluster. Each of them have lets say 1 TB Storage capacity and 4/8 GB Ram + 2.5GHz Processor. Now we want to use all of these resources that is what we call Distributed Systems.
Now I have 10 TB of data, which needs to be processed, a single machine can anyway not store it. So I need to store it in all these machines, and handle it. HDFS file systems helps you do the same. It creates a virtual File system called HDFS. It has a master or namenode which consists of metadata of the  storage on different machines, and abstracts it like a single large unit. The actual data resides on individual machines (50 PC or AWS Virtual machines). These machines store data as datanode. Now HDFS, cleverly integrates these storages, to make a giant virtual storage. That is why it is so popular in Big Data Community.



Functional Programming
============
A programming paradigm is called purely functional if changes introduced by a function are only based on input and only changes/intends to change the output. Like setting of global variable in function is Non-functional, etc. In other programming paradigms, functions are consider second class datatypes, but in functional programming, functions are treated par to par with any datatype. A function can be passed to another function as input, etc. Some examples are Javascript, Scala, Lambda functions in python.
Development of Spark was heavily inspired by Scala, and hence the API and structure derives quite a lot of similarites with Scala.

Features of Spark
============
Spark has 6 basic properties which makes it robust

1. Immutability
2. Lazy execution and Lineage support
3. Cacheable
4. Type Infered
5. Compile Time Safe
6. Functional in nature

It has 3 basic ways  of treating data : 
1. RDD - Basic building blocks
2. Dataframes - For Optimized query like execution
3. Datasets - Dataframes + type-support

We will only look into what is RDD to get a context for the latter topics in Tutorial.Full form of RDD is Resilient Distributed Datasets. As the name suggests, it is immutable dataset form, which can behave like single unit while residing in different systems. HDFS is used commonly as distributed storage form for RDD's. However the processing happens in-memory and only uses HDFS in case of insuffient storage in memory for computations.
Lazy execution means, It only executes when there a store or print command, or in other ways whenever output is requested. A good example is how we prepare for exams. The syllabus keeps piling up, but we start studying only 24 hrs prior to exams.


Basics of pyspark
======
Pyspark is a python wrapper over spark. The important thing is to understand how it works with sparkcontext. Let us look at a few examples to understand it in brief.





Look into the bellow link for more examples 
[https://github.com/mahmoudparsian/pyspark-tutorial]


Recommender Systems
======
Today, ECommerce Industry has heavy demand of understanding user behaviour. It employs various methods, starting from bootstapping in knowledge graph, to nlp for understanding the same. Recommender System is a specific application for the same. 
![Recomender System ](recommender1.png "Recommender System")
Almost all Recommender Systems consist of sparse storage of User Item Matrix. The example we have considered for tutorial is predicting movie rating for a user who has not yet watched it. Here the item is movie.
You can use user-user or item-item similarity as such. which uses cosine similarity for knowing thee correlation between two items.
![User Item Matrix](movies.png "User Item Matrix")
In the above matrix, we need to have two elements which are unknown. The way to find it, is to factorize the matrix into UV, such the the multiplication of U and V results in matrix which is nearest possible to the original matrix. The number of matrix in this optimization problem is same as number of known elements in matrix. We can also combine these constraints as shown in the figure bellow.

We are going to use Alternating Least Squares solution for Recommend the unlabelled elements in the matrix.
![Alternating Least Square - Matrix Factorization ](matrix_factorization.png "Matrix Factorization")




Recommender in Pyspark
======


We will be using ALS algorithm and hence we have imported it first. The next step is to read the date from its source into RDD. This is done via spark context

In [1]:
import findspark
import os
findspark.init()
import pyspark
sc = pyspark.SparkContext()

from pyspark.mllib.recommendation import ALS
#All Data
ratings_raw_data = sc.textFile('ml-latest-small/ratings.csv')
movies_raw_data= sc.textFile('ml-latest-small/movies.csv')
#Headers
ratings_raw_data_header = ratings_raw_data.take(1)[0]
movies_raw_data_header = movies_raw_data.take(1)[0]


Let us observe the content of movies.csv befor we start getting recommendations for a new user.

In [2]:
for idx,lines in enumerate(open('ml-latest-small/ratings.csv','r').readlines()):
    print lines
    if(idx>5):break;

userId,movieId,rating,timestamp

1,16,4.0,1217897793

1,24,1.5,1217895807

1,32,4.0,1217896246

1,47,4.0,1217896556

1,50,4.0,1217896523

1,110,4.0,1217896150



In [4]:
for idx,lines in enumerate(open('ml-latest-small/movies.csv','r').readlines()):
    print lines
    if(idx>5):break;

movieId,title,genres

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy

2,Jumanji (1995),Adventure|Children|Fantasy

3,Grumpier Old Men (1995),Comedy|Romance

4,Waiting to Exhale (1995),Comedy|Drama|Romance

5,Father of the Bride Part II (1995),Comedy

6,Heat (1995),Action|Crime|Thriller



In [3]:
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
#Print Ratings RDD Data
print ratings_raw_data_header.split(',')[0:-1]
for elem in ratings_data.take(5):print elem

[u'userId', u'movieId', u'rating']
(u'1', u'16', u'4.0')
(u'1', u'24', u'1.5')
(u'1', u'32', u'4.0')
(u'1', u'47', u'4.0')
(u'1', u'50', u'4.0')


In [5]:
movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
#Print Movies RDD Data
print movies_raw_data_header.split(',')[0:-1]
for elem in movies_data.take(5): print elem

[u'movieId', u'title']
(u'1', u'Toy Story (1995)')
(u'2', u'Jumanji (1995)')
(u'3', u'Grumpier Old Men (1995)')
(u'4', u'Waiting to Exhale (1995)')
(u'5', u'Father of the Bride Part II (1995)')


`Splitting the data in Train,Validation and Test sets in the done via randomSplit function in pyspark.`
> Note that the operations on RDD are functional in nature. In other words, you have lambda functions which are evaluating expressions on object state, rather than introducing any unknown effects. A programming paradigm is called purely functional if changes introduced by a function are only based on input and only changes/intends to change the output. Like setting of global variable in function is Non-functional, etc. Read this blog about functional programming : http://blog.jenkster.com/2015/12/what-is-functional-programming.html

In [7]:
training_RDD, validation_RDD = ratings_data.randomSplit([8, 2], seed=0L)
print 'Length of Train Set is : ',len(training_RDD.collect())
print 'Length of Validation Set is : ',len(validation_RDD.collect())

Length of Train Set is :  84132
Length of Validation Set is :  21207


In [8]:
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
#Printing a subset of modified data
print ratings_raw_data_header.split(',')[0:-1]
for elem in validation_for_predict_RDD.take(5): print elem

[u'userId', u'movieId', u'rating']
(u'1', u'16')
(u'1', u'150')
(u'1', u'161')
(u'1', u'165')
(u'1', u'204')


In [9]:
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

For rank 4 the RMSE is 0.893634858349
For rank 8 the RMSE is 0.896745287132
For rank 12 the RMSE is 0.901753642561
The best model was trained with rank 4


In [10]:
print '(userId,movieId,rating)'
for elem in predictions.take(5): print elem

(userId,movieId,rating)
((568, 1084), 3.9166594902518033)
((188, 1084), 4.216447131934498)
((33, 1084), 4.302628872278145)
((525, 1084), 3.8478979869621077)
((109, 1084), 4.464118455087252)


In [11]:
actual_rating = validation_RDD.map(lambda x : ((int(x[0]),int(x[1])),float(x[2])))
predicted_rating=predictions
joined_rating=actual_rating.join(predicted_rating)
for elem in joined_rating.take(5):print elem

((62, 2412), (1.5, 1.770752830443994))
((554, 1380), (3.0, 3.672310492776139))
((145, 903), (4.0, 4.269041620122563))
((403, 51709), (3.0, 3.507399339351336))
((262, 1270), (3.0, 3.8147462594987043))


References
--------
`The instructions are morphed version of the link bellow. I want to thank codementor for the amazing tutorial on recommender systems via PySpark`
https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1