# Importing Necessary Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import matplotlib.pyplot as plt

### Starting a Spark Session

In [2]:
spark = SparkSession.builder.appName("Ecommerce Item Recommendation").getOrCreate()

23/12/12 02:22:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Storing the data in the HDFS

In [3]:
!hdfs dfs -rm /user/bigdata/Product_Reviews.csv

Deleted /user/bigdata/Product_Reviews.csv


In [4]:
!hdfs dfs -put /home/Product_Reviews.csv /user/bigdata


In [5]:
!hdfs dfs -ls /user/bigdata/

Found 1 items
-rw-r--r--   2 root hadoop    6934726 2023-12-12 02:22 /user/bigdata/Product_Reviews.csv


### Loading the data from the HDFS to the Spark dataframe

In [6]:
dataset = spark.read.csv('/user/bigdata/Product_Reviews.csv', header = False, inferSchema = True)

                                                                                

### Changing the name of the Columns in the dataframe dataframe

In [7]:
columns = ['UserId', 'ProductId', 'Rating', 'TimeStamp']
dataset = dataset.toDF(*columns)

In [8]:
dataset.printSchema()

root
 |-- UserId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- TimeStamp: integer (nullable = true)



# Data Preprocessing

### Dropping the TimeStamp column

In [10]:
dataset = dataset.drop("TimeStamp")

### Understanding the Distribution of Ratings

### No of Ratings User Distributions

In [12]:
dataset.groupBy("UserId").count().show()



+--------------+-----+
|        UserId|count|
+--------------+-----+
|A2LAFL37V088EU|    2|
|A20YA7V169MKPE|    1|
|A3SNQ55CZZGV1F|    2|
| A39L0QMLQHJF3|    2|
|A23MKAKSMHX3IB|    1|
|A3KTCAGEZ1P56M|    1|
|A3PTVB51FV5OPB|    1|
|A12DVG1FD6DEQE|    1|
|A1IKQCEYH8HZM1|    1|
| A73O73IS6R6NZ|    1|
|A31XTLHV15BWCR|    1|
|A32I48EP8E14TD|    3|
|A1MT9I4QGVH3AS|    1|
|A1OZKCP83VQDWN|    1|
|A2G7WA9SLVQWH6|    1|
|A3OMCDDQV9HDMQ|    1|
| A6HOWM08PLFZ5|    2|
|A1JJSCLG40POB0|    1|
|A1CNLU84K5ZIOZ|    1|
| AUAX1QWUCYKSX|    3|
+--------------+-----+
only showing top 20 rows



                                                                                

### No of Ratings Per Item

In [13]:
dataset.groupBy("ProductId").count().show()



+----------+-----+
| ProductId|count|
+----------+-----+
|B00HG51IA8|   18|
|B00HHKDYMW|    5|
|B00HHR0SYW|  100|
|B00HNF1CLG|    3|
|B00HNKGMPW|    1|
|B00HNKTKQA|    1|
|B00HQMN3P4|    1|
|B00HSG8BQY|    1|
|B00HT8S1LQ|    1|
|B00HTU7J08|    1|
|B00HUC9RD2|    1|
|B00HUG1C7M|    1|
|B00HVKAXU4|    3|
|B00HVUZG38|    1|
|B00HWL2AVM|    4|
|B00HXQDYWK|    3|
|B00HZ4JYRY|    1|
|B00I46JGV6|    1|
|B00IASX4BG|    2|
|B00IAZR98I|    1|
+----------+-----+
only showing top 20 rows



                                                                                

### Dropping Missing values

In [14]:
dataset = dataset.dropna()

### Handling Categorical Values(Converting String Values to Integer Values)

In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType, DoubleType

In [16]:
# Instantiate StringIndexer
userIndexer = StringIndexer(inputCol="UserId", outputCol="TransformedUserId")

# Transform the String values to Double Values
tempDF = userIndexer.fit(dataset).transform(dataset)

#Transform the double values to Integer Values
UserTransformedDF = tempDF.select(tempDF['TransformedUserId'].cast(IntegerType()), tempDF['ProductId'], tempDF['Rating'])


                                                                                

In [17]:
UserTransformedDF.show(10)

23/12/12 02:23:36 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
                                                                                

+-----------------+----------+------+
|TransformedUserId| ProductId|Rating|
+-----------------+----------+------+
|            17751|B00BQYF21G|   4.0|
|            33190|B00BQYF220|   5.0|
|            77758|B00BQYF220|   5.0|
|            25021|B00BQYF220|   5.0|
|            55150|B00BQYF220|   5.0|
|            88870|B00BQYF220|   5.0|
|            91265|B00BQYF220|   5.0|
|           123819|B00BQYF220|   5.0|
|            98461|B00BQYF220|   2.0|
|              577|B00BQYF220|   4.0|
+-----------------+----------+------+
only showing top 10 rows



In [18]:
# Create a StringIndexer Instance for the ProductId
productIndexer = StringIndexer(inputCol='ProductId', outputCol = 'TransformedProductId')

# Transform the String values into Double values
ProductTransformedDF = productIndexer.fit(UserTransformedDF).transform(UserTransformedDF)

