## IMPORT NECESSARY LIBRARIES

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *

## STATE THE INPUT AND OUTPUT URI 

In [4]:
input_uri = "mongodb://localhost:27017/Amazon.Reviews"
output_uri = "mongodb://localhost:27017/Amazon.Reviews"

## MAKE A SPARK SESSION AND SET MONGODB CONNECTOR

In [5]:
spark = SparkSession.builder \
    .appName("myProject") \
    .config("spark.mongodb.input.uri", input_uri) \
    .config("spark.mongodb.output.uri", output_uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()

In [4]:
spark

## READING FROM MONGODB

In [27]:
df_mongo = spark.read.format("mongo").load()

## NOW FOR COLLABORATIVE FILTERING 

In [28]:
df_collab = df_mongo.select('reviewerID', 'asin', 'overall')
df_collab.createOrReplaceTempView("df_collab")

In [None]:
df_distinct_asin = spark.sql("SELECT DISTINCT asin FROM data")

In [16]:
df_distinct_asin.coalesce(1).write.option("header",True) \
 .csv("asin_csv")

In [17]:
df_distinct_reviewer = spark.sql("SELECT DISTINCT reviewerID FROM data")

In [18]:
df_distinct_reviewer.coalesce(1).write.option("header", True) \
    .csv("reviewerID_csv")

In [29]:
df_asin = spark.read.csv("asin_csv", header=True)
df_asin.createOrReplaceTempView("df_asin")

In [None]:
df_reviewer = spark.read.csv("reviewerID_csv", header=True)
df_reviewer.createOrReplaceTempView("df_reviewer")

In [None]:
df_joined.describe

In [None]:
df_joined = spark.sql("SELECT ROW_NUMBER() OVER (ORDER BY df_collab.reviewerID) - 1 AS reviewer_index, \
                    ROW_NUMBER() OVER (ORDER BY df_collab.asin) - 1 AS asin_index, \
                    CAST(df_collab.overall AS INTEGER) AS overall \
                    FROM df_collab \
                    INNER JOIN df_asin ON df_collab.asin = df_asin.asin \
                    INNER JOIN df_reviewer ON df_collab.reviewerID = df_reviewer.reviewerID")

In [None]:
df_joined.write \
    .option("header", True) \
    .option("nullValue", "") \
    .option("quote", "") \
    .option("escape", "") \
    .option("mode", "overwrite") \
    .csv("All_joined_csv")

In [6]:
df_joined_csv = spark.read.csv("All_Joined_csv", header=True)
df_joined_csv.createOrReplaceTempView("df_joined_csv")

## FILTERING OUT THE DATA

In [7]:
query = """
SELECT *
FROM df_joined_csv where overall > 4
"""

filtered_df = spark.sql(query)

## Get the product IDs with the most reviews

In [8]:
popular_products = filtered_df.groupBy("asin_index").count().orderBy(col("count").desc()).limit(300).select("asin_index")

In [None]:
popular_products.count()

## Filter for reviews of popular products

In [7]:
filtered_df = filtered_df.join(popular_products, on="asin_index")

## ALS MODEL PREDICTION

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.recommendation import ALSModel
import pyspark.sql.functions as F

In [9]:
filtered_df = filtered_df.select(col("reviewer_index").cast("int"), col("asin_index").cast("int"), col("overall").cast("int"))
filtered_df.describe

<bound method DataFrame.describe of DataFrame[reviewer_index: int, asin_index: int, overall: int]>

In [None]:
filtered_df.count()

In [10]:
(training, test) = filtered_df.randomSplit([.7, .3])

In [11]:
training.count()

104056989

## SAVE TRAINING DATASET TO USE AFTER MODEL TRAIN

In [13]:
training.coalesce(1).write.option("header", True).csv("training_csv")

## SAVE TEST DATASET TO USE AFTER MODEL TRAIN AS WELL

In [14]:
test.coalesce(1).write.option("header", True).csv("test_csv")

## READ THE TRAINING AND TESTING DATASET

In [15]:
training_df = spark.read.csv("training_csv", header=True)
training_df = training_df.select(col("reviewer_index").cast("int"), col("asin_index").cast("int"), col("overall").cast("int"))
training_df.count()

104056989

In [16]:
test_df = spark.read.csv("test_csv", header=True)
test_df = test_df.select(col("reviewer_index").cast("int"), col("asin_index").cast("int"), col("overall").cast("int")).distinct()
test_df.count()

44592961

## TRAINING MODEL

In [17]:
als = ALS(maxIter=2, rank=6, userCol='reviewer_index', itemCol='asin_index', ratingCol='overall', coldStartStrategy='drop')

In [None]:
model = als.fit(training_df)SS

## SAVE THE MODEL 

In [14]:
model.save("ALS")

## LOAD THE SAVED MODEL 

In [9]:
model = ALSModel.load("ALS")

## GET RECOMMENDATIONS FROM MODEL

In [43]:
user_recs=model.recommendForAllUsers(20).show(10)

+--------------+--------------------+
|reviewer_index|     recommendations|
+--------------+--------------------+
|       3221405|[{100006060, 4.96...|
|       3627754|[{100015935, 4.96...|
|       8511114|[{100011693, 4.96...|
|       9737584|[{100013635, 4.96...|
|      10246362|[{100016101, 4.96...|
|      11582635|[{100002437, 4.96...|
|      11930560|[{100011262, 4.96...|
|      12191461|[{100013570, 4.96...|
|      12296296|[{100016143, 4.96...|
|      12614929|[{100021718, 3.96...|
+--------------+--------------------+
only showing top 10 rows



## EVALUATING MODEL 

In [16]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

RMSE=0.0021583798928685068
+--------------+----------+-------+----------+
|reviewer_index|asin_index|overall|prediction|
+--------------+----------+-------+----------+
|      10760282|   1000127|      4| 3.9974043|
|      83476127|   1000146|      5|  4.997922|
|     218680506|    100010|      5| 4.9979224|
|     219591444|   1000073|      5|  4.997922|
|       1614443|  10000600|      5| 4.9979224|
|      30351220|  10000021|      5| 4.9979224|
|      52928479|  10000528|      5|  4.997922|
|      75138366|  10000989|      4| 3.9974043|
|      84206832|  10001183|      5| 4.9979224|
|      92575672|  10001380|      5|  4.997922|
|      98123032|  10001599|      5|  4.997922|
|     100427680|  10001677|      5|  4.997922|
|     108372703|  10001910|      5|  4.997922|
|     123574269|  10001272|      5|  4.997922|
|     126250063|  10001331|      4| 3.9974043|
|     184678585|  10002084|      5|  4.997922|
|     185276685|  10002100|      5| 4.9979215|
|     192470325|  10001751|      

## GET RECOMMENDED BY PRODUCT ID

In [12]:
df_reviewer = spark.read.csv("reviewerID_csv", header=True)
df_reviewer.createOrReplaceTempView("df_reviewer")

In [9]:
df_reviewer.show(10)

+--------------+
|    reviewerID|
+--------------+
| AIQDYZVLGWHER|
|A3O3B0H45VICX0|
|A3EPQDDI5QLVR0|
|A3O8XET7PDCNSF|
| AECVG05HBE9I0|
|A3A5UPM2KP70S2|
|A22A0KTRXCN74T|
|A13OHG9NAYCRVG|
|A170DUCSTHOIGS|
|A3R5X3HI509L7X|
+--------------+
only showing top 10 rows



In [10]:
df_reviewer = spark.sql("SELECT reviewerID, ROW_NUMBER() OVER (ORDER BY reviewerID) - 1 AS reviewer_index FROM df_reviewer")
df_reviewer.show()

+--------------------+--------------+
|          reviewerID|reviewer_index|
+--------------------+--------------+
|A0000040I1OM9N4SGBD8|             0|
|A0000074RA15UCBH3ON5|             1|
|A000013090ZI3HIT9N5V|             2|
|A0000148KSJ81F2E3O7V|             3|
|A0000188NWOSI5X2PMSN|             4|
|A0000196KBA0ICH151EG|             5|
|A00003323X6I53YWRGN0|             6|
|A000033826RVJH496D4A|             7|
|A0000378ZNUHTQUDNNHR|             8|
|A0000448ZD4QU0AQCOH8|             9|
|A0000618JRL8NVY0J0AN|            10|
|A00006301SOXP1JTSSEW|            11|
|A00007664HEMMTK5IAWX|            12|
|A00007762BKXYRMOCC0A|            13|
|A0000862BTSWL73O3J0Y|            14|
|A00008882A0PUVHCTDUP|            15|
|A0000932YCOC06EWVVQY|            16|
|A0000966VPR3PHG0J8GV|            17|
|A0000978V0GBY0646VAM|            18|
|A00009928J2TXTYX144F|            19|
+--------------------+--------------+
only showing top 20 rows



In [None]:
df_reviewer.coalesce(1).write.option("header", True) \
    .csv("reviewerID_reviewerIndex_csv")

In [31]:
df_reviewer = spark.read.csv("reviewerID_reviewerIndex_csv", header=True)
df_reviewer.createOrReplaceTempView("df_reviewer")

In [13]:
df_reviewer = df_reviewer.withColumn("reviewer_index", col("reviewer_index").cast("integer"))
df_reviewer.describe

<bound method DataFrame.describe of DataFrame[reviewerID: string, reviewer_index: int]>

In [17]:
# Count the number of reviews for each asin
df_asin_reviews = df_collab.groupBy("asin").agg(count("*").alias("Total_reviews"))

# Join df_asin_reviews with df_asin on the asin column
df_asin = df_asin.join(df_asin_reviews, "asin")
df_asin.createOrReplaceTempView("df_asin")

In [19]:
reviewer_ids = spark.sql("""
    SELECT DISTINCT df_collab.reviewerID
    FROM df_collab
    INNER JOIN df_asin ON df_collab.asin = df_asin.asin
    WHERE df_collab.overall > 3.5 AND df_asin.Total_reviews >= 1000
""")

# Join df_reviewer with reviewer_ids
df_reviewer_filtered = df_reviewer.join(reviewer_ids, on="reviewerID", how="inner")

In [20]:
df_reviewer_filtered

DataFrame[reviewerID: string, reviewer_index: int]

In [22]:
df_reviewer_filtered.coalesce(1).write.option("header", True) \
    .csv("reviewerID_reviewerIndex_filtered")

In [23]:
df_reviewer = spark.read.csv("reviewerID_reviewerIndex_filtered", header=True)
df_reviewer.createOrReplaceTempView("df_reviewer")

In [24]:
df_reviewer.show(10)

+--------------------+--------------+
|          reviewerID|reviewer_index|
+--------------------+--------------+
|A0007154ZQMFVTRTSP3X|           135|
|A002556217M3R4LLKZHR|           436|
|A0031944XK4KVC58YJ6I|           542|
| A0044154XURSDJT4N4I|           755|
|A0072319W2B8TUXQHHD1|          1210|
|A0074159BRRBJRK1EAOY|          1237|
|A0089557DXY1OSDNNIEC|          1492|
|A0092907G6YM6TJ2KMKK|          1544|
|A009516920GSIZCMS1JK|          1578|
|A0098677SNWYKXZKBJB0|          1639|
+--------------------+--------------+
only showing top 10 rows

