#### Imports

In [1]:
import os
from pyspark.mllib.recommendation import ALS
import math

### Loading the Netflix dataset

In [3]:
netflix_dataset_path = "./../../Datasets/Netflix-prize-data/"

In [4]:
combined_data_1_location = os.path.join(netflix_dataset_path, "combined_data_1.txt")
combined_data_2_location = os.path.join(netflix_dataset_path, "combined_data_2.txt")
combined_data_3_location = os.path.join(netflix_dataset_path, "combined_data_3.txt")
combined_data_4_location = os.path.join(netflix_dataset_path, "combined_data_4.txt")

help_location = f'{combined_data_1_location},{combined_data_2_location},{combined_data_3_location},{combined_data_4_location}'

Combine all data to the same sark context:

In [7]:
combined_data = sc.textFile(help_location).cache()

In [8]:
print(f'There are {combined_data.count()} elements in the combined RDD')

There are 100498277 elements in the combined RDD


The available data represent a movie id, followed by many reviews from different users. We wish to structure the data and group them by each individual user, since our target is to predict the rating that each user would assign to a movie. The following example shows that for the movied with id 1, the user with id 1488844 has rated it with 3 stars out of 5, with the rating date to be the 6th of Spetmeber, 2005. The next lines follow the same pattern, until all reviews are exhausted and then a single line with the number '2:' will show up, followed by the corresponding reviews for the second movie etc.

In order to efficiently perform the Alternating Least Squares algorith, we will need to re-map the form of these lines into the pattern 'user_id, movie_id, rating'. We do not care about the date of the ratings, since we do not intend to use them in our model. 

#### Spark transformations

Remove the dates of ratings from the data:

In [9]:
def remove_dates(single_line):
    """
    Removes the date of reviews. 
    
    Inputs:
        single line -> A single line of the original RDD, represented as a list fo strings.
        
    Outputs:
        new_line -> A list of strings without the date. 
        
    We need to take special care of the lines that represent a movie id, which only have a single element 
    in their list representation.
    """
    
    if len(single_line)==1:
        return single_line
    else:
        return single_line[:-1]

In [10]:
no_dates_rdd = combined_data.map(lambda line: line.split(",")).map(lambda lst: remove_dates(lst)).cache()

In [11]:
no_dates_rdd.take(10)

[['1:'],
 ['1488844', '3'],
 ['822109', '5'],
 ['885013', '4'],
 ['30878', '4'],
 ['823519', '3'],
 ['893988', '3'],
 ['124105', '4'],
 ['1248029', '3'],
 ['1842128', '4']]

#### Assign a movie id to the rating

In [12]:
def append_acc(file_path, acc):
    
    movie_id = -1
    with open(file_path, 'r') as cd:
        for line in cd:
            if ':' in line:
                movie_id = line.split(':')[0]
            else:
                acc.append(movie_id)
    
    return acc

In [13]:
acc = append_acc(combined_data_1_location, [])
acc = append_acc(combined_data_2_location, acc)
acc = append_acc(combined_data_3_location, acc)
acc = append_acc(combined_data_4_location, acc)

In [14]:
len(acc)

100480507

From inspecting the 'combined_data_1.txt' file we can see that the movie id 2 is found in line 549. Taking into account that lines are indexed from number 1, and the first line contains the movie id 1, we expect to meet 547 reviews for the movie with id 1. furthermore, we expect to see that the 547th review is about the movie with id 2.

In [15]:
acc[546]

'1'

In [16]:
acc[547]

'2'

In order to be completely sure that no mistake was made, we check if there is an unexpected value to the first 546 elements

In [17]:
def find_unexpected(lst, correct_elem):
    """
    Finds if an unexpected element is found among elem in a list.
    
    Inputs:
        lst -> The list under consideration.
        elem -> The expected element to the list.
        
    Returns:
        None
        
    Outputs:
        A message of correctness or not.
    """
    
    for index, elem in enumerate(lst):
        if elem!=correct_elem:
            print(f'Found unexpected element: {elem} in position {index}')
            return
                  
    print("No unexpected element was found!")

In [18]:
find_unexpected(acc[:547], '1')

No unexpected element was found!


In [19]:
find_unexpected(acc[:548], '1')

Found unexpected element: 2 in position 547


Remove the movie id elements fromm the RDD

In [20]:
no_movie_ids_rdd = no_dates_rdd.filter(lambda elem: ':' not in elem[0]).cache()

