In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
import pandas as pd
import numpy as np

In [0]:
spark = SparkSession.builder.appName('myproj').getOrCreate()

In [0]:
#import ratings data

file_location = "/FileStore/tables/ml_100k/u_user.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = "|"

# The applied options are for CSV files. For other file types, these will be ignored.
user = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(user)

# Create Temporary Tables
user.createOrReplaceTempView("user")

userId,age,gender,occupation,zipcode
1,24,M,technician,85711
2,53,F,other,94043
3,23,M,writer,32067
4,24,M,technician,43537
5,33,F,other,15213
6,42,M,executive,98101
7,57,M,administrator,91344
8,36,M,administrator,05201
9,29,M,student,01002
10,53,M,lawyer,90703


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
df = user.select(['userId',  'age',  'gender',  'occupation',  'zipcode'])
na_report=df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])# dimensions of the dataframe
print("Number of Rows: ",df.count() ,"   Number of Columns: ", len(df.columns))
# we should handle the missing values by imputation unless too many of them are emtpy
# na.drop() method is not recommended for avoiding biased except for the dependent variable
na_report.show()

In [0]:
%sql

select count(distinct(occupation)) from user

count(DISTINCT occupation)
21


In [0]:
display(sqlContext.sql("select distinct(occupation) as occupation_type from user"))

occupation_type
librarian
retired
lawyer
none
writer
programmer
marketing
other
executive
scientist


In [0]:
%sql

select age from user

age
24
53
23
24
33
42
57
36
29
53


In [0]:
%sql

select gender from user

gender
M
F
M
M
F
M
M
M
M
M


In [0]:
user_df = user.toPandas()

In [0]:
user_df1 = user_df.drop(columns = ['zipcode'])
user_df1=pd.get_dummies(user_df1)

In [0]:
user_df1 = spark.createDataFrame(user_df1)

In [0]:
#user_df1.head()
#user_df1 = spark.createDataFrame(user_df1)
user_df1.createOrReplaceTempView("user_df1")

In [0]:
display(user_df1)

userId,age,gender_F,gender_M,occupation_administrator,occupation_artist,occupation_doctor,occupation_educator,occupation_engineer,occupation_entertainment,occupation_executive,occupation_healthcare,occupation_homemaker,occupation_lawyer,occupation_librarian,occupation_marketing,occupation_none,occupation_other,occupation_programmer,occupation_retired,occupation_salesman,occupation_scientist,occupation_student,occupation_technician,occupation_writer
1,24,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
2,53,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0
3,23,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
4,24,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
5,33,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0
6,42,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
7,57,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
8,36,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
9,29,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
10,53,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0


In [0]:
#import ratings data

file_location = "/FileStore/tables/ml_100k/u_rating.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(ratings)

# Create Temporary Tables
ratings.createOrReplaceTempView("ratings")

userId,movieId,rating,timestamp
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817
6,86,3,883603013


In [0]:
#import ratings data
file_location = "/FileStore/tables/ml_100k/u_movie.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = "|"

# The applied options are for CSV files. For other file types, these will be ignored.
movies = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(movies)

# Create Temporary Tables
movies.createOrReplaceTempView("movies")

