In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from pyspark.sql.types import FloatType, IntegerType
# data from MovieLens|GroupLens(http://files.grouplens.org/datasets/movielens/ml-latest-small.zip)
# File location and type
file_location = "/FileStore/tables/movies.csv"
file_type = "csv"
display(df)
movie_data = df.toPandas()
movie_data.head()


# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# converting string column types to Float and Integer
for col in df.columns:
  if col in ['movieId']:
    df = df.withColumn(col, df[col].cast(IntegerType()))

display(df)
movie_data = df.toPandas()
#movie_data.head()

In [2]:
_df = df.groupBy('genres').count()
display(_df.sort('count', ascending =False).limit(100))

In [3]:
# File location and type
file_location = "/FileStore/tables/ratings.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

# converting string column types to Float and Integer
for col in df.columns:
  if col in ['rating', 'timestamp']:
    df = df.withColumn(col, df[col].cast(FloatType()))
  if col in ['userId', 'movieId']:
    df = df.withColumn(col, df[col].cast(IntegerType()))

movie_ratings=df.toPandas()
#movie_ratings.head()

In [4]:
R_df = df.groupBy('rating').count()
display(R_df.sort('count', ascending =False))

In [5]:
movie_ratings
movie_data

joined = movie_data.join(movie_ratings, on='movieId', how='left', lsuffix='_', sort=True)

In [6]:
joined.drop(['movieId_', 'genres', 'userId', 'movieId', 'timestamp'], inplace=True, axis=1)


In [7]:
joined.head()

In [8]:
display(joined.sort_values('rating', ascending=False))

In [9]:
FloatType()

In [10]:
def movieTitle(movieId):
    title = movie_data.at[movieId, 'title']
    return title
movieTitle(1)


In [11]:
def movieGenre(movieId):  
    genre = movie_data.at[movieId, 'genres']
    return  genre
movieGenre(1)

In [12]:
# Data Preprocessing for huge dataset (However here Not Required)
# to select only those movies whose id is present in movie_data
# movie_ratings = movie_ratings[movie_ratings['movieId'].isin(movie_data.index)]

def favMovie(userId, N):
    userRatings = movie_ratings[movie_ratings.userId==userId]
    sortedRatings = pd.DataFrame.sort_values(userRatings,['rating'] ,ascending=[0])[:N]
    sortedRatings['Title'] = sortedRatings['movieId'].apply(movieTitle)
    sortedRatings['Genre'] = sortedRatings['movieId'].apply(movieGenre)
    return sortedRatings
favMovie(1, 12)

In [13]:
movie_ratings.shape, movie_data.shape

In [14]:
userPerMovieID = movie_ratings.movieId.value_counts()
userPerMovieID.head()

In [15]:
userPerMovieID.shape

In [16]:
## Data Preprocessing to obtain less sparse matrix for huge dataset(However here Not Required)
 ## Take only those movies which are seen by more than 10 users
#movie_ratings = movie_ratings[movie_ratings.index.isin(userPerMovieID[userPerMovieID > 10].index)]
#movie_ratings.shape
userMovieRatingMatrix = pd.pivot_table(movie_ratings, index=['userId'],columns=['movieId'] ,values='rating')
userMovieRatingMatrix.head(10)

In [17]:
user1 = 100
user2 = 200

user1_ratings = userMovieRatingMatrix.transpose()[user1]
user1_ratings.head()

In [18]:
user2_ratings = userMovieRatingMatrix.transpose()[user2]
user2_ratings.head()

In [19]:
from scipy.spatial.distance import hamming
# hamming() returns a value which shows the pecentage of disagreement

hamming(user1_ratings, user1_ratings)

In [20]:
# Wrapping it up in a function
def distance(user1, user2):
    try:
        user1_ratings = userMovieRatingMatrix.transpose()[user1]
        user2_ratings = userMovieRatingMatrix.transpose()[user2]
        distance = hamming(user1_ratings, user2_ratings)
    except:
        distance = np.nan
    return distance
distance(100,200)

In [21]:
user = 1
allusers = pd.DataFrame(userMovieRatingMatrix.index)
# Removing the activee user
allusers = allusers[allusers.userId != user]
allusers.head()

In [22]:
allusers['distance'] = allusers['userId'].apply(lambda x: distance(user, x))
allusers.head()

In [23]:
K = 10
KNearestUsers = allusers.sort_values(['distance'], ascending=True)['userId'][:K]
KNearestUsers

In [24]:
# Wrapping it up in a function
def nearestNeighbours(user, K=10):
    allusers = pd.DataFrame(userMovieRatingMatrix.index)
    allusers = allusers[allusers.userId != user]
    allusers['distance'] = allusers['userId'].apply(lambda x: distance(user, x))
    KNearestUsers = allusers.sort_values(['distance'], ascending=True)['userId'][:K]
    return KNearestUsers

KNearestNeighbours = nearestNeighbours(1,5)
KNearestNeighbours

In [25]:
# Nearest Neighbours ratings

NNratings = userMovieRatingMatrix[userMovieRatingMatrix.index.isin(KNearestNeighbours)]
NNratings


In [26]:
#Getting the average rating of each movie seen by Nearest Neighbours of active user
avgRating = NNratings.apply(np.nanmean).dropna()
avgRating.head()

# warning where the columns of NNratings are completely empty(nan)

In [27]:
moviesAlreadySeen = userMovieRatingMatrix.transpose()[user].dropna().index
moviesAlreadySeen

In [28]:
# Removing the movies which are already seen by user
avgRating = avgRating[~avgRating.index.isin(moviesAlreadySeen)]

In [29]:
N=3
topNMovieId = avgRating.sort_values(ascending=False).index[:N]
topNMovieId

In [30]:
movie_data.info()

In [31]:
topNMovieId

In [32]:
# Wrapping it up in a function
def topN(user,N=3):
    KnearestUsers = nearestNeighbours(user)
    NNRatings = userMovieRatingMatrix[userMovieRatingMatrix.index.isin(KnearestUsers)]
    avgRating = NNRatings.apply(np.nanmean).dropna()
    moviesAlreadySeen = userMovieRatingMatrix.transpose()[user].dropna().index
    avgRating = avgRating[~avgRating.index.isin(moviesAlreadySeen)]
    topNMovieId = avgRating.sort_values(ascending=False).index[:N]
    return pd.DataFrame({'Movie':pd.Series(topNMovieId).apply(movieTitle), 'Genre':pd.Series(topNMovieId).apply(movieGenre)})


In [34]:
# To remove the RunTimeWarning error 
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning) 

genre = topN(3,5)


In [35]:
genre

In [36]:
display(genre.groupby('Genre').count())