In [1]:
#Import all the required Packages and Libraries
import findspark
findspark.init()
import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from sklearn.feature_extraction.text import CountVectorizer as cv
import pyspark.sql.functions as f
import numpy as np
import pandas
import re
import string
from wordcloud import WordCloud, STOPWORDS
import nltk
from pandas import DataFrame
import csv
import math
from numpy.linalg import norm
import warnings
warnings.filterwarnings('ignore')



In [2]:
#initiate spark session
spark = SparkSession \
    .builder \
    .appName("Phone Book - Country Look up") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.sql.caseSensitive", "false")\
    .getOrCreate()
spark.conf.set('spark.sql.caseSensitive', False)

In [3]:
#read in the csv file
df = ps.read_csv('reviews.csv')

#Conver Score to Integer
df['Score'] = df['Score'].astype('int64')

#Get the mean of the Score
mean = df['Score'].mean()

**Data Cleaning and Preprocessing**

In [4]:
#Basic Information about the dataset
print(df.info())
print(df["Score"].describe())

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 568454 entries, 0 to 568453
Data columns (total 10 columns):
 #   Column                  Non-Null Count   Dtype 
---  ------                  --------------   ----- 
 0   Id                      568454 non-null  int32 
 1   ProductId               568454 non-null  object
 2   UserId                  568454 non-null  object
 3   ProfileName             568454 non-null  object
 4   HelpfulnessNumerator    568452 non-null  object
 5   HelpfulnessDenominator  568452 non-null  object
 6   Score                   568162 non-null  int64 
 7   Time                    568449 non-null  object
 8   Summary                 568448 non-null  object
 9   Text                    568444 non-null  object
dtypes: int32(1), int64(1), object(8)None
count    568162.000000
mean          4.176305
std           1.383878
min           0.000000
25%           4.000000
50%           5.000000
75%           5.000000
max          69.000000
Name: Score, dtype: floa

In [5]:
#Replace Null Values with the mean of the score and rest with "No"
df.fillna({'Summary': 'No Summary'}, inplace=True)
df.fillna({'Text': 'No Text'}, inplace=True)
df.fillna({'ProfileName': 'No ProfileName'}, inplace=True)
df.fillna({'HelpfulnessNumerator': 'No HelpfulnessNumerator'}, inplace=True)
df.fillna({'HelpfulnessDenominator': 'No HelpfulnessDenominator'}, inplace=True)
df.fillna({'Time': 'No Time'}, inplace=True)
df.fillna({'Id': 'No Id'}, inplace=True)
df.fillna({'ProductId': 'No ProductId'}, inplace=True)
df.fillna({'UserId': 'No UserId'}, inplace=True)
df.fillna({'Score': mean}, inplace=True)

