<a href="https://colab.research.google.com/github/dinhhungGM/RecommendationSystemUsingBigData/blob/main/Recommendation_System_BIGDATA_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Approach with Spark

In [1]:
import findspark
findspark.init()

In [1]:
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Group 7 - Recommendation System').config('spark.sql.execution.arrow.pyspark.enabled', True)\
.config('spark.driver.memory','8G')\
.config('spark.ui.showConsoleProgress', True)\
.config('spark.sql.repl.eagerEval.enabled', True)\
.getOrCreate()

21/07/16 22:09:17 WARN Utils: Your hostname, zas resolves to a loopback address: 127.0.1.1; using 192.168.1.225 instead (on interface wlp0s20f3)
21/07/16 22:09:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/07/16 22:09:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Data is downloaded from https://www.kaggle.com/bandikarthik/movie-recommendation-system
movies = spark.read.csv('../MovieLens/movie.csv', header=True, inferSchema=True)
ratings = spark.read.csv('../MovieLens/rating.csv',  header=True, inferSchema=True)

                                                                                

In [4]:
movies.limit(5).show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



# Calculating sparsity of data

In [5]:
numerator = ratings.select("Rating").count()

# Count the number of distinct userIds and distinct movieIds
unique_users = ratings.select("UserID").distinct().count()
unique_movies = ratings.select("MovieID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = unique_users * unique_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings data is ", "%.2f" % sparsity + "% empty.")



The ratings data is  99.46% empty.


                                                                                

# Implementing ALS(Alternating Least Square) algorithm in Spark

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [6]:
(trainData, testData) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False
          , coldStartStrategy="drop")

In [7]:
%%time
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [100]) \
.addGrid(als.regParam, [.01]) \
.build()


# rank is the number of latent factors in the model (defaults to 10).
# maxIter is the maximum number of iterations to run (defaults to 10).
# regParam specifies the regularization parameter in ALS (defaults to 1.0).


evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
cv = CrossValidator(estimator=als,
                            estimatorParamMaps=param_grid,
                            evaluator=evaluator,
                            numFolds=3) 

model = cv.fit(trainData)


21/07/16 21:45:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/07/16 21:45:16 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

CPU times: user 574 ms, sys: 137 ms, total: 711 ms
Wall time: 3min 50s


In [8]:
best_model = model.bestModel
predictions = best_model.transform(testData)
rmse = evaluator.evaluate(predictions)
print(f"Root mean square error: {rmse}")
print("====BEST MODEL ====")
print(f"BEST RANK: {best_model.rank}")
print(f"maxIter: {best_model._java_obj.parent().getMaxIter()}")
print(f"regParam: {best_model._java_obj.parent().getRegParam()}")



Root mean square error: 0.8058084628531729
====BEST MODEL ====
BEST RANK: 14
maxIter: 10
regParam: 0.01


                                                                                

In [19]:
predictions.agg({"prediction": "max"}).collect()[0]

                                                                                

Row(max(prediction)=9.730632781982422)

# Movie Recommendation

In [20]:


# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(10)
recommendations.limit(10).show()





