In [235]:
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import numpy as np

In [576]:
ratings = pd.read_csv("ratings.csv")
movies = pd.read_csv("movies.csv")

In [272]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [237]:
from sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(ratings, test_size=0.5, stratify = ratings['userId'], random_state=42)

In [238]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.createDataFrame(test_data)



DataFrame[userId: bigint, movieId: bigint, rating: double, timestamp: bigint]

In [291]:
train_data.to_csv('train.csv')

In [530]:
gold_data = sc.textFile('train.csv')

In [531]:
gold_data.take(5)

[',userId,movieId,rating,timestamp',
 '95565,600,3160,3.5,1237742428',
 '28017,192,266,4.0,835129083',
 '69156,448,3146,4.0,1038736356',
 '55211,367,1,5.0,997811550']

In [532]:
from itertools import islice
gold_data = gold_data.mapPartitionsWithIndex(lambda index,iterator: islice(iterator,1,None) if index==0 else iterator).map(lambda row:row.split(',')).map(lambda row: row[1:4])

In [533]:
gold_data.take(5)

[['600', '3160', '3.5'],
 ['192', '266', '4.0'],
 ['448', '3146', '4.0'],
 ['367', '1', '5.0'],
 ['495', '8533', '5.0']]

In [534]:
distinct_users_dict = gold_data.map(lambda x: x[0]).distinct().zipWithIndex()\
    .map(lambda x: (x[0], x[1])).collectAsMap()

distinct_movies_dict = gold_data.map(lambda x: x[1]).distinct().zipWithIndex()\
    .map(lambda x: (x[0], x[1])).collectAsMap()

In [535]:
len(distinct_movies_dict)

7502

In [536]:
movie_avg_rating_dict = gold_data.map(lambda row: (row[1], float(row[2]))).groupByKey().mapValues(list)\
    .map(lambda b: (b[0], float(float(sum(b[1])) / len(b[1])))).collectAsMap()

user_avg_rating_dict = gold_data.map(lambda row: (row[0], float(row[2]))).groupByKey().mapValues(list)\
    .map(lambda b: (b[0], float(float(sum(b[1])) / len(b[1])))).collectAsMap()

In [537]:
user_rated_movies_ratings = gold_data.map(lambda x: (x[0], (x[1], float(x[2]))))\
    .groupByKey().mapValues(dict).collectAsMap()

movie_rated_users_ratings = gold_data.map(lambda x: (x[1], (x[0], float(x[2]))))\
    .groupByKey().mapValues(dict).collectAsMap()

In [538]:
gold_data.take(5)

[['600', '3160', '3.5'],
 ['192', '266', '4.0'],
 ['448', '3146', '4.0'],
 ['367', '1', '5.0'],
 ['495', '8533', '5.0']]

In [539]:
data = gold_data.map(lambda row:[row[0], row[1]])

In [540]:
data.take(5)

[['600', '3160'],
 ['192', '266'],
 ['448', '3146'],
 ['367', '1'],
 ['495', '8533']]

## Make a Sparse Matrix

In [541]:
movie_rated_by_users_dict = data.map(lambda x: (x[1], x[0])).groupByKey().mapValues(set).collectAsMap()

In [542]:
user_rated_movies_dict = data.map(lambda x: (x[0], x[1])).groupByKey().mapValues(set).collectAsMap()

In [543]:
def handle_cold_start(query):
    user = query[0]
    movie = query[1]
    if movie not in distinct_movies_dict.keys():

        if user not in distinct_users_dict.keys():
            return (user, movie, 3.0)
        return (user,movie,get_user_avg_rating(user))
    elif user not in distinct_users_dict.keys():
        return (user, movie, get_movie_avg_rating(movie))
    else:
        return (user,movie,"hello")

In [544]:
import math
def pearson_correlation(pair, commons, user_rated_movies_ratings):
    weights_list = []
    ans = 0.0
    if len(commons) > 1:
        avg1 = sum(user_rated_movies_ratings[user][pair[0]]
                   for user in commons)/len(commons)
        avg2 = sum(user_rated_movies_ratings[user][pair[1]]
                   for user in commons)/len(commons)
        numerator = sum((user_rated_movies_ratings[user][pair[0]] - avg1) * (
            user_rated_movies_ratings[user][pair[1]] - avg2) for user in commons)
        denominator = math.sqrt(sum((user_rated_movies_ratings[user][pair[0]] - avg1)**2 for user in commons)) * math.sqrt(
            sum((user_rated_movies_ratings[user][pair[1]] - avg2)**2 for user in commons))
        if denominator > 0 and numerator > 0:
            ans = numerator/denominator
    else:
        numerator = get_movie_avg_rating(pair[0])
        denominator = get_movie_avg_rating(pair[1])
        ans = numerator / denominator
        if ans > 1:
            ans = 1 / ans
    return ans