We need to ensure that the number of elements in this RDD is the same with the number of elements in the accumulator:

In [21]:
no_movie_ids_rdd.count()

100480507

In [22]:
no_movie_ids_rdd.take(5)

[['1488844', '3'],
 ['822109', '5'],
 ['885013', '4'],
 ['30878', '4'],
 ['823519', '3']]

Since it is the same, we can proceed to the next steps!

Zip the RDDs with their index:

In [23]:
indexed_rdd = no_movie_ids_rdd.zipWithIndex().cache()

In [24]:
indexed_rdd.count()

100480507

In [25]:
indexed_rdd.take(5)

[(['1488844', '3'], 0),
 (['822109', '5'], 1),
 (['885013', '4'], 2),
 (['30878', '4'], 3),
 (['823519', '3'], 4)]

Now we can use the accumulator along with the indexes to assign a movie ID to each review, in the form of 
(user_id, movie_id, rating):

In [26]:
joint_rdd = indexed_rdd.map(lambda tpl: (int(tpl[0][0]), int(acc[tpl[1]]), int(tpl[0][1]))).cache()

In [27]:
joint_rdd.count()

100480507

In [28]:
joint_rdd.take(5)

[(1488844, 1, 3),
 (822109, 1, 5),
 (885013, 1, 4),
 (30878, 1, 4),
 (823519, 1, 3)]

Next step is to reorganize the reviews based on the user id. First we need to create a key-value pair schema:

In [29]:
key_value_schema = joint_rdd.map(lambda elem: (elem[0], (elem[1], elem[2]))).cache()

In [30]:
key_value_schema.count()

100480507

In [31]:
key_value_schema.take(5)

[(1488844, (1, 3)),
 (822109, (1, 5)),
 (885013, (1, 4)),
 (30878, (1, 4)),
 (823519, (1, 3))]

No we just need to sort by value and revert to the required form:

In [32]:
sorted_rdd = key_value_schema.sortByKey().cache()

In [33]:
sorted_rdd.count()

100480507

In [34]:
sorted_rdd.take(5)

[(6, (30, 3)), (6, (157, 3)), (6, (173, 4)), (6, (175, 5)), (6, (191, 2))]

Restore the sorted rdd in the requested format:

In [35]:
final_rdd = sorted_rdd.map(lambda elem: (elem[0], elem[1][0], elem[1][1])).cache()

In [36]:
final_rdd.count()

100480507

In [37]:
final_rdd.take(5)

[(6, 30, 3), (6, 157, 3), (6, 173, 4), (6, 175, 5), (6, 191, 2)]

## Alternating Least Squares in the Netflix Dataset

We split the data in a training, validation and test set. We have a huge number of data, so the split will be performed in 80%-10%-10% ratio respectively.

In [38]:
training_set, validation_set, test_set = final_rdd.randomSplit([8,1,1], seed= 400)

In [39]:
training_set.take(5)

[(6, 30, 3), (6, 175, 5), (6, 191, 2), (6, 197, 3), (6, 241, 3)]

In [40]:
validation_set.take(5)

[(6, 564, 4), (6, 658, 3), (6, 825, 3), (6, 1145, 3), (6, 1180, 3)]

In [41]:
test_set.take(5)

[(6, 157, 3), (6, 173, 4), (6, 445, 3), (6, 501, 3), (6, 705, 3)]

We also need to have a second instance of the validation and test sets without their ratings, so that we can predict the reviews and compare with the real ones:

In [42]:
validation_set_to_predict = validation_set.map(lambda elem: (elem[0], elem[1]))
test_set_to_predict = test_set.map(lambda elem: (elem[0], elem[1]))

#### Setting a handful of training parameters:

In [43]:
iterations = 10
regularization_parameters = [0.01, 0.05, 0.1]
ranks = [2, 4, 6, 8, 10, 12]

In [44]:
min_error = 1e3
best_rank = -1
best_regularization = -1

In [None]:
for rank in ranks:
    
    for regularization_parameter in regularization_parameters:
    
        model = ALS.train(training_set, rank, seed=5, iterations=iterations,
                          lambda_=regularization_parameter)
        predictions = model.predictAll(validation_set_to_predict).map(lambda elem: ((elem[0], elem[1]), elem[2]))
        true_and_predicted_rates = validation_set.map(lambda elem: ((int(elem[0]), int(elem[1])), float(elem[2]))).join(predictions)
        error = math.sqrt(true_and_predicted_rates.map(lambda elem: (elem[1][0] - elem[1][1])**2).mean())
        print('--------------------------------------')
        print (f'For rank {rank} and amount of regularization {regularization_parameter} the RMSE is {error}')
        if error < min_error:
            min_error = error
            best_rank = rank
            best_regularization = regularization_parameter

