Arun Ganesh \
Final Code \
CSC 369 


# Amazon Product Recommender with ALS model and PySpark



---



In [50]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [51]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col
from pyspark.sql import SparkSession, column
spark = SparkSession.builder.appName('rs_evaluator').getOrCreate()

In [52]:
import pyspark.sql.functions as F
df = spark.read.csv('Amazon_Consumer_Reviews.csv', inferSchema=True, header=True)

df = df.withColumn("reviews.rating",F.col('`reviews.rating`').cast("integer"))

df.printSchema()


root
 |-- id: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- primaryCategories: string (nullable = true)
 |-- imageURLs: string (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- reviews.date: string (nullable = true)
 |-- reviews.dateAdded: string (nullable = true)
 |-- reviews.dateSeen: string (nullable = true)
 |-- reviews.doRecommend: string (nullable = true)
 |-- reviews.id: string (nullable = true)
 |-- reviews.numHelpful: string (nullable = true)
 |-- reviews.rating: integer (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.username: string

In [53]:
import re
from pyspark.sql.functions import col
def rename_cols(df):
    for column in df.columns:
        new_column = column.replace('.','')
        df = df.withColumnRenamed(column, new_column)
    return df
df2 = rename_cols(df)
df2.columns



['id',
 'dateAdded',
 'dateUpdated',
 'name',
 'asins',
 'brand',
 'categories',
 'primaryCategories',
 'imageURLs',
 'keys',
 'manufacturer',
 'manufacturerNumber',
 'reviewsdate',
 'reviewsdateAdded',
 'reviewsdateSeen',
 'reviewsdoRecommend',
 'reviewsid',
 'reviewsnumHelpful',
 'reviewsrating',
 'reviewssourceURLs',
 'reviewstext',
 'reviewstitle',
 'reviewsusername',
 'sourceURLs']

In [54]:
df3 = df2.select('reviewsusername', 'id', 'reviewsrating')
df3.groupBy('reviewsusername').count().orderBy('count', ascending=False).show(10)

+---------------+-----+
|reviewsusername|count|
+---------------+-----+
|           Mike|   22|
|          Chris|   11|
|           Nick|   10|
|           Bill|   10|
|           Tony|   10|
|          Bobby|    9|
|           Dave|    9|
|           Rick|    8|
|           John|    8|
|          Steve|    8|
+---------------+-----+
only showing top 10 rows



In [55]:
df3.groupBy('reviewsrating').count().orderBy('count', ascending=False).show(20)


+-------------+-----+
|reviewsrating|count|
+-------------+-----+
|            5| 2691|
|         null| 1049|
|            4|  877|
|            3|  161|
|            0|  108|
|            1|   60|
|            2|   47|
|            9|    1|
|           97|    1|
|            6|    1|
|           12|    1|
|            8|    1|
|           49|    1|
|           13|    1|
+-------------+-----+



In [56]:
df_4 = df3.filter("reviewsrating != 9 AND reviewsrating != 8 AND reviewsrating != 13 AND reviewsrating != 6 AND reviewsrating != 49 AND reviewsrating != 12")
df_4.select(col("reviewsrating").cast('int').alias("reviewsrating")).printSchema()
df_4.show(10)

root
 |-- reviewsrating: integer (nullable = true)

+--------------------+--------------------+-------------+
|     reviewsusername|                  id|reviewsrating|
+--------------------+--------------------+-------------+
|           Too small|AVqVGZNvQMlgsOJE6eUY|            0|
|Great light reade...|AVqVGZNvQMlgsOJE6eUY|            0|
| Great for the price|AVqVGZNvQMlgsOJE6eUY|            0|
|         A Great Buy|AVqVGZNvQMlgsOJE6eUY|            3|
|Solid entry-level...|AVqVGZNvQMlgsOJE6eUY|            0|
|          Good ebook|AVqVGZNvQMlgsOJE6eUY|            0|
|Light Weight - Ma...|AVqVGZNvQMlgsOJE6eUY|            0|
|    not good quality|AVqVGZNvQMlgsOJE6eUY|            0|
|          best ebook|AVqVGZNvQMlgsOJE6eUY|            0|
|       Great Product|AVqVGZNvQMlgsOJE6eUY|            0|
+--------------------+--------------------+-------------+
only showing top 10 rows



In [57]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString

In [58]:
string_indexer = StringIndexer(inputCol="id", outputCol="id_int")
model = string_indexer.fit(df_4)
df_5 = model.transform(df_4)

In [59]:
df_5.groupBy('id_int').count().orderBy('count', ascending=False).show(10)


+------+-----+
|id_int|count|
+------+-----+
|   0.0|  650|
|   1.0|  590|
|   2.0|  561|
|   3.0|  467|
|   4.0|  371|
|   5.0|  225|
|   6.0|  217|
|   7.0|  195|
|   8.0|  159|
|   9.0|  106|
+------+-----+
only showing top 10 rows



In [60]:
string_indexer = StringIndexer(inputCol="reviewsusername", outputCol="userid")
model = string_indexer.fit(df_5)
df6 = model.transform(df_5)
df6.select(col("reviewsrating").cast('int').alias("reviewsrating")).printSchema()

root
 |-- reviewsrating: integer (nullable = true)



In [61]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

df6.show(5)
df6.select(col("reviewsrating").cast("integer")).printSchema()
df6.printSchema()

+--------------------+--------------------+-------------+------+------+
|     reviewsusername|                  id|reviewsrating|id_int|userid|
+--------------------+--------------------+-------------+------+------+
|           Too small|AVqVGZNvQMlgsOJE6eUY|            0|  10.0|2418.0|
|Great light reade...|AVqVGZNvQMlgsOJE6eUY|            0|  10.0|1262.0|
| Great for the price|AVqVGZNvQMlgsOJE6eUY|            0|  10.0|1259.0|
|         A Great Buy|AVqVGZNvQMlgsOJE6eUY|            3|  10.0| 586.0|
|Solid entry-level...|AVqVGZNvQMlgsOJE6eUY|            0|  10.0|2250.0|
+--------------------+--------------------+-------------+------+------+
only showing top 5 rows

root
 |-- reviewsrating: integer (nullable = true)

root
 |-- reviewsusername: string (nullable = true)
 |-- id: string (nullable = true)
 |-- reviewsrating: integer (nullable = true)
 |-- id_int: double (nullable = false)
 |-- userid: double (nullable = false)



In [62]:
train, test = df6.randomSplit([0.75,0.25])
train.count()

2917

In [63]:
test.count()


1028

In [64]:
from pyspark.ml.recommendation import ALS

rs_test = ALS(maxIter=10, regParam=0.01, userCol='userid', itemCol='id_int', ratingCol='reviewsrating', nonnegative=True, coldStartStrategy="drop")
rs_test = rs_test.fit(train)
pred = rs_test.transform(test)

In [65]:
from pyspark.ml.evaluation import RegressionEvaluator

rs_evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='reviewsrating')
rmse = rs_evaluator.evaluate(pred)
print(rmse)

1.9527562460622925


In [66]:
# Depending on userid calculate what products will be most suitable for user
total = df6.select('id_int').distinct()
used = df6.filter(df6['userid'] == 20).select('id_int').distinct()
used = used.withColumnRenamed("id_int", "id_int_used")
joined = total.join(used, total.id_int == used.id_int_used, how='left')
news = joined.where(col('id_int_used').isNull()).select(col('id_int')).distinct()
news = news.withColumn("userid",lit(int(20)))
news.show(5)

+------+------+
|id_int|userid|
+------+------+
|  18.0|    20|
|   1.0|    20|
|   4.0|    20|
|   2.0|    20|
|  10.0|    20|
+------+------+
only showing top 5 rows



In [67]:
# Top 5 products for user with ASL model
rec = rs_test.transform(news).orderBy('prediction', ascending=False)
rec.createOrReplaceTempView('rec')
rec_5 = spark.sql('SELECT id_int FROM rec LIMIT 5')
prod_id = rec_5.join(df6, rec_5.id_int == df6.id_int, how='left')
prod_id.select('id').join(df2, prod_id.id == df2.id, how='left').select('name').distinct().show(truncate = False)

+------------------------------------------------------------------------------------------------+
|name                                                                                            |
+------------------------------------------------------------------------------------------------+
|"Amazon Kindle E-Reader 6"" Wifi (8th Generation                                                |
|Amazon Fire TV with 4K Ultra HD and Alexa Voice Remote (Pendant Design) | Streaming Media Player|
|Amazon - Kindle Voyage - 4GB - Wi-Fi + 3G - Black                                               |
|"Amazon - Kindle Voyage - 6"" - 4GB - Black"                                                    |
|Amazon Tap - Alexa-Enabled Portable Bluetooth Speaker                                           |
+------------------------------------------------------------------------------------------------+