In [545]:
def get_pairs(query):
    movies_rated_by_user = user_rated_movies_dict[query[0]]
    return (query, [(query[1], m) for m in movies_rated_by_user])

# Gets list of users who have rated both movies in the pair
def common_users(pair):
    return (pair, movie_rated_by_users_dict[pair[0]].intersection(movie_rated_by_users_dict[pair[1]]))

In [546]:
def get_movie_avg_rating(movie):
    return movie_avg_rating_dict[movie]


def get_user_avg_rating(user):
    return user_avg_rating_dict[user]

In [547]:
def pred_rating(entry, user_rated_movies_ratings):
    user = entry[0][0]
    numerator = 0
    denominator = 0
    for pair, pc in entry[1]:
        numerator += user_rated_movies_ratings[user][pair[1]] * pc
        denominator += pc
    pred = numerator / denominator
    return (entry[0], pred)

In [548]:
pred = data.map(get_pairs)\
.map(lambda query_pair_list: (query_pair_list[0], [common_users(pair) for pair in query_pair_list[1]]))\
    .map(lambda query_pairs_commons: (query_pairs_commons[0], sorted([(pair, pearson_correlation(pair, commons, user_rated_movies_ratings)) for pair, commons in query_pairs_commons[1]],key= lambda x: x[1], reverse=True))[0:3])\
    .map(lambda final: pred_rating(final, user_rated_movies_ratings))\
.map(lambda x: (x[0][0],x[0][1],x[1]))

In [549]:
pred.take(1)

[('600', '3160', 2.756871214031464)]

In [550]:
final_pred = pred.map(lambda x:float(x[2]))

In [551]:
def cal_rmse(true, test):
    true = true.map(lambda x: ((x[0], x[1]), float(x[2])))
    test = test.map(lambda x: ((x[0], x[1]), float(x[2])))
    mse = true.join(test).map(lambda x: (x[1][0], x[1][1]))\
        .map(lambda x: (x[0] - x[1])**2).mean()
    rmse = math.sqrt(mse)
    return rmse

In [552]:
print(cal_rmse(gold_data, pred))

0.7841211206995575


## Movies from the training data that the user rated

In [592]:
x = input("Enter User ID")
print(type(x))
all_movies = []
for movie_id in user_rated_movies_dict[x]:
  rating = train_data.loc[train_data['movieId'] == int(x), 'rating'].values
  all_movies.append(movies.loc[movies['movieId'] == int(movie_id), 'title'].values)
final = list(zip(all_movies, rating))

Enter User ID1
<class 'str'>


## Test Data

In [605]:
for ele in final:
  print(f"{ele[0][0]} has been rated of {ele[1]} by User {x}")

Dumb & Dumber (Dumb and Dumber) (1994) has been rated of 5.0 by User 1
Gulliver's Travels (1939) has been rated of 4.5 by User 1
Henry V (1989) has been rated of 3.5 by User 1
Sister Act (1992) has been rated of 4.0 by User 1
Highlander (1986) has been rated of 4.0 by User 1
Space Jam (1996) has been rated of 4.0 by User 1
Logan's Run (1976) has been rated of 4.0 by User 1
Con Air (1997) has been rated of 5.0 by User 1
American Beauty (1999) has been rated of 4.5 by User 1
Live and Let Die (1973) has been rated of 5.0 by User 1
Star Wars: Episode I - The Phantom Menace (1999) has been rated of 4.0 by User 1
South Park: Bigger, Longer and Uncut (1999) has been rated of 3.0 by User 1
Heat (1995) has been rated of 3.0 by User 1
Shaft (2000) has been rated of 4.0 by User 1
Wayne's World (1992) has been rated of 4.0 by User 1
Go (1999) has been rated of 2.5 by User 1
Robin Hood (1973) has been rated of 5.0 by User 1
Green Mile, The (1999) has been rated of 3.0 by User 1
Quiet Man, The (1952