+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[{128366, 11.8917...|
|   463|[{96030, 9.862978...|
|   471|[{96030, 9.586279...|
|   496|[{128366, 8.97899...|
|   833|[{96030, 13.61601...|
|  1088|[{96255, 10.12521...|
|  1238|[{96030, 8.942157...|
|  1342|[{61913, 10.79397...|
|  1580|[{50347, 14.56270...|
|  1591|[{119705, 11.4136...|
+------+--------------------+



                                                                                

### 7th User’s Actual Preference:

In [14]:
ratings.join(movies, on='movieId').filter('userId = 50') \
.sort('rating', ascending=True).limit(10).show()



+-------+------+------+-------------------+--------------------+--------------------+
|movieId|userId|rating|          timestamp|               title|              genres|
+-------+------+------+-------------------+--------------------+--------------------+
|    344|    50|   2.0|2007-06-24 09:34:28|Ace Ventura: Pet ...|              Comedy|
|   2710|    50|   2.0|2007-06-24 09:39:36|Blair Witch Proje...|Drama|Horror|Thri...|
|    319|    50|   3.0|2007-06-12 11:05:32|Shallow Grave (1994)|Comedy|Drama|Thri...|
|   1407|    50|   3.0|2007-06-24 09:40:59|       Scream (1996)|Comedy|Horror|Mys...|
|   3147|    50|   3.0|2007-06-24 09:42:47|Green Mile, The (...|         Crime|Drama|
|   4022|    50|   3.0|2007-06-24 09:43:43|    Cast Away (2000)|               Drama|
|   8784|    50|   3.0|2007-06-20 14:09:57| Garden State (2004)|Comedy|Drama|Romance|
|  36276|    50|   3.0|2007-06-20 13:58:21|Hidden (a.k.a. Ca...|Drama|Mystery|Thr...|
|    597|    50|   3.5|2007-06-24 09:35:14| Pretty Wom

                                                                                

### 7th User’s ALS Recommentions

In [12]:
recommendations = recommendations.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
recommendations.join(movies, on='movieId').filter('userId = 50').show()

                                                                                

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|  96030|    50| 8.454298|Weekend It Lives,...|              Horror|
|  96255|    50| 8.019306|On Top of the Wha...|             Fantasy|
|  74061|    50|7.2264924|Rahtree: Flower o...|Comedy|Drama|Horr...|
|  26968|    50| 6.842768|  Cremaster 5 (1997)|       Drama|Musical|
| 120821|    50|6.7726007|The War at Home (...|     Documentary|War|
| 128366|    50| 6.758215|Patton Oswalt: Tr...|              Comedy|
|  97300|    50| 6.668107|Björk: Volumen (1...|   Animation|Musical|
|  94900|    50| 6.502551|    Desi Boyz (2011)|        Comedy|Drama|
| 112577|    50|6.4785285|Willie & Phil (1980)|Comedy|Drama|Romance|
|  79236|    50| 6.382984|       Yumeji (1991)|       Drama|Fantasy|
+-------+------+---------+--------------------+--------------------+



# Approach with Dask

# KNN

In [None]:
!pip install "dask[complete]"

In [None]:
import joblib
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors
import numpy as np

In [None]:
!python -m pip install dask distributed --upgrade

In [None]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4, processes=False, memory_limit='8GB')

# If we doesn't convert userId to category then will met errors

model_knn= NearestNeighbors(metric='cosine', algorithm='brute', n_neighbors=20)
movies_users= ratings.limit(1000000).toPandas().pivot(index='movieId', columns='userId',values='rating').fillna(0)
    

In [None]:
import dask
import dask.dataframe as dd
ratings_dask_df = dd.read_csv('./drive/MyDrive/BigDataProject/ratings.csv')

In [None]:
ratings_dask_df.head()

In [None]:
with joblib.parallel_backend('dask'):
    mat_movies_users=csr_matrix(movies_users.values)
    model_knn.fit(mat_movies_users)

In [None]:
!pip install fuzzywuzzy
from fuzzywuzzy import process
def recommender(movie_name, data, model, n_recommendations ):
    df_movies = movies.toPandas()
    model.fit(data)
    idx=process.extractOne(movie_name, df_movies['title'])[2]
    print('Movie Selected: ', df_movies['title'][idx], 'Index: ',idx)
    print('Searching for recommendations.....')
    distances, indices=model.kneighbors(data[idx], n_neighbors=n_recommendations)
    for i in indices:
        print(df_movies['title'][i].where(i!=idx))
    
recommender('Toy Story (1995)', mat_movies_users, model_knn,20)

# SVD

## Basend on funk-svd is a Python 3 library implementing a fast version of the famous SVD algorithm popularized by Simon Funk during the Neflix Prize contest.

In [None]:
!pip install git+https://github.com/gbolmier/funk-svd

In [None]:
import pandas as pd
from funk_svd import SVD

In [None]:
%%time
with joblib.parallel_backend('dask'):
  movies_df = movies.toPandas()
  rating_df = ratings.toPandas()

In [None]:
movies_df.head(5)

In [None]:
rating_df.columns = ['u_id', 'i_id', 'rating', 'timestamps']
movies_df.columns = ['i_id', 'title', 'genres']
rating_df

In [None]:
%%time
from sklearn.metrics import mean_squared_error, mean_absolute_error
# movielens18.drop(columns = 'timestamp', inplace = True)

with joblib.parallel_backend('dask'):
  train = rating_df.sample(frac=0.8)
  val = rating_df.drop(train.index.tolist()).sample(frac=0.5, random_state=8)
  test = rating_df.drop(train.index.tolist()).drop(val.index.tolist())

In [None]:
train

In [None]:
lr, reg, factors = (0.01, 0.03, 90)

with joblib.parallel_backend('dask'):
  svd = SVD(lr=lr, reg=reg, n_epochs=20, n_factors=factors,
            min_rating=0.5, max_rating=5)
  svd.fit(X=train, X_val=val)

pred = svd.predict(test)
mae = mean_absolute_error(test["rating"], pred)
rmse = np.sqrt(mean_squared_error(test["rating"], pred))
print("Test MAE:  {:.2f}".format(mae))
print("Test RMSE: {:.2f}".format(rmse))
print('{} factors, {} lr, {} reg'.format(factors, lr, reg))

#User Recommendations

In [None]:
n_m = len(rating_df.i_id.unique())

#  Initialize my ratings
my_ratings = np.zeros(n_m)


my_ratings[4993] = 5
my_ratings[1080] = 5
my_ratings[260] = 5
my_ratings[4896] = 5
my_ratings[1196] = 5
my_ratings[1210] = 5
my_ratings[2628] = 5
my_ratings[5378] = 5

print('User ratings:')
print('-----------------')

for i, val in enumerate(my_ratings):
    if val > 0:
        print('Rated %d stars: %s' % (val, movies_df.loc[movies_df.i_id==i].title.values))

In [None]:

print("Adding your recommendations!")
items_id = [item[0] for item in np.argwhere(my_ratings>0)]
ratings_list = my_ratings[np.where(my_ratings>0)]
user_id = np.asarray([0] * len(ratings_list))

user_ratings = pd.DataFrame(list(zip(user_id, items_id, ratings_list)), columns=['u_id', 'i_id', 'rating'])



In [None]:
try:
    rating_df = rating_df.drop(columns=['timestamps'])
except:
    pass
data_with_user = rating_df.append(user_ratings, ignore_index=True)



with joblib.parallel_backend('dask'):
  train_user = data_with_user.sample(frac=0.8)
  val_user = data_with_user.drop(train_user.index.tolist()).sample(frac=0.5, random_state=8)
  test_user = data_with_user.drop(train_user.index.tolist()).drop(val_user.index.tolist())



In [None]:
from itertools import product


def funk_svd_predict(userID, data_with_user, movies_df):
    userID = [userID]

    # all_users = data_with_user.u_id.unique()
    all_movies = data_with_user.i_id.unique()
    recommendations = pd.DataFrame(list(product(userID, all_movies)), columns=['u_id', 'i_id'])

    #Getting predictions for the selected userID
    pred_train = svd.predict(recommendations)
    recommendations['prediction'] = pred_train
    recommendations.head(10)

    sorted_user_predictions = recommendations.sort_values(by='prediction', ascending=False)

    user_ratings = data_with_user[data_with_user.u_id == userID[0]]
    user_ratings.columns = ['u_id',	'i_id', 'rating']
    # Recommend the highest predicted rating movies that the user hasn't seen yet.
    recommendations = movies_df[~movies_df['i_id'].isin(user_ratings['i_id'])].\
        merge(pd.DataFrame(sorted_user_predictions).reset_index(drop=True), how = 'inner', left_on = 'i_id', right_on = 'i_id').\
        sort_values(by='prediction', ascending = False)#.drop(['i_id'],axis=1)

    rated_df = movies_df[movies_df['i_id'].isin(user_ratings['i_id'])].\
        merge(pd.DataFrame(data_with_user).reset_index(drop=True), how = 'inner', left_on = 'i_id', right_on = 'i_id')
    rated_df = rated_df.loc[rated_df.u_id==userID[0]].sort_values(by='rating', ascending = False)
    
    return recommendations, rated_df
recommendations, rated_df = funk_svd_predict(0, data_with_user, movies_df)

In [None]:
rated_df

In [None]:
recommendations.head(10)