### Importing libraries and dataset 

In [1]:
import pyspark
sc.stop()
from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
from pyspark.sql import SQLContext

import pandas as pd
import sklearn.metrics as metrics
import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import correlation
from sklearn.metrics.pairwise import pairwise_distances
import ipywidgets as widgets
from IPython.display import display, clear_output
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')
import os, sys
import re

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

from pyspark.sql.functions import isnan, count, when, col, desc, udf, col,rand
from pyspark.sql.functions import sort_array, asc, avg
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import stddev as Fstddev
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

from pyspark.sql import Window

from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import rank 
import pyspark.sql.functions as F
from pyspark.sql import DataFrameStatFunctions as statFunc
from pyspark.sql.functions import first
from pyspark.sql.functions import lit

from pyspark.sql.functions import col, countDistinct

In [2]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

df_schema = StructType([StructField('movieId', IntegerType()),StructField('userId', IntegerType()),
   StructField('rating', DoubleType())])

df_movie_schema = StructType([StructField('movieId',IntegerType()),StructField('year_of_release',IntegerType())
                        ,StructField('Title',StringType())])

In [3]:
train_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(df_schema).load('s3a://netfinal/TrainingRatings.txt')
train_df.show(5)

+-------+-------+------+
|movieId| userId|rating|
+-------+-------+------+
|      8|1395430|   2.0|
|      8|1205593|   4.0|
|      8|1488844|   4.0|
|      8|1447354|   1.0|
|      8| 306466|   4.0|
+-------+-------+------+
only showing top 5 rows



In [4]:
test_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(df_schema).load('s3a://netfinal/TestingRatings.txt')
test_df.show(5)

+-------+-------+------+
|movieId| userId|rating|
+-------+-------+------+
|      8|2149668|   3.0|
|      8|1089184|   3.0|
|      8|2465894|   3.0|
|      8| 534508|   1.0|
|      8| 992921|   4.0|
+-------+-------+------+
only showing top 5 rows



In [5]:
movies_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(df_movie_schema).load('s3://netfinal/movie_titles.txt')
movies_df.show(5,truncate=False)

+-------+---------------+----------------------------+
|movieId|year_of_release|Title                       |
+-------+---------------+----------------------------+
|2      |2004           |Isle of Man TT 2004 Review  |
|3      |1997           |Character                   |
|4      |1994           |Paula Abdul's Get Up & Dance|
|5      |2004           |The Rise and Fall of ECW    |
|6      |1997           |Sick                        |
+-------+---------------+----------------------------+
only showing top 5 rows



In [6]:
train_df = train_df.join(movies_df,on=['movieId'],how='inner')
test_df = test_df.join(movies_df,on=['movieId'],how='inner')

### Analysis 

In [7]:
#Distinct users in the test set
distinct_users = test_df.agg(countDistinct(col("userId")).alias("Distinct_users"))
distinct_users.show()

+--------------+
|Distinct_users|
+--------------+
|         27555|
+--------------+



In [8]:
#Distinct movies in test set
distinct_movies = test_df.agg(countDistinct(col("movieId")).alias("Distinct_movies"))
distinct_movies.show()

+---------------+
|Distinct_movies|
+---------------+
|           1701|
+---------------+



In [9]:
# Most number of ratings given by a user
user_rated= train_df.groupBy('userId').count().show(1)

+-------+-----+
| userId|count|
+-------+-----+
|1001129|  640|
+-------+-----+
only showing top 1 row



In [10]:
train_set = train_df.toPandas()
test_set = test_df.toPandas()

In [11]:
# Top 10 movies by their average rating
sort1= train_set.groupby('Title')['rating'].mean().sort_values(ascending=False)
sort1.head(10)

Title
Ghost in the Shell: Stand Alone Complex: 2nd Gig    4.500000
The Sopranos: Season 1                              4.422819
The Best of Friends: Vol. 3                         4.418619
The Best of Friends: Vol. 4                         4.414831
The Sopranos: Season 3                              4.410690
The Godfather                                       4.407814
Dead Like Me: Season 2                              4.383886
Inu-Yasha                                           4.372822
Finding Nemo (Full-screen)                          4.352884
The Incredibles                                     4.339148
Name: rating, dtype: float64

