In [1]:
#import libraries
import pandas as pd

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StringIndexer

In [2]:
#import ratings data
df_ratings = pd.read_csv('Preprocessed_data_csv')

In [8]:
print("Total number of Ratings: {}".format(df_ratings.shape[0]))

Total number of Ratings: 6053296


In [3]:
#import title data
df_title = pd.read_csv('title_data_csv')

In [9]:
print("Total number of Ratings: {}".format(df_title.shape[0]))

Total number of Ratings: 1301225


In [5]:
#Merge the data 
df = pd.merge(df_ratings,df_title,on='asin',how='left')
df.head()

Unnamed: 0,overall,reviewerID,asin,title
0,4.0,A2MNB77YGJ3CN0,B00004R940,George Foreman GR20WHT XL Grill
1,4.0,A2MNB77YGJ3CN0,B00004R940,George Foreman GR20WHT XL Grill
2,5.0,A1LUUO72VAFKFJ,B00004S4TZ,"Polder 307T 307 Preset Thermometer, White"
3,5.0,A1LUUO72VAFKFJ,B00004S4TZ,"Polder 307T 307 Preset Thermometer, White"
4,5.0,ALSQSIHU2ETLM,B00004RDAZ,Hamilton Beach 52204 Blender/Food Processor


In [10]:
#take portion of data
df = df.iloc[0:50000]

In [11]:
df.count()

overall       50000
reviewerID    50000
asin          50000
title         50000
dtype: int64

In [13]:
#Creating Spark Context
sc = SparkContext('local')
spark = SparkSession(sc)

In [14]:
#Create a spark dataframe
data = spark.createDataFrame(df)

In [15]:
#Display of the dataframe
data.show(5)

+-------+--------------+----------+--------------------+
|overall|    reviewerID|      asin|               title|
+-------+--------------+----------+--------------------+
|    4.0|A2MNB77YGJ3CN0|B00004R940|George Foreman GR...|
|    4.0|A2MNB77YGJ3CN0|B00004R940|George Foreman GR...|
|    5.0|A1LUUO72VAFKFJ|B00004S4TZ|Polder 307T 307 P...|
|    5.0|A1LUUO72VAFKFJ|B00004S4TZ|Polder 307T 307 P...|
|    5.0| ALSQSIHU2ETLM|B00004RDAZ|Hamilton Beach 52...|
+-------+--------------+----------+--------------------+
only showing top 5 rows



In [17]:
#Analyzing dataframe
data.describe().show()

+-------+------------------+--------------+-------------------+--------------------+
|summary|           overall|    reviewerID|               asin|               title|
+-------+------------------+--------------+-------------------+--------------------+
|  count|             50000|         50000|              50000|               50000|
|   mean|             4.297|          null|  4.8959391429375E9|                42.0|
| stddev|1.1438431568809393|          null|2.508200634354148E9|                 0.0|
|    min|               1.0|A100C1Z111U34R|         1581174292|"MINI" Miniature ...|
|    max|               5.0| AZZU5BA2CHYVF|         B01HBC1K1W|“Dyson Genuine DC...|
+-------+------------------+--------------+-------------------+--------------------+



In [18]:
#transform itemID and userID string type to index (integer type)

# Indexing for items
itemIndexer = StringIndexer(inputCol="asin", outputCol="itemid",handleInvalid='error')  
# Indexing for users
userIndexer = StringIndexer(inputCol='reviewerID',outputCol='userid',handleInvalid='error')

temp = itemIndexer.fit(data)
itemIndexed = temp.transform(data)

temp = userIndexer.fit(itemIndexed)
userIndexed = temp.transform(itemIndexed) 

# remove old columns - user and item ID's of string type
df_final = userIndexed.drop('asin').drop('reviewerID') 
df_final.show(5)

+-------+--------------------+------+------+
|overall|               title|itemid|userid|
+-------+--------------------+------+------+
|    4.0|George Foreman GR...|4391.0|  20.0|
|    4.0|George Foreman GR...|4391.0|  20.0|
|    5.0|Polder 307T 307 P...|2883.0| 946.0|
|    5.0|Polder 307T 307 P...|2883.0| 946.0|
|    5.0|Hamilton Beach 52...|1491.0|4622.0|
+-------+--------------------+------+------+
only showing top 5 rows



In [19]:
# renaming columns 
df_final = df_final.selectExpr("userid as userID","itemid as productID","overall as rating","title as title")

In [20]:
df_final.show(5)

+------+---------+------+--------------------+
|userID|productID|rating|               title|
+------+---------+------+--------------------+
|  20.0|   4391.0|   4.0|George Foreman GR...|
|  20.0|   4391.0|   4.0|George Foreman GR...|
| 946.0|   2883.0|   5.0|Polder 307T 307 P...|
| 946.0|   2883.0|   5.0|Polder 307T 307 P...|
|4622.0|   1491.0|   5.0|Hamilton Beach 52...|
+------+---------+------+--------------------+
only showing top 5 rows



In [21]:
#Changng the type of the users and items to Integer format
df_final = df_final.selectExpr("cast(userID as long) as userID", "cast(productID as long) as productID", "rating", "title")
df_final.show(5)

+------+---------+------+--------------------+
|userID|productID|rating|               title|
+------+---------+------+--------------------+
|    20|     4391|   4.0|George Foreman GR...|
|    20|     4391|   4.0|George Foreman GR...|
|   946|     2883|   5.0|Polder 307T 307 P...|
|   946|     2883|   5.0|Polder 307T 307 P...|
|  4622|     1491|   5.0|Hamilton Beach 52...|
+------+---------+------+--------------------+
only showing top 5 rows



In [22]:
#Separate dataframe of item and their titles
df_titles =  df_final.selectExpr("productID", "title")
df_titles.show(5)

+---------+--------------------+
|productID|               title|
+---------+--------------------+
|     4391|George Foreman GR...|
|     4391|George Foreman GR...|
|     2883|Polder 307T 307 P...|
|     2883|Polder 307T 307 P...|
|     1491|Hamilton Beach 52...|
+---------+--------------------+
only showing top 5 rows



In [23]:
#Dataframe for users, items and ratings
df_temp = df_final.drop('title')
df_temp.show(5)

+------+---------+------+
|userID|productID|rating|
+------+---------+------+
|    20|     4391|   4.0|
|    20|     4391|   4.0|
|   946|     2883|   5.0|
|   946|     2883|   5.0|
|  4622|     1491|   5.0|
+------+---------+------+
only showing top 5 rows



In [24]:
#Saving the final ratings data
final_data = df_temp.toPandas()
final_data.to_csv('AmazonRatings', index = False)

In [25]:
#Saving the final title data
final_title = df_titles.toPandas()
final_title.to_csv('AmazonTitles', index = False)