--------------------------------------
For rank 2 and amount of regularization 0.01 the RMSE is 0.88697628814868
--------------------------------------
For rank 2 and amount of regularization 0.05 the RMSE is 0.8871776166390885
--------------------------------------
For rank 2 and amount of regularization 0.1 the RMSE is 0.8914959584054967
--------------------------------------
For rank 4 and amount of regularization 0.01 the RMSE is 0.8681876907047872
--------------------------------------
For rank 4 and amount of regularization 0.05 the RMSE is 0.8672640338039733
--------------------------------------
For rank 4 and amount of regularization 0.1 the RMSE is 0.8753160642908002
--------------------------------------
For rank 6 and amount of regularization 0.01 the RMSE is 0.8596104071174606
--------------------------------------
For rank 6 and amount of regularization 0.05 the RMSE is 0.8561725042723435
--------------------------------------
For rank 6 and amount of regularization 0.1 t

#### Test the model's RMSE performance in the test set.

In [None]:
model = ALS.train(training_small_ratings, best_rank, seed=5, iterations=iterations,
                  lambda_=best_regularization)
test_set_predictions = model.predictAll(test_set_to_predict).map(lambda r: ((elem[0], elem[1]), elem[2]))
test_set_true_and_predicted_rates = test_set.map(lambda elem: ((int(elem[0]), int(elem[1])), float(elem[2]))).join(test_set_predictions)
test_set_error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

In [None]:
print(f'Test set RMSE performance: {test_set_error}')

## Generate predictions for new users

In [138]:
!ls ../../Datasets/Netflix-prize-data/

