# Problem 2: Analyzing the Netflix Data

## Table of Content

#### a) how many distinct items and how many distinct users are there in the test set?
#### b) overlappingitems and overlappingusers
#### c) Best approach to implement the collaborative filtering 
#### d) Applying Correlation Coefficient and rerunning the code

#### Loading the files 

In [1]:
# Change to the location of data files
dbfs_dir = 's3://archanamaroldsde.bucket/'
test = dbfs_dir + '/TestingRatings.txt'
train = dbfs_dir + '/TrainingRatings.txt'
names= dbfs_dir + '/movie_titles.txt'

In [2]:
test

's3://archanamaroldsde.bucket//TestingRatings.txt'

In [3]:
#MovieID,UserID,Rating

In [4]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
import pandas
tests=sqlContext.read.text(test)
trains=sqlContext.read.text(train)
movie_names=sqlContext.read.text(names)

In [5]:
import pyspark.sql.functions as f
test_data = tests.select(f.split(tests.value,",")).rdd.flatMap(lambda x: x).toDF(schema=["movieID","userID", "rating"])
train_data = trains.select(f.split(trains.value,",")).rdd.flatMap(lambda x: x).toDF(schema=["movieID","userID", "rating"])
movie_name = movie_names.select(f.split(movie_names.value,",")).rdd.flatMap(lambda x: x).toDF(schema=["movieID","year", "title"])

In [7]:
#converting types
test_data = test_data.withColumn("movieID",test_data["movieID"].cast(IntegerType()))
test_data = test_data.withColumn("userID",test_data["userID"].cast(IntegerType()))
test_data = test_data.withColumn("rating",test_data["rating"].cast(FloatType()))

train_data = train_data.withColumn("movieID",train_data["movieID"].cast(IntegerType()))
train_data = train_data.withColumn("userID",train_data["userID"].cast(IntegerType()))
train_data = train_data.withColumn("rating",train_data["rating"].cast(FloatType()))

movie_name = movie_name.withColumn("movieID",movie_name["movieID"].cast(IntegerType()))
movie_name = movie_name.withColumn("year",movie_name["year"].cast(IntegerType()))
movie_name = movie_name.withColumn("title",movie_name["title"].cast(StringType()))

In [8]:
print(test_data.show(3))
print(train_data.show(3))
print(movie_name.show(3))

+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|      8| 573364|   1.0|
|      8|2149668|   3.0|
|      8|1089184|   3.0|
+-------+-------+------+
only showing top 3 rows

None
+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|      8|1744889|   1.0|
|      8|1395430|   2.0|
|      8|1205593|   4.0|
+-------+-------+------+
only showing top 3 rows

None
+-------+----+--------------------+
|movieID|year|               title|
+-------+----+--------------------+
|      1|2003|     Dinosaur Planet|
|      2|2004|Isle of Man TT 20...|
|      3|1997|           Character|
+-------+----+--------------------+
only showing top 3 rows

None


In [9]:
test_data.cache()
train_data.cache()
assert test_data.is_cached
assert train_data.is_cached

In [10]:
train_data_count = train_data.count()
test_data_count = test_data.count()
print('There are %s rows in the train datasets' % (train_data_count))
print('There are %s rows in the test datasets' % (test_data_count))

There are 3255352 rows in the train datasets
There are 100478 rows in the test datasets


# a) how many distinct items and how many distinct users are there in the test set?

In [11]:
d1=test_data.select('movieID').distinct().count()
print("Distinct movies:")
print(d1)

Distinct movies:
1701


In [12]:
d2=test_data.select('userID').distinct().count()
print("Distinct users")
print(d2)

Distinct users
27555


# b) overlappingitems and overlappingusers

part B has the analysis part. The prediction part is covered in part D

### b 1) user similarities are measured by overlappingitems
### user 199435

In [70]:
# lets pick an user 199435
movies_user=test_data.filter("userID  == 199435")
print("movies of userid 199435 in test set")
movies_user.show(5)
#lets collect them in a list
list1 = movies_user.select('movieID')
array = [int(row.movieID) for row in list1.collect()]