# Transform the Double vaues into Integer Values
dataframe = ProductTransformedDF.select(ProductTransformedDF['TransformedUserId'], ProductTransformedDF['TransformedProductId'].cast(IntegerType()), ProductTransformedDF['Rating'])

                                                                                

In [19]:
dataframe.show(10)

23/12/12 02:23:45 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB


+-----------------+--------------------+------+
|TransformedUserId|TransformedProductId|Rating|
+-----------------+--------------------+------+
|            17751|               17212|   4.0|
|            33190|                2805|   5.0|
|            77758|                2805|   5.0|
|            25021|                2805|   5.0|
|            55150|                2805|   5.0|
|            88870|                2805|   5.0|
|            91265|                2805|   5.0|
|           123819|                2805|   5.0|
|            98461|                2805|   2.0|
|              577|                2805|   4.0|
+-----------------+--------------------+------+
only showing top 10 rows



                                                                                

In [20]:
# Changing the column Names
columnNames = ['UserId', 'ProductId', 'Rating']

dataframe = dataframe.toDF(*columnNames)

In [21]:
dataframe.printSchema()

root
 |-- UserId: integer (nullable = true)
 |-- ProductId: integer (nullable = true)
 |-- Rating: double (nullable = true)



### Splitting the data into Training Set and Test Set.

In [22]:
(train_set, test_set) = dataframe.randomSplit([0.8,0.2])

### Creating an instance of the ALS

In [23]:
als = ALS(maxIter = 8, regParam = 0.1, userCol = 'UserId', itemCol = 'ProductId', ratingCol = 'Rating', coldStartStrategy = 'drop')

### Training the model

In [24]:
%%time
model = als.fit(train_set)

23/12/12 02:23:59 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:02 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:03 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:05 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:09 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:12 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:18 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:21 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:24 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:27 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:29 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:24:31 WARN DAGScheduler: Broadcasting larg

CPU times: user 181 ms, sys: 23.5 ms, total: 205 ms
Wall time: 1min 2s


### Testing the model

In [25]:
predictions = model.transform(test_set)

### Displaying the results

In [26]:
predictions.show(10)

23/12/12 02:25:09 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:25:09 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:25:09 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
                                                                                

+------+---------+------+----------+
|UserId|ProductId|Rating|prediction|
+------+---------+------+----------+
| 90586|      496|   4.0| 3.9551299|
|    34|      833|   5.0|  4.875361|
| 74178|    36131|   3.0| 2.9280152|
| 71282|      496|   5.0| 4.9439125|
|   417|      148|   5.0| 4.8761716|
| 12273|    30654|   4.0|  3.898721|
|   246|      148|   5.0|  4.825057|
|120421|     3918|   5.0|  4.963419|
| 56108|      496|   5.0| 4.9439125|
|  1681|     7253|   5.0| 4.9597626|
+------+---------+------+----------+
only showing top 10 rows



### Evaluating the model using RMSE

In [27]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(model.transform(test_set))
print(f"Root Mean Squared Error (RMSE): {rmse}")

23/12/12 02:25:33 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:25:33 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:25:33 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:25:40 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
[Stage 167:>                                                        (0 + 1) / 1]

Root Mean Squared Error (RMSE): 0.8588850271824344


                                                                                

### Generate recommendations for all users

In [29]:

finalRecommendations = model.recommendForAllUsers(10)

pandasFinalRecommendations = finalRecommendations.toPandas()

users, recommendations = [], []

for i in range(len(pandasFinalRecommendations)):
    
    users.append(pandasFinalRecommendations.iloc[i,0])
    
    itemRecommendations = ""
    
    for item in pandasFinalRecommendations.iloc[i,1]:
        
        itemRecommendations = itemRecommendations + ", " + str(item.asDict()['ProductId'])
        
    recommendations.append(itemRecommendations[2:])
    

    
finalRecommendations = pd.DataFrame(data = zip(users, recommendations), columns=["UserId", "Recommendations"])

finalRecommendations.head(10)

23/12/12 02:33:26 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/12/12 02:35:21 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
                                                                                

Unnamed: 0,UserId,Recommendations
0,12,"11575, 8285, 12404, 1661, 12545, 15417, 5504, ..."
1,22,"37973, 38807, 3884, 7270, 25534, 18728, 12262,..."
2,26,"7682, 5167, 30591, 12145, 5513, 33886, 7699, 3..."
3,27,"5099, 29785, 24530, 14885, 526, 5886, 30136, 1..."
4,28,"4050, 5741, 37315, 2805, 23356, 10464, 10362, ..."
5,31,"4957, 11468, 3664, 7393, 15951, 16957, 5317, 7..."
6,34,"36526, 5741, 8543, 31323, 37804, 1526, 30830, ..."
7,44,"5448, 4191, 8259, 5099, 13778, 37804, 6157, 23..."
8,53,"32775, 12219, 12242, 25197, 13078, 4034, 21420..."
9,65,"16446, 11749, 6417, 5994, 38922, 1748, 6253, 1..."


### Show the recommendation for a single user

In [32]:
finalRecommendations = finalRecommendations.reset_index(drop = True)
user = 12

recommendations = finalRecommendations[finalRecommendations['UserId'] == user]

print(recommendations)

   UserId                                    Recommendations
0      12  11575, 8285, 12404, 1661, 12545, 15417, 5504, ...