[31mREADME.md[m[m           [31mcombined_data_2.txt[m[m [31mcombined_data_4.txt[m[m [31mprobe.txt[m[m
[31mcombined_data_1.txt[m[m [31mcombined_data_3.txt[m[m [31mmovie_titles.csv[m[m    [31mqualifying.txt[m[m


In [148]:
movies_path = os.path.join(netflix_dataset_path, 'movie_titles.csv')
movies = sc.textFile(movies_path)

In [149]:
movies.take(5)

['1,2003,Dinosaur Planet',
 '2,2004,Isle of Man TT 2004 Review',
 '3,1997,Character',
 "4,1994,Paula Abdul's Get Up & Dance",
 '5,2004,The Rise and Fall of ECW']

Re-arrange to format movie_id, title, year:

In [150]:
movies = movies.map(lambda temp: temp.split(",")).map(lambda movie: [movie[0], movie[2], movie[1]])

In [151]:
movies.take(5)

[['1', 'Dinosaur Planet', '2003'],
 ['2', 'Isle of Man TT 2004 Review', '2004'],
 ['3', 'Character', '1997'],
 ['4', "Paula Abdul's Get Up & Dance", '1994'],
 ['5', 'The Rise and Fall of ECW', '2004']]

Now we need to get the average ratings of the movies, based on the user reviews:

In [153]:
def avg(lst):
    
    return sum(lst)/len(lst)

In [171]:
movie_and_ratings_temp = \
    final_rdd.map(lambda elem: (elem[1], elem[2])).groupByKey()

In [172]:
movie_and_ratings_temp.count()

17770

In [173]:
movie_and_ratings_temp.take(1)

[(4914, <pyspark.resultiterable.ResultIterable at 0x111ab9cc0>)]

In [176]:
movie_and_ratings_temp1 = movie_and_ratings_temp.map(lambda elem: (elem[0], list(elem[1])))

In [178]:
movie_and_ratings_temp1.take(1)

[(4914,
  [4,
   4,
   3,
   4,
   4,
   2,
   2,
   4,
   4,
   5,
   3,
   3,
   1,
   3,
   3,
   4,
   3,
   3,
   3,
   3,
   4,
   2,
   3,
   3,
   4,
   3,
   3,
   4,
   3,
   1,
   4,
   2,
   2,
   4,
   3,
   4,
   4,
   2,
   4,
   3,
   5,
   3,
   2,
   4,
   4,
   4,
   4,
   3,
   5,
   4,
   4,
   2,
   2,
   5,
   4,
   3,
   4,
   2,
   5,
   4,
   3,
   1,
   4,
   3,
   3,
   3,
   2,
   4,
   4,
   3,
   2,
   4,
   4,
   3,
   4,
   5,
   3,
   4,
   2,
   2,
   3,
   3,
   3,
   3,
   2,
   3,
   4,
   5,
   2,
   5,
   3,
   5,
   3,
   3,
   2,
   3,
   2,
   2,
   4,
   2,
   4,
   3,
   4,
   4,
   3,
   3,
   3,
   4,
   3,
   2,
   2,
   4,
   3,
   3,
   5,
   1,
   4,
   5,
   3,
   3,
   4,
   1,
   3,
   3,
   3,
   4,
   3,
   5,
   4,
   3,
   3,
   3,
   2,
   3,
   2,
   3,
   3,
   4,
   4,
   3,
   4,
   5,
   5,
   3,
   2,
   4,
   3,
   1,
   3,
   4,
   1,
   4,
   3,
   3,
   4,
   3,
   3,
   5,
   4,
   3,
   2,
   3,
   4,
   4,
   1,
  

We then create a new RDD that contains the same values as in the form of ( movie_id, (average_review, number_of_reviewers) )

In [179]:
movie_and_ratings = movie_and_ratings_temp1.map(lambda elem: (int(elem[0]), (avg(elem[1]), len(elem[1]))))

In [180]:
movie_and_ratings.take(5)

[(4914, (3.1278435059677716, 38289)),
 (5607, (3.4454226399381356, 16811)),
 (8379, (3.768257100274985, 47639)),
 (8883, (3.8640426532852437, 19131)),
 (9954, (3.938280235693598, 10013))]

#### Generate predictions for new users

Let's generate a fake user by rating some of the available movies

In [184]:
movies.take(1000)

[['1', 'Dinosaur Planet', '2003'],
 ['2', 'Isle of Man TT 2004 Review', '2004'],
 ['3', 'Character', '1997'],
 ['4', "Paula Abdul's Get Up & Dance", '1994'],
 ['5', 'The Rise and Fall of ECW', '2004'],
 ['6', 'Sick', '1997'],
 ['7', '8 Man', '1992'],
 ['8', 'What the #$*! Do We Know!?', '2004'],
 ['9', "Class of Nuke 'Em High 2", '1991'],
 ['10', 'Fighter', '2001'],
 ['11', 'Full Frame: Documentary Shorts', '1999'],
 ['12', 'My Favorite Brunette', '1947'],
 ['13',
  'Lord of the Rings: The Return of the King: Extended Edition: Bonus Material',
  '2003'],
 ['14', 'Nature: Antarctica', '1982'],
 ['15', 'Neil Diamond: Greatest Hits Live', '1988'],
 ['16', 'Screamers', '1996'],
 ['17', '7 Seconds', '2005'],
 ['18', 'Immortal Beloved', '1994'],
 ['19', "By Dawn's Early Light", '2000'],
 ['20', 'Seeta Aur Geeta', '1972'],
 ['21', 'Strange Relations', '2002'],
 ['22', 'Chump Change', '2000'],
 ['23',
  "Clifford: Clifford Saves the Day! / Clifford's Fluffiest Friend Cleo",
  '2001'],
 ['24', 

In [None]:
fake_user_id = 0

fake_user_ratings= [
    "['13',
  'Lord of the Rings: The Return of the King: Extended Edition: Bonus Material',
  '2003'],"
" ['175', 'Reservoir Dogs', '1992'],
"
]

 ['189', 'Airplane II: The Sequel', '1982'],
 ['199', 'The Deer Hunter', '1978'],
 ['252', 'Stuart Little 2', '2002'],
 ['263', 'Dragon Ball: Tournament Saga', '2001'],
 ['273', 'Taxi', '2004'],
 ['290', 'Harold and Kumar Go to White Castle', '2004'],
 ['299', "Bridget Jones's Diary", '2001'],
 ['316', 'Futurama: Monster Robot Maniac Fun Collection', '1999'],
 ['362', 'The Flintstones in Viva Rock Vegas', '2000'],
 ['457', 'Kill Bill: Vol. 2', '2004'],
 ['468', 'The Matrix: Revolutions', '2003'],


fake_user_rdd = 