movies of userid 199435 in test set
+-------+------+------+
|movieID|userID|rating|
+-------+------+------+
|    443|199435|   3.0|
|   4852|199435|   2.0|
|   7145|199435|   3.0|
|   8596|199435|   5.0|
|  10082|199435|   2.0|
+-------+------+------+
only showing top 5 rows



In [71]:
print("No of movies user watched in the train set:")
print(movies_user.count())

No of movies user watched in the train set:
8


In [72]:
#now lets get all the users in the train set, who has watched the same movie as 199435 watched
count=train_data[train_data['movieID'].isin(array)].count()
print("The number of users in the train set, who has watched the same movie as 199435 watched are:", count)
users=train_data[train_data['movieID'].isin(array)]
users.show(5)

The number of users in the train set, who has watched the same movie as 199435 watched are: 92691
+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|    443| 364518|   3.0|
|    443| 716091|   4.0|
|    443|1601783|   4.0|
|    443| 306466|   3.0|
|    443| 160977|   5.0|
+-------+-------+------+
only showing top 5 rows



In [75]:
from pyspark.sql import functions as F

x=users.groupBy("userID").agg(F.count('movieID').alias("count1"))
y=x.orderBy(x.count1.desc()).limit(10)
#storing in an array
list2 = y.select('userID')
array2 = [int(row.userID) for row in list2.collect()]

### average overlap of items rated by the users in the training set for users in the test set 

In [76]:
from pyspark.sql import functions as F
users2=users[users['userID'].isin(array2)]
print("Average over all the items of that users similar to the userid 199435 ")
users3= users2.groupBy("movieID").agg(F.mean('rating'). alias('prediction'))
users3.show()

Average over all the items of that users similar to the userid 199435 
+-------+----------+
|movieID|prediction|
+-------+----------+
|  10082|       2.8|
|   4852|       3.4|
|   8596|       4.5|
|  14144|       3.9|
|    443|       3.4|
|  12778|       3.6|
|   7145|       4.2|
|  14712|       3.6|
+-------+----------+



In [77]:
#actual values
movies_user.show()

+-------+------+------+
|movieID|userID|rating|
+-------+------+------+
|    443|199435|   3.0|
|   4852|199435|   2.0|
|   7145|199435|   3.0|
|   8596|199435|   5.0|
|  10082|199435|   2.0|
|  12778|199435|   2.0|
|  14144|199435|   3.0|
|  14712|199435|   3.0|
+-------+------+------+



### prediction

In [78]:
final1 = movies_user.join(users3, on=['movieID' ], how='inner')
final1.show()

+-------+------+------+----------+
|movieID|userID|rating|prediction|
+-------+------+------+----------+
|  10082|199435|   2.0|       2.8|
|   4852|199435|   2.0|       3.4|
|   8596|199435|   5.0|       4.5|
|  14144|199435|   3.0|       3.9|
|    443|199435|   3.0|       3.4|
|  12778|199435|   2.0|       3.6|
|   7145|199435|   3.0|       4.2|
|  14712|199435|   3.0|       3.6|
+-------+------+------+----------+



 the predicted and actual are close values

#### tried for few more userids

In [82]:
# lets pick an user 199435
#movies_user=test_data.filter("userID  == 573364")
#movies_user=test_data.filter("userID  == 2149668")
movies_user=test_data.filter("userID  == 1089184")

#print("movies of userid 199435 in test set")
#movies_user.show(5)
#lets collect them in a list
list1 = movies_user.select('movieID')
array = [int(row.movieID) for row in list1.collect()]

count=train_data[train_data['movieID'].isin(array)].count()
#print("The number of users in the train set, who has watched the same movie as 199435 watched are:", count)
users=train_data[train_data['movieID'].isin(array)]
#users.show(5)

from pyspark.sql import functions as F

#lets take only those users who has watched more than 200 movies as the main user movies are 260
x=users.groupBy("userID").agg(F.count('movieID').alias("count1"))
y=x.orderBy(x.count1.desc()).limit(10)
#storing in an array
list2 = y.select('userID')
array2 = [int(row.userID) for row in list2.collect()]



