<a href="https://colab.research.google.com/github/dinhhungGM/RecommendationSystemUsingBigData/blob/main/Recommendation_System_BIGDATA_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Approach with Spark

In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Group 7 - Recommendation System') \
.config('spark.sql.execution.arrow.pyspark.enabled', True)\
.config('spark.driver.memory','8G')\
.config('spark.ui.showConsoleProgress', True)\
.config('spark.sql.repl.eagerEval.enabled', True)\
.getOrCreate()

In [9]:
# Data is downloaded from https://www.kaggle.com/bandikarthik/movie-recommendation-system
movies = spark.read.csv('../MovieLens/movie.csv', header=True, inferSchema=True)
ratings = spark.read.csv('../MovieLens/rating.csv',  header=True, inferSchema=True)

                                                                                

In [4]:
movies.limit(5).show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [4]:
ratings.limit(5).show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+



In [6]:
print(ratings.agg({"rating": "max"}).collect()[0])
print(ratings.agg({"rating": "min"}).collect()[0])

                                                                                

Row(max(rating)=5.0)




Row(min(rating)=0.5)


                                                                                

# Implementing ALS(Alternating Least Square) algorithm in Spark

In [15]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False
          , coldStartStrategy="drop")


In [12]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [100]) \
            .addGrid(als.regParam, [.15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  1


In [16]:
# Build cross validation using CrossValidator
# numFolds=3 means the CrossValidator will create 3 different models.
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [17]:
# We fit the cross validator to the 'train' dataset
model = cv.fit(train)

# We Extract best model from the cv model above
best_model = model.bestModel

21/07/16 22:20:52 WARN CacheManager: Asked to cache already cached data.
21/07/16 22:20:52 WARN CacheManager: Asked to cache already cached data.
21/07/16 22:21:14 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/07/16 22:21:14 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [19]:
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root mean square error: {rmse}")
print("====BEST MODEL ====")
print(f"BEST RANK: {best_model.rank}")
print(f"maxIter: {best_model._java_obj.parent().getMaxIter()}")
print(f"regParam: {best_model._java_obj.parent().getRegParam()}")



Root mean square error: 0.8143051599489648
====BEST MODEL ====
BEST RANK: 10
maxIter: 10
regParam: 0.1


                                                                                

In [20]:
predictions.agg({"prediction": "max"}).collect()[0]

                                                                                

Row(max(prediction)=6.4292802810668945)

# Movie Recommendation

In [21]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(10)
recommendations.limit(10).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[{120821, 6.22960...|
|   463|[{3226, 6.3365936...|
|   471|[{3226, 5.771446}...|
|   496|[{121029, 6.44937...|
|   833|[{3226, 6.089091}...|
|  1088|[{3226, 5.434558}...|
|  1238|[{3226, 5.8392224...|
|  1342|[{121029, 6.59056...|
|  1580|[{120821, 5.34024...|
|  1591|[{3226, 6.2007923...|
+------+--------------------+



                                                                                

### 7th User’s Actual Preference:

In [11]:
ratings.join(movies, on='movieId').filter('userId = 7') \
.sort('rating', ascending=False).limit(10)

                                                                                

movieId,userId,rating,timestamp,title,genres
912,7,5.0,2002-01-16 18:09:56,Casablanca (1942),Drama|Romance
3179,7,5.0,2002-01-16 19:22:51,Angela's Ashes (1...,Drama
1077,7,5.0,2002-01-16 18:48:18,Sleeper (1973),Comedy|Sci-Fi
750,7,5.0,2002-01-16 18:44:19,Dr. Strangelove o...,Comedy|War
1196,7,5.0,2002-01-16 18:09:32,Star Wars: Episod...,Action|Adventure|...
587,7,5.0,2002-01-16 19:10:20,Ghost (1990),Comedy|Drama|Fant...
1210,7,5.0,2002-01-16 18:10:54,Star Wars: Episod...,Action|Adventure|...
1721,7,5.0,2002-01-16 19:06:05,Titanic (1997),Drama|Romance
2942,7,5.0,2002-01-16 18:38:41,Flashdance (1983),Drama|Romance
2028,7,5.0,2002-01-16 18:24:41,Saving Private Ry...,Action|Drama|War


### 7th User’s ALS Recommentions

In [23]:
recommendations = recommendations.withColumn("rec_exp", explode("recommendations")).select('userId', 
col("rec_exp.movieId"), col("rec_exp.rating"))
recommendations.join(movies, on='movieId').filter('userId = 7').show()

                                                                                

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
|   3226|     7| 5.637633|Hellhounds on My ...|         Documentary|
| 121029|     7| 5.573067|No Distance Left ...|         Documentary|
| 120821|     7| 5.295107|The War at Home (...|     Documentary|War|
| 129536|     7|5.0036817|Code Name Coq Rou...|  (no genres listed)|
| 114070|     7|4.9300246|Good Job:  Storie...|         Documentary|
| 128366|     7|4.8328657|Patton Oswalt: Tr...|              Comedy|
| 117907|     7| 4.705026|My Brother Tom (2...|               Drama|
| 129451|     7| 4.669075|    Ingenious (2009)|Comedy|Drama|Romance|
| 112473|     7|4.6646147|Stuart: A Life Ba...|               Drama|
| 129243|     7| 4.609404|Afstiros katallil...|              Comedy|
+-------+------+---------+--------------------+--------------------+



# Approach with Dask

# KNN

In [29]:
!pip install "dask[complete]"

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting bokeh!=2.0.0,>=1.0.0; extra == "complete"
  Downloading bokeh-2.3.3.tar.gz (10.7 MB)
[K     |████████████████████████████████| 10.7 MB 1.5 MB/s eta 0:00:01    |███▊                            | 1.3 MB 195 kB/s eta 0:00:49
[?25hCollecting distributed==2021.07.0; extra == "complete"
  Downloading distributed-2021.7.0-py3-none-any.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 1.8 MB/s eta 0:00:01
Collecting pillow>=7.1.0
  Downloading Pillow-8.3.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 663 kB/s eta 0:00:01
Collecting psutil>=5.0
  Downloading psutil-5.8.0-cp38-cp38-manylinux2010_x86_64.whl (296 kB)
[K     |████████████████████████████████| 296 kB 649 kB/s eta 0:00:01
Collecting sortedcontainers!=2.0.0,!=2.0.1
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Collecting tblib>=

In [4]:
import joblib
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors
import numpy as np

In [None]:
!python -m pip install dask distributed --upgrade

In [None]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4, processes=False, memory_limit='8GB')

# If we doesn't convert userId to category then will met errors

model_knn= NearestNeighbors(metric='cosine', algorithm='brute', n_neighbors=20)
movies_users= ratings.limit(1000000).toPandas().pivot(index='movieId', columns='userId',values='rating').fillna(0)
    

In [None]:
import dask
import dask.dataframe as dd
ratings_dask_df = dd.read_csv('./drive/MyDrive/BigDataProject/ratings.csv')

In [None]:
ratings_dask_df.head()

In [None]:
with joblib.parallel_backend('dask'):
    mat_movies_users=csr_matrix(movies_users.values)
    model_knn.fit(mat_movies_users)

In [None]:
!pip install fuzzywuzzy
from fuzzywuzzy import process
def recommender(movie_name, data, model, n_recommendations ):
    df_movies = movies.toPandas()
    model.fit(data)
    idx=process.extractOne(movie_name, df_movies['title'])[2]
    print('Movie Selected: ', df_movies['title'][idx], 'Index: ',idx)
    print('Searching for recommendations.....')
    distances, indices=model.kneighbors(data[idx], n_neighbors=n_recommendations)
    for i in indices:
        print(df_movies['title'][i].where(i!=idx))
    
recommender('Toy Story (1995)', mat_movies_users, model_knn,20)

# SVD

## Basend on funk-svd is a Python 3 library implementing a fast version of the famous SVD algorithm popularized by Simon Funk during the Neflix Prize contest.

In [24]:
!pip install git+https://github.com/gbolmier/funk-svd

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting git+https://github.com/gbolmier/funk-svd
  Cloning https://github.com/gbolmier/funk-svd to /tmp/pip-req-build-nu5nkxsc
  Running command git clone -q https://github.com/gbolmier/funk-svd /tmp/pip-req-build-nu5nkxsc
Building wheels for collected packages: funk-svd
  Building wheel for funk-svd (setup.py) ... [?25ldone
[?25h  Created wheel for funk-svd: filename=funk_svd-0.0.1.dev1-py3-none-any.whl size=9047 sha256=331a6b2a1409b96f8b99d74555e59c19408c428fb9b203629e0c365a5c65e95c
  Stored in directory: /tmp/pip-ephem-wheel-cache-40ftelzt/wheels/f8/93/18/db4114b3fafc2eb9a319db1e3b3c3465db51d1fdc1d4f2e769
Successfully built funk-svd
Installing collected packages: funk-svd
Successfully installed funk-svd-0.0.1.dev1


In [25]:
import pandas as pd
from funk_svd import SVD

In [27]:
movies_df = movies.toPandas()
rating_df = ratings.toPandas()

                                                                                

In [None]:
movies_df.head(5)

In [28]:
rating_df.columns = ['u_id', 'i_id', 'rating', 'timestamps']
movies_df.columns = ['i_id', 'title', 'genres']
rating_df

Unnamed: 0,u_id,i_id,rating,timestamps
0,1,2,3.5,2005-04-02 23:53:47
1,1,29,3.5,2005-04-02 23:31:16
2,1,32,3.5,2005-04-02 23:33:39
3,1,47,3.5,2005-04-02 23:32:07
4,1,50,3.5,2005-04-02 23:29:40
...,...,...,...,...
20000258,138493,68954,4.5,2009-11-13 15:42:00
20000259,138493,69526,4.5,2009-12-03 18:31:48
20000260,138493,69644,3.0,2009-12-07 18:10:57
20000261,138493,70286,5.0,2009-11-13 15:42:24


In [None]:
%%time
from sklearn.metrics import mean_squared_error, mean_absolute_error
# movielens18.drop(columns = 'timestamp', inplace = True)

with joblib.parallel_backend('dask'):
  train = rating_df.sample(frac=0.8)
  val = rating_df.drop(train.index.tolist()).sample(frac=0.5, random_state=8)
  test = rating_df.drop(train.index.tolist()).drop(val.index.tolist())

In [None]:
train

In [None]:
lr, reg, factors = (0.01, 0.03, 90)

with joblib.parallel_backend('dask'):
  svd = SVD(lr=lr, reg=reg, n_epochs=20, n_factors=factors,
            min_rating=0.5, max_rating=5)
  svd.fit(X=train, X_val=val)

pred = svd.predict(test)
mae = mean_absolute_error(test["rating"], pred)
rmse = np.sqrt(mean_squared_error(test["rating"], pred))
print("Test MAE:  {:.2f}".format(mae))
print("Test RMSE: {:.2f}".format(rmse))
print('{} factors, {} lr, {} reg'.format(factors, lr, reg))

#User Recommendations

In [None]:
n_m = len(rating_df.i_id.unique())

#  Initialize my ratings
my_ratings = np.zeros(n_m)


my_ratings[4993] = 5
my_ratings[1080] = 5
my_ratings[260] = 5
my_ratings[4896] = 5
my_ratings[1196] = 5
my_ratings[1210] = 5
my_ratings[2628] = 5
my_ratings[5378] = 5

print('User ratings:')
print('-----------------')

for i, val in enumerate(my_ratings):
    if val > 0:
        print('Rated %d stars: %s' % (val, movies_df.loc[movies_df.i_id==i].title.values))

In [None]:

print("Adding your recommendations!")
items_id = [item[0] for item in np.argwhere(my_ratings>0)]
ratings_list = my_ratings[np.where(my_ratings>0)]
user_id = np.asarray([0] * len(ratings_list))

user_ratings = pd.DataFrame(list(zip(user_id, items_id, ratings_list)), columns=['u_id', 'i_id', 'rating'])



In [None]:
try:
    rating_df = rating_df.drop(columns=['timestamps'])
except:
    pass
data_with_user = rating_df.append(user_ratings, ignore_index=True)



with joblib.parallel_backend('dask'):
  train_user = data_with_user.sample(frac=0.8)
  val_user = data_with_user.drop(train_user.index.tolist()).sample(frac=0.5, random_state=8)
  test_user = data_with_user.drop(train_user.index.tolist()).drop(val_user.index.tolist())



In [None]:
from itertools import product


def funk_svd_predict(userID, data_with_user, movies_df):
    userID = [userID]

    # all_users = data_with_user.u_id.unique()
    all_movies = data_with_user.i_id.unique()
    recommendations = pd.DataFrame(list(product(userID, all_movies)), columns=['u_id', 'i_id'])

    #Getting predictions for the selected userID
    pred_train = svd.predict(recommendations)
    recommendations['prediction'] = pred_train
    recommendations.head(10)

    sorted_user_predictions = recommendations.sort_values(by='prediction', ascending=False)

    user_ratings = data_with_user[data_with_user.u_id == userID[0]]
    user_ratings.columns = ['u_id',	'i_id', 'rating']
    # Recommend the highest predicted rating movies that the user hasn't seen yet.
    recommendations = movies_df[~movies_df['i_id'].isin(user_ratings['i_id'])].\
        merge(pd.DataFrame(sorted_user_predictions).reset_index(drop=True), how = 'inner', left_on = 'i_id', right_on = 'i_id').\
        sort_values(by='prediction', ascending = False)#.drop(['i_id'],axis=1)

    rated_df = movies_df[movies_df['i_id'].isin(user_ratings['i_id'])].\
        merge(pd.DataFrame(data_with_user).reset_index(drop=True), how = 'inner', left_on = 'i_id', right_on = 'i_id')
    rated_df = rated_df.loc[rated_df.u_id==userID[0]].sort_values(by='rating', ascending = False)
    
    return recommendations, rated_df
recommendations, rated_df = funk_svd_predict(0, data_with_user, movies_df)

In [None]:
rated_df

In [None]:
recommendations.head(10)