In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Configure spark session
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('AMAZON_BOOK')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .config("spark.driver.memory", "5g")\
    .getOrCreate()

In [3]:
# MongoDB connection URI
mongo_uri = "mongodb://localhost:27017/AMAZON_BOOK.RATE"
# Read data from MongoDB collection into a DataFrame and limit to 1,000,000 rows
df_rate = spark.read.format("mongo").option("uri", mongo_uri).load().limit(1000000)
# Show the DataFrame
df_rate.show()

+----------+-----+--------------------+--------------+--------------------+-----+--------------------+--------------------+
|        Id|Price|               Title|       User_id|                 _id|score|             summary|                text|
+----------+-----+--------------------+--------------+--------------------+-----+--------------------+--------------------+
|1558746153| NULL|Chicken Soup for ...| AEKP4FJRWRGZT|{6570367e824b9730...|  5.0|             Helpful|Shows you what ot...|
|1882931173| NULL|Its Only Art If I...| AVCGYZL8FQQTD|{6570367e824b9730...|  4.0|Nice collection o...|This is only for ...|
|1558746153| NULL|Chicken Soup for ...|          NULL|{6570367e824b9730...|  5.0|"This book hit th...|This book was ver...|
|0826414346| NULL|Dr. Seuss: Americ...|A30TK6U7DNS82R|{6570367e824b9730...|  5.0|   Really Enjoyed It|I don't care much...|
|0826414346| NULL|Dr. Seuss: Americ...|A3UH4UZ4RSVO82|{6570367e824b9730...|  5.0|Essential for eve...|"If people become...|
|1558746

In [4]:
df_rate = df_rate.drop('summary')
df_rate

DataFrame[Id: string, Price: string, Title: string, User_id: string, _id: struct<oid:string>, score: string, text: string]

In [5]:
df_rate.show()

+----------+-----+--------------------+--------------+--------------------+-----+--------------------+
|        Id|Price|               Title|       User_id|                 _id|score|                text|
+----------+-----+--------------------+--------------+--------------------+-----+--------------------+
|1558746153| NULL|Chicken Soup for ...| AEKP4FJRWRGZT|{6570367e824b9730...|  5.0|Shows you what ot...|
|1882931173| NULL|Its Only Art If I...| AVCGYZL8FQQTD|{6570367e824b9730...|  4.0|This is only for ...|
|1558746153| NULL|Chicken Soup for ...|          NULL|{6570367e824b9730...|  5.0|This book was ver...|
|0826414346| NULL|Dr. Seuss: Americ...|A30TK6U7DNS82R|{6570367e824b9730...|  5.0|I don't care much...|
|0826414346| NULL|Dr. Seuss: Americ...|A3UH4UZ4RSVO82|{6570367e824b9730...|  5.0|"If people become...|
|1558746153| NULL|Chicken Soup for ...|          NULL|{6570367e824b9730...|  4.0|well me and my fr...|
|1558746153| NULL|Chicken Soup for ...|          NULL|{6570367e824b9730..

In [6]:
df_rate = df_rate.drop('_id')
df_rate = df_rate.drop('Price')
df_rate

DataFrame[Id: string, Title: string, User_id: string, score: string, text: string]

In [7]:
# Convert the "score" column to float
df_rate = df_rate.withColumn("score", col("score").cast("float"))
df_rate

DataFrame[Id: string, Title: string, User_id: string, score: float, text: string]

In [8]:
df_rate = df_rate.drop('Text')
df_rate

DataFrame[Id: string, Title: string, User_id: string, score: float]

In [9]:
# Check for null or NaN values in the "score" column
df_rate.select([count(when(isnan('score') | col('score').isNull(), 'score'))]).show()

+-----------------------------------------------------------------+
|count(CASE WHEN (isnan(score) OR (score IS NULL)) THEN score END)|
+-----------------------------------------------------------------+
|                                                             5729|
+-----------------------------------------------------------------+



In [10]:
df_rate = df_rate.na.drop(subset=["score"])

In [11]:
# Check for null or NaN values in the "score" column
df_rate.select([count(when(isnan('score') | col('score').isNull(), 'score'))]).show()

+-----------------------------------------------------------------+
|count(CASE WHEN (isnan(score) OR (score IS NULL)) THEN score END)|
+-----------------------------------------------------------------+
|                                                                0|
+-----------------------------------------------------------------+



In [12]:
df_rate.select('score').show()

+-----+
|score|
+-----+
|  5.0|
|  4.0|
|  5.0|
|  5.0|
|  5.0|
|  4.0|
|  4.0|
|  4.0|
|  5.0|
|  4.0|
|  5.0|
|  4.0|
|  5.0|
|  5.0|
|  5.0|
|  5.0|
|  5.0|
|  5.0|
|  4.0|
|  5.0|
+-----+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


# Convert string columns to numerical indices
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="skip").fit(df_rate) for column in ['Id', 'Title', 'User_id']]
pipeline = Pipeline(stages=indexers)
df_rate_indexed = pipeline.fit(df_rate).transform(df_rate)

In [14]:
df_rate_indexed.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- score: float (nullable = true)
 |-- Id_index: double (nullable = false)
 |-- Title_index: double (nullable = false)
 |-- User_id_index: double (nullable = false)



In [15]:

# Create ALS model
als = ALS(
    userCol="User_id_index",
    itemCol="Title_index",
    ratingCol="score",
    rank=15,      # Adjust this parameter
    maxIter=15,
    regParam=0.1, # Adjust this parameter
    coldStartStrategy="drop",
)


# Split the data into training and test sets
(training, test) = df_rate_indexed.randomSplit([0.8, 0.2], seed=1234)

# Fit the ALS model to the training data
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="score", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

Root Mean Squared Error (RMSE) = 1.6910231008300247


In [18]:

# Create ALS model
als = ALS(
    userCol="User_id_index",
    itemCol="Title_index",
    ratingCol="score",
    rank=50,      # Adjust this parameter
    maxIter=20,
    regParam=0.1, # Adjust this parameter
    coldStartStrategy="drop",
)


# Split the data into training and test sets
(training, test) = df_rate_indexed.randomSplit([0.8, 0.2], seed=1234)

# Fit the ALS model to the training data
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="score", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

Root Mean Squared Error (RMSE) = 1.5040903560449084


In [17]:
# Generate top N book recommendations for all users
userRecs = model.recommendForAllUsers(5)  # You can change 5 to the desired number of recommendations

# Show the recommendations
userRecs.show(truncate=False)

+-------------+----------------------------------------------------------------------------------------------------+
|User_id_index|recommendations                                                                                     |
+-------------+----------------------------------------------------------------------------------------------------+
|1            |[{9994, 5.9698405}, {10882, 5.7770405}, {11277, 5.7585497}, {11979, 5.7438855}, {3456, 5.743495}]   |
|3            |[{56495, 6.4719734}, {66459, 6.072416}, {33491, 5.9220114}, {11175, 5.875997}, {6849, 5.8391876}]   |
|6            |[{15651, 6.038165}, {22066, 5.840315}, {7832, 5.798223}, {22632, 5.7839284}, {60924, 5.6988163}]    |
|12           |[{56495, 5.767653}, {14754, 5.479958}, {9087, 5.339184}, {7412, 5.246888}, {6184, 5.242036}]        |
|13           |[{52411, 6.301175}, {11175, 6.296687}, {64065, 6.206318}, {70918, 6.1913047}, {56495, 6.1319957}]   |
|16           |[{52411, 6.165396}, {13267, 5.9807725}, {49132, 5