from pyspark.sql import functions as F
users2=users[users['userID'].isin(array2)]

users3= users2.groupBy("movieID").agg(F.mean('rating'). alias('prediction'))
#users3.show()

final3 = movies_user.join(users3, on=['movieID' ], how='inner')

print("Average over all the items of that users similar to the userid 573364 ")
final1.show()
print("Average over all the items of that users similar to the userid 2149668 ")
final2.show()
print("Average over all the items of that users similar to the userid 1089184 ")
final3.show()



Average over all the items of that users similar to the userid 573364 
+-------+------+------+----------+
|movieID|userID|rating|prediction|
+-------+------+------+----------+
|      8|573364|   1.0|       2.9|
|   2913|573364|   4.0|       3.9|
|    398|573364|   3.0|       3.4|
|   2640|573364|   3.0|       4.1|
+-------+------+------+----------+

Average over all the items of that users similar to the userid 2149668 
+-------+-------+------+----------+
|movieID| userID|rating|prediction|
+-------+-------+------+----------+
|      8|2149668|   3.0|       2.7|
|   1046|2149668|   3.0|       3.2|
|   6190|2149668|   3.0|       3.3|
|  12778|2149668|   3.0|       4.2|
|   8699|2149668|   4.0|       3.3|
|   8039|2149668|   4.0|       3.3|
+-------+-------+------+----------+

Average over all the items of that users similar to the userid 1089184 
+-------+-------+------+----------+
|movieID| userID|rating|prediction|
+-------+-------+------+----------+
|      8|1089184|   3.0|       3.0|

### b 2) item similarities are measured by overlappingusers
### average overlap of users that rated items in the training set for items appearing in the test set
### lets consider movieid 8

In [90]:
# lets pick a movie
users_movie=test_data.filter("movieID  == 28")
print("users of movieid 28 in train set")
users_movie.show(5)
#lets collect them in a list
list3 = users_movie.select('userID')
array3 = [int(row.userID) for row in list3.collect()]


users of movieid 28 in train set
+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|     28| 991725|   3.0|
|     28|2628220|   4.0|
|     28| 946314|   4.0|
|     28|2370740|   4.0|
|     28|1100912|   4.0|
+-------+-------+------+
only showing top 5 rows



In [85]:
print("No of users who watched this movie in the train set:")
print(users_movie.count())

No of users who watched this movie in the train set:
96


In [91]:
#now lets get all the movies in the train set, that are watched by the same users who watched movie 28
count=train_data[train_data['userID'].isin(array3)].count()
print("The number of movies in the train set, that are been watched by the same users who watched movie 28:", count)
movies=train_data[train_data['userID'].isin(array3)]
movies.show(5)

The number of movies in the train set, that are been watched by the same users who watched movie 28: 44773
+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|      8| 991725|   4.0|
|      8| 603277|   1.0|
|      8|1645535|   3.0|
|      8|2487958|   5.0|
|      8| 303821|   3.0|
+-------+-------+------+
only showing top 5 rows



In [92]:
from pyspark.sql import functions as F

x=movies.groupBy("movieID").agg(F.count('userID').alias("count1"))
y=x.orderBy(x.count1.desc()).limit(4)
y.show()


+-------+------+
|movieID|count1|
+-------+------+
|   6287|   330|
|   6971|   326|
|  10947|   314|
|  15582|   313|
+-------+------+



In [93]:
#storing in an array
list2 = y.select('movieID')
array2 = [int(row.movieID) for row in list2.collect()]

from pyspark.sql import functions as F
movies2=movies[movies['movieID'].isin(array2)]
print("Average over all the users of whose movies similar to the movie 28 ")
movies3=movies2.groupBy("userID").agg(F.mean('rating').alias('predicted'))
movies3=movies3.orderBy(movies3.predicted.desc())
movies3.show(30)

