In [None]:
! apt update

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
H

In [None]:
#Java JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Downloading Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
#Unzipping the hadoop file
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
#Unzip the file
!unzip book1M.zip

Archive:  book1M.zip
   creating: book1M/
  inflating: book1M/BX-Book-Ratings.csv  
  inflating: book1M/BX-Books.csv     
  inflating: book1M/BX-Users.csv     
  inflating: book1M/explicit_ratings_books.csv  


In [None]:
###################### SPARK SETUP ################################
#Install findspark
!pip install -q findspark

In [None]:
!pip install py4j



In [None]:
#Setting up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
#Initialize Spark session using findspark lib
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [None]:
import os
import time

from pyspark.sql import SparkSession
import pyspark 

from pyspark.sql.functions import lit
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline, PipelineModel

import math
import numpy as np
import pandas as pd


import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
from pyspark.sql import functions as f

**read the data file**

In [None]:
df = spark.read.csv(path = '/content/book1M/explicit_ratings_books.csv', header = True,inferSchema = True)

In [None]:
df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- yearOfPublication: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)



In [None]:
df.show(10,truncate=True)

+------+----------+------+--------------------+----------------+-----------------+--------------------+---+-------------+
|userID|      ISBN|rating|               title|          author|yearOfPublication|           publisher|age|      country|
+------+----------+------+--------------------+----------------+-----------------+--------------------+---+-------------+
|276726|0155061224|     5|    Rites of Passage|      Judith Rae|             2001|              Heinle| 34|          usa|
|276729|052165615X|     3|      Help!: Level 1|   Philip Prowse|             1999|Cambridge Univers...| 16|      croatia|
|276729|0521795028|     6|The Amsterdam Con...|     Sue Leather|             2001|Cambridge Univers...| 16|      croatia|
|276744|038550120X|     7|     A Painted House|    JOHN GRISHAM|             2001|           Doubleday| 34|          usa|
|276747|0060517794|     9|Little Altars Eve...|   Rebecca Wells|             2003|         HarperTorch| 25|          usa|
|276747|0671537458|     

In [None]:
df.count()

383842

In [None]:
df.describe()

DataFrame[summary: string, userID: string, ISBN: string, rating: string, title: string, author: string, yearOfPublication: string, publisher: string, age: string, country: string]

**Collaborative Filtering: Data Modeling using Alternating Least Square matrix (ALS)
Select required columns (UserID, ISBN, Ratings)**

In [None]:
# selecting the columns to work with in the dataset, we do not need all columns for the prediction, only userID, ISBN & rating column 
data=df.select(df['userID'],df['ISBN'],df['rating'])
data.show()

+------+----------+------+
|userID|      ISBN|rating|
+------+----------+------+
|276726|0155061224|     5|
|276729|052165615X|     3|
|276729|0521795028|     6|
|276744|038550120X|     7|
|276747|0060517794|     9|
|276747|0671537458|     9|
|276747|0679776818|     8|
|276747|0943066433|     7|
|276747|1885408226|     7|
|276748|0747558167|     6|
|276751|3596218098|     8|
|276754|0684867621|     8|
|276755|0451166892|     5|
|276762|0380711524|     5|
|276762|3453092007|     8|
|276772|0553572369|     7|
|276772|3499230933|    10|
|276772|3596151465|    10|
|276774|3442136644|     9|
|276786|8437606322|     8|
+------+----------+------+
only showing top 20 rows



In [None]:
# Converting String columns (userID & ISBN) to index
s_indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(data.columns)-set(['rating'])) ]
pipeline = Pipeline(stages=s_indexer)
dftransform = pipeline.fit(data).transform(data)
dftransform.show()

+------+----------+------+------------+----------+
|userID|      ISBN|rating|userID_index|ISBN_index|
+------+----------+------+------------+----------+
|276726|0155061224|     5|     56337.0|   58420.0|
|276729|052165615X|     3|     26364.0|   87191.0|
|276729|0521795028|     6|     26364.0|   87208.0|
|276744|038550120X|     7|     56338.0|     216.0|
|276747|0060517794|     9|     12196.0|    1070.0|
|276747|0671537458|     9|     12196.0|    2592.0|
|276747|0679776818|     8|     12196.0|    1951.0|
|276747|0943066433|     7|     12196.0|  124008.0|
|276747|1885408226|     7|     12196.0|  137096.0|
|276748|0747558167|     6|     56339.0|   42440.0|
|276751|3596218098|     8|     56340.0|  145073.0|
|276754|0684867621|     8|     56341.0|     374.0|
|276755|0451166892|     5|     56342.0|     198.0|
|276762|0380711524|     5|     26365.0|    2698.0|
|276762|3453092007|     8|     26365.0|   27412.0|
|276772|0553572369|     7|     18853.0|    6882.0|
|276772|3499230933|    10|     

