# Reviews

In [24]:
# import libraries

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import *

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [25]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [26]:
reviews = spark.read.csv('Review_new.csv',inferSchema=True,header=True)

In [27]:
reviews = reviews.withColumn('customer_id',reviews['customer_id'].cast(IntegerType()))
reviews = reviews.withColumn('product_id',reviews['product_id'].cast(IntegerType()))
reviews = reviews.withColumn('rating',reviews['rating'].cast(DoubleType()))

In [28]:
reviews = reviews.na.drop(subset=['customer_id','product_id','rating'],how='any')

In [29]:
reviews.toPandas().to_csv("final_review.csv")

In [30]:
# Distinct users and movies
users = reviews.select('customer_id').distinct().count()
prods = reviews.select('product_id').distinct().count()
numberator = reviews.count()

In [31]:
denominator = users * prods
denominator

1058240750

In [32]:
# Calculating sparsity
sparsity = 1 - (numberator*1.0 / denominator)

In [33]:
reviews_sub = reviews.select('customer_id','product_id','rating')

In [34]:
# reviews_sub.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("final_review.csv")
reviews_sub.toPandas().to_csv("nearly_review.csv")
# spark.sql("select * from reviews_sub").coalesce(1).write.option("mode","append").option("header","true").csv("final_review.csv")

In [35]:
(training,test) = reviews_sub.randomSplit([0.8,0.2])

In [36]:
als = ALS(maxIter=15,
            regParam=0.1,
            userCol='customer_id',
            itemCol='product_id',
            ratingCol='rating',
            coldStartStrategy='drop',
            nonnegative=True)
model = als.fit(training)

In [37]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [38]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction')

In [39]:
rmse = evaluator.evaluate(predictions)

In [40]:
# Get 10 recommendations which have highest rating
user_recs = model.recommendForAllUsers(5)

In [41]:
# products that userID bought
userID = 6177374
test.filter(test['customer_id']==userID).sort('rating',ascending=False).show()

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|    6177374|  47321729|   5.0|
|    6177374|  47452735|   5.0|
|    6177374|    416613|   5.0|
|    6177374|  44009404|   5.0|
|    6177374|  32033717|   5.0|
|    6177374|  71198812|   5.0|
|    6177374|  28571379|   5.0|
|    6177374|  52539829|   5.0|
|    6177374|  53447698|   5.0|
+-----------+----------+------+



In [42]:
training.filter(test['customer_id']==userID).sort('rating',ascending=False).show()

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|    6177374|  15239985|   5.0|
|    6177374|  38458616|   5.0|
|    6177374|  38458616|   5.0|
|    6177374|  32033717|   5.0|
|    6177374|  35726089|   5.0|
|    6177374|    702132|   5.0|
|    6177374|  71051598|   5.0|
|    6177374|  71197117|   5.0|
|    6177374|  23459272|   5.0|
|    6177374|  23556574|   5.0|
|    6177374|   4497817|   5.0|
|    6177374|  47499193|   5.0|
|    6177374|    845378|   5.0|
|    6177374|   7817447|   5.0|
|    6177374|  75186039|   5.0|
|    6177374|    555019|   5.0|
|    6177374|  60030176|   5.0|
|    6177374|  49661643|   5.0|
|    6177374|  51030375|   5.0|
|    6177374|  52070229|   5.0|
+-----------+----------+------+
only showing top 20 rows



In [43]:
result = user_recs.filter(user_recs['customer_id']==userID)
result.show(truncate=False)

+-----------+---------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                |
+-----------+---------------------------------------------------------------------------------------------------------------+
|6177374    |[{57654514, 6.998508}, {20007977, 6.93751}, {74776144, 6.8708816}, {54349904, 6.795362}, {57440303, 6.7288017}]|
+-----------+---------------------------------------------------------------------------------------------------------------+



In [44]:
result = result.select(result.customer_id,explode(result.recommendations))

In [45]:
result = result.withColumn('product_id', result.col.getField('product_id'))\
    .withColumn('rating',result.col.getField('rating'))

In [46]:
# Save to disk
user_recs.write.parquet('user_recs.parquet',mode='overwrite')
# user_recs.write.format("parquet").saveAsTable("recommender_users")