Average over all the users of whose movies similar to the movie 28 
+-------+---------+
| userID|predicted|
+-------+---------+
|1518104|      5.0|
| 830282|      5.0|
| 393730|      5.0|
|1563429|      5.0|
|1290593|      5.0|
|1249490|      5.0|
| 591184|      5.0|
|2229986|      5.0|
|2101762|      5.0|
| 336578|      5.0|
|1704223|      5.0|
|  75384|      5.0|
|2252217|      5.0|
| 875719|      5.0|
| 282447|      5.0|
| 452001|      5.0|
|2179596|      5.0|
| 459468|      5.0|
|1745577|      5.0|
|1646639|      5.0|
| 633303|      5.0|
|2057611|      5.0|
| 714802|      5.0|
|2554750|      5.0|
|2375058|      5.0|
|  62655|      5.0|
|1787038|      5.0|
| 498716|     4.75|
|1242044|     4.75|
|2015182|     4.75|
+-------+---------+
only showing top 30 rows



#### prediction

In [94]:
final1 = users_movie.join(movies3, on=['userID' ], how='inner')
final1.show(25)

+-------+-------+------+---------+
| userID|movieID|rating|predicted|
+-------+-------+------+---------+
| 459468|     28|   5.0|      5.0|
|  62655|     28|   3.0|      5.0|
|  75384|     28|   4.0|      5.0|
| 452001|     28|   5.0|      5.0|
| 282447|     28|   4.0|      5.0|
|1646639|     28|   4.0|      5.0|
| 714802|     28|   5.0|      5.0|
| 591184|     28|   4.0|      5.0|
|2554750|     28|   5.0|      5.0|
|1290593|     28|   4.0|      5.0|
| 633303|     28|   5.0|      5.0|
|1249490|     28|   5.0|      5.0|
|1563429|     28|   3.0|      5.0|
|1704223|     28|   2.0|      5.0|
|1787038|     28|   3.0|      5.0|
| 875719|     28|   5.0|      5.0|
|2375058|     28|   4.0|      5.0|
|2252217|     28|   4.0|      5.0|
|2179596|     28|   5.0|      5.0|
|1745577|     28|   5.0|      5.0|
| 830282|     28|   5.0|      5.0|
| 393730|     28|   3.0|      5.0|
|2229986|     28|   3.0|      5.0|
|1518104|     28|   4.0|      5.0|
|2057611|     28|   5.0|      5.0|
+-------+-------+---

 the predicted and actual are close values

In [None]:
#tries for few more examples

In [100]:
# lets pick an user 199435
#users_movie=test_data.filter("movieID  == 2913")
#users_movie=test_data.filter("movieID  == 398")
users_movie=test_data.filter("movieID  == 2640")

#print("users of movieid 28 in train set")
#users_movie.show(5)
#lets collect them in a list
list3 = users_movie.select('userID')
array3 = [int(row.userID) for row in list3.collect()]


#now lets get all the movies in the train set, that are watched by the same users who watched movie 28
count=train_data[train_data['userID'].isin(array3)].count()
#print("The number of movies in the train set, that are been watched by the same users who watched movie 28:", count)
movies=train_data[train_data['userID'].isin(array3)]
#movies.show(5)

from pyspark.sql import functions as F

#lets take only those users who has watched more than 200 movies as the main user movies are 260
x=movies.groupBy("movieID").agg(F.count('userID').alias("count1"))
y=x.orderBy(x.count1.desc()).limit(4)
#y.show()

#storing in an array
list2 = y.select('movieID')
array2 = [int(row.movieID) for row in list2.collect()]

from pyspark.sql import functions as F
movies2=movies[movies['movieID'].isin(array2)]
movies3=movies2.groupBy("userID").agg(F.mean('rating').alias('predicted'))
movies3=movies3.orderBy(movies3.predicted.desc())
#movies3.show(30)

final2 = users_movie.join(movies3, on=['userID' ], how='inner')

print("Average over all the users of whose movies similar to the movie 2913 ")
final1.show(10)
print("Average over all the users of whose movies similar to the movie 398 ")
final2.show(10)
print("Average over all the users of whose movies similar to the movie 2640 ")
final3.show(10)

