In [2]:
sc

# Importing the Libraries

In [114]:
import pyspark

from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.sql import Window
from pyspark.sql import DataFrameStatFunctions as statFunc


import pandas as pd
import numpy as np
import ipywidgets as widgets
from IPython.display import display, clear_output
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

## Creating a spark session

In [115]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('COLLABORATIVE FILTERING USING THE NETFLIX DATA').getOrCreate()

## Creating the three dataframes for training, testing and movie data sets

In [116]:
from pyspark.sql.types import *

movies_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('yearOfRelease', IntegerType()),
   StructField('title', StringType())]
)
Training_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('ratings',DoubleType())]
)

Testing_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('ratings',DoubleType())]
)

In [117]:
#Creating the training,testing and movie dataframes

df_training = sqlContext.read.format('txt').options(inferSchema=True).schema(Training_df_schema).csv('s3://chaitratadagadsci/Netflix/TrainingRatings.txt')
 
df_testing = sqlContext.read.format('txt').options(inferSchema=True).schema(Testing_df_schema).csv('s3://chaitratadagadsci/Netflix/TestingRatings.txt')
 
df_movies = sqlContext.read.format('txt').options(inferSchema=True).schema(movies_df_schema).csv('s3://chaitratadagadsci/Netflix/movie_titles.txt')

In [118]:
#Count of each dataframes    
training_count = df_training.count()
testing_count = df_testing.count()
movies_count = df_movies.count()

In [119]:
#Printing the data and count
print('There are %s samples in training set , %s samples in testing set and %s samples in movies in the datasets' % (training_count,testing_count, movies_count))
print('Training:')
df_training.show(5)
print('Testing:')
df_testing.show(5)
print('Movies:')
df_movies.show(5,truncate=False)

There are 3255352 samples in training set , 100478 samples in testing set and 17770 samples in movies in the datasets
Training:
+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8|1744889|    1.0|
|      8|1395430|    2.0|
|      8|1205593|    4.0|
|      8|1488844|    4.0|
|      8|1447354|    1.0|
+-------+-------+-------+
only showing top 5 rows

Testing:
+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8| 573364|    1.0|
|      8|2149668|    3.0|
|      8|1089184|    3.0|
|      8|2465894|    3.0|
|      8| 534508|    1.0|
+-------+-------+-------+
only showing top 5 rows

Movies:
+-------+-------------+----------------------------+
|movieId|yearOfRelease|title                       |
+-------+-------------+----------------------------+
|1      |2003         |Dinosaur Planet             |
|2      |2004         |Isle of Man TT 2004 Review  |
|3      |1997         |Character                   |
|4      |1994      

In [120]:
df_training.show()

+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8|1744889|    1.0|
|      8|1395430|    2.0|
|      8|1205593|    4.0|
|      8|1488844|    4.0|
|      8|1447354|    1.0|
|      8| 306466|    4.0|
|      8|1331154|    4.0|
|      8|1818178|    3.0|
|      8| 991725|    4.0|
|      8|1987434|    4.0|
|      8|1765381|    4.0|
|      8| 433803|    3.0|
|      8|1148143|    2.0|
|      8|1174811|    5.0|
|      8|1684516|    3.0|
|      8| 754781|    4.0|
|      8| 567025|    4.0|
|      8|1623132|    4.0|
|      8|1567095|    3.0|
|      8|1666394|    5.0|
+-------+-------+-------+
only showing top 20 rows



In [121]:
df_testing.show()

+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8| 573364|    1.0|
|      8|2149668|    3.0|
|      8|1089184|    3.0|
|      8|2465894|    3.0|
|      8| 534508|    1.0|
|      8| 992921|    4.0|
|      8| 595054|    4.0|
|      8|1298304|    4.0|
|      8|1661600|    4.0|
|      8| 553787|    2.0|
|      8|1309839|    3.0|
|      8| 727242|    1.0|
|      8|1437668|    4.0|
|      8|2170930|    1.0|
|      8|1780876|    5.0|
|      8|   9660|    3.0|
|      8|2379200|    4.0|
|      8| 563186|    5.0|
|      8|1539617|    4.0|
|      8|1656839|    1.0|
+-------+-------+-------+
only showing top 20 rows



In [122]:
df_movies.show()

