In [1]:
from pyspark.sql import SparkSession
spark=(SparkSession
    .builder
    .appName('phase2')
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/amazon.review")
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2")
    .config('spark.driver.memory','10g')
    .config('spark.executor.memory','15g')
    .config('spark.executor.cores',4)
    .getOrCreate())
spark

In [2]:
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/amazon.review").load()
data.show(5)

+--------------------+----------+-------+--------------------+--------------+------------+--------------+--------+----+
|                 _id|      asin|overall|          reviewText|    reviewerID|reviewerName|unixReviewTime|verified|vote|
+--------------------+----------+-------+--------------------+--------------+------------+--------------+--------+----+
|{644edb388c152636...|B01C6DXMX0|    5.0|   Love this product|A1MWPEXTKRZTVU|      cherie|    1506384000|   false|null|
|{644edb388c152636...|B00BWYVWLO|    5.0|It's exactly what...|A1QYUQDUVUV05V|       toots|    1438819200|    true|null|
|{644edb388c152636...|B01C6DXMX0|    4.0|great product , e...|A15Q1O8A4WWL0M| Greg Willis|    1506297600|   false|null|
|{644edb388c152636...|B01C6DXMX0|    5.0|It just works great!|A1K0Z9BW3SYMBB|     Alan L.|    1506211200|   false|null|
|{644edb388c152636...|B01C6DXMX0|    5.0|This camera works...|A2NIFC4XEWL3EC|         BMW|    1506211200|    true|null|
+--------------------+----------+-------

In [3]:
df = data.drop("_id","reviewText","reviewerName","unixReviewTime","verified","vote")
df.show(3)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|B01C6DXMX0|    5.0|A1MWPEXTKRZTVU|
|B00BWYVWLO|    5.0|A1QYUQDUVUV05V|
|B01C6DXMX0|    4.0|A15Q1O8A4WWL0M|
+----------+-------+--------------+
only showing top 3 rows



In [6]:
df1 = df.na.drop()

# Saving dataframe in parquet format for faster queries

In [7]:
df1.coalesce(1).write.mode('overwrite').option("header", "true").parquet('data.parquet')

In [10]:
spark.stop()

# Preparing data for ML model

In [2]:
from pyspark.sql import SparkSession
spark=(SparkSession
    .builder
    .appName('phase2')
    .config('spark.driver.memory','10g')
    .config('spark.executor.memory','15g')
    .config('spark.executor.cores',8)
    .getOrCreate())
spark

## Due to memory issues, we created seperate files for distinct values

In [3]:
df = spark.read.parquet('dataset.parquet')

In [4]:
df2 = df.select("asin").distinct()

In [8]:
df2.coalesce(1).write.mode('overwrite').option("header", "true").csv('asin2')

In [6]:
df3 = df.select("reviewerID").distinct()

In [7]:
df3.coalesce(1).write.mode('overwrite').option("header", "true").csv('asin')

## Assigning unique index in each file

In [1]:
from pyspark.sql import SparkSession
spark=(SparkSession
    .builder
    .appName('phase2')
    .config('spark.driver.memory','10g')
    .config('spark.executor.memory','15g')
    .config('spark.executor.cores',8)
    .getOrCreate())
spark

## ASIN

In [3]:
df = spark.read.csv('product.csv',header=True)
df.show(3)

+----------+
|      asin|
+----------+
|B000K6B5J4|
|B01E709KWW|
|B00HB2ZVWM|
+----------+
only showing top 3 rows



In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

updated = df.withColumn("index", row_number().over(Window.orderBy("asin")))

updated.show(3)

+----------+-----+
|      asin|index|
+----------+-----+
|0000000116|    1|
|0000000868|    2|
|0000001589|    3|
+----------+-----+
only showing top 3 rows



In [5]:
updated.coalesce(1).write.mode('overwrite').option("header", "true").csv('updated_asin')

## ReviewerID

In [3]:
df = spark.read.csv('unique_reviewer.csv',header=True)
df.show(3)

+--------------------+-----+
|          reviewerID|index|
+--------------------+-----+
|A0000040I1OM9N4SGBD8|    1|
|A0000074RA15UCBH3ON5|    2|
|A000013090ZI3HIT9N5V|    3|
+--------------------+-----+
only showing top 3 rows



Unique reviewers

In [4]:
df.count()

43531850

In [8]:
updated = df.withColumn("index", row_number().over(Window.orderBy("reviewerID")))
updated.show(3)

+--------------------+-----+
|          reviewerID|index|
+--------------------+-----+
|A0000040I1OM9N4SGBD8|    1|
|A0000074RA15UCBH3ON5|    2|
|A000013090ZI3HIT9N5V|    3|
+--------------------+-----+
only showing top 3 rows



In [9]:
updated.coalesce(1).write.mode('overwrite').option("header", "true").csv('updated_reviewer')

# inner joining with main data dataframe

## ASIN 

In [2]:
df1 = spark.read.parquet('dataset.parquet')

In [5]:
df2 = spark.read.csv('unique_asin.csv',header=True)
df2.show(1)

+----------+-----+
|      asin|index|
+----------+-----+
|0000000116|    1|
+----------+-----+
only showing top 1 row



In [8]:
combined = df2.join(df1,df2.asin ==  df1.asin,"inner")
combined.show(1)

+----------+-----+----------+-------+--------------+
|      asin|index|      asin|overall|    reviewerID|
+----------+-----+----------+-------+--------------+
|0001714422|  169|0001714422|    5.0|A1LTMOQIYR3UTJ|
+----------+-----+----------+-------+--------------+
only showing top 1 row



In [9]:
final = combined.drop('asin')

In [10]:
final.coalesce(1).write.mode('overwrite').option("header", "true").csv('combined_asin')

## ReviewerID

In [3]:
df1 = spark.read.csv('combined_with_asin.csv',header=True)
df1.show(1)

+-----+-------+--------------+
|index|overall|    reviewerID|
+-----+-------+--------------+
|  169|    5.0|A1LTMOQIYR3UTJ|
+-----+-------+--------------+
only showing top 1 row



In [4]:
df2 = spark.read.csv('unique_reviewer.csv',header=True)
df2.show(1)

+--------------------+-----+
|          reviewerID|index|
+--------------------+-----+
|A0000040I1OM9N4SGBD8|    1|
+--------------------+-----+
only showing top 1 row



In [5]:
df3 = df2.withColumnRenamed("index","reviewer_index")
df3.show(1)

+--------------------+--------------+
|          reviewerID|reviewer_index|
+--------------------+--------------+
|A0000040I1OM9N4SGBD8|             1|
+--------------------+--------------+
only showing top 1 row



In [10]:
combined = df1.join(df3,df1.reviewerID == df3.reviewerID ,"inner")

In [11]:
final = combined.drop('reviewerID')

In [12]:
final.coalesce(1).write.mode('overwrite').option("header", "true").csv('final_combined')

# TRAINING MODEL

In [1]:
from pyspark.sql import SparkSession
spark=(SparkSession
    .builder
    .appName('phase2')
    .config('spark.driver.memory','10g')
    .config('spark.executor.memory','15g')
    .config('spark.executor.cores',8)
    .getOrCreate())
spark

In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [3]:
df = spark.read.csv('final.csv',header=True)
df.show(5)

+--------+-------+--------------+
|   index|overall|reviewer_index|
+--------+-------+--------------+
| 4422427|    5.0|           518|
|10297439|    5.0|           518|
| 7434094|    5.0|           543|
| 4086771|    5.0|           543|
| 4086771|    5.0|           543|
+--------+-------+--------------+
only showing top 5 rows



In [4]:
df.printSchema()

root
 |-- index: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- reviewer_index: string (nullable = true)



In [5]:
df2 = df.withColumn("index",df["index"].cast('integer'))
df3 = df2.withColumn("overall",df["overall"].cast('integer'))
df4 = df3.withColumn("reviewer_index",df["reviewer_index"].cast('integer'))

In [6]:
df4.printSchema()

root
 |-- index: integer (nullable = true)
 |-- overall: integer (nullable = true)
 |-- reviewer_index: integer (nullable = true)



In [7]:
df4.createTempView("test")

In [8]:
result1 = spark.sql("select * from test where reviewer_index in (select reviewer_index from test group by reviewer_index having count(reviewer_index) > 50)")

In [9]:
result1.coalesce(1).write.mode('overwrite').option("header", "true").csv('30query')

In [9]:
df = spark.read.csv('50_query.csv',header=True)
df.show(5)

+--------+-------+--------------+
|   index|overall|reviewer_index|
+--------+-------+--------------+
|10632623|      5|          5984|
| 7108016|      5|          5984|
|11671156|      5|          5984|
| 9036280|      5|          5984|
|13143720|      4|          5984|
+--------+-------+--------------+
only showing top 5 rows



In [11]:
df2 = df.withColumn("index",df["index"].cast('integer'))
df3 = df2.withColumn("overall",df["overall"].cast('integer'))
df4 = df3.withColumn("reviewer_index",df["reviewer_index"].cast('integer'))

In [12]:
df4.printSchema()

root
 |-- index: integer (nullable = true)
 |-- overall: integer (nullable = true)
 |-- reviewer_index: integer (nullable = true)



In [13]:
(training,test)=df4.randomSplit([0.8, 0.2],seed = 50)

In [17]:
training.coalesce(1).write.mode('overwrite').option("header", "true").csv('train50')

In [14]:
als=ALS(maxIter=3,regParam=0.09,rank=6,userCol="reviewer_index",itemCol="index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)

In [15]:
model=als.fit(training)

In [16]:
model.save('50_set')

In [16]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))

RMSE=1.0328847009803104


In [None]:
df.createTempView("ids")

In [None]:
result1 = spark.sql("select distinct(reviewer_index) from ids")

In [None]:
result1.coalesce(1).write.mode('overwrite').option("header", "true").csv('unique_train_50')