In [1]:
import findspark
findspark.init()

In [2]:
from __future__ import print_function

import sys

import numpy as np
from numpy.random import rand
from numpy import matrix
from pyspark.sql import SparkSession

In [3]:
import pandas as pd

In [4]:
def loadDatasets():
    print('Loading datasets')
    movies = pd.read_csv('../data/movie.csv')
    ratings = pd.read_csv('../data/rating.csv')
    print('Datasets loaded successfully')
    return movies, ratings

In [5]:
def preprocessData(user_movie_rating, factor=0.25):
        size = user_movie_rating.shape[0]
        partition_index = int(size*factor)
        subset = user_movie_rating.iloc[:partition_index, :]
        subset = subset.dropna()
        subset = subset.drop('timestamp', axis=1)
        return subset

In [6]:
def getCharateristicMatrix(user_movie_rating):
        characteristic_df = user_movie_rating.pivot('userId', 'movieId', values='rating')
        characteristic_df = characteristic_df.fillna(0)
        characteristic_matrix = characteristic_df.as_matrix()
        characteristic_df.index.name = None
        
        movie_mapping_df = pd.DataFrame({'matrix_index': range(characteristic_df.shape[1]), 'movie_id': characteristic_df.columns})
        movie_mapping = dict(zip(movie_mapping_df.matrix_index, movie_mapping_df.movie_id))
        r_movie_mapping = dict(zip(movie_mapping_df.movie_id, movie_mapping_df.matrix_index))
        
        user_mapping_df = pd.DataFrame({'matrix_index': range(characteristic_df.shape[0]), 'user_id': characteristic_df.index.tolist()})
        user_mapping = dict(zip(user_mapping_df.matrix_index, user_mapping_df.user_id))
        r_user_mapping = dict(zip(user_mapping_df.user_id, user_mapping_df.matrix_index))
        mappings = {
            'user_mapping': user_mapping,
            'movie_mapping': movie_mapping,
            'r_user_mapping': r_user_mapping,
            'r_movie_mapping': r_movie_mapping
        }
        return characteristic_matrix, mappings

In [7]:
movies, ratings = loadDatasets()

Loading datasets
Datasets loaded successfully


In [8]:
ratings_subset = preprocessData(ratings, factor=0.5)
characteristic_matrix, mappings = getCharateristicMatrix(ratings_subset)
# train_matrix, test_matrix = self.train_test_split(characteristic_matrix)

In [9]:
spark = SparkSession\
        .builder\
        .appName("MovieRecommender")\
        .getOrCreate()


sc = spark.sparkContext

In [10]:
characteristic_matrix.shape

(69141, 23304)

In [11]:
n_characteristic_matrix = characteristic_matrix[:, :]

In [12]:
n_characteristic_matrix = n_characteristic_matrix.T

In [13]:
n_characteristic_matrix = np.mat(n_characteristic_matrix)

In [14]:
n_characteristic_matrix.shape

(23304, 69141)

In [None]:
M = n_characteristic_matrix.shape[0]
U = n_characteristic_matrix.shape[1]
F = 20
ITERATIONS = 5
partitions = 2

R = n_characteristic_matrix
print(R.shape)
ms = matrix(rand(M, F))
us = matrix(rand(U, F))

print(ms.shape, us.shape)

m_offset = 3000

u_offset = 10000