Average over all the users of whose movies similar to the movie 2913 
+-------+-------+------+---------+
| userID|movieID|rating|predicted|
+-------+-------+------+---------+
|1063188|   2913|   5.0|      5.0|
|1832810|   2913|   5.0|      5.0|
| 430738|   2913|   5.0|      5.0|
|1419126|   2913|   4.0|      5.0|
|1929128|   2913|   4.0|      5.0|
|1185461|   2913|   3.0|      5.0|
|2518772|   2913|   5.0|      5.0|
|2309341|   2913|   4.0|      5.0|
| 132731|   2913|   4.0|      5.0|
| 155620|   2913|   5.0|      5.0|
+-------+-------+------+---------+
only showing top 10 rows

Average over all the users of whose movies similar to the movie 398 
+-------+-------+------+---------+
| userID|movieID|rating|predicted|
+-------+-------+------+---------+
| 858298|   2640|   4.0|      5.0|
| 923103|   2640|   3.0|      5.0|
|2197203|   2640|   4.0|      5.0|
| 941011|   2640|   3.0|      5.0|
|  50259|   2640|   4.0|      5.0|
|1637299|   2640|   5.0|      5.0|
| 552356|   2640|   4.0|      

# c) Best approach to implement the collaborative filtering 

There are several methods of finding similar users such as JACCARD, COSINE, NORMALISATION, and the one we will be using here is going to be based on the Correlation Function. 

It is used to measure the strength of a linear association between two variables. 
It is used to measure the strength of a linear association between two variables. The formula for finding this coefficient between sets X and Y with N values can be seen in the image below. 

Why are we using Pearson Correlation?
Pearson correlation is invariant to scaling, i.e. multiplying all elements by a nonzero constant or adding any constant to all elements. For example, if you have two vectors X and Y,then, pearson(X, Y) == pearson(X, 2 * Y + 3). This is a pretty important property in recommendation systems because for example two users might rate two series of items totally different in terms of absolute rates, but they would be similar users (i.e. with similar ideas) with similar rates in various scales .
The values given by the formula vary from r = -1 to r = 1, where 1 forms a direct correlation between the two entities (it means a perfect positive correlation) and -1 forms a perfect negative correlation. 
In our case, a 1 means that the two users have similar tastes while a -1 means the opposite.



Jaccard distance is not a suitable?

Jaccard distance is not a suitable measure for the kind of data we are considering, because We could ignore values in the matrix and focus only on the sets of items rated meaning it takes the union and intersections of user A and B. If our problem statement was only about watching movies and not about ratings, then Jaccard distance would be a good choice to use. But, in this project, we are dealing with detailed ratings (1 being lowest and 5 being highest), the Jaccard distance loses important information. 


# d) Applying Correlation Coefficient and rerunning the code

The process for creating a User Based recommendation system is as follows:
1.	First select a user with the movies the user has watched

2.	Based on his rating to movies, find the top X neighbours/ similar users

3.	Get the watched movie record of the user for each neighbour.

4.	Calculate a similarity score using the formula

5.	Recommend the items with the highest score



## User-based Collaborative Filtering 

In [33]:
#train_data
#test_data

 #### lets pick an user

In [13]:
# lets pick an user 1395430  from test dataset
movies_user=test_data.filter("userID  == 199435")
print("movies of userid 199435  in train set")

#there are many movies, let take only top ones that has rating 5. 
x=movies_user.orderBy(movies_user.rating.desc())
userInput=x.limit(10)
userInput.show()
#collect the movieids in an array
l = userInput.select('movieID')
a = [int(row.movieID) for row in l.collect()]

movies of userid 199435  in train set
+-------+------+------+
|movieID|userID|rating|
+-------+------+------+
|   8596|199435|   5.0|
|    443|199435|   3.0|
|   7145|199435|   3.0|
|  14144|199435|   3.0|
|  14712|199435|   3.0|
|   4852|199435|   2.0|
|  10082|199435|   2.0|
|  12778|199435|   2.0|
+-------+------+------+



 
 
 
 ####  The users who has seen the same movies from train dataset
 
 
 

In [14]:
similaruser = train_data[train_data['movieID'].isin(a)]
similaruser.show(5)
similaruser.count()