Membagi dataset yang sudah dilakukan cleaning ke dalam 2 bagian, Data Training
(80%) dan Data Testing (20%).

In [None]:
# Randomly split the data into train and test where 80% data is in train and remaining is test
train, test = dftransform.randomSplit([0.8, 0.2])
print("  Train dataset:", train.count(), "rows")
print("  Test dataset: ", test.count(), "rows")

  Train dataset: 307153 rows
  Test dataset:  76689 rows


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
# Build a recommendation model using Alternating Least Squares method
# Evaluate the model by computing the RMSE on the test data
model = ALS(userCol="userID_index", itemCol="ISBN_index", ratingCol="rating", nonnegative=True, coldStartStrategy="drop").fit(train)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

In [None]:
# Make predictions and print the RMSE of the ALS model
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("New RMSE: ", evaluator.evaluate(model.transform(test)))

New RMSE:  2.4802905847663936


***Implementing ALS with Cross Validation***

In [None]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Now we try to improve the performance of the original model using cross validation and solve the cold-start problem.
# we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

model = ALS(userCol="userID_index", itemCol="ISBN_index", ratingCol="rating", nonnegative = True, coldStartStrategy="drop")

#For Parameter tuning of the ALS model we use ParamGridBuilder function
#We tune two parameters 
#1. The Regularization parameter ranging from 0.1, 0.01, 0.001, 0.0001
#2. The rank for matrix factorization
paramGrid = ParamGridBuilder() \
    .addGrid(model.regParam, [0.1, 0.05, 0.01, 0.001]) \
    .addGrid(model.rank, [5, 10, 20, 30]) \
    .build()

#Defining a cross-validator object
#Setting up CV and adding parameters. We will be performing a 5 fold CV
crossvalidation = CrossValidator(estimator = model,
                     estimatorParamMaps = paramGrid,
                     evaluator = evaluator,
                     numFolds=5)

In [None]:

# Build cross validation using CrossValidator
cv = CrossValidator(estimator=model, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [None]:
#Fit cross validator to the 'train' dataset
model = crossvalidation.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

***Printing the Best Model's parameter values***

In [None]:
#The Best_model
print(type(best_model))
#Complete the code below to extract the ALS model parameters
print("**Best Model**")
#Rank
print("Rank: ", best_model._java_obj.parent().getRank())
#MaxIter
print("MaxIter: ", best_model._java_obj.parent().getMaxIter())
#RegParam
print("RegParam: ", best_model._java_obj.parent().getRegParam())
# Calculate the RMSE on test data using the best set of parameters obtained after cross validation
print("Best RMSE value is: ", evaluator.evaluate(best_model.transform(test)))

In [None]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(paramGrid))

In [None]:
pred = best_model.transform(test)
pred.show(10)

In [None]:
pred.join(Movies, "movieId").select("userId","title","genres","prediction").show(5)

+------+--------------------+------+----------+
|userId|               title|genres|prediction|
+------+--------------------+------+----------+
|   597|Hudsucker Proxy, ...|Comedy|  4.671783|
|   602|Hudsucker Proxy, ...|Comedy| 3.5895188|
|   409|Hudsucker Proxy, ...|Comedy| 3.9152715|
|   610|Hudsucker Proxy, ...|Comedy|  3.603048|
|   217|Hudsucker Proxy, ...|Comedy| 2.7374995|
+------+--------------------+------+----------+
only showing top 5 rows



***Data Sparsity and Cold Start***

In [None]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct userIds and distinct movieIds
    total_elements = ratings.select("userId").distinct().count() * ratings.select("movieId").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.2f" % sparsity + "% sparse.")
    
get_mat_sparsity(ratings)

The ratings dataframe is  99.82% sparse.