In [22]:
print("Done : 1, ", R[m_offset*0:1*m_offset].shape[0]*R[m_offset*0:1*m_offset].shape[1])
b_movie1 = sc.broadcast(R[m_offset*0:1*m_offset])
print("Done : 2, ", R[m_offset*1:2*m_offset].shape[0]*R[m_offset*1:2*m_offset].shape[1])
b_movie2 = sc.broadcast(R[m_offset*1:2*m_offset])
print("Done : 3, ", R[m_offset*2:3*m_offset].shape[0]*R[m_offset*2:3*m_offset].shape[1])
b_movie3 = sc.broadcast(R[m_offset*2:3*m_offset])
print("Done : 4, ", R[m_offset*3:4*m_offset].shape[0]*R[m_offset*3:4*m_offset].shape[1])
b_movie4 = sc.broadcast(R[m_offset*3:4*m_offset])
print("Done : 5, ", R[m_offset*4:5*m_offset].shape[0]*R[m_offset*4:5*m_offset].shape[1])
b_movie5 = sc.broadcast(R[m_offset*4:5*m_offset])
print("Done : 6, ", R[m_offset*5:6*m_offset].shape[0]*R[m_offset*5:6*m_offset].shape[1])
b_movie6 = sc.broadcast(R[m_offset*5:6*m_offset])
print("Done : 7, ", R[m_offset*6:7*m_offset].shape[0]*R[m_offset*6:7*m_offset].shape[1])
b_movie7 = sc.broadcast(R[m_offset*6:7*m_offset])
print("Done : 8, ", R[m_offset*7:].shape[0]*R[m_offset*7:].shape[1])
b_movie8 = sc.broadcast(R[m_offset*7:])

Done : 1,  207423000
Done : 2,  207423000
Done : 3,  207423000
Done : 4,  207423000
Done : 5,  207423000
Done : 6,  207423000
Done : 7,  207423000
Done : 8,  159300864


In [23]:
print("C Done : 1, ", R[:, u_offset*0:1*u_offset].shape[0]*R[:, u_offset*0:1*u_offset].shape[1])
b_user1 = sc.broadcast(R[:, u_offset*0:1*u_offset])
print("C Done : 2, ", R[:, u_offset*1:2*u_offset].shape[0]*R[:, u_offset*1:2*u_offset].shape[1])
b_user2 = sc.broadcast(R[:, u_offset*1:2*u_offset])
print("C Done : 3, ", R[:, u_offset*2:3*u_offset].shape[0]*R[:, u_offset*2:3*u_offset].shape[1])
b_user3 = sc.broadcast(R[:, u_offset*2:3*u_offset])
print("C Done : 4, ", R[:, u_offset*3:4*u_offset].shape[0]*R[:, u_offset*3:4*u_offset].shape[1])
b_user4 = sc.broadcast(R[:, u_offset*3:4*u_offset])
print("C Done : 5, ", R[:, u_offset*4:5*u_offset].shape[0]*R[:, u_offset*4:5*u_offset].shape[1])
b_user5 = sc.broadcast(R[:, u_offset*4:5*u_offset])
print("C Done : 6, ", R[:, u_offset*5:6*u_offset].shape[0]*R[:, u_offset*5:6*u_offset].shape[1])
b_user6 = sc.broadcast(R[:, u_offset*5:6*u_offset])
print("C Done : 7, ", R[:, u_offset*6:7*u_offset].shape[0]*R[:, u_offset*6:7*u_offset].shape[1])
b_user7 = sc.broadcast(R[:, u_offset*6:7*u_offset])
print("C Done : 8, ", R[:, u_offset*7:8*u_offset].shape[0]*R[:, u_offset*7:8*u_offset].shape[1])
b_user8 = sc.broadcast(R[:, u_offset*7:8*u_offset])
print("C Done : 9, ", R[:, u_offset*8:9*u_offset].shape[0]*R[:, u_offset*8:9*u_offset].shape[1])
b_user9 = sc.broadcast(R[:, u_offset*8:9*u_offset])
print("C Done : 10, ", R[:, u_offset*9:10*u_offset].shape[0]*R[:, u_offset*9:10*u_offset].shape[1])
b_user10 = sc.broadcast(R[:, u_offset*9:10*u_offset])
print("C Done : 11,  ", R[:, u_offset*10:].shape[0]*R[:, u_offset*10:].shape[1])
b_user11 = sc.broadcast(R[:, u_offset*10:])



msb = sc.broadcast(ms)
usb = sc.broadcast(us)

C Done : 1,  233040000
C Done : 2,  233040000
C Done : 3,  233040000
C Done : 4,  233040000
C Done : 5,  233040000
C Done : 6,  233040000
C Done : 7,  213021864
C Done : 8,  0
C Done : 9,  0
C Done : 10,  0
C Done : 11,   0