+-------+-------+------+
|movieID| userID|rating|
+-------+-------+------+
|    443| 364518|   3.0|
|    443| 716091|   4.0|
|    443|1601783|   4.0|
|    443| 306466|   3.0|
|    443| 160977|   5.0|
+-------+-------+------+
only showing top 5 rows



92691

In [15]:
#lets take only 100
#similarusers=similaruser.limit(100)

We dont want to run for whole data so lets take only 100

In [16]:
similarusers=similaruser.select('userID').distinct()
similar=similarusers.limit(100)

In [17]:
#collecting in a list
l1 = similar.select('userID')
similaruser_ids = [int(row.userID) for row in l1.collect()]

#### Calculating the Pearson Correlation between input user and subset group, and store it in a dictionary

In [18]:
pearsonCorrelationDict={}
import  numpy as np
for i in similaruser_ids:
    #get each userid from the similarusers file and sort it by movieid
    group =  similaruser[similaruser['userID'].isin(i)]
    group=group.orderBy(group.movieID.desc())
    #group.show()
    #store them in an array 
    l2 = group.select('movieID')
    a2 = [int(row.movieID) for row in l2.collect()]
    #get each userInput  file and sort it by movieid
    userInput1=userInput.orderBy(userInput.movieID.desc())
    #userInput1.show()
    #get the number of rows of the groupfile
    nRatings = group.count()
    #Get the rating scores for the movies that they both have in common
    temp_df = userInput1[userInput1['movieID'].isin(a2)] 
    l3 = temp_df.select('rating')
    #store in a list
    rating_tempdf = [int(row.rating) for row in l3.collect()]
    #print("rating_tempdf",rating_tempdf)
    #Get the rating scores for the group data
    l4 = group.select('rating')
    #store in a list
    rating_group = [int(row.rating) for row in l4.collect()]
    #print("rating_group",rating_group)
    
    #Now let's calculate the pearson correlation between two users, so called, x and y
    Sxx = sum([i**2 for i in rating_tempdf]) - pow(sum(rating_tempdf),2)/float(nRatings)
    Syy = sum([i**2 for i in rating_group]) - pow(sum(rating_group),2)/float(nRatings)
    Sxy = sum( i*j for i, j in zip(rating_tempdf, rating_group)) - sum(rating_tempdf)*sum(rating_group)/float(nRatings)
    
    if Sxx != 0 and Syy != 0:
        pearsonCorrelationDict[i] = Sxy/(np.sqrt(Sxx*Syy))
    else:
        pearsonCorrelationDict[i] = 0 

In [19]:
pearsonCorrelationDict.items()