movieId,title,releasedate,videoreleasedate,IMDbURL,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,_c24
1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%20(1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,
2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(1995),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,
3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995),0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,
4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995),0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,
5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,0,0,
6,Shanghai Triad (Yao a yao yao dao waipo qiao) (1995),01-Jan-1995,,http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,
7,Twelve Monkeys (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,
8,Babe (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Babe%20(1995),0,0,0,0,1,1,0,0,1,0,0,0,0,0,0,0,0,0,0,
9,Dead Man Walking (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Dead%20Man%20Walking%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,
10,Richard III (1995),22-Jan-1996,,http://us.imdb.com/M/title-exact?Richard%20III%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,


In [0]:
%sql

select ratings.userId, movies.title, ratings.rating from ratings left join movies on ratings.movieId= movies.movieId where ratings.rating > 4 and ratings.userId=16

userId,title,rating
16,"Shawshank Redemption, The (1994)",5
16,Singin' in the Rain (1952),5
16,"Graduate, The (1967)",5
16,"Professional, The (1994)",5
16,"Sting, The (1973)",5
16,Babe (1995),5
16,12 Angry Men (1957),5
16,This Is Spinal Tap (1984),5
16,Pulp Fiction (1994),5
16,Amadeus (1984),5


In [0]:
%sql

select * from user where userId=16

userId,age,gender,occupation,zipcode
16,21,M,entertainment,10309


In [0]:
from pyspark.sql.functions import from_unixtime, unix_timestamp
movies1 = movies.withColumn('releasedate1', from_unixtime(unix_timestamp('releasedate', 'dd-MMM-yyyy')))
#df.show()

In [0]:
#display(movies1)

In [0]:
from pyspark.sql.functions import date_format, col
movies1 = movies1.withColumn("releasedate1", date_format(col("releasedate1"), "MM-dd-yyyy"))

In [0]:
movies1=movies1.withColumn(
    "releasedate1",
    to_date(col("releasedate1"),"MM-dd-yyyy")
  )

In [0]:
#display(movies1)

In [0]:
movies1=movies1.withColumn("datesDiff", datediff(current_date(),col("releasedate1")))

In [0]:
#movies1 = spark.createDataFrame(movies)
movies1.createOrReplaceTempView("movies1")

merge dataset user(user_df1), ratings(ratings) & movies(df)

In [0]:
#df = ratings.join(movies1, ratings.movieId == movies1.movieId).select(ratings.movieId, ratings.userId, #ratings.rating,movies1.title,movies1.unknown,movies1.Action,movies1.Adventure,movies1.Animation,movies1.Children,movies1.Comedy,movies1.Crim#e,movies1.Documentary,movies1.Drama,movies1.Fantasy,movies1.Film_Noir,movies1.Horror,movies1.Musical,movies1.Mystery,movies1.Romance,movies1#.Sci_Fi,movies1.Thriller,movies1.War,movies1.Western,movies1.datesDiff)

In [0]:
df = ratings.join(movies, ratings.movieId == movies.movieId,how='right').select(movies.movieId, ratings.userId, ratings.rating,movies.title,movies.unknown,movies.Action,movies.Adventure,movies.Animation,movies.Children,movies.Comedy,movies.Crime,movies.Documentary,movies.Drama,movies.Fantasy,movies.Film_Noir,movies.Horror,movies.Musical,movies.Mystery,movies.Romance,movies.Sci_Fi,movies.Thriller,movies.War,movies.Western )

In [0]:
display(df)

movieId,userId,rating,title,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western
1,593,3,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,800,4,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,786,4,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,747,5,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,895,4,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,682,4,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,398,5,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,445,3,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,709,4,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
1,265,5,Toy Story (1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
df.createOrReplaceTempView("df")

In [0]:
dff = df.join(user_df1, df.userId == user_df1.userId,how='right').select(df.movieId, user_df1.userId, df.rating,df.title,df.unknown,df.Action,df.Adventure,df.Animation,df.Children,df.Comedy,df.Crime,df.Documentary,df.Drama,df.Fantasy,df.Film_Noir,df.Horror,df.Musical,df.Mystery,df.Romance,df.Sci_Fi,df.Thriller,df.War,df.Western,user_df1.age,user_df1.gender_F,user_df1.gender_M,user_df1.occupation_administrator,user_df1.occupation_artist,user_df1.occupation_doctor,user_df1.occupation_educator,user_df1.occupation_engineer,user_df1.occupation_entertainment,user_df1.occupation_executive,user_df1.occupation_healthcare,user_df1.occupation_homemaker,user_df1.occupation_lawyer,user_df1.occupation_librarian,user_df1.occupation_marketing,user_df1.occupation_none,user_df1.occupation_other,user_df1.occupation_programmer,user_df1.occupation_retired,user_df1.occupation_salesman,user_df1.occupation_scientist,user_df1.occupation_student,user_df1.occupation_technician,user_df1.occupation_writer)

In [0]:
dff.createOrReplaceTempView("dff")

In [0]:
%sql

select * from dff where rating is null

movieId,userId,rating,title,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,age,gender_F,gender_M,occupation_administrator,occupation_artist,occupation_doctor,occupation_educator,occupation_engineer,occupation_entertainment,occupation_executive,occupation_healthcare,occupation_homemaker,occupation_lawyer,occupation_librarian,occupation_marketing,occupation_none,occupation_other,occupation_programmer,occupation_retired,occupation_salesman,occupation_scientist,occupation_student,occupation_technician,occupation_writer


In [0]:
%sql
select * from dff

movieId,userId,rating,title,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,age,gender_F,gender_M,occupation_administrator,occupation_artist,occupation_doctor,occupation_educator,occupation_engineer,occupation_entertainment,occupation_executive,occupation_healthcare,occupation_homemaker,occupation_lawyer,occupation_librarian,occupation_marketing,occupation_none,occupation_other,occupation_programmer,occupation_retired,occupation_salesman,occupation_scientist,occupation_student,occupation_technician,occupation_writer
122,26,1,"Cable Guy, The (1996)",0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
845,26,3,That Thing You Do! (1996),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
683,26,3,Rocket Man (1997),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
288,26,4,Scream (1996),0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
323,26,2,Dante's Peak (1997),0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
248,26,3,Grosse Pointe Blank (1997),0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1010,26,2,Basquiat (1996),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
328,26,2,Conspiracy Theory (1997),0,1,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1013,26,1,Anaconda (1997),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
282,26,4,"Time to Kill, A (1996)",0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
%sql

select distinct(rating) from ratings

rating
1
3
5
4
2


In [0]:
%sql

select ratings.rating, user.gender, count(ratings.userId) as count_g from ratings left join user on ratings.userId=user.UserId group by ratings.rating, user.gender sort by user. gender

rating,gender,count_g
5,F,5975
1,F,1894
3,F,6784
2,F,2784
4,F,8303
4,M,25871
5,M,15226
2,M,8586
1,M,4216
3,M,20361


In [0]:
%sql

select ratings.rating, user.gender, ratings.userId from ratings, user 

rating,gender,userId
3,M,196
3,F,196
3,M,196
3,M,196
3,F,196
3,M,196
3,M,196
3,M,196
3,M,196
3,M,196


**Neural Network**

In [0]:
from pyspark.sql.functions import when
ratio = 0.91
def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))
new_df = dff.withColumn('weights', weight_balance(col('rating')))

In [0]:
display(new_df)

movieId,userId,rating,title,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,age,gender_F,gender_M,occupation_administrator,occupation_artist,occupation_doctor,occupation_educator,occupation_engineer,occupation_entertainment,occupation_executive,occupation_healthcare,occupation_homemaker,occupation_lawyer,occupation_librarian,occupation_marketing,occupation_none,occupation_other,occupation_programmer,occupation_retired,occupation_salesman,occupation_scientist,occupation_student,occupation_technician,occupation_writer,weights
122,26,1,"Cable Guy, The (1996)",0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.91
845,26,3,That Thing You Do! (1996),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
683,26,3,Rocket Man (1997),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
288,26,4,Scream (1996),0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
323,26,2,Dante's Peak (1997),0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
248,26,3,Grosse Pointe Blank (1997),0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
1010,26,2,Basquiat (1996),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
328,26,2,Conspiracy Theory (1997),0,1,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999
1013,26,1,Anaconda (1997),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.91
282,26,4,"Time to Kill, A (1996)",0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,49,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0.0899999999999999


In [0]:
!pip install scikit-neuralnetwork
from sklearn.neural_network import MLPRegressor

In [0]:
from sklearn import preprocessing
x_MinMax = preprocessing.MinMaxScaler()
y_MinMax = preprocessing.MinMaxScaler()

In [0]:
dff_pd=new_df.toPandas()
dff_pd=dff_pd.drop(columns = ['gender_F','occupation_writer'])

In [0]:
y = dff_pd['rating']
features = dff_pd.columns.drop(['rating','userId','movieId','title'])
X = dff_pd[features].copy()
x = x_MinMax.fit_transform(X)

In [0]:
from sklearn.model_selection import train_test_split
np.random.seed(2020)
x_train, x_test, y_train, y_test = train_test_split(x,y,test_size = 0.2)

In [0]:
from sklearn.neural_network import MLPRegressor
fit1 = MLPRegressor(random_state=1, max_iter=500)
fit1.fit(x_train,y_train)
pred1_train = fit1.predict(x_train)

In [0]:
from sklearn.metrics import mean_squared_error
mse_1 = mean_squared_error(pred1_train,y_train)
print ("Train ERROR = ", mse_1)
pred1_test = fit1.predict(x_test)
mse_2 = mean_squared_error(pred1_test,y_test)
print ("Test ERROR = ", mse_2)

In [0]:
pred1_test = fit1.predict(x)

In [0]:
dff_pd['prediction']= pred1_test

In [0]:
dff_pd1 = spark.createDataFrame(dff_pd)

In [0]:
dff_pd1.createOrReplaceTempView("dff_pd1")

In [0]:
%sql

select * from dff_pd1 where rating is null

movieId,userId,rating,title,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,age,gender_M,occupation_administrator,occupation_artist,occupation_doctor,occupation_educator,occupation_engineer,occupation_entertainment,occupation_executive,occupation_healthcare,occupation_homemaker,occupation_lawyer,occupation_librarian,occupation_marketing,occupation_none,occupation_other,occupation_programmer,occupation_retired,occupation_salesman,occupation_scientist,occupation_student,occupation_technician,weights,prediction


In [0]:
%sql

select userId, movieId,rating,prediction from dff_pd1 where userId=305 and movieId in (427,483,50,189,923)

userId,movieId,rating,prediction
305,923,5,3.8217503977313663
305,189,5,4.142554492050305
305,50,5,4.359856383952327
305,483,5,3.918102485197722
305,427,5,3.8217503977313663


In [0]:
%sql

select userId, movieId, title, prediction, rating from dff_pd1 where userId=16 sort by prediction desc

userId,movieId,title,prediction,rating
16,172,"Empire Strikes Back, The (1980)",4.808275528482107,5
16,127,"Godfather, The (1972)",4.78247823382557,5
16,182,GoodFellas (1990),4.429298088911999,5
16,76,Carlito's Way (1993),4.429298088911999,5
16,504,Bonnie and Clyde (1967),4.429298088911999,5
16,56,Pulp Fiction (1994),4.429298088911999,5
16,22,Braveheart (1995),4.423280057572734,5
16,89,Blade Runner (1982),4.386940684599838,2
16,661,High Noon (1952),4.351170422987781,4
16,498,"African Queen, The (1951)",4.3293403985768055,5


***ALS***

In [0]:
#converting movies data into pandas dataframe

moives_df = movies.toPandas()
moives_df.head()

Unnamed: 0,movieId,title,releasedate,videoreleasedate,IMDbURL,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western,_c24
0,1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%2...,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,
1,2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(...,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,
2,3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%...,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,
3,4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%...,0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,
4,5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,0,0,


In [0]:
print('Number of different users: {}'.format(ratings.select('userId').distinct().count()))
print('Number of different movies: {}'.format(ratings.select('movieId').distinct().count()))
print('Number of movies with at least one rating strictly higher than 4: {}'.format(ratings.filter('rating > 4').select('movieId').distinct().count()))

In [0]:
ratings.createOrReplaceTempView('ratings')
spark.sql('SELECT COUNT(DISTINCT(movieId)) AS nb FROM ratings WHERE rating > 4').show()

In [0]:
import pandas as pd

ratings.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596


In [0]:
spark.sql('''
    SELECT *, 100 * nb_ratings/matrix_size AS percentage
    FROM (
        SELECT nb_users, nb_movies, nb_ratings, nb_users * nb_movies AS matrix_size
        FROM (
            SELECT COUNT(*) AS nb_ratings, COUNT(DISTINCT(movieId)) AS nb_movies, COUNT(DISTINCT(userId)) AS nb_users
            FROM ratings
        )
    )
''').toPandas().head()

Unnamed: 0,nb_users,nb_movies,nb_ratings,matrix_size,percentage
0,943,1682,100000,1586126,6.304669


In [0]:
from pyspark.ml.recommendation import ALS

model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating').fit(ratings)

In [0]:
predictions = model.transform(ratings)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp,prediction
0,148,496,3,877015066,3.938978
1,496,496,1,876066424,2.812282
2,392,463,3,891038946,4.073119
3,540,471,4,882157706,3.630294
4,897,496,5,879994113,4.200776


In [0]:
#predictions1 = spark.createDataFrame(predictions)
predictions.createOrReplaceTempView("predictions2")

In [0]:
#%sql

#select userId, movieId,rating,prediction from predictions1 where userId=305 order by movieId 

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
print('The mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))

In [0]:
(trainingRatings, testRatings) = ratings.randomSplit([80.0, 20.0])

In [0]:
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating')
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)

In [0]:
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp,prediction
0,148,1,4,877019411,4.110066
1,148,7,5,877017054,4.19242
2,148,78,1,877399018,1.893532
3,148,132,4,877020715,4.372531
4,148,168,5,877015900,4.727008


In [0]:
#predictions1 = spark.createDataFrame(predictions)
predictions.createOrReplaceTempView("predictions1")

In [0]:
%sql
select * from predictions1

userId,movieId,rating,timestamp,prediction
85,496,4,879453781,3.8405895
251,471,3,886272319,3.5469673
580,471,3,884125018,3.6614206
458,496,3,886398289,4.0548954
883,463,3,891693058,4.254629
883,1591,3,891695570,2.889025
255,833,4,883216902,2.4369702
406,496,4,879445378,3.5687747
731,496,5,886179040,3.9335246
26,471,2,891371676,3.0690165


In [0]:
%sql

select userId, movieId,rating,prediction from predictions1 where userId=305 order by rating

userId,movieId,rating,prediction
305,179,1,3.6002448
305,1073,1,3.3755019
305,52,2,3.5710902
305,144,2,3.0895884
305,597,2,2.0980175
305,91,2,3.38988
305,222,2,2.8360639
305,212,3,3.1985908
305,602,3,3.226202
305,770,3,3.2599864


In [0]:
evaluator = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
print('The ?mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))

In [0]:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

evaluator = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
print ('The mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))

In [0]:
evaluator = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
print ('The mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

In [0]:
def repeatALS(data, k=3, userCol='userId', itemCol='movieId', ratingCol='rating', metricName='mse'):
    evaluations = []
    for i in range(0, k):  
        (trainingSet, testingSet) = data.randomSplit([k - 1.0, 1.0])
        als = ALS(userCol=userCol, itemCol=itemCol, ratingCol=ratingCol)
        model = als.fit(trainingSet)
        predictions = model.transform(testingSet)
        evaluator = RegressionEvaluator(metricName=metricName, labelCol='rating', predictionCol='prediction')
        evaluation = evaluator.evaluate(predictions.na.drop())
        print('Loop {}: {} = {}'.format(i + 1, metricName, evaluation))
        evaluations.append(evaluation)
    return sum(evaluations) / float(len(evaluations))

In [0]:
#print('MSE = {}'.format(repeatALS(ratings, k=4)))


In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

(trainingRatings, validationRatings) = ratings.randomSplit([80.0, 20.0])
evaluator = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')

paramGrid = ParamGridBuilder().addGrid(als.rank, [1, 5, 10]).addGrid(als.maxIter, [20]).addGrid(als.regParam, [0.05, 0.1, 0.5]).build()

crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
cvModel = crossval.fit(trainingRatings)
predictions = cvModel.transform(validationRatings)

print('The mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

In [0]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the movies listed in the ratings DataFrame
    dataSet = ratings.select('movieId').distinct().withColumn('userId', lit(user))

    # Create a Spark DataFrame with the movies that have already been rated by this user
    moviesAlreadyRated = ratings.filter(ratings.userId == user).select('movieId', 'userId')

    # Apply the recommender system to the data set without the already rated movies to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('movieId', 'prediction')

    # Join with the movies DataFrame to get the movies titles and genres
    recommendations = predictions.join(movies, predictions.movieId == movies.movieId).select(predictions.movieId, movies.title,  predictions.prediction)

#     recommendations.show(truncate=False)
    return recommendations

In [0]:
%sql

select * from user where userId=305

userId,age,gender,occupation,zipcode
305,23,M,programmer,94086


In [0]:
print('Recommendations for user 305:')
recommendMovies(model, 305, 10).toPandas()

Unnamed: 0,movieId,title,prediction
0,114,Wallace & Gromit: The Best of Aardman Animatio...,4.188755
1,124,Lone Star (1996),4.044257
2,513,"Third Man, The (1949)",4.095413
3,515,"Boot, Das (1981)",4.000786
4,694,Persuasion (1995),4.058099
5,896,"Sweet Hereafter, The (1997)",4.102649
6,958,To Live (Huozhe) (1994),4.049252
7,1449,Pather Panchali (1955),4.154473
8,1463,"Boys, Les (1997)",4.52656
9,1536,Aiqing wansui (1994),4.040064


In [0]:
print('Recommendations for user 4:')
recommendMovies(model, 4, 10).toPandas()


Unnamed: 0,movieId,title,prediction
0,169,"Wrong Trousers, The (1993)",5.568966
1,733,Go Fish (1994),5.32138
2,745,"Ruling Class, The (1972)",5.406593
3,753,Burnt By the Sun (1994),5.374434
4,867,"Whole Wide World, The (1996)",5.575703
5,958,To Live (Huozhe) (1994),5.646121
6,1062,Four Days in September (1997),5.398902
7,1137,Beautiful Thing (1996),5.342711
8,1167,"Sum of Us, The (1994)",5.382503
9,1466,Margaret's Museum (1995),5.5746


In [0]:
print('Recommendations for user 16:')
recommendMovies(model, 16, 10).toPandas()

Unnamed: 0,movieId,title,prediction
0,50,Star Wars (1977),5.061309
1,187,"Godfather: Part II, The (1974)",4.927236
2,192,Raging Bull (1980),4.940132
3,483,Casablanca (1942),5.050618
4,837,Meet John Doe (1941),5.000682
5,1122,They Made Me a Criminal (1939),4.976582
6,1269,Love in the Afternoon (1957),5.07077
7,1449,Pather Panchali (1955),5.310487
8,1450,Golden Earrings (1947),5.261733
9,1467,"Saint of Fort Washington, The (1993)",5.393768


In [0]:
print('Recommendations for user 21:')
recommendMovies(model, 21, 10).toPandas()


Unnamed: 0,movieId,title,prediction
0,12,"Usual Suspects, The (1995)",4.25814
1,119,Maya Lin: A Strong Clear Vision (1994),4.300254
2,169,"Wrong Trousers, The (1993)",4.259159
3,187,"Godfather: Part II, The (1974)",4.236284
4,357,One Flew Over the Cuckoo's Nest (1975),4.281776
5,896,"Sweet Hereafter, The (1997)",4.300021
6,1449,Pather Panchali (1955),4.518641
7,1463,"Boys, Les (1997)",4.272986
8,1467,"Saint of Fort Washington, The (1993)",4.239812
9,1536,Aiqing wansui (1994),4.486767


In [0]:
%sql

select ratings.userId, ratings.movieId, movies.title, ratings.rating from ratings left join movies on ratings.movieId= movies.movieId where ratings.rating > 4 and ratings.userId=305 order by rating desc

userId,movieId,title,rating
305,427,To Kill a Mockingbird (1962),5
305,483,Casablanca (1942),5
305,50,Star Wars (1977),5
305,189,"Grand Day Out, A (1992)",5
305,923,Raise the Red Lantern (1991),5
305,529,My Life as a Dog (Mitt liv som hund) (1985),5
305,209,This Is Spinal Tap (1984),5
305,216,When Harry Met Sally... (1989),5
305,169,"Wrong Trousers, The (1993)",5
305,357,One Flew Over the Cuckoo's Nest (1975),5


In [0]:
%sql

select movies.title from movies where movieId in (6906,1368,1058,433,26294)

title
Heathers (1989)
"War, The (1994)"
Mina Tannenbaum (1994)
