# MapReduce Exercise #3

## Netflix Recommendation Engine

### Background:

Netflix is an American company. They started in the **DVD by mail** business around 1998. In 2007 they
expanded its business with the introduction of streaming media (like online TV series and movies). Netflix
debuted its first series **House of Cards**.

Today Netflix is available aver 190 countries, has 86 million subscribers
and produces around 126 original series or films a year

### Recommendation Engine

The core business of Netflix is their recommendation algorithm. It is used to recommend users on
movies/series they might like (thus keeping them attracted to their services) and gives insights to the company
on what kind of movies their users like (Director, actors, genres, etc.) so the company can produce content that
will fit the users' desires.


### Netflix Prize
The Netflix Prize was an open competition for the best algorithm to predict user ratings for films, based
on previous ratings (without any other information about the users or films). The grand prize of $1,000,000 was
given in 2009 to **Pragmatic Chaos team** which bested Netflix's own algorithm for predicting ratings by **10.06 %**


### Types of Recommendation Engines
1. Recommend the most popular items
    * Always recommend the items which are liked by most number of users.
    * Fast and easy to implement.
    * No personalization (all the users will get the same recommendation).
    * Example: news portals.

2. Content based algorithm
    * Idea: If you like an item then you will also like "similar" items.
    * Works well when it's easy to determine the properties of each item (so we can define which items are "similar").
    * Example: movie or song recommendations.
    
3. Collaborative filtering algorithm
    * Idea: If a person A likes item 1, 2, 3 and B likes 2, 3, 4. Then they have similar interests and A should
like item 4 and B should like item 1.
    * The algorithm is entirely based on past behavior and is not dependent on any additional information.
    * Example: product recommendations in e-commerce (like Amazon)


---

## The exercise
* We will build our own Recommendation System using MapReduce.
* The type of engine we want to implement is a version of the **"Content based algorithm"**.

### Similarity
We define similarity between two movies like this:

`sim(movie1, movie2) = Number of users who liked both movie1 and movie2`

#### Example:
If:

* `User A likes movies: 1,2,3,4`
* `User B likes movies: 1,2,5`
* `User C likes movies: 1,2,4`

Then:
```
| sim | 1 | 2 | 3 | 4 | 5 |
|-----|---|---|---|---|---|
| 1   | - | 3 | 1 | 2 | 1 |
| 2   | 3 | - | 1 | 2 | 1 |
| 3   | 1 | 1 | - | 1 | 0 |
| 4   | 2 | 2 | 1 | - | 0 |
| 5   | 1 | 1 | 0 | 0 | - |
```


* What movie is the most similar to movie 1?
    * Just go to the row of movie 1 and find the movie with the highest **similarity value**. In this case its movie 2 with a similarity of 3.
    * Therefore, if a new user "likes" movie 1, our recommendation system will recommend him to watch also movie 2.

### The Challenge
What's the difficulty in this approach?

None really! Just a little bit of planning from our side. Let's define the steps to solve this problem:

#### Step 1 - List of pairs
For every user, we want to build a list of pairs between all the movies (sorted by the first value),
for example:
```
User A: (1,2) (1,3) (1,4) (2,3) (2,4) (3,4)
User B: (1,2) (1,5) (2,5)
User C: (1,2) (1,4) (2,4)
```

##### _Bonus Question_
How many pairs are going to be there for every user?

Answer: `(n choose 2) = n!/((n-2)!2!)`, where `n = # movies liked`

#### Step 2 - Similarity Value 
Sum how many pairs have the same values (this will give us the "similarity value" defined above), for example:
```
(1,2) = 3
(1,3) = 1
(1,4) = 2
You get the point...
```

With this information we can build our similarity table and the recommendation engine is done!

#### A Big Data problem
We can implement the algorithm without any problem. But what will happen if we try to apply it on all Netflix's data? Let's do some math..

* Netflix has `86 million users`.
* Let's imagine that on average every user liked `50 movies`.
* Then for every user we will have `(50 choose 2) = 1225` pairs of movies.
* In total, there would be: `86,000,000 * 1225 = 105,350,000,000` pairs!
* That would be approximately `1.4 Terabytes (!!)` of produced output.
* And for every pair we need to find the times it appears! (In other words, we need to search 1.5T for every pair to find duplicates!!)
* This will take a loooooot of time and a complex logic because our OS won't be able to keep all that info the memory!
* This is a classic problem that can be solved using Map Reduce and that will be our task.

Calculation for geeks - How did we arrive to 1.4 TB?
* Java int = `4 bytes`.
* Java char = `2 bytes`.
* Assume we save each pair as above, then we have 3 chars for `(`, `)` and `,` + two integers for each movie ID
* So each pair will be stored with `14 bytes`.
* `1 GB = 1,073,741,824 bytes`
* `105,350,000,000 pairs * 14 bytes =  1,474,900,000,000 bytes = 1,373 GB = 1.4 TB`

 
---
 
## The implementation
### Goal:
* For each movie, find the top 10 movies with highest similarity (and their 'similarity value').

 
### Input:
* File: `ratings_small.csv`.
* Schema: `userId, movieId, rating, timestamp`.

### Output:
* File: `recommendations.csv`.
* Schema: `movieId, first_similar_movie_id (x 'Similarity Value'), ... tenth_similar_movie_id (x 'Similarity Value')`
* Example: `5380, 780 (x23), 588 (x18), ..., 8665 (x15)`.
    * Meaning: If a user lies movie `5308`, we will recommend him movie `780` as the first option, etc.

### Jobs:
Take this as a helpful hint, you can think about another algorithm to solve the problem. 
Each Job can be a mix of Map/Reduce or just one of them.