In [12]:
# Top 10 movies with highest number of ratings 
most_rated= train_set.groupby('Title')['rating'].sum().sort_values(ascending=False)
most_rated.head(10)

Title
The Godfather                    167422.0
Ferris Bueller's Day Off         103701.0
Rain Man                          95216.0
Seven                             94398.0
The Incredibles                   92029.0
Pretty Woman                      90891.0
As Good as It Gets                88670.0
The Italian Job                   82862.0
Terminator 2: Extreme Edition     81889.0
Good Morning                      80580.0
Name: rating, dtype: float64

### Creating sample set from dataset

In [13]:
sample_set = pd.DataFrame(data = train_set)
sample_set = sample_set.drop_duplicates()

### User based and item based collaborative filtering 

In [14]:
ratings_new = sample_set.pivot_table(index='userId', columns='movieId', values='rating',fill_value=0)
ratings_new.head()

movieId,8,28,43,48,61,64,66,92,96,111,...,17654,17660,17689,17693,17706,17725,17728,17734,17741,17742
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
7,5,4,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
79,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
199,0,0,0,0,0,0,0,0,0,4,...,0,0,0,0,0,0,0,0,0,0
481,0,0,0,0,0,0,0,0,0,5,...,0,0,0,0,0,0,0,0,0,0
769,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [15]:
def user_similarities(user_id, rating_table, metric ='cosine', k=10):
    similarity_score=[]
    user_ids=[]
    knn = NearestNeighbors(metric = metric, algorithm = 'brute') 
    knn.fit(rating_table)
    loc = rating_table.index.get_loc(user_id)
    distances, user_ids = knn.kneighbors(rating_table.iloc[loc, :].values.reshape(1, -1), n_neighbors = k+1)
    similarity_score = 1 - distances.flatten()
    similarity_score = pd.DataFrame(data = similarity_score)
    user_ids = pd.DataFrame(data = user_ids).T
    sim_users = pd.concat([similarity_score, user_ids.reindex(similarity_score.index)], axis=1)
    sim_users.columns = ['similarity_score','user_ids']
    
    return sim_users

In [16]:
#Finding user similarities score for userId 481
user_similarities(481,ratings_new)

Unnamed: 0,similarity_score,user_ids
0,1.0,3
1,0.606244,6188
2,0.597981,15680
3,0.593259,12687
4,0.585067,22490
5,0.578425,9485
6,0.576827,27920
7,0.563754,927
8,0.562031,24795
9,0.5609,1161


In [17]:
ratings_2 = sample_set.pivot_table(index='movieId', columns='userId', values='rating',fill_value=0)

In [18]:
def movie_similarities(movie_id, rating_table, metric ='cosine', k=10):
    similarity_score=[]
    movie_ids=[]
    knn = NearestNeighbors(metric = metric, algorithm = 'brute') 
    knn.fit(rating_table)
    loc = rating_table.index.get_loc(movie_id)
    distances, movie_ids = knn.kneighbors(rating_table.iloc[loc, :].values.reshape(1, -1), n_neighbors = k+1)
    similarity_score = 1 - distances.flatten()
    similarity_score = pd.DataFrame(data = similarity_score)
    movie_ids = pd.DataFrame(data = movie_ids).T
    movie_sim = pd.concat([similarity_score, movie_ids.reindex(similarity_score.index)], axis=1)
    movie_sim.columns = ['similarity_score','movieId']
    
    return movie_sim    

In [19]:
movies_set = movies_df.toPandas()
movie_sim1 = movie_similarities(8,ratings_2)
similar_movies_set = pd.merge(movie_sim1,movies_set,on='movieId')
similar_movies_set.head(10)

Unnamed: 0,similarity_score,movieId,year_of_release,Title
0,0.298464,1228,2004.0,The Carol Burnett Show: Let's Bump Up the Lights!
1,0.291148,232,1989.0,Gross Anatomy
2,0.278488,299,2001.0,Bridget Jones's Diary
3,0.27152,328,2002.0,Deftones: Live in Hawaii
4,0.263171,1071,1995.0,The Ice Princess
5,0.262434,705,1989.0,Major League
6,0.261318,975,1997.0,Convict 762
7,0.26126,478,1962.0,The Beverly Hillbillies
8,0.258058,1093,1980.0,The Leg Fighters
9,0.254881,875,1998.0,Zakhm