+-------+-------------+--------------------+
|movieId|yearOfRelease|               title|
+-------+-------------+--------------------+
|      1|         2003|     Dinosaur Planet|
|      2|         2004|Isle of Man TT 20...|
|      3|         1997|           Character|
|      4|         1994|Paula Abdul's Get...|
|      5|         2004|The Rise and Fall...|
|      6|         1997|                Sick|
|      7|         1992|               8 Man|
|      8|         2004|What the #$*! Do ...|
|      9|         1991|Class of Nuke 'Em...|
|     10|         2001|             Fighter|
|     11|         1999|Full Frame: Docum...|
|     12|         1947|My Favorite Brunette|
|     13|         2003|Lord of the Rings...|
|     14|         1982|  Nature: Antarctica|
|     15|         1988|Neil Diamond: Gre...|
|     16|         1996|           Screamers|
|     17|         2005|           7 Seconds|
|     18|         1994|    Immortal Beloved|
|     19|         2000|By Dawn's Early L...|
|     20| 

In [41]:
from pyspark.sql import functions as F


movie_ids_with_avg_ratings_df = df_testing.groupBy('movieId').agg(F.count(df_testing.ratings).alias("count"), F.avg(df_testing.ratings).alias("average"))
print ('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(3, truncate=False)

movie_names_df = movie_ids_with_avg_ratings_df.join(df_movies, movie_ids_with_avg_ratings_df.movieId == df_movies.ID)
movie_names_with_avg_ratings_df = (movie_names_df.select(movie_names_df["average"], movie_names_df["title"], movie_names_df["count"], movie_names_df["movieId"])).orderBy(["average"], ascending = 0)


print ('movie_names_with_avg_ratings_df:')
movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+-----------------+
|movieId|count|average          |
+-------+-----+-----------------+
|2366   |47   |2.851063829787234|
|11317  |35   |3.085714285714286|
|13289  |1    |4.0              |
+-------+-----+-----------------+
only showing top 3 rows

movie_names_with_avg_ratings_df:
+-------+-------------------------+-----+-------+
|average|title                    |count|movieId|
+-------+-------------------------+-----+-------+
|5.0    |Dragnet                  |3    |13965  |
|5.0    |Vanished                 |1    |12705  |
|5.0    |Working with Orson Welles|1    |9930   |
+-------+-------------------------+-----+-------+
only showing top 3 rows



In [42]:
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter("count >= 500")
print ('Movies with highest ratings:')
movies_with_500_ratings_or_more.show(20, truncate=False)

Movies with highest ratings:
+------------------+--------------------------------+-----+-------+
|average           |title                           |count|movieId|
+------------------+--------------------------------+-----+-------+
|4.4535073409461665|The Godfather                   |613  |12293  |
|4.354838709677419 |The Incredibles                 |620  |10947  |
|4.336330935251799 |The Godfather                   |556  |3290   |
|4.123304562268804 |Ferris Bueller's Day Off        |811  |6971   |
|4.091172214182344 |Seven                           |691  |8596   |
|4.00990099009901  |Finding Neverland               |505  |2913   |
|4.0071942446043165|Terminator 2: Extreme Edition   |695  |8915   |
|4.006613756613756 |Rain Man                        |756  |4640   |
|3.9323671497584543|When Harry Met Sally            |621  |2660   |
|3.9323181049069373|Office Space                    |591  |13614  |
|3.876470588235294 |Good Morning                    |680  |6408   |
|3.868627450980392 

In [43]:
seed = 1800009193
(split_60_df, split_a_20_df, split_b_20_df) = df_training.randomSplit([6.0, 2.0, 2.0], seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

Training: 1953267, validation: 651277, test: 650808

+-------+------+-------+
|movieId|userId|ratings|
+-------+------+-------+
|      8|  1333|    3.0|
|      8|  4706|    5.0|
|      8|  5652|    3.0|
+-------+------+-------+
only showing top 3 rows

+-------+------+-------+
|movieId|userId|ratings|
+-------+------+-------+
|      8|  3363|    2.0|
|      8|  6460|    5.0|
|      8|  9321|    3.0|
+-------+------+-------+
only showing top 3 rows

+-------+------+-------+
|movieId|userId|ratings|
+-------+------+-------+
|      8|     7|    5.0|
|      8|  3321|    1.0|
|      8|  3604|    3.0|
+-------+------+-------+
only showing top 3 rows



## HOW MANY DISTINCT USERS AND DISTINCT ITEMS ARE THERE IN THE TEST SET ?

In [123]:
#finding out the distinct users and items/movies in the testing set

print("The distinct items in testing set are :",df_testing.select('movieID').distinct().count())
print("The distinct users in testing set are :",df_testing.select('userID').distinct().count())


The distinct items in testing set are : 1701
The distinct users in testing set are : 27555


In [124]:
#finding out the distinct users and items(movies) in the training set 

print("The distinct items in training set are :",df_training.select('movieID').distinct().count())
print("The distinct users in training set are :",df_training.select('userID').distinct().count())

The distinct items in training set are : 1821
The distinct users in training set are : 28978


# Finding Overall Rates for Users and Movies in both the sets 

## Some Initial analysis on training and testing sets 

In [125]:
#lets convert to a Pandas Dataframe 

pd_training = df_training.toPandas()

pd_testing = df_testing.toPandas()


In [126]:
#descrivbing the training dataset 

pd_training.describe()

Unnamed: 0,movieId,userId,ratings
count,3255352.0,3255352.0,3255352.0
mean,8724.66,1327058.0,3.481188
std,5107.402,762688.7,1.082873
min,8.0,7.0,1.0
25%,3893.0,671697.0,3.0
50%,8825.0,1322467.0,4.0
75%,13326.0,1988873.0,4.0
max,17742.0,2649285.0,5.0


In [127]:
pd_testing.describe()

Unnamed: 0,movieId,userId,ratings
count,100478.0,100478.0,100478.0
mean,8701.547792,1329956.0,3.479458
std,5098.075495,762504.1,1.08528
min,8.0,7.0,1.0
25%,3893.0,677430.0,3.0
50%,8699.0,1325031.0,4.0
75%,13298.0,1995052.0,4.0
max,17742.0,2649285.0,5.0


In [128]:
pd_training.tail()

Unnamed: 0,movieId,userId,ratings
3255347,17742,46222,3.0
3255348,17742,2534701,1.0
3255349,17742,208724,3.0
3255350,17742,483107,2.0
3255351,17742,1181331,2.0


In [129]:
pd_testing.tail()

Unnamed: 0,movieId,userId,ratings
100473,17742,1898310,2.0
100474,17742,716096,4.0
100475,17742,38115,3.0
100476,17742,2646347,5.0
100477,17742,273576,2.0


## Testing for null values in both the sets

In [130]:
pd_training.isnull().sum()

movieId    0
userId     0
ratings    0
dtype: int64

In [131]:
pd_testing.isnull().sum()

movieId    0
userId     0
ratings    0
dtype: int64

In [132]:
print("The counts of both the dataframes are ","Training dataset ==> ",len(pd_training),"Testing dataset ==>",len(pd_testing))

The counts of both the dataframes are  Training dataset ==>  3255352 Testing dataset ==> 100478


# Average Movie Ratings

In [133]:
#checking overall average rate of the movies with their counts  in the training set

averageMovieRatingsTraining = pd.DataFrame(pd_training.groupby('movieId')['ratings'].mean())

averageMovieRatingsTraining['counts'] =pd.DataFrame(pd_training.groupby('movieId')['ratings'].count())

averageMovieRatingsTraining.head()

Unnamed: 0_level_0,ratings,counts
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
8,3.055104,2831
28,3.760127,12244
43,2.310345,58
48,3.620648,1666
61,2.385965,57


In [134]:
#checking overall average rate of the movies with their counts  in the testing set

averageMovieRatingsTesting = pd.DataFrame(pd_testing.groupby('movieId')['ratings'].mean())

averageMovieRatingsTesting['counts'] =pd.DataFrame(pd_testing.groupby('movieId')['ratings'].count())

averageMovieRatingsTesting.head()

Unnamed: 0_level_0,ratings,counts
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
8,3.072917,96
28,3.698667,375
43,4.0,1
48,3.622642,53
61,4.0,3


In [135]:
averageMovieRatingsTesting.sort_values(by=['counts'],ascending=False).head(10)

Unnamed: 0_level_0,ratings,counts
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
6971,4.123305,811
4640,4.006614,756
6287,3.716418,737
9728,3.807365,706
8915,4.007194,695
4432,3.674855,692
8596,4.091172,691
6408,3.876471,680
1406,3.233129,652
1744,3.752308,650


# Average User Ratings

In [136]:
#checking overall average rate of the user with their counts  in the training set

averageUserRatingsTraining = pd.DataFrame(pd_training.groupby('userId')['ratings'].mean())

averageUserRatingsTraining['counts'] =pd.DataFrame(pd_training.groupby('userId')['ratings'].count())

averageUserRatingsTraining.head()

Unnamed: 0_level_0,ratings,counts
userId,Unnamed: 1_level_1,Unnamed: 2_level_1
7,3.903846,104
79,3.630952,84
199,3.943662,71
481,4.351351,74
769,3.193878,98


In [137]:
#checking overall average rate of the user with their counts  in the testing set

averageUserRatingsTesting = pd.DataFrame(pd_testing.groupby('userId')['ratings'].mean())

averageUserRatingsTesting['counts'] =pd.DataFrame(pd_testing.groupby('userId')['ratings'].count())

averageUserRatingsTesting.tail()

Unnamed: 0_level_0,ratings,counts
userId,Unnamed: 1_level_1,Unnamed: 2_level_1
2648869,2.777778,9
2648885,4.0,5
2649120,5.0,2
2649267,4.142857,7
2649285,2.333333,3


In [138]:
averageUserRatingsTesting.sort_values(by=['counts'],ascending=False).head(10)

Unnamed: 0_level_0,ratings,counts
userId,Unnamed: 1_level_1,Unnamed: 2_level_1
1664010,4.3,70
2439493,1.307692,52
305344,1.711538,52
387418,1.941176,51
1314869,3.263158,38
2118461,4.117647,34
1932594,2.321429,28
491531,2.62963,27
2606799,2.888889,27
727242,1.36,25


## ESTIMATED AVERAGE OVERLAP OF ITEMS FOR USERS

In [139]:
#Pivoting the table with userID as index and the movieId as columns for our training data

userItemRating = pd_training.pivot_table(index='userId', columns='movieId', values='ratings')

userItemRating.head()

movieId,8,28,43,48,61,64,66,92,96,111,...,17654,17660,17689,17693,17706,17725,17728,17734,17741,17742
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
7,5.0,4.0,,,,,,,,,...,,,,,,,,,,
79,,,,,,,,,,,...,,,,,,,,,,
199,,,,,,,,,,4.0,...,,,,,,,,,,
481,,,,,,,,,,5.0,...,,,,,,,,,,
769,,,,,,,,,,,...,,,,,,,,,,


In [140]:
pd_testing.head()

Unnamed: 0,movieId,userId,ratings
0,8,573364,1.0
1,8,2149668,3.0
2,8,1089184,3.0
3,8,2465894,3.0
4,8,534508,1.0


In [141]:
userItemRating.fillna(0, inplace = True)

userMovieRating = userItemRating.astype(np.int32)

userMovieRating.head()

movieId,8,28,43,48,61,64,66,92,96,111,...,17654,17660,17689,17693,17706,17725,17728,17734,17741,17742
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
7,5,4,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
79,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
199,0,0,0,0,0,0,0,0,0,4,...,0,0,0,0,0,0,0,0,0,0
481,0,0,0,0,0,0,0,0,0,5,...,0,0,0,0,0,0,0,0,0,0
769,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0



## I am using the top 10 users and movies which are having the most counts from the test dataset.


In [142]:
userMovieRating[17742].head(50)

userId
7       0
79      0
199     0
481     0
769     0
906     0
1310    0
1333    0
1427    0
1442    0
1457    0
1500    0
1527    0
1918    0
2000    0
2128    0
2213    0
2225    0
2307    0
2455    0
2469    0
2678    0
2787    0
2905    0
2976    0
3039    0
3321    0
3363    0
3458    0
3595    0
3604    0
3694    0
3718    0
3798    0
3870    0
3998    0
4057    0
4247    0
4306    0
4315    0
4409    0
4421    0
4576    0
4597    0
4679    0
4706    0
4783    0
4905    0
4906    0
4983    0
Name: 17742, dtype: int32

In [143]:
def average(user):
    summed =0
    count =0

    for i in range(len(user)):
        if(user[i]==0):
            summed = summed +0
        else:
            summed= user[i] +summed
            count= 1+ count
    avgg=summed/count
    return avgg

In [144]:
pd_testing.tail()

Unnamed: 0,movieId,userId,ratings
100473,17742,1898310,2.0
100474,17742,716096,4.0
100475,17742,38115,3.0
100476,17742,2646347,5.0
100477,17742,273576,2.0


In [145]:
user1664010= list(userMovieRating.loc[1664010])
avg1664010= average(user1664010)
print(avg1664010)

4.2384364820846905


In [146]:
user2439493 = list(userMovieRating.loc[2439493])

avg2439493= average(user2439493)

print(avg2439493)

1.225609756097561


In [147]:
user305344 = list(userMovieRating.loc[305344])

avg305344 = average(user305344)

print(avg305344)

1.904382470119522


In [148]:
user387418 = list(userMovieRating.loc[387418])

avg387418= average(user387418)

print(avg387418)

1.8405963302752293


In [149]:
user1314869 = list(userMovieRating.loc[1314869])

avg1314869= average(user1314869)

print(avg1314869)

2.970984455958549


In [150]:
user2118461 = list(userMovieRating.loc[2118461])

avg2118461= average(user2118461)

print(avg2118461)

4.088453747467927


In [151]:
user1932594 = list(userMovieRating.loc[1932594])

avg1932594 = average(user1932594)

print(avg1932594)

2.3195754716981134


In [152]:
user491531 = list(userMovieRating.loc[491531])

avg491531= average(user491531)

print(avg491531)

2.9102384291725105


In [153]:
user2606799 = list(userMovieRating.loc[2606799])

avg2606799 = average(user2606799)

print(avg2606799)

2.817982456140351


In [154]:
user727242 = list(userMovieRating.loc[727242])

avg727242 = average(user727242)

print(avg727242)

1.1890694239290989


In [155]:
ESTIMATED_AVERAGE_ITEMS_USERS = avg1314869+avg387418+avg305344+avg2439493+avg1664010+avg727242+avg2606799+avg491531+avg1932594+avg2118461

print("ESTIMATED AVERAGE OVERLAP OF ITEMS FOR USERS is : ",ESTIMATED_AVERAGE_ITEMS_USERS/10)

ESTIMATED AVERAGE OVERLAP OF ITEMS FOR USERS is :  2.550532902294355


### ESTIMATED AVERAGE OVERLAP OF ITEMS FOR USERS is :  2.550532902294355

## ESTIMATED AVERAGE OVERLAP OF USERS FOR ITEMS# 

In [156]:
#creating a utility matrix with movieId as index and the userId as columns to training data

movieUserRating = pd_training.pivot_table(index='movieId', columns='userId', values='ratings')

movieUserRating.head()

userId,7,79,199,481,769,906,1310,1333,1427,1442,...,2648572,2648589,2648730,2648734,2648853,2648869,2648885,2649120,2649267,2649285
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
8,5.0,,,,,,,3.0,,,...,,,,,,,,,,
28,4.0,,,,,3.0,3.0,2.0,,4.0,...,,3.0,4.0,,2.0,,4.0,,,4.0
43,,,,,,,,,,,...,,,,,,,,,,
48,,,,,,,,,,,...,,,,,,,,,,
61,,,,,,,,,,,...,,,,,,,,,,


In [157]:
movieUserRating.fillna(0, inplace = True)

movieUserRating = movieUserRating.astype(np.int32)

movieUserRating.head()

userId,7,79,199,481,769,906,1310,1333,1427,1442,...,2648572,2648589,2648730,2648734,2648853,2648869,2648885,2649120,2649267,2649285
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
8,5,0,0,0,0,0,0,3,0,0,...,0,0,0,0,0,0,0,0,0,0
28,4,0,0,0,0,3,3,2,0,4,...,0,3,4,0,2,0,4,0,0,4
43,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
48,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
61,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [158]:
movie6971= list(movieUserRating.loc[6971])

avg6971= average(movie6971)

print(avg6971)

4.071815611748076


In [159]:

movie4640= list(movieUserRating.loc[4640])

avg4640 = average(movie4640)

print(avg4640)

4.047438894792774


In [160]:

movie6287= list(movieUserRating.loc[6287])

avg6287= average(movie6287)

print(avg6287)

3.7261099495756977


In [161]:

movie9728  = list(movieUserRating.loc[9728])

avg9728= average(movie9728)

print(avg9728)

3.8246204278812974


In [162]:
8915
movie8915 = list(movieUserRating.loc[8915])

avg8915 = average(movie8915)

print(avg8915)

3.9663373050469826


In [163]:
#4432
movie4432= list(movieUserRating.loc[4432])

avg4432 = average(movie4432)

print(avg4432)

3.6721471305118545


In [164]:
#8596
movie8596= list(movieUserRating.loc[8596])

avg8596 = average(movie8596)

print(avg8596)

4.103368832862421


In [165]:
#6408
movie6408= list(movieUserRating.loc[6408])

avg6408 = average(movie6408)

print(avg6408)

3.8013020096235492


In [166]:

movie1406= list(movieUserRating.loc[1406])

avg1406 = average(movie1406)

print(avg1406)

3.2807910426057876


In [167]:
movie1744= list(movieUserRating.loc[1744])

avg1744 = average(movie1744)

print(avg1744)

3.802730883813307


In [168]:

ESTIMATED_AVERAGE_USER_ITEMS = avg6971+avg4640+avg6287+avg9728+avg8915+avg1744+avg1406+avg6408+avg8596+avg4432

print("ESTIMATED AVERAGE OVERLAP OF USERS FOR ITEMS : ",ESTIMATED_AVERAGE_USER_ITEMS/10)

ESTIMATED AVERAGE OVERLAP OF USERS FOR ITEMS :  3.8296662088461746


### ESTIMATED AVERAGE OVERLAP OF USERS FOR ITEMS :  3.8296662088461746

# ALS Model Approach

In [169]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [170]:
#Joining the actual dataframes with movie dataframe for accurate understandings

df_training_join = df_training.join(df_movies,on=['movieId'],how='inner')

df_testing_join = df_testing.join(df_movies,on=['movieId'],how='inner')

### Approach 1, maxIter = 5 , regParam = 0.08

In [171]:
als = ALS(maxIter=5, regParam=0.08, userCol="userId", itemCol="movieId", ratingCol="ratings",
          coldStartStrategy="drop")
model1 = als.fit(df_training_join)



# Evaluate the model by computing the RMSE on the test data
predictions1 = model1.transform(df_testing_join)
evaluator = RegressionEvaluator(metricName="mse", labelCol="ratings",
                                predictionCol="prediction")
                                            
evaluator1 = RegressionEvaluator(metricName="rmse", labelCol="ratings",
                                predictionCol="prediction")



rmse = evaluator1.evaluate(predictions1)
mse = evaluator.evaluate(predictions1)

print("Mean squared error = " + str(mse))
print("Root-mean-square error = " + str(rmse))

Mean squared error = 0.7622417455545198
Root-mean-square error = 0.8730645712400199


### With the parameters maxIter = 5 , regParam = 0.08, Root-mean-square error is 0.860 and Mean squared error is 0.739

In [176]:
print("Following are the predictions results for the approach 1 model. \n")

predictions1.show()

Following are the predictions results for the approach 1 model. 

+-------+-------+-------+-------------+--------------------+----------+
|movieId| userId|ratings|yearOfRelease|               title|prediction|
+-------+-------+-------+-------------+--------------------+----------+
|      8| 573364|    1.0|         2004|What the #$*! Do ...|  2.808439|
|      8|2149668|    3.0|         2004|What the #$*! Do ...|   2.95522|
|      8|1089184|    3.0|         2004|What the #$*! Do ...| 1.9884332|
|      8|2465894|    3.0|         2004|What the #$*! Do ...|   2.01401|
|      8| 534508|    1.0|         2004|What the #$*! Do ...| 2.8113463|
|      8| 992921|    4.0|         2004|What the #$*! Do ...| 2.6006048|
|      8| 595054|    4.0|         2004|What the #$*! Do ...| 2.7244895|
|      8|1298304|    4.0|         2004|What the #$*! Do ...| 3.7037585|
|      8|1661600|    4.0|         2004|What the #$*! Do ...| 3.2884686|
|      8| 553787|    2.0|         2004|What the #$*! Do ...|  2.779989

In [177]:
userRecs = model1.recommendForAllUsers(10)

movieRecs = model1.recommendForAllItems(10)

In [178]:
#Recommendations for all users from model1 approach

pd_userRecs = userRecs.toPandas()

pd_userRecs.head()

Unnamed: 0,userId,recommendations
0,481,"[(14648, 4.8505401611328125), (7569, 4.8420782..."
1,2678,"[(6991, 4.134357929229736), (7569, 4.114245891..."
2,3595,"[(12293, 4.304803848266602), (3290, 4.16258287..."
3,6460,"[(12232, 4.4317169189453125), (12293, 4.231027..."
4,7284,"[(6991, 5.153048515319824), (8933, 5.116566658..."


In [175]:
#Movie Recommendations for all users from model1 approach

pd_movieRecs = movieRecs.toPandas()

pd_movieRecs.head()

Unnamed: 0,movieId,recommendations
0,4190,"[(300908, 4.489530563354492), (1486829, 4.3192..."
1,3220,"[(1080361, 4.988464832305908), (1596531, 4.928..."
2,11240,"[(794999, 5.105077266693115), (27061, 5.077645..."
3,6110,"[(1663569, 4.814904689788818), (794999, 4.8123..."
4,8260,"[(507082, 3.9599781036376953), (1890819, 3.942..."


### Approach 2, maxIter = 10 , regParam = 0.01

In [179]:
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="ratings",
          coldStartStrategy="drop")
model2 = als.fit(df_training_join)



# Evaluate the model by computing the RMSE on the test data
predictions2 = model2.transform(df_testing_join)
evaluator = RegressionEvaluator(metricName="mse", labelCol="ratings",
                                predictionCol="prediction")
                                            
evaluator1 = RegressionEvaluator(metricName="rmse", labelCol="ratings",
                                predictionCol="prediction")



rmse = evaluator1.evaluate(predictions2)
mse = evaluator.evaluate(predictions2)

print("Mean squared error = " + str(mse))
print("Root-mean-square error = " + str(rmse))

Mean squared error = 0.7084090743120849
Root-mean-square error = 0.8416704071737848


### With the parameters maxIter = 10 , regParam = 0.01, Root-mean-square error is 0.84 and Mean squared error is 0.70

In [180]:
predictions2.show()

+-------+-------+-------+-------------+--------------------+----------+
|movieId| userId|ratings|yearOfRelease|               title|prediction|
+-------+-------+-------+-------------+--------------------+----------+
|      8| 573364|    1.0|         2004|What the #$*! Do ...| 2.4632375|
|      8|2149668|    3.0|         2004|What the #$*! Do ...| 2.5270288|
|      8|1089184|    3.0|         2004|What the #$*! Do ...| 1.5026662|
|      8|2465894|    3.0|         2004|What the #$*! Do ...| 1.0114069|
|      8| 534508|    1.0|         2004|What the #$*! Do ...| 2.9000576|
|      8| 992921|    4.0|         2004|What the #$*! Do ...|  2.957749|
|      8| 595054|    4.0|         2004|What the #$*! Do ...| 2.9656339|
|      8|1298304|    4.0|         2004|What the #$*! Do ...| 4.6261888|
|      8|1661600|    4.0|         2004|What the #$*! Do ...|  4.068904|
|      8| 553787|    2.0|         2004|What the #$*! Do ...| 2.8306088|
|      8|1309839|    3.0|         2004|What the #$*! Do ...| 2.1

In [181]:
userRecs2 = model2.recommendForAllUsers(10)

movieRecs2 = model2.recommendForAllItems(10)

In [182]:
userRecs2.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   481|[[15567, 6.077099...|
|  2678|[[6991, 7.311792]...|
|  3595|[[9854, 5.490034]...|
|  6460|[[10025, 6.418989...|
|  7284|[[6991, 10.731239...|
|  7576|[[6406, 5.4165072...|
|  9597|[[12952, 5.294409...|
| 15191|[[6287, 4.957383]...|
| 15846|[[10025, 5.053463...|
| 20461|[[15809, 5.633242...|
| 20774|[[7806, 4.467826]...|
| 26258|[[6894, 4.8676343...|
| 27608|[[6991, 6.3618126...|
| 28346|[[1444, 4.7926564...|
| 30941|[[6991, 6.070702]...|
| 30976|[[3359, 7.116294]...|
| 31203|[[5142, 6.3571777...|
| 36822|[[3359, 6.7574663...|
| 40851|[[9973, 5.586188]...|
| 41068|[[9854, 5.0963683...|
+------+--------------------+
only showing top 20 rows



In [183]:
movieRecs2.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   4190|[[396985, 10.5164...|
|   3220|[[1080361, 5.9857...|
|  11240|[[1839424, 6.5068...|
|   6110|[[8135, 5.3616295...|
|   8260|[[770230, 6.18032...|
|    481|[[941786, 5.68070...|
|   6911|[[685613, 5.18767...|
|  11041|[[941786, 5.39375...|
|  15841|[[685613, 5.21089...|
|  15051|[[770230, 7.70414...|
|   8851|[[1925991, 5.1384...|
|   1061|[[602277, 5.62927...|
|  16232|[[2567407, 5.0674...|
|   9492|[[868600, 7.16008...|
|    192|[[1468972, 6.0410...|
|   9482|[[1060026, 6.1768...|
|   6522|[[1851283, 8.8597...|
|  10082|[[1360119, 5.1870...|
|    122|[[1482568, 5.0187...|
|  16283|[[2239984, 5.1503...|
+-------+--------------------+
only showing top 20 rows



### From the above two model performances we can say the RMSE value is comparitively less when maxIter = 10 and regParam = 0.01. Hence this would be our best ALS model.

## Creating a new user for testing purpose and adding self rated movie ratings to our dataset 

In [184]:
user = 's3://chaitratadagadsci/Netflix/myuser.txt'

myUser_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('ratings', StringType())])

In [185]:
myUser_df = sqlContext.read.format('txt').options(inferSchema=True).schema(myUser_df_schema).csv(user)

myUser_df.show()

+-------+------+-------+
|movieId|userId|ratings|
+-------+------+-------+
|      8|     1|    4.0|
|     24|     1|    3.0|
|     41|     1|    4.0|
|     43|     1|    4.0|
|     61|     1|    2.0|
|     64|     1|    4.0|
|     48|     1|    4.0|
|     30|     1|    3.0|
|     12|     1|    4.0|
|     10|     1|    4.0|
+-------+------+-------+



In [186]:
myUser_df = myUser_df.join(df_movies,on=['movieId'],how='inner')

In [187]:
myUser_df.show()

+-------+------+-------+-------------+--------------------+
|movieId|userId|ratings|yearOfRelease|               title|
+-------+------+-------+-------------+--------------------+
|      8|     1|    4.0|         2004|What the #$*! Do ...|
|     10|     1|    4.0|         2001|             Fighter|
|     12|     1|    4.0|         1947|My Favorite Brunette|
|     24|     1|    3.0|         1981| My Bloody Valentine|
|     30|     1|    3.0|         2003|Something's Gotta...|
|     41|     1|    4.0|         2000|       Horror Vision|
|     43|     1|    4.0|         2000|      Silent Service|
|     48|     1|    4.0|         2001|      Justice League|
|     61|     1|    2.0|         1999|Ricky Martin: One...|
|     64|     1|    4.0|         2001|     Outside the Law|
+-------+------+-------+-------------+--------------------+



In [188]:
df_training_union = df_training_join.union(myUser_df)

df_testing_union = df_testing_join.union(myUser_df)

In [189]:
df_training_union.show()

+-------+-------+-------+-------------+--------------------+
|movieId| userId|ratings|yearOfRelease|               title|
+-------+-------+-------+-------------+--------------------+
|      8|1744889|    1.0|         2004|What the #$*! Do ...|
|      8|1395430|    2.0|         2004|What the #$*! Do ...|
|      8|1205593|    4.0|         2004|What the #$*! Do ...|
|      8|1488844|    4.0|         2004|What the #$*! Do ...|
|      8|1447354|    1.0|         2004|What the #$*! Do ...|
|      8| 306466|    4.0|         2004|What the #$*! Do ...|
|      8|1331154|    4.0|         2004|What the #$*! Do ...|
|      8|1818178|    3.0|         2004|What the #$*! Do ...|
|      8| 991725|    4.0|         2004|What the #$*! Do ...|
|      8|1987434|    4.0|         2004|What the #$*! Do ...|
|      8|1765381|    4.0|         2004|What the #$*! Do ...|
|      8| 433803|    3.0|         2004|What the #$*! Do ...|
|      8|1148143|    2.0|         2004|What the #$*! Do ...|
|      8|1174811|    5.0

In [190]:
UserID = 1
self = df_training_union.filter(df_training_union.userId == UserID)
self.show(30)

+-------+------+-------+-------------+--------------------+
|movieId|userId|ratings|yearOfRelease|               title|
+-------+------+-------+-------------+--------------------+
|      8|     1|    4.0|         2004|What the #$*! Do ...|
|     10|     1|    4.0|         2001|             Fighter|
|     12|     1|    4.0|         1947|My Favorite Brunette|
|     24|     1|    3.0|         1981| My Bloody Valentine|
|     30|     1|    3.0|         2003|Something's Gotta...|
|     41|     1|    4.0|         2000|       Horror Vision|
|     43|     1|    4.0|         2000|      Silent Service|
|     48|     1|    4.0|         2001|      Justice League|
|     61|     1|    2.0|         1999|Ricky Martin: One...|
|     64|     1|    4.0|         2001|     Outside the Law|
+-------+------+-------+-------------+--------------------+