#### Job #1
Convert input file to a file that contains the userID followed by a list of all the movies the user liked.
Schema: `userID, movie1Liked, movie2Liked, ..., movieNLiked`

#### Job #2
Compute all movie pairs (with the mapper) and the similarity between two movies (reducer)

Hint: The reducer's output should be: `Key: movie1ID, Value: <similarityValue, movie2ID>` 

#### Job #3
Receive a movie and all the movies with similarity bigger than 0 and write the movieID and the 10 movies with the highest similarity (reducer)
Merge the output into a new file `recommendations_merged.csv`.

#### Cleaning the output
Once you confirm that the output in `recommendations_merged.csv` is the expected one. Remove the string `(x 'Similarity Value')` from each column 
so you end up with a dataset of only numbers.

Save this output to a new file: `recommendations_clean.csv`

* Schema: `movieId, first_similar_movie_id, ... tenth_similar_movie_id`
* Example: `5380, 780, 588, ..., 8665`.

## Netflix MRJob

In [3]:
from mrjob.job import MRJob
from mrjob.step import MRStep

class NetflixRecommendations(MRJob):
    
    def steps(self): 
        return [MRStep(mapper=self.mapper_1_user_and_movie, 
                       reducer=self.reducer_1_user_and_all_movies),
                MRStep(mapper=self.mapper_2_compute_pairs,
                       reducer=self.reducer_2_sim_between_two_movies),
                MRStep(reducer=self.reducer_3_top_10_similar_movies)
               ]

    def mapper_1_user_and_movie(self, _, line): 
        (userID, movieID, rating, timestamp) = line.split(',')
        if rating != 'rating':
          yield userID, movieID
        
    def reducer_1_user_and_all_movies(self, user, movies):
        """
        Output: Key: UserID, Value: String representing all the movies liked by the user
        """
        yield user, list(movies)
        
        
    def mapper_2_compute_pairs(self, user, liked_movies):
        """
        For each pair of movies liked by the same user,
        generate the following output: Key: <movie1, movie2>, Value: 1
        So we can count in the reducer the amount of times each pair appears.
        """
        
        for i in range(len(liked_movies)):
          for j in range(i+1, len(liked_movies)):
            movie1 = liked_movies[i]
            movie2 = liked_movies[j]
            if movie1 <= movie2:
              yield (movie1, movie2),1
            else:
              yield (movie2, movie1),1
    
    def reducer_2_sim_between_two_movies(self, pair, counts):
        """
        Count the amount of times each pair appears
        """
        similarity_value = sum(counts)
        movie1, movie2 = pair
        if movie1 != '' and movie2 != '':
          yield movie1, (movie2, similarity_value)
          yield movie2, (movie1, similarity_value)
          
    def reducer_3_top_10_similar_movies(self, movie1, simval_movie2_values):
        """
        Sort and save movies to output
        """
        simval_movie2 = list(simval_movie2_values) # This can be improved by using a MaxHeap of size 10 so we don't load all the list into memory.
        simval_movie2_sorted = sorted(simval_movie2, key=lambda x: x[1], reverse=True) # There are many other ways for sorting, this is one of them.
        top_10 = simval_movie2_sorted[:min(len(simval_movie2_sorted), 10)]
       
        yield movie1, ",".join([tup[0] for tup in top_10])


In [4]:
input_path = "/dbfs/FileStore/tables/ratings_small.csv"
output_path = "/dbfs/FileStore/tables/netflix_recommendations"
no_dbfs = output_path[5:]
NetflixRecommendations(args=[input_path,  "-o", output_path+'/output']).execute()
dbutils.fs.head(no_dbfs+'/output/part-00000')

##Cleaning the output

#### Merge output files

In [7]:
import os 
parts = os.listdir(output_path+'/output')
output_file = os.path.join(output_path, 'recommendations_merged.csv')

# What's the problem with merging files this way? How can we solve it?
with open(output_file, 'w') as outfile:
    for filename in parts:
        with open(os.path.join(output_path, 'output', filename), 'r') as infile:
            outfile.write(infile.read())

In [8]:
# Check the merged output looks right
with open(output_file) as f:
  for l in f.readlines():
    print(l)

#### Clean merged output file

In [10]:
clean_out_file =  os.path.join(output_path, 'recommendations_clean.csv')
with open(output_file, "r") as f_read:
    with open(clean_out_file, 'w') as outfile:
      for line in f_read:
        movie, recommendations = line.split('\t')
        
        movie = movie.replace('"','')
        recommendations = list(map(lambda r: r.split(' ')[0], recommendations.replace('"','').split(',')))
        recommendations = ",".join(recommendations)
        
        row = "{},{}".format(movie,recommendations)
        outfile.write(row)
        outfile.write('\n')

In [11]:
# Check the cleaned output looks right
with open(clean_out_file) as f:
  for l in f.readlines():
    print(l)

In [12]:
%sql
-- Set the path to be as the 'output_path' without '/dbfs'
DROP TABLE IF EXISTS nextflix;
CREATE TABLE IF NOT EXISTS nextflix(
movie STRING
,recommended_1 STRING
,recommended_2 STRING
,recommended_3 STRING
,recommended_4 STRING
,recommended_5 STRING
,recommended_6 STRING
,recommended_7 STRING
,recommended_8 STRING
,recommended_9 STRING
,recommended_10 STRING) 
USING CSV OPTIONS (path "/FileStore/tables/netflix_recommendations/recommendations_clean.csv", header "false", delimiter=',');
SELECT * FROM nextflix;