dict_items([(128389, -0.22075539284417398), (2228253, 0), (2311863, 0.0), (2629660, 0.6123724356957946), (1742759, 0.4482758620689666), (279120, 0), (1553158, 0), (2088272, 1.0), (1896167, -1.0), (2358799, 0.7559289460184533), (15846, 0.49999999999999667), (953170, 0.41403933560541256), (1552084, 0.2886751345948129), (455334, 0.9449111825230695), (1629521, 0.47368421052631576), (2531111, -0.5000000000000008), (1497891, 0.866025403784439), (637596, 0.866025403784439), (973051, 0), (2250628, 0), (1628484, 0.0), (446160, 0.2075143391598224), (675056, -0.9449111825230686), (1909175, 1.0), (1434507, 0), (1704384, 0.49999999999999933), (1214262, 1.0), (2613898, -0.866025403784439), (216558, 1.000000000000004), (2305305, 0.7559289460184573), (761430, 0), (836945, 0.866025403784439), (1213587, 0), (1610263, 0.7385489458759964), (434567, 0.4999999999999982), (377808, 0.9449111825230686), (1849621, 0.981980506061966), (2339191, 0.9449111825230686), (2482819, 1.0), (1919244, 1.0), (1607539, 0.188

#### Converting dictionary to dataframe and shorting in descending order


In [20]:
DF = pandas.DataFrame.from_dict(pearsonCorrelationDict, orient='index')
DF.columns = ['similarityIndex']
DF['userId'] = DF.index
DF.index = range(len(DF))
ddf = spark.createDataFrame(DF)

In [21]:
ddf.show()

+--------------------+-------+
|     similarityIndex| userId|
+--------------------+-------+
|-0.22075539284417398| 128389|
|                 0.0|2228253|
|                 0.0|2311863|
|  0.6123724356957946|2629660|
|  0.4482758620689666|1742759|
|                 0.0| 279120|
|                 0.0|1553158|
|                 1.0|2088272|
|                -1.0|1896167|
|  0.7559289460184533|2358799|
| 0.49999999999999667|  15846|
| 0.41403933560541256| 953170|
|  0.2886751345948129|1552084|
|  0.9449111825230695| 455334|
| 0.47368421052631576|1629521|
| -0.5000000000000008|2531111|
|   0.866025403784439|1497891|
|   0.866025403784439| 637596|
|                 0.0| 973051|
|                 0.0|2250628|
+--------------------+-------+
only showing top 20 rows



#### The top x similar users to input user

In [22]:
topUsers=ddf.orderBy(ddf.similarityIndex.desc())

In [23]:
topUsers.show()

+------------------+-------+
|   similarityIndex| userId|
+------------------+-------+
| 1.000000000000004| 168902|
| 1.000000000000004| 216558|
|               1.0|2482819|
|               1.0|1919244|
|               1.0| 561364|
|               1.0|2520933|
|               1.0|2120968|
|               1.0|1909175|
|               1.0|2088272|
|               1.0|1214262|
| 0.981980506061966|1849621|
|0.9449111825230695| 455334|
|0.9449111825230686|2339191|
|0.9449111825230686| 377808|
|0.9449111825230686| 839232|
|0.9271726499455306|2134425|
| 0.899228803025897|1684416|
| 0.866025403784439| 836945|
| 0.866025403784439| 637596|
| 0.866025403784439|1497891|
+------------------+-------+
only showing top 20 rows



## Now, recommending movies to the input user.

#### Taking the weighted average of the ratings of the movies using the Pearson Correlation as the weight.
But to do this, we first need to get the movies watched by the users in our __DF__ from the ratings dataframe and then store their correlation in a new column called _similarityIndex". This is achieved below by merging of these two tables.

In [24]:
new_df = topUsers.join(similaruser, on=['userId' ], how='inner')

In [25]:
new_df.show()

+-------+-------------------+-------+------+
| userId|    similarityIndex|movieID|rating|
+-------+-------------------+-------+------+
|1742759| 0.4482758620689666|    443|   2.0|
| 460258|                0.0|    443|   4.0|
|1221563| 0.6882472016116852|    443|   4.0|
|2153439|                0.0|    443|   5.0|
| 861862|0.13245323570650439|    443|   4.0|
| 839232| 0.9449111825230686|    443|   4.0|
| 973051|                0.0|    443|   5.0|
|1434507|                0.0|    443|   5.0|
|2271702|0.41522739926870117|    443|   5.0|
| 637596|  0.866025403784439|    443|   3.0|
| 110938| 0.4969039949999533|    443|   4.0|
|1005202| 0.6488856845230502|    443|   4.0|
|1559165| 0.6123724356957964|    443|   4.0|
|1896167|               -1.0|    443|   4.0|
| 328283|                0.0|    443|   5.0|
|1684416|  0.899228803025897|    443|   2.0|
| 381625|                0.0|    443|   4.0|
|2520933|                1.0|    443|   3.0|
|1069088| 0.3333333333333333|    443|   3.0|
| 761430| 

Lets calculate the similarity index by multiply the movie rating by its weight, 
then sum up the new ratings and divide it by the sum of the weights.

It shows the idea of all similar users to candidate movies for the input user:

In [26]:
from pyspark.sql.functions import col

In [27]:
#multiply
new_df1=new_df.withColumn('weightedRating', (col('similarityIndex') * col('rating')))
new_df1.show()

+-------+-------------------+-------+------+------------------+
| userId|    similarityIndex|movieID|rating|    weightedRating|
+-------+-------------------+-------+------+------------------+
|1742759| 0.4482758620689666|    443|   2.0|0.8965517241379332|
| 460258|                0.0|    443|   4.0|               0.0|
|1221563| 0.6882472016116852|    443|   4.0|2.7529888064467407|
|2153439|                0.0|    443|   5.0|               0.0|
| 861862|0.13245323570650439|    443|   4.0|0.5298129428260175|
| 839232| 0.9449111825230686|    443|   4.0|3.7796447300922744|
| 973051|                0.0|    443|   5.0|               0.0|
|1434507|                0.0|    443|   5.0|               0.0|
|2271702|0.41522739926870117|    443|   5.0| 2.076136996343506|
| 637596|  0.866025403784439|    443|   3.0| 2.598076211353317|
| 110938| 0.4969039949999533|    443|   4.0|1.9876159799998132|
|1005202| 0.6488856845230502|    443|   4.0|2.5955427380922007|
|1559165| 0.6123724356957964|    443|   

In [28]:
from pyspark.sql import functions as F
temp=new_df1.groupBy('movieID').agg(F.sum(new_df1.similarityIndex).alias("SI_sum"),F.sum(new_df1.weightedRating).alias("WR_sum"))

In [29]:
temp.show()

+-------+------------------+------------------+
|movieID|            SI_sum|            WR_sum|
+-------+------------------+------------------+
|  10082| 6.762463820697189|18.512183134051664|
|   4852| 6.202397253085562| 17.49336617354312|
|   8596|28.005071129964108|139.25153230950056|
|  14144| 5.424940182070157| 19.35571539109325|
|    443| 7.545557310351263| 26.00181992951127|
|  12778|15.420285028238624| 49.15830219942137|
|   7145| 8.981991714248839| 29.32700876370245|
|  14712|20.449127741359334|  59.8559381486637|
+-------+------------------+------------------+



#### weighted average

In [30]:
recommendation=temp.withColumn('weighted_average', (col('WR_sum') / col('SI_sum')))
recommendation=recommendation.select("movieID","weighted_average")
recommendation=recommendation.orderBy(recommendation.weighted_average.desc()).limit(10)


#### These are the movies that  are the predicted rating of 1395430

#### Predicted

In [31]:
recommendation.show()

+-------+------------------+
|movieID|  weighted_average|
+-------+------------------+
|   8596| 4.972368456529574|
|  14144|3.5679131458564965|
|    443| 3.445977395710858|
|   7145| 3.265089714698658|
|  12778|3.1878984149384726|
|  14712| 2.927065589580245|
|   4852|2.8204201472004296|
|  10082| 2.737490894575036|
+-------+------------------+



In [32]:
m = recommendation.select('movieID')
n = [int(row.movieID) for row in m.collect()]

#### Actual

In [33]:
actual=movies_user[movies_user['movieID'].isin(n)]

In [34]:
actual.show()

+-------+------+------+
|movieID|userID|rating|
+-------+------+------+
|    443|199435|   3.0|
|   4852|199435|   2.0|
|   7145|199435|   3.0|
|   8596|199435|   5.0|
|  10082|199435|   2.0|
|  12778|199435|   2.0|
|  14144|199435|   3.0|
|  14712|199435|   3.0|
+-------+------+------+



In [36]:
final1 = userInput.join(recommendation, on=['movieID' ], how='inner')

In [39]:
final1.show()

+-------+------+------+------------------+
|movieID|userID|rating|  weighted_average|
+-------+------+------+------------------+
|   8596|199435|   5.0| 4.972368456529574|
|    443|199435|   3.0|3.4459773957108584|
|   7145|199435|   3.0| 3.265089714698658|
|  14144|199435|   3.0| 3.567913145856496|
|  14712|199435|   3.0| 2.927065589580245|
|   4852|199435|   2.0|2.8204201472004296|
|  10082|199435|   2.0|2.7374908945750356|
|  12778|199435|   2.0|3.1878984149384726|
+-------+------+------+------------------+



In [None]:
#if we want to see the name of the movie, join it with movie_titles dataset

In [52]:
final2= final1.join(movie_name, on=['movieID'], how='inner')