In [6]:
print(df.info())

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 568454 entries, 0 to 568453
Data columns (total 10 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   Id                      568454 non-null  object 
 1   ProductId               568454 non-null  object 
 2   UserId                  568454 non-null  object 
 3   ProfileName             568454 non-null  object 
 4   HelpfulnessNumerator    568454 non-null  object 
 5   HelpfulnessDenominator  568454 non-null  object 
 6   Score                   568454 non-null  float64
 7   Time                    568454 non-null  object 
 8   Summary                 568454 non-null  object 
 9   Text                    568454 non-null  object 
dtypes: float64(1), object(9)None


In [7]:
#compute the count and mean value as group by the products
count = df.groupby("UserId", as_index=False).count()
mean = df.groupby("UserId", as_index=False).mean()

#merge two dataset create df1
df1 = ps.merge(df, count, how='right', on=['UserId'])

In [8]:
#rename column
df1["Count"] = df1["ProductId_y"]
df1["Score"] = df1["Score_x"]
df1["Summary"] = df1["Summary_x"]


In [9]:
#Create New datafram with selected variables
df1 = df1[['UserId','Summary','Score',"Count"]]
df1 = df1.sort_values('Count', ascending=False)

In [10]:
#Get the reviews more than or equal to 2
df2 = df1[df1.Count >= 2]
#Insights from the dataset2 whohave count more than 2
print(df2.info())
print(df2["Score"].describe())

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 393063 entries, 544569 to 568443
Data columns (total 4 columns):
 #   Column   Non-Null Count   Dtype  
---  ------   --------------   -----  
 0   UserId   393063 non-null  object 
 1   Summary  393063 non-null  object 
 2   Score    393063 non-null  float64
 3   Count    393063 non-null  int64  
dtypes: float64(1), int64(1), object(2)None
count    393063.000000
mean          4.185372
std           1.371463
min           0.000000
25%           4.000000
50%           5.000000
75%           5.000000
max          69.000000
Name: Score, dtype: float64


In [11]:
#create new dataframe as combining all summary with same product Id
df4 = df.groupby("UserId", as_index=False).mean()
combine_summary = df2.groupby("UserId")["Summary"].apply(list)
combine_summary = ps.DataFrame(combine_summary)
#Store the intermediate result for my reference
combine_summary.to_excel("combine_summary.xlsx")

In [12]:
#create with certain columns
df3 = ps.read_excel("combine_summary.xlsx")
df3 = ps.merge(df3, df4, on="UserId", how='inner')
#Store the intermediate result for my reference
df3.to_excel("uncleaned_summary.xlsx")

**Feature Extraction**

In [13]:
#read the data
pdf=pandas.read_excel("uncleaned_summary.xlsx")

In [14]:
#clean the data and use nltk to tokenize the words
cleanup_re = re.compile('[^a-z]+')
def cleanup(sentence):
    sentence = sentence.lower()
    sentence = cleanup_re.sub(' ', sentence).strip()
    sentence = " ".join(nltk.word_tokenize(sentence))
    return sentence

In [15]:
#Apply the cleanup function to the data
pdf['Summary_Clean'] = pdf['Summary'].apply(cleanup)
pdf=pdf.drop_duplicates(['Score'], keep='last')
pdf=pdf.reset_index()
df=spark.createDataFrame(pdf)

In [16]:
#Create theTokenizer and transform the data
tokenizer=Tokenizer(inputCol="Summary",outputCol="words")
words=tokenizer.transform(df)
words.show()

+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+
|index|Unnamed: 0|        UserId|             Summary|            Score|       Summary_Clean|               words|
+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+
|   28|        28|A106ZCP7RSXMRU|['Natural organic...|4.716666666666667|natural organic a...|[['natural, organ...|
|  364|       364|A14RZUPW44KCPF|['Crack for Cats!...|4.655172413793103|crack for cats st...|[['crack, for, ca...|
|  369|       369|A14TUWXDA5WQ7W|['1268179200', '1...|             6.75|                    |[['1268179200',, ...|
|  604|       604|A185QFJRTB5W93|['Great Breading'...|4.527777777777778|great breading fo...|[['great, breadin...|
|  763|       763|A1AEPMPA12GUJ7|['4', '4', '4', '...|0.391304347826087|                    |[['4',, '4',, '4'...|
| 1194|      1194|A1GQAKL9CGQLP1|['Great Dark Choc...|4.955223880597015|great da

In [17]:
#Using the CountVectorizer to create the word count by fiting and transforming the data
count=CountVectorizer(inputCol="words",outputCol="rawFeatures")
model=count.fit(words)
result=model.transform(words)
result.show()


+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|index|Unnamed: 0|        UserId|             Summary|            Score|       Summary_Clean|               words|         rawFeatures|
+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|   28|        28|A106ZCP7RSXMRU|['Natural organic...|4.716666666666667|natural organic a...|[['natural, organ...|(22167,[0,1,2,3,4...|
|  364|       364|A14RZUPW44KCPF|['Crack for Cats!...|4.655172413793103|crack for cats st...|[['crack, for, ca...|(22167,[3,10,29,3...|
|  369|       369|A14TUWXDA5WQ7W|['1268179200', '1...|             6.75|                    |[['1268179200',, ...|(22167,[1591,4670...|
|  604|       604|A185QFJRTB5W93|['Great Breading'...|4.527777777777778|great breading fo...|[['great, breadin...|(22167,[0,1,3,5,6...|
|  763|       763|A1AEPMPA12GUJ7|['4', '4', '4',

In [18]:
#Using the IDF to create the TF-IDF by fiting and transforming the data
idf=IDF(inputCol="rawFeatures",outputCol="features")
idfModel=idf.fit(result)
rescaledData=idfModel.transform(result)
rescaledData.show()
rescaledData=ps.DataFrame(rescaledData)
#Store the result in the excel file for future use
rescaledData.to_excel("summary_features.xlsx")

+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|index|Unnamed: 0|        UserId|             Summary|            Score|       Summary_Clean|               words|         rawFeatures|            features|
+-----+----------+--------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|   28|        28|A106ZCP7RSXMRU|['Natural organic...|4.716666666666667|natural organic a...|[['natural, organ...|(22167,[0,1,2,3,4...|(22167,[0,1,2,3,4...|
|  364|       364|A14RZUPW44KCPF|['Crack for Cats!...|4.655172413793103|crack for cats st...|[['crack, for, ca...|(22167,[3,10,29,3...|(22167,[3,10,29,3...|
|  369|       369|A14TUWXDA5WQ7W|['1268179200', '1...|             6.75|                    |[['1268179200',, ...|(22167,[1591,4670...|(22167,[1591,4670...|
|  604|       604|A185QFJRTB5W93|['Great Breading'...|4.52

In [19]:
#Extracting Words and their frequencies
docs=pdf['Summary_Clean']
vect=cv(max_features=100, stop_words='english')
X=vect.fit_transform(docs)


#Exporting Countvectorizer data to pandas dataframe
df1 = DataFrame(X.A, columns=vect.get_feature_names())
df1=df1.astype(int)
#Exporting in Excel
df1.to_excel("WordScores.xlsx")

**USer Based Recommendation**

In [20]:
#Create a own model using euclidean distance
def kNearestNeighborClassifier(pdataset,point,k):
  results = {}
  for point_item in pdataset:  
    ecludian_distance=math.sqrt(np.sum(np.subtract(point_item,point)*np.subtract(point_item,point)))
    if len(results)<k:
      results[ecludian_distance] = point_item
    else:
      for max_key in sorted(results.keys(),reverse=True):
              if(max_key>=ecludian_distance):
                results[ecludian_distance]=point_item
                results.pop(max_key)
              break
  return results

In [21]:
  #Create a own model using Cosine Similarity
def kNNCosine(pdataset,point,k):
    results = {}
    for point_item in pdataset:  
        cosine_distance=np.dot(point_item,point)/(norm(point_item)*norm(point))
    if len(results)<k:
      results[cosine_distance] = point_item
    else:
      for max_key in sorted(results.keys(),reverse=True):
              if(max_key>=cosine_distance):
                results[cosine_distance]=point_item
                results.pop(max_key)
              break
    return results

In [22]:
#Read the data
pdf=pandas.read_excel("WordScores.xlsx")
raw=pandas.read_excel("summary_features.xlsx")
rev=pandas.read_csv("Reviews.csv")
rev=rev.drop_duplicates(['Summary'], keep='last')
rev["Score"]=rev["Score"].astype(int)
rev=rev.reset_index()

pdflen=len(pdf)
revlen=len(rev)
print(revlen)
print(pdflen)


295743
1097


In [23]:
#Create a CSV file and add the headers in append mode
with open('User_Recommendation.csv', 'a') as file:
    writerObj = csv.writer(file)
    writerObj.writerow(['User','NearestUser', 'Products'])

In [24]:
#Apply the model to the data Using Cosine Similarity  
for i in range(pdflen):
  point=pdf.iloc[i,0]
  results=kNNCosine(pdf.iloc[i],point,11)
  first_related_user=[item for item in results.values()] 
  #Initialize the list and append the related products to the list
  products_list=[]
  for j in first_related_user:
     products_list.append(raw.iloc[j,3])
  if len(products_list)>2:
    products_list=products_list[:1]+products_list[2:]
  products_list=products_list[0:2]


In [25]:
#Apply the model to the data using Euclidean Distance
for i in range(pdflen):
  point=pdf.iloc[i,0]
  results=kNearestNeighborClassifier(pdf.iloc[i],point,11)
  first_related_user=[item for item in results.values()]
  #Initialize the list and append the related products to the list
  products_list=[]
  for j in first_related_user:
     products_list.append(raw.iloc[j,3])
  if len(products_list)>2:
    products_list=products_list[:1]+products_list[2:]
  products_list=products_list[0:2]
  for k in range(revlen):
    if (rev["UserId"][k]==products_list[1] and rev["Score"][k]>3):
      products_list.append(rev["ProductId"][k])
  #Open the file in append mode and write the data to the file
  with open('User_Recommendation.csv', 'a') as file:
    writerObj = csv.writer(file)
    writerObj.writerow(products_list)

**Check User_Recommendation.csv for the results of User based Recommendation**