In [26]:
for i in range(ITERATIONS):
        ms = sc.parallelize(range(M), partitions) \
               .map(lambda x: updateMovie(x, usb.value, b_movie1.value, b_movie2.value, b_movie3.value, b_movie4.value, b_movie5.value, b_movie6.value, b_movie7.value, b_movie8.value, m_offset)) \
               .collect()
        # collect() returns a list, so array ends up being
        # a 3-d array, we take the first 2 dims for the matrix
        ms = matrix(np.array(ms)[:, :, 0])
        msb = sc.broadcast(ms)

        us = sc.parallelize(range(U), partitions) \
               .map(lambda x: updateUser(x, msb.value, b_user1.value.T, b_user2.value.T, b_user3.value.T, b_user4.value.T, b_user5.value.T, b_user6.value.T, b_user7.value.T, b_user8.value.T, b_user9.value.T, b_user10.value.T, b_user11.value.T, u_offset)) \
               .collect()
        us = matrix(np.array(us)[:, :, 0])
        usb = sc.broadcast(us)

        error = rmse(R, ms, us)
        print("Iteration %d:" % i)
        print("\nRMSE: %5.4f\n" % error)

Iteration 0:

RMSE: 0.2823

Iteration 1:

RMSE: 0.2545

Iteration 2:

RMSE: 0.2418

Iteration 3:

RMSE: 0.2372

Iteration 4:

RMSE: 0.2352



In [27]:
import pickle

In [28]:
with open('ms.bin', mode='wb') as model_binary:
    pickle.dump(ms, model_binary)

In [29]:
with open('us.bin', mode='wb') as model_binary:
    pickle.dump(us, model_binary)

In [30]:
with open('mappings.bin', mode='wb') as model_binary:
    pickle.dump(mappings, model_binary)

In [34]:
class BestModel(object):
    def __init__(self, movies, model, ratings, mappings):
        self.ratings = ratings
        self.model = model
        self.mappings = mappings
        self.movies = movies

In [68]:
from os import path
import os

In [69]:
os.getcwd()

'/Users/amit/WorkPro/warzone/movie-recommender/spark'

In [70]:
ls ../

Dockerfile                  [1m[36mmodel[m[m/
[1m[36mdata[m[m/                       [1m[36mnotebooks[m[m/
derby.log                   requirements.txt
docker-compose.yml          [1m[36mspark[m[m/
init.py                     [1m[36mstatic[m[m/
init.pyc                    [1m[36mtemplates[m[m/
jupyter_notebook_config.py  [1m[36mutils[m[m/
[1m[36mmetastore_db[m[m/


In [None]:
/Users/amit/WorkPro/warzone/movie-recommender/model

In [72]:
path.dirname( path.dirname( path.abspath('/Users/amit/WorkPro/warzone/movie-recommender/model') ))

'/Users/amit/WorkPro/warzone'

In [82]:
import sys
sys.path.append('/Users/amit/WorkPro/warzone/movie-recommender/')

In [78]:
from model import BestModel

In [63]:
class AlternatingLeastSquare(object):
    USERS_MATRIX = 'users_matrix'
    MOVIES_MATRIX = 'movies_matrix'
    
    def __init__(self, user_matrix, movies_matrix):
        np.random.seed(0)
        self.users_matrix = user_matrix
        self.movies_matrix = movies_matrix

    def predict(self, user, movie):
        return self.users_matrix[user, :].dot(self.movies_matrix[movie, :].T)
    
    def predict_all(self):
        # predictions = np.zeros((self.total_users, self.total_movies))
        # for user in xrange(self.total_users):
        #     for movie in xrange(self.total_movies):
        #         predictions[user, movie] = self.predict(user, movie)

        predictions = self.users_matrix.dot(self.movies_matrix.T)
                
        return predictions

In [86]:
from model import AlternatingLeastSquare

In [89]:
als = AlternatingLeastSquare(np.array([[]]))

In [90]:
als.users_matrix = np.array(us)
als.movies_matrix = np.array(ms)

In [47]:
als = AlternatingLeastSquare(np.array(us), np.array(ms))

In [91]:
best_model = BestModel(movies, als, ratings_subset, mappings)

In [92]:
with open('../data/best_model.bin', mode='wb') as model_binary:
    pickle.dump(best_model, model_binary)

In [93]:
movies.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [133]:
cat = 'Comedy'

In [134]:
movies[movies['genres'].str.contains(cat)][::-1][:20]

Unnamed: 0,movieId,title,genres
27274,131256,"Feuer, Eis & Dosenbier (2002)",Comedy
27273,131254,Kein Bund für's Leben (2007),Comedy
27272,131252,Forklift Driver Klaus: The First Day on the Jo...,Comedy|Horror
27271,131250,No More School (2000),Comedy
27270,131248,Brother Bear 2 (2006),Adventure|Animation|Children|Comedy|Fantasy
27269,131243,Werner - Gekotzt wird später (2003),Animation|Comedy
27268,131241,Ants in the Pants (2000),Comedy|Romance
27267,131239,Three Quarter Moon (2011),Comedy|Drama
27266,131237,What Men Talk About (2010),Comedy
27265,131231,Standby (2014),Comedy|Romance


In [135]:
ids = movies[movies['genres'].str.contains(cat)][::-1][:20]['movieId']

In [136]:
ids.values

array([131256, 131254, 131252, 131250, 131248, 131243, 131241, 131239,
       131237, 131231, 131158, 131156, 131154, 131152, 131150, 131148,
       131146, 131144, 131142, 131138])

In [137]:
ratings_subset[ratings_subset['movieId'].isin(ids.values)]

Unnamed: 0,userId,movieId,rating
9117176,63046,131231,3.5


In [182]:
movies[movies['title'].str.contains('Amelie')]

Unnamed: 0,movieId,title,genres
4877,4973,"Amelie (Fabuleux destin d'Amélie Poulain, Le) ...",Comedy|Romance


In [193]:
ans = set()
for x in movies['genres'].unique():
    x = x.split('|')
    for y in x:
        ans.add(y)

In [195]:
ans = list(ans)

In [224]:
movies.columns

Index([u'movieId', u'title', u'genres'], dtype='object')

TypeError: 'RangeIndex' object is not callable

In [242]:
def fu(row, genre):
    if genre in row['genres']:
        return 1
    return 0

In [243]:
movies['Comedy'] = movies.apply(lambda x: fu(x, 'Comedy'), axis=1)

In [246]:
int(True)

1

In [244]:
movies.head()

Unnamed: 0,movieId,title,genres,Comedy
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1
1,2,Jumanji (1995),Adventure|Children|Fantasy,0
2,3,Grumpier Old Men (1995),Comedy|Romance,1
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance,1
4,5,Father of the Bride Part II (1995),Comedy,1


In [220]:
print(ans)

['Mystery', 'Drama', 'Western', 'Sci-Fi', 'Horror', 'Film-Noir', 'Crime', 'Romance', 'Fantasy', 'Musical', 'Animation', 'War', 'Adventure', 'Action', 'Comedy', 'Documentary', 'Children', 'Thriller']


In [None]:
for gen in ans:
    movies[gen] = 0

In [187]:
ratings_subset[ratings_subset['userId'] == 353].sort_values('rating', ascending=False)[:30]['movieId'].values

array([ 541, 1032, 3996, 3718, 1394,  608, 1204, 1148, 1080, 4226,  926,
        923,  919,  745,  720, 1348, 1617, 3793, 3851, 2361, 1028,  912,
       1537, 1214, 1210, 1207, 3783, 1041, 3578, 3408])

In [210]:
years = map(str, range(2015, 2020))

In [211]:
years

['2015', '2016', '2017', '2018', '2019']

In [219]:
movies[movies['title'].str.contains('|'.join(years)) & ~movies['title'].str.startswith('2')][:20]

Unnamed: 0,movieId,title,genres
23865,113345,Jupiter Ascending (2015),Action|Adventure|Sci-Fi
24855,117466,In the Heart of the Sea (2015),Action|Adventure|Drama
25254,119145,Kingsman: The Secret Service (2015),Action|Adventure|Comedy|Crime
25460,120466,Chappie (2015),Action|Thriller
25490,120635,Taken 3 (2015),Action|Crime|Thriller
25491,120637,Blackhat (2015),Action|Crime|Drama|Mystery|Thriller
25495,120757,Beck - Rum 302 (2015),Crime|Mystery|Thriller
25527,120825,The Woman in Black 2: Angel of Death (2015),Drama|Horror|Thriller
25786,122147,Beck - Familjen (2015),Crime|Mystery|Thriller
26049,124867,Justice League: Throne of Atlantis (2015),Action|Animation


In [142]:
groups = ratings_subset.groupby(by='movieId')['rating'].agg(['sum','count'])

In [147]:
groups = groups.reset_index()

In [148]:
groups['averageRating'] = groups['sum']/(1.0*groups['count'])

In [149]:
groups.head()

Unnamed: 0,movieId,sum,count,averageRating
0,1,97035.0,24736,3.922825
1,2,35477.0,11049,3.210879
2,3,19951.5,6369,3.132595
3,4,3932.5,1360,2.891544
4,5,18583.5,6047,3.073177


In [150]:
C = groups['averageRating'].mean()

In [153]:
m = groups['count'].quantile(0.90)

In [155]:
q_movies = groups.copy().loc[groups['count'] >= m]

In [156]:
q_movies.shape

(2336, 4)

In [160]:
def bayesian_average(x, m=m, C=C):
    v = x['count']
    R = x['averageRating']
    return (v/(v+m) * R) + (m/(m+v) * C)

In [161]:
q_movies['score'] = q_movies.apply(bayesian_average, axis=1)

In [162]:
q_movies = q_movies.sort_values('score', ascending=False)

In [170]:
q_movies[:10]['movieId'].values

array([ 318,  858,   50,  527, 1221, 1193, 2959,  912, 1198,  750])

In [164]:
movies[movies['movieId'].isin(q_movies['movieId'].values)].head(10)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller
10,11,"American President, The (1995)",Comedy|Drama|Romance


In [167]:
pd.merge(movies, q_movies, how='inner',left_on="movieId" , right_on='movieId').sort_values('score', ascending=False)

Unnamed: 0,movieId,title,genres,sum,count,averageRating,score
184,318,"Shawshank Redemption, The (1994)",Crime|Drama,140795.5,31624,4.452172,4.419437
409,858,"Godfather, The (1972)",Crime|Drama,90179.5,20692,4.358182,4.312344
40,50,"Usual Suspects, The (1995)",Crime|Mystery|Thriller,102312.5,23571,4.340609,4.300768
292,527,Schindler's List (1993),Drama|War,108121.0,25096,4.308296,4.271810
584,1221,"Godfather: Part II, The (1974)",Crime|Drama,58218.0,13611,4.277276,4.213475
559,1193,One Flew Over the Cuckoo's Nest (1975),Drama,63212.5,14872,4.250437,4.193153
1290,2959,Fight Club (1999),Action|Crime|Drama|Thriller,84974.5,20086,4.230534,4.188314
428,912,Casablanca (1942),Drama|Romance,51799.5,12176,4.254230,4.184816
563,1198,Raiders of the Lost Ark (Indiana Jones and the...,Action|Adventure,91355.5,21659,4.217900,4.179092
375,750,Dr. Strangelove or: How I Learned to Stop Worr...,Comedy|War,49700.0,11716,4.242062,4.170886


In [40]:
import numpy as np

class Recommender(object):
    def __init__(self, model_obj):
        np.random.seed(0)
        self.model_obj = model_obj
        self.model = self.model_obj.model
        self.mappings = self.model_obj.mappings
        self.ratings = self.model_obj.ratings
        self.movies = self.model_obj.movies

    def getUserIdsFromMatrixIndexes(self, matrix_indexes, user_mapping, preserve_order=True):
        user_ids = []
        for index in matrix_indexes:
            user_ids.append(user_mapping[index])
        return user_ids

    def getMovieIdsFromMatrixIndexes(self, matrix_indexes, preserve_order=True):
        movie_ids = []
        movie_mapping = self.mappings['movie_mapping']
        for index in matrix_indexes:
            movie_ids.append(movie_mapping[index])
        return movie_ids

    def getWatchedMovies(self, user_id):
        rated_movies = self.ratings[self.ratings['userId'] == user_id]['movieId'].values.tolist()
        return rated_movies

    def recommendMoviesTo(self, user_id, limit=50):
        user_index = self.mappings['r_user_mapping'][user_id]
        user_vector = self.model.users_matrix[user_index, :]
        user_ratings = user_vector.dot(self.model.movies_matrix.T)
        highest_rated_movie_indexes = user_ratings.argsort()[::-1][:limit]
        movie_ids = self.getMovieIdsFromMatrixIndexes(highest_rated_movie_indexes)
        return movie_ids

    def displayMovies(self, movie_ids):
        movie_infos = []
        for movie_id in movie_ids:
            movie_info = self.movies[self.movies['movieId'] == movie_id].values.tolist()[0]
            movie_infos.append(movie_info)
        return movie_infos

    def filterWatchedMovies(self, user_id, ordered_movie_ids):
        all_movies = set(ordered_movie_ids)
        rated_movies = set(self.getWatchedMovies(user_id))
        not_watched = all_movies - rated_movies
        ordered_not_watched = [movie_id for movie_id in ordered_movie_ids if movie_id not in rated_movies]
        return ordered_not_watched

# if __name__ == '__main__':
#     recommender = Recommender()
#     movie_ids = recommender.recommendMoviesTo(1, limit=300)
#     print movie_ids
#     print recommender.displayMovies(movie_ids)




In [49]:
recom = Recommender(best_model)

In [51]:
ids = recom.recommendMoviesTo(1, limit=300)

In [52]:
recom.displayMovies(ids)

[[1196,
  'Star Wars: Episode V - The Empire Strikes Back (1980)',
  'Action|Adventure|Sci-Fi'],
 [4993,
  'Lord of the Rings: The Fellowship of the Ring, The (2001)',
  'Adventure|Fantasy'],
 [260, 'Star Wars: Episode IV - A New Hope (1977)', 'Action|Adventure|Sci-Fi'],
 [2571, 'Matrix, The (1999)', 'Action|Sci-Fi|Thriller'],
 [541, 'Blade Runner (1982)', 'Action|Sci-Fi|Thriller'],
 [1198,
  'Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)',
  'Action|Adventure'],
 [1214, 'Alien (1979)', 'Horror|Sci-Fi'],
 [5952, 'Lord of the Rings: The Two Towers, The (2002)', 'Adventure|Fantasy'],
 [1136, 'Monty Python and the Holy Grail (1975)', 'Adventure|Comedy|Fantasy'],
 [1200, 'Aliens (1986)', 'Action|Adventure|Horror|Sci-Fi'],
 [7153,
  'Lord of the Rings: The Return of the King, The (2003)',
  'Action|Adventure|Drama|Fantasy'],
 [296, 'Pulp Fiction (1994)', 'Comedy|Crime|Drama|Thriller'],
 [1210,
  'Star Wars: Episode VI - Return of the Jedi (1983)',
  'Action|

In [25]:
LAMBDA = 0.01   # regularization
np.random.seed(42)


def rmse(R, ms, us):
    diff = R - ms * us.T
    return np.sqrt(np.sum(np.power(diff, 2)) / (M * U))


def updateMovie(i, mat, movie_rows1, movie_rows2, movie_rows3, movie_rows4, movie_rows5, movie_rows6, movie_rows7, movie_rows8, m_offset):
    uu = mat.shape[0]
    ff = mat.shape[1]

    XtX = mat.T * mat
    if i < m_offset:
        Xty = mat.T * movie_rows1[i, :].T
    elif i < 2*m_offset:
        Xty = mat.T * movie_rows2[i-1*m_offset, :].T
    elif i < 3*m_offset:
        Xty = mat.T * movie_rows3[i-2*m_offset, :].T
    elif i < 4*m_offset:
        Xty = mat.T * movie_rows4[i-3*m_offset, :].T
    elif i < 5*m_offset:
        Xty = mat.T * movie_rows5[i-4*m_offset, :].T
    elif i < 6*m_offset:
        Xty = mat.T * movie_rows6[i-5*m_offset, :].T
    elif i < 7*m_offset:
        Xty = mat.T * movie_rows7[i-6*m_offset, :].T
    elif i < 8*m_offset:
        Xty = mat.T * movie_rows8[i-7*m_offset, :].T
    else:
        print('This should not get printed')

    for j in range(ff):
        XtX[j, j] += LAMBDA * uu

    return np.linalg.solve(XtX, Xty)



def updateUser(i, mat, user_cols1, user_cols2, user_cols3, user_cols4, user_cols5, user_cols6, user_cols7, user_cols8, user_cols9, user_cols10, user_cols11, u_offset):
    uu = mat.shape[0]
    ff = mat.shape[1]

    XtX = mat.T * mat
    if i < u_offset:
        Xty = mat.T * user_cols1[i, :].T
    elif i < 2*u_offset:
        Xty = mat.T * user_cols2[i-u_offset, :].T
    elif i < 3*u_offset:
        Xty = mat.T * user_cols3[i-2*u_offset, :].T
    elif i < 4*u_offset:
        Xty = mat.T * user_cols4[i-3*u_offset, :].T
    elif i < 5*u_offset:
        Xty = mat.T * user_cols5[i-4*u_offset, :].T
    elif i < 6*u_offset:
        Xty = mat.T * user_cols6[i-5*u_offset, :].T
    elif i < 7*u_offset:
        Xty = mat.T * user_cols7[i-6*u_offset, :].T
    elif i < 8*u_offset:
        Xty = mat.T * user_cols8[i-7*u_offset, :].T
    elif i < 9*u_offset:
        Xty = mat.T * user_cols9[i-8*u_offset, :].T
    elif i < 10*u_offset:
        Xty = mat.T * user_cols10[i-9*u_offset, :].T
    elif i < 11*u_offset:
        Xty = mat.T * user_cols11[i-10*u_offset, :].T
    else:
        print('This should not get printed')

    for j in range(ff):
        XtX[j, j] += LAMBDA * uu

    return np.linalg.solve(XtX, Xty)

# offset = 11652

# m_offset = 10000

# Rb1 = sc.broadcast(R[0:offset])
# Rb2 = sc.broadcast(R[offset:2*offset])

# Rbm1 = sc.broadcast(R[:, 0:m_offset])
# Rbm2 = sc.broadcast(R[:, m_offset:2*m_offset])

In [44]:
type(ms)

numpy.matrixlib.defmatrix.matrix

In [45]:
arr = np.array(ms)

In [46]:
arr.shape

(23304, 20)

In [7]:
spark = SparkSession\
        .builder\
        .appName("MovieRecommender")\
        .getOrCreate()


sc = spark.sparkContext

In [3]:
ratings = sc.textFile('../data/rating.csv')
ratings = ratings.map(lambda line: line.split(',')).map(lambda x: (int(x[0]), int(x[1]), float(x[2])))
ratings_df = ratings.toDF(['userId', 'movieId', 'rating'])

In [4]:
ratings_df = ratings_df.drop('timestamp')

In [5]:
ratings_df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    223|   4.0|
|     1|    253|   4.0|
|     1|    260|   4.0|
|     1|    293|   4.0|
|     1|    296|   4.0|
|     1|    318|   4.0|
|     1|    337|   3.5|
|     1|    367|   3.5|
|     1|    541|   4.0|
|     1|    589|   3.5|
|     1|    593|   3.5|
|     1|    653|   3.0|
|     1|    919|   3.5|
+------+-------+------+
only showing top 20 rows



In [6]:
d = ratings_df.groupBy("userId").pivot("movieId").avg("rating")

KeyboardInterrupt: 

In [10]:
spark.conf.set('spark.sql.pivotMaxValues', u'50000')

In [65]:
d = d.fillna(0)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:40857)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40857)

In [64]:
data_array =  np.array(d.collect())

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 40296)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/socketserver.py", line 316, in _ha

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40857)

In [55]:
data_array.shape

(91, 2890)

In [57]:
d.columns

['userId',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 '10',
 '11',
 '12',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '28',
 '29',
 '31',
 '32',
 '34',
 '36',
 '39',
 '41',
 '42',
 '43',
 '44',
 '45',
 '46',
 '47',
 '48',
 '50',
 '52',
 '55',
 '57',
 '58',
 '60',
 '61',
 '62',
 '63',
 '64',
 '65',
 '67',
 '68',
 '69',
 '70',
 '73',
 '74',
 '76',
 '78',
 '79',
 '81',
 '85',
 '86',
 '88',
 '89',
 '92',
 '93',
 '94',
 '95',
 '97',
 '100',
 '101',
 '102',
 '104',
 '105',
 '107',
 '110',
 '111',
 '112',
 '114',
 '122',
 '124',
 '125',
 '132',
 '135',
 '140',
 '141',
 '144',
 '145',
 '147',
 '149',
 '150',
 '151',
 '153',
 '154',
 '155',
 '156',
 '157',
 '158',
 '159',
 '160',
 '161',
 '162',
 '163',
 '164',
 '165',
 '168',
 '170',
 '171',
 '172',
 '173',
 '174',
 '175',
 '176',
 '177',
 '179',
 '180',
 '181',
 '185',
 '186',
 '188',
 '190',
 '191',
 '193',
 '194',
 '195',
 '196',
 '198',
 '203',
 '204',
 '206',
 '207',
 '208',
 '213',
 '2

In [59]:
data_array[0:3, 1:]

array([[None, 3.0, None, ..., None, None, None],
       [None, None, None, ..., None, None, None],
       [None, None, None, ..., None, None, None]], dtype=object)

In [29]:
d.show(n=1)

+------+----+---+----+----+----+----+----+----+---+----+---+----+----+---+---+----+----+---+---+---+---+---+---+----+----+----+---+---+---+----+----+----+---+----+---+----+---+---+----+---+----+----+---+----+---+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+----+---+----+----+---+----+---+----+---+----+----+---+---+----+----+----+---+---+----+---+---+---+----+----+----+---+---+---+---+---+----+----+---+---+---+---+----+---+---+----+----+----+----+---+----+----+---+----+---+----+---+----+----+----+---+----+----+---+----+----+---+----+----+---+---+----+----+---+---+----+----+---+----+---+---+---+----+---+----+----+----+---+----+---+---+---+----+---+---+---+----+----+----+----+----+----+---+---+---+---+---+---+----+----+---+----+---+---+----+---+----+---+---+---+----+---+----+---+---+----+----+----+----+----+---+----+---+---+---+---+----+----+----+----+----+---+---+----+----+---+----+----+----+---+---+----+-

In [53]:
type(d)

pyspark.sql.dataframe.DataFrame

In [33]:
dir(d)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collectAsArrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'f

In [49]:
df = pd.read_csv('../data/subset.csv', header = None, names = ['userId', 'movieId', 'rating', 'timestamp'])

In [8]:
small_ratings_raw_data = sc.textFile('../data/rating.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]


In [9]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [10]:
small_ratings_data.take(3)

[('1', '2', '3.5'), ('1', '29', '3.5'), ('1', '32', '3.5')]

In [12]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [16]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

model = ALS.train(training_RDD, 12, seed=seed, iterations=iterations,
                  lambda_=regularization_parameter)

predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
errors[err] = error
err += 1
print ('For rank %s the RMSE is %s' % (12, error))

For rank 12 the RMSE is 0.812032979620398
