##### Problem
The problem at hand is a decrease in customer satisfaction, leading to concerns about customer retention and loyalty. Amazon has observed a decline in customer satisfaction scores, an increase in negative customer feedback. The goal of this data science project is to understand the root causes of the declining customer satisfaction and develop actionable insights and recommendations to address the issue effectively.

##### Business Objectives

- Personalize recommendations and product offerings to increase customer engagement and sales.
- Optimize Customer Purchasing Experience by filtering out fake reviews
- Enhance the quality of existing products to improve customers perception of the company thereby fostering customer loyalty and repeat purchases

##### Research Objectives

- To analyze product quality issues to gauge main pain points that customers deal with when interacting with the platform

- To develop a model to classify fake or manipulated reviews within the dataset 

- To investigate the impact of incorporating review text, on the performance of the product recommendation system.


In [29]:
#Import Libraries
import time
from pyspark.sql import SparkSession,DataFrame
import pandas as pd
from functools import reduce
from pyspark.sql.functions import rand
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from datetime import datetime
from pyspark.sql.types import StringType,FloatType,TimestampType,DateType,ArrayType,IntegerType
import re
import nltk
from nltk.corpus import stopwords,wordnet
from nltk.tokenize import word_tokenize
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Tokenizer,CountVectorizer,IDF\
,HashingTF,NGram,BucketedRandomProjectionLSH,Normalizer
import seaborn as sns
import matplotlib.pyplot as plt
import math    
from textblob import TextBlob
import string
from bs4 import BeautifulSoup
from nltk.stem import PorterStemmer
from nltk.stem.wordnet import WordNetLemmatizer
import spacy
from scipy.stats import pointbiserialr
import plotly.express as px
from pyspark.conf import SparkConf
import wordcloud 
from  pyspark.ml import PipelineModel
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator,BinaryClassificationEvaluator,MulticlassClassificationEvaluator,\
ClusteringEvaluator
from pyspark.ml.clustering import KMeans,BisectingKMeans,KMeansModel
from pyspark.ml.classification import NaiveBayes,LinearSVC,RandomForestClassifier,LogisticRegression,DecisionTreeClassifier
from pyspark.ml.recommendation import ALS,ALSModel
from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasInputCol, HasOutputCol,TypeConverters
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable
from pyspark import keyword_only
from pyspark.ml.linalg import Vectors, VectorUDT
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
from scipy.sparse import coo_matrix
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from pyspark.ml.feature import StringIndexer,IndexToString,MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler,StandardScaler,PCA
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import CoordinateMatrix
import multiprocessing
from textblob import Word
import numpy as np
from collections import Counter
import plotly.graph_objects as go

In [30]:
# Create a SparkSession
spark = SparkSession.builder.appName("amazonapp").getOrCreate()

# Read the data from a tab-separated values (TSV) file located in a Google Cloud Storage (GCS) bucket

# The data contains us customer product reviews from Amazon and is stored in the file amazonReviews1M.tsv

#This dataset is a sample of a much larger dataset from kaggle which contains over 100 million rows 
#(https://www.kaggle.com/datasets/cynthiarempel/amazon-us-customer-reviews-dataset)

#The dataset is a random sample of the original dataset
df = spark.read.parquet(r"C:\Users\dn10657\source\repos\Capstone\dataset\amazonReviewsparquet1M")

In [31]:
#checking the size of the dataset
df.count()

1033363

In [32]:
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+--------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|         review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+--------------------+
|         US|   26524369|R3HH81BIQ3ZZE1|B00SETTZGE|      10327631|RUGGED & DAPPER –...|          Beauty|          3|            6|         11|   N|                Y|Good/great face w...|The face wash is ...|2015-04-25T00:00:...|
|         US|   26920430|R2R0NREZG6WG2V|B006XY3Y1Y|      57006252|Tretinoin Microsp.

In [33]:
#The data contains customer product reviews from Amazon with each review having a class column where "0" 
#indicates not spam and "1" indicates spam reviews.

#dataset was sourced from https://www.kaggle.com/datasets/naveedhn/amazon-product-review-spam-and-non-spam
spam_df = spark.read.parquet(r"C:\Users\dn10657\source\repos\Capstone\dataset\spam_dataset_parquet") 


In [34]:
spam_df.limit(5).toPandas()

Unnamed: 0,_id,asin,category,class,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,"(5a132773741a2384e84b883e,)",B000AAYBZ0,Sports_and_Outdoors,1.0,"[0, 0]",4.0,I advise you to read all of the reviews of the...,"03 31, 2009",A34IJACMU8C3IM,"Rob W. ""Sounddude""",Good Seat Bag For The Money!,1238457600
1,"(5a132773741a2384e84b884a,)",B000AAYBZ0,Sports_and_Outdoors,0.0,"[2, 14]",1.0,I can't figure the thing out. How does it mo...,"04 21, 2008",AM779PE0NTAEK,Thomas C. Ardoline,stuck riding,1208736000
2,"(5a132773741a2384e84b886a,)",B000AAYC5O,Sports_and_Outdoors,0.0,"[2, 2]",1.0,Made of cheap plastic and will not support the...,"08 25, 2012",A2FU8Y7IE3K84K,Tom S,Not worth the money,1345852800
3,"(5a132773741a2384e84b8874,)",B000AAYC6I,Sports_and_Outdoors,1.0,"[1, 1]",5.0,Easy to put together.Sturdy.Great for the gara...,"06 19, 2011",A13E8WQH3FFQL0,honeycombcandles,Bike Rack,1308441600
4,"(5a132773741a2384e84b888a,)",B000AAYC72,Sports_and_Outdoors,1.0,"[4, 4]",4.0,I read the reviews about this product before I...,"11 19, 2010",A3ED6RMQAUMVYR,echapman,"Works with bolt ons, fairly quiet, and stable",1290124800


In [35]:
#checking the size of the dataset
spam_df.count()

1010113

In [36]:
#taking sample of dataset to see what data is like
spam_df.limit(5).toPandas()

Unnamed: 0,_id,asin,category,class,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,"(5a132773741a2384e84b883e,)",B000AAYBZ0,Sports_and_Outdoors,1.0,"[0, 0]",4.0,I advise you to read all of the reviews of the...,"03 31, 2009",A34IJACMU8C3IM,"Rob W. ""Sounddude""",Good Seat Bag For The Money!,1238457600
1,"(5a132773741a2384e84b884a,)",B000AAYBZ0,Sports_and_Outdoors,0.0,"[2, 14]",1.0,I can't figure the thing out. How does it mo...,"04 21, 2008",AM779PE0NTAEK,Thomas C. Ardoline,stuck riding,1208736000
2,"(5a132773741a2384e84b886a,)",B000AAYC5O,Sports_and_Outdoors,0.0,"[2, 2]",1.0,Made of cheap plastic and will not support the...,"08 25, 2012",A2FU8Y7IE3K84K,Tom S,Not worth the money,1345852800
3,"(5a132773741a2384e84b8874,)",B000AAYC6I,Sports_and_Outdoors,1.0,"[1, 1]",5.0,Easy to put together.Sturdy.Great for the gara...,"06 19, 2011",A13E8WQH3FFQL0,honeycombcandles,Bike Rack,1308441600
4,"(5a132773741a2384e84b888a,)",B000AAYC72,Sports_and_Outdoors,1.0,"[4, 4]",4.0,I read the reviews about this product before I...,"11 19, 2010",A3ED6RMQAUMVYR,echapman,"Works with bolt ons, fairly quiet, and stable",1290124800


In [37]:
spam_df.limit(5).toPandas()

Unnamed: 0,_id,asin,category,class,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,"(5a132773741a2384e84b883e,)",B000AAYBZ0,Sports_and_Outdoors,1.0,"[0, 0]",4.0,I advise you to read all of the reviews of the...,"03 31, 2009",A34IJACMU8C3IM,"Rob W. ""Sounddude""",Good Seat Bag For The Money!,1238457600
1,"(5a132773741a2384e84b884a,)",B000AAYBZ0,Sports_and_Outdoors,0.0,"[2, 14]",1.0,I can't figure the thing out. How does it mo...,"04 21, 2008",AM779PE0NTAEK,Thomas C. Ardoline,stuck riding,1208736000
2,"(5a132773741a2384e84b886a,)",B000AAYC5O,Sports_and_Outdoors,0.0,"[2, 2]",1.0,Made of cheap plastic and will not support the...,"08 25, 2012",A2FU8Y7IE3K84K,Tom S,Not worth the money,1345852800
3,"(5a132773741a2384e84b8874,)",B000AAYC6I,Sports_and_Outdoors,1.0,"[1, 1]",5.0,Easy to put together.Sturdy.Great for the gara...,"06 19, 2011",A13E8WQH3FFQL0,honeycombcandles,Bike Rack,1308441600
4,"(5a132773741a2384e84b888a,)",B000AAYC72,Sports_and_Outdoors,1.0,"[4, 4]",4.0,I read the reviews about this product before I...,"11 19, 2010",A3ED6RMQAUMVYR,echapman,"Works with bolt ons, fairly quiet, and stable",1290124800


In [38]:
#function that gets the shape of data set (number of rows and columns)
def shape(df):
    # Get the number of rows
    num_rows = df.count()

    # Get the number of columns
    num_cols = len(df.columns)

    # Print the shape of the DataFrame
    print(f"Shape: ({num_rows}, {num_cols})")

In [39]:
shape(spam_df)

Shape: (1010113, 12)


In [40]:
shape(df)

Shape: (1033363, 15)


In [41]:
df.limit(5).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,26524369,R3HH81BIQ3ZZE1,B00SETTZGE,10327631,RUGGED & DAPPER – Facial Cleanser for Men – 8 ...,Beauty,3,6,11,N,Y,"Good/great face wash, poor bottle design",The face wash is great. My skin feels differen...,2015-04-25T00:00:00.000Z
1,US,26920430,R2R0NREZG6WG2V,B006XY3Y1Y,57006252,Tretinoin Microsphere 0.1% USP w/w 20g,Beauty,4,1,1,N,Y,Works Great,"Works well on uneven skin tone, melasma, sun s...",2012-06-08T00:00:00.000Z
2,US,26523163,R1ZYSPZ3LWHVM4,B008QYE40Y,966404562,Sleek i-Divine Storm Palette Mineral based Eye...,Beauty,5,1,1,N,Y,Wonderful palette!,This eyeshadow palette is wonderful! The color...,2013-05-26T00:00:00.000Z
3,US,9963368,R3VXVN0I0E1PIV,B004VFXVJW,806462967,The Shave Well Company a TRULY FOG FREE Shower...,Beauty,5,1,1,N,Y,BEST MIRROR EVER,My husband has a very rough beard and therefor...,2012-02-22T00:00:00.000Z
4,US,2662502,R2H4PHUSEOH4T8,B0009OAHQY,591505715,Claiborne Cologne by Liz Claiborne for men Col...,Beauty,5,1,1,N,Y,Five Stars,Love it,2014-11-13T00:00:00.000Z


In [42]:
#shows missing values for all columns
def showMissingValues(df):
    print('\nMissing values:')
    # Iterate over each column and print the name and count of missing values
    for c in df.columns:
        null_count = df.select(count(when(col(c).isNull(), c))).collect()[0][0]    
        print("{}: {}".format(c, null_count))

In [43]:
#shows missing values for specific column
def showMissingValuesByCol(colName,df):
    print('\nMissing values:')
    null_count = df.select(count(when(col(colName).isNull(),colName))).collect()[0][0]    
    print("{}: {}".format(colName, null_count))

In [44]:
#shows rows where specific column is null
def showPandasDFOfNullRows(colName, df, limit):
    null_rows = df.filter(isnull(df[colName]))
    random_null_rows = null_rows.orderBy(rand()).limit(limit)
    return random_null_rows.toPandas()

In [45]:
showMissingValues(df)


Missing values:
marketplace: 0
customer_id: 0
review_id: 0
product_id: 0
product_parent: 0
product_title: 0
product_category: 9
star_rating: 9
helpful_votes: 9
total_votes: 9
vine: 9
verified_purchase: 9
review_headline: 69
review_body: 117
review_date: 73


In [46]:
showMissingValues(spam_df)


Missing values:
_id: 0
asin: 0
category: 0
class: 0
helpful: 0
overall: 0
reviewText: 0
reviewTime: 0
reviewerID: 0
reviewerName: 8066
summary: 0
unixReviewTime: 0


In [47]:
#gets all the column names in dataset
spam_df.columns

['_id',
 'asin',
 'category',
 'class',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [48]:
#gets all the column names in dataset
df.columns

['marketplace',
 'customer_id',
 'review_id',
 'product_id',
 'product_parent',
 'product_title',
 'product_category',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'vine',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date']

In [49]:
# Check the data types
print('Data types:')
print(df.dtypes)



Data types:
[('marketplace', 'string'), ('customer_id', 'string'), ('review_id', 'string'), ('product_id', 'string'), ('product_parent', 'string'), ('product_title', 'string'), ('product_category', 'string'), ('star_rating', 'string'), ('helpful_votes', 'string'), ('total_votes', 'string'), ('vine', 'string'), ('verified_purchase', 'string'), ('review_headline', 'string'), ('review_body', 'string'), ('review_date', 'string')]


In [50]:
# Check the data types
print('Data types:')
print(spam_df.dtypes)

Data types:
[('_id', 'struct<$oid:string>'), ('asin', 'string'), ('category', 'string'), ('class', 'double'), ('helpful', 'array<bigint>'), ('overall', 'double'), ('reviewText', 'string'), ('reviewTime', 'string'), ('reviewerID', 'string'), ('reviewerName', 'string'), ('summary', 'string'), ('unixReviewTime', 'bigint')]


In [51]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [52]:
spam_df.printSchema()


root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- category: string (nullable = true)
 |-- class: double (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [53]:
# Use describe() to get summary statistics
summary = df.describe()

# Show the summary
summary.select("summary","star_rating","helpful_votes","total_votes").toPandas()

Unnamed: 0,summary,star_rating,helpful_votes,total_votes
0,count,1033354.0,1033354.0,1033354.0
1,mean,4.15050505441504,2.16458541796906,2.871916110064896
2,stddev,1.305044384231812,21.947333910851263,23.990776655808123
3,min,1.0,0.0,0.0
4,max,5.0,993.0,99.0


In [58]:
# Use describe() to get summary statistics
summary = spam_df.select("class","overall").describe()

# Show the summary
summary.toPandas()
#summary.show()

Unnamed: 0,summary,class,overall
0,count,1010113.0,1010113.0
1,mean,0.7775536004387628,4.129152876955351
2,stddev,0.415889612897432,1.2905134894886965
3,min,0.0,1.0
4,max,1.0,5.0


**Data Exploration Amazon Spam Reviews Dataset**

In [None]:
#gets the count for each unique values in column
spam_df.groupBy('class').count().orderBy('count').show()

In [None]:
spam_df.groupBy('reviewText').count().orderBy('count').show()

In [None]:
spam_df.groupBy('reviewerID').count().orderBy('count').show()

In [None]:
spam_df.groupBy('asin').count().orderBy('count').show()

**Data Exploration Amazon Product Reviews Dataset**

In [None]:
df.groupBy('customer_id').count().orderBy('count').show()

In [None]:
df.groupBy('review_id').count().orderBy('count').show()

In [None]:
df.groupBy('product_parent').count().orderBy('count').show()

In [None]:
df.groupBy('product_title').count().orderBy('count').show()

In [None]:
df.groupBy('product_category').count().orderBy('count').show()

In [None]:
df.groupBy('star_rating').count().orderBy('count').show()

In [None]:
df.groupBy('helpful_votes').count().orderBy('count').show()

In [None]:
df.groupBy('total_votes').count().orderBy('count').show()

In [None]:
df.groupBy('vine').count().orderBy('count').show()

In [None]:
df.groupBy('verified_purchase').count().orderBy('count').show()

In [None]:
df.groupBy('review_headline').count().orderBy('count').show()

In [None]:
df.groupBy('review_body').count().orderBy('count').show()

In [None]:
df.groupBy('review_date').count().orderBy('count').show()

### Handling data types and missing values Amazon Product Reviews Dataset 

In [None]:
#selecting specific columns which are supposed to be applicable to project
spam_df=spam_df.select(col("class"),col("reviewText"),col("reviewTime"),col("asin"),col("reviewerID"))

In [55]:
spam_df.limit(5).toPandas()

Unnamed: 0,_id,asin,category,class,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,"(5a132773741a2384e84b883e,)",B000AAYBZ0,Sports_and_Outdoors,1.0,"[0, 0]",4.0,I advise you to read all of the reviews of the...,"03 31, 2009",A34IJACMU8C3IM,"Rob W. ""Sounddude""",Good Seat Bag For The Money!,1238457600
1,"(5a132773741a2384e84b884a,)",B000AAYBZ0,Sports_and_Outdoors,0.0,"[2, 14]",1.0,I can't figure the thing out. How does it mo...,"04 21, 2008",AM779PE0NTAEK,Thomas C. Ardoline,stuck riding,1208736000
2,"(5a132773741a2384e84b886a,)",B000AAYC5O,Sports_and_Outdoors,0.0,"[2, 2]",1.0,Made of cheap plastic and will not support the...,"08 25, 2012",A2FU8Y7IE3K84K,Tom S,Not worth the money,1345852800
3,"(5a132773741a2384e84b8874,)",B000AAYC6I,Sports_and_Outdoors,1.0,"[1, 1]",5.0,Easy to put together.Sturdy.Great for the gara...,"06 19, 2011",A13E8WQH3FFQL0,honeycombcandles,Bike Rack,1308441600
4,"(5a132773741a2384e84b888a,)",B000AAYC72,Sports_and_Outdoors,1.0,"[4, 4]",4.0,I read the reviews about this product before I...,"11 19, 2010",A3ED6RMQAUMVYR,echapman,"Works with bolt ons, fairly quiet, and stable",1290124800


#### product_category

In [45]:
showPandasDFOfNullRows("product_category", df, 10)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,30466043,R292MVXQ4Y7CIV,B0082CXEI8,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,
1,US,34077068,R5QMKUM3C1KO3,B0082CXEI8,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,
2,US,50625085,R2AL699XSAMK0I,B0082CXEI8,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,
3,US,48047331,R2OMXDNEF8VE5Q,B001U4ZZPK,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,
4,US,5883076,R1B99YM5JYBAUI,B00CLVNIE6,6211165,Fonus 3.5mm White Premium Coiled Car Stereo Au...,,,,,,,,,
5,US,7621146,R19OBAWEBSTEH2,B0082CXEI8,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,
6,US,31368223,R1KZPYNT0NDZNC,B00MGVKA4C,994037962,Konjac Sponge - Original - Facial Sponge\tBeau...,,,,,,,,,
7,US,52259120,RZ7PQZGLI0F2J,B006VHU6PK,723345439,Cross Stitch: Your Own Cross Stitch Masterpiec...,,,,,,,,,
8,US,31051333,R3AEEHBJ24ICWA,B001U4ZZPK,661176068,"Blue Sea Systems BelowDeck Panels, Dual USB Ch...",,,,,,,,,


##### Drop the rows where the last 9 rows are null. There is no essential info that can be garnered from this

In [46]:

clist = df.columns[-9:]

In [47]:
clist

['product_category',
 'star_rating',
 'helpful_votes',
 'total_votes',
 'vine',
 'verified_purchase',
 'review_headline',
 'review_body',
 'review_date']

In [48]:
df=df.dropna(subset=clist, how='all')

In [49]:
df.count()

1033354

In [50]:
showMissingValues(df)


Missing values:
marketplace: 0
customer_id: 0
review_id: 0
product_id: 0
product_parent: 0
product_title: 0
product_category: 0
star_rating: 0
helpful_votes: 0
total_votes: 0
vine: 0
verified_purchase: 0
review_headline: 60
review_body: 108
review_date: 64


In [51]:
#There is date value appearing as product_category most likely due to data misalignment so it was removed
df=df.filter(col("product_category") != "2011-09-09")

In [52]:
df.groupBy('product_category').count().orderBy('count').show(30)

+--------------------+-----+
|    product_category|count|
+--------------------+-----+
|     Lawn and Garden|    2|
|                Home|    6|
|             Kitchen|   11|
|    Home Improvement|   11|
|  Home Entertainment|  160|
|Personal_Care_App...|12765|
|    Major Appliances|14610|
|    Digital_Software|15189|
|  Mobile_Electronics|15795|
| Digital_Video_Games|21646|
|           Gift Card|22315|
|          Automotive|29750|
|         Electronics|29809|
| Musical Instruments|29876|
|            Software|29886|
|             Grocery|29907|
|             Watches|29931|
|             Apparel|29947|
|            Outdoors|29974|
|         Video Games|30002|
|           Furniture|30054|
|Health & Personal...|30066|
|                Baby|30113|
|               Tools|30125|
|               Video|30126|
|     Office Products|30159|
|        Pet Products|30172|
|               Shoes|30186|
|              Camera|30195|
|            Wireless|30205|
+--------------------+-----+
only showing t

In [53]:
df.groupBy('product_category').count().orderBy('count').select(col('product_category')).first()[0]

'Lawn and Garden'

In [54]:
df.filter(col("product_category").contains("slimmy sponge")).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date


In [55]:
df=df.filter(~col("product_category").contains("slimmy sponge"))

In [56]:
df.groupBy('product_category').count().orderBy('count').show(30)

+--------------------+-----+
|    product_category|count|
+--------------------+-----+
|     Lawn and Garden|    2|
|                Home|    6|
|             Kitchen|   11|
|    Home Improvement|   11|
|  Home Entertainment|  160|
|Personal_Care_App...|12765|
|    Major Appliances|14610|
|    Digital_Software|15189|
|  Mobile_Electronics|15795|
| Digital_Video_Games|21646|
|           Gift Card|22315|
|          Automotive|29750|
|         Electronics|29809|
| Musical Instruments|29876|
|            Software|29886|
|             Grocery|29907|
|             Watches|29931|
|             Apparel|29947|
|            Outdoors|29974|
|         Video Games|30002|
|           Furniture|30054|
|Health & Personal...|30066|
|                Baby|30113|
|               Tools|30125|
|               Video|30126|
|     Office Products|30159|
|        Pet Products|30172|
|               Shoes|30186|
|              Camera|30195|
|            Wireless|30205|
+--------------------+-----+
only showing t

#### star_rating

In [57]:
df.groupBy('star_rating').count().orderBy('count').show()

+-----------+------+
|star_rating| count|
+-----------+------+
|          2| 51764|
|          3| 81776|
|          1| 96141|
|          4|174421|
|          5|629252|
+-----------+------+



In [58]:
# Convert string column to integer column
df = df.withColumn("star_rating", col("star_rating").cast("integer"))

#### review_body

In [59]:
showPandasDFOfNullRows("review_body", df, 10)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,13418638,R3TTC36XJVTT9F,B008E62TTW,174178495,Shiva Murthi - Shankarudu,Digital_Music_Purchase,5,1,1,N,N,Chinna Kuyil' Musician Srimati Chitra\tVery in...,,
1,US,42057820,R17V3IHWBY8P15,B000CCW2UM,568298225,Lincoln Aka Dark Side/lincoln,Video DVD,3,0,1,N,Y,Not Vizio/Generic,,2015-06-16T00:00:00.000Z
2,US,616054,R1J7GSK9FDIE1X,B00NGF2JXC,518182826,"YOPO 32"" Wigs Long Curly Wavy Wig Cosplay Cost...",Beauty,5,2,2,N,Y,Five Stars,,2015-06-11T00:00:00.000Z
3,US,49280079,RORMXHSNHLRAP,B007AMMGMG,131334444,In the Land of Blood and Honey,Digital_Video_Download,1,2,5,N,N,And I say fine. Artists do have that right\tHi...,,
4,US,9491373,R282RKS7R478N6,B00TKLPMOE,945778258,Twisted Women's RUBY Floral Canvas Knotted Toe...,Shoes,5,10,12,N,Y,Five Stars,,2015-03-20T00:00:00.000Z
5,US,3981694,RN7K2YVL3OU00,B00HUJUS88,617736148,New Apple MacBook Pro 13.3-Inch Laptop i5 4GB ...,PC,1,1,2,N,Y,New Mac won't turn on.\tThe Mac that I bought ...,,
6,US,854686,RFELC9SXLSWW,B002W07BXA,939238527,"Baby Starters Plush Swirl Blanket, Bow, Pink",Baby,5,6,7,N,Y,Five Stars,,2015-05-07T00:00:00.000Z
7,US,9965601,RGS8Y9K7X8M4A,B00005MK9Z,360246660,Confrontation (Remastered),Music,5,0,0,N,Y,the legacy of his music & message will live on...,,
8,US,23122744,R2D3YTK3JQG8KY,B001VL5304,65054676,Fireproof,Digital_Video_Download,5,0,0,N,Y,He discovers everything he thought to be true ...,,
9,US,15011917,R55KPDV34QCLK,B00452OHXU,970636444,FURminator deShedding Tool for Cats,Pet Products,4,3,5,N,Y,Four Stars,,2015-07-08T00:00:00.000Z


In [60]:
df.limit(10).toPandas()[["review_body"]]

Unnamed: 0,review_body
0,The face wash is great. My skin feels differen...
1,"Works well on uneven skin tone, melasma, sun s..."
2,This eyeshadow palette is wonderful! The color...
3,My husband has a very rough beard and therefor...
4,Love it
5,"I love this as a body lotion, but it leaves my..."
6,I really like this mask. I know that a mask is...
7,I had this Daily Matte Moisturizer in the past...
8,Color shade was a bit different from the origi...
9,This isn't a pre and post...it's two of the sa...


In [61]:
df.groupBy('review_body').count().orderBy(desc('count')).show()

+-------------+-----+
|  review_body|count|
+-------------+-----+
|         Good| 1574|
|        Great| 1541|
|         good| 1517|
|        great| 1085|
|    Excellent| 1052|
|      Love it|  849|
|           ok|  713|
|    excellent|  582|
|      Awesome|  557|
|      Perfect|  531|
|     Love it!|  500|
|       Great!|  492|
|      love it|  484|
|         Nice|  480|
|    very good|  459|
|Great product|  414|
|    Very good|  406|
|    excelente|  390|
|         nice|  379|
|  Works great|  367|
+-------------+-----+
only showing top 20 rows



In [62]:
showMissingValues(df)


Missing values:
marketplace: 0
customer_id: 0
review_id: 0
product_id: 0
product_parent: 0
product_title: 0
product_category: 0
star_rating: 0
helpful_votes: 0
total_votes: 0
vine: 0
verified_purchase: 0
review_headline: 60
review_body: 108
review_date: 64


In [63]:
df.filter((isnull('review_headline')) | (isnull('review_body')) | (isnull('review_date'))).limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,20813663,R21EVZD1G1KXVB,B00629OEWS,665819280,Origin8 TA42 Track Rims,Sports,5,0,0,N,Y,Five Stars,,2015-07-09T00:00:00.000Z
1,US,30465514,RWDDO61MOV4OH,B007RN1IYK,865681990,Skywalker Trampolines 12-Feet Round Trampoline...,Sports,5,0,0,N,Y,,"Good size, good quality",2015-01-04T00:00:00.000Z
2,US,39345384,R3UOS98R7PWS2N,B00ACL4LR0,576616467,Paracord Planet Mil-Spec Commercial Grade 550l...,Sports,4,29,35,N,Y,Four Stars,,2014-09-18T00:00:00.000Z
3,US,41455465,R32HCIS5ZS4MND,B002BH4U56,505269061,Star Trek: U.S.S. Enterprise NCC-1701 High Def...,Toys,5,14,14,N,Y,They got it right!,,2015-03-25T00:00:00.000Z
4,US,33433702,R3H718X1C84591,B000A6U6K8,238514892,Toy State CAT Big Builder Wheel Loader Lands R...,Toys,5,0,0,N,Y,,"Exactly as described, great!",2015-01-23T00:00:00.000Z
5,US,29246701,R3E6LXEJ5QCHIA,0393320782,736198039,"Code of the Street: Decency, Violence, and the...",Books,5,4,5,N,N,,This book should be mandatory reading for anyo...,2005-04-16T00:00:00.000Z
6,US,13013663,R2Y0CEYQPCDUPH,B001O01KE0,264615420,Drippin' Wet,Digital_Music_Purchase,5,0,0,N,Y,Stoop down baby,,2015-07-09T00:00:00.000Z
7,US,51337633,R164OBU2RBPL5D,B00138CSLM,436023672,Black & White Night,Digital_Music_Purchase,5,0,0,N,N,"EXCELLENT! Also have this show on a DVD, ...\t...",,
8,US,7235219,R30DW5GAUIAGJO,B00C5XG6XA,611455629,Redeemed,Digital_Music_Purchase,2,0,0,N,Y,I am in the process of learning this wonderful...,,
9,US,4252573,R3FTX5EKLXUL5O,B001RJSNG6,421460338,Secrets,Digital_Music_Purchase,5,0,0,N,Y,""" just like some good ole Kentucky Fried Chick...",2014-09-05,


In [64]:
# Define the regular expression pattern to match the format "yyyy-mm-dd"
date_pattern = r"\d{4}-\d{2}-\d{2}"






In [65]:
df.filter(col("review_body")
    .rlike(date_pattern)  & isnull(col("review_date"))
  ).count()

14

In [66]:
df.filter(col("review_body")
    .rlike(date_pattern) & isnull(col("review_date"))
  ).limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,4252573,R3FTX5EKLXUL5O,B001RJSNG6,421460338,Secrets,Digital_Music_Purchase,5,0,0,N,Y,""" just like some good ole Kentucky Fried Chick...",2014-09-05,
1,US,45566062,R2XYGMALN7X3Q0,B00138JBLW,335252461,Dream Of A Lifetime,Digital_Music_Purchase,5,0,0,N,N,"""Its all good.'\tIF M GAY HAD TO0 HAVE A LAST ...",2012-01-20,
2,US,32308077,R3SI1MVHU7YCG8,0440414806,159975375,Holes,Books,5,0,0,N,N,"""Holes ' Helps Children Dig Into Reading\tFrom...",2003-09-17,
3,US,25495441,RKAT3A16B7RQK,B000083JWE,729019812,The Best Of,Music,2,48,51,N,N,"""Mauriat's Best ?\tI was looking forward to re...",2005-09-29,
4,US,49583354,R2FPJ80HT9D749,B0009X7BHI,405822548,Vincent & Theo (1990),Video DVD,5,3,5,N,N,"""Now I think I know what you tried to say to m...",2007-04-17,
5,US,11314966,R2UMEQGZHNST7F,B000P0J0EW,484638351,The Shawshank Redemption (Single-Disc Edition),Video DVD,5,1,1,N,Y,"""Shawshenk Redemption Redux\tIt was and remain...",2011-09-22,
6,US,17571600,R1LWEBBQ2JJ8AF,B005J4X91E,323924094,Ballad of Mott the Hoople,Video DVD,5,20,27,N,Y,"""...It mesmerizes and I can't explain...\tMott...",2011-10-31,
7,US,28310525,RJA6GERDGIY2X,B00000JGHM,429439461,Heat,Video DVD,5,2,3,N,N,"""One Of The Best Films Of The 90's\tThis is pr...",2004-08-31,
8,US,50279273,R2DKCDZUTWKM0W,B001NFNFMQ,480707423,Jillian Michaels - No More Trouble Zones,Video DVD,5,4,5,N,Y,""" I'm feeling better than ever\tI also own Jil...",2014-10-24,
9,US,52575421,RLVIZX444O4N1,6301969588,76845203,Freaks [VHS],Video,4,3,6,N,Y,"""Gooble gobble, we accept, you, one of us! one...",2002-09-01,


In [67]:
# Replace the review body with an empty string if it matches the pattern
df = df.withColumn("review_date_2", when(df["review_body"].rlike(date_pattern) & df["review_body"].isNull(), "")\
                     .otherwise(""))

In [68]:
df.filter((col("review_date_2")!="") & isnull(col("review_date"))).limit(5).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2


In [69]:
# Replace review_date with review_date2 when review_date is null and review_date2 is not empty
df = df.withColumn("review_date", when((isnull(col("review_date"))) & (col("review_date_2") != ""), col("review_date_2"))\
                   .otherwise(col("review_date")))

In [70]:
# Replace the review body with an empty string if it matches the pattern
df = df.withColumn("review_body", when(col("review_body").rlike(date_pattern), None)\
                       .otherwise(col("review_body")))


In [71]:
# Define the regular expression pattern to match the format "yyyy-mm-dd"
date_pattern = r"^\d{4}-\d{2}-\d{2}$"


In [72]:
df.filter(col("review_headline")
    .rlike(date_pattern) ).limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2


In [73]:
df.filter(df["review_body"].isNull()).limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2
0,US,52306962,R23YRANOMDQAOF,B003RXL7KY,111969204,"Mossy Oak Cloth Tape (Mossy Oak Break-Up), 2"" ...",Sports,5,3,4,N,Y,I like this... it works.,,2011-02-10T00:00:00.000Z,
1,US,18708513,RNWWK8NG44HL1,B001F4SK3E,351610450,"MTech USA MT-317 Folding Tactical Knife, Stile...",Sports,4,94,110,N,Y,Mixed feelings-- hard to open,,2012-02-02T00:00:00.000Z,
2,US,20813663,R21EVZD1G1KXVB,B00629OEWS,665819280,Origin8 TA42 Track Rims,Sports,5,0,0,N,Y,Five Stars,,2015-07-09T00:00:00.000Z,
3,US,39345384,R3UOS98R7PWS2N,B00ACL4LR0,576616467,Paracord Planet Mil-Spec Commercial Grade 550l...,Sports,4,29,35,N,Y,Four Stars,,2014-09-18T00:00:00.000Z,
4,US,41455465,R32HCIS5ZS4MND,B002BH4U56,505269061,Star Trek: U.S.S. Enterprise NCC-1701 High Def...,Toys,5,14,14,N,Y,They got it right!,,2015-03-25T00:00:00.000Z,
5,US,52453104,R2FWTH6QM5KX3R,B00L1LGIU8,791188769,"Pull and Stretch Bounce Ball, Squeeze It, Stre...",Toys,2,23,26,N,Y,Would be okay if they get rid of the air pockets,,2014-10-27T00:00:00.000Z,
6,US,13013663,R2Y0CEYQPCDUPH,B001O01KE0,264615420,Drippin' Wet,Digital_Music_Purchase,5,0,0,N,Y,Stoop down baby,,2015-07-09T00:00:00.000Z,
7,US,51337633,R164OBU2RBPL5D,B00138CSLM,436023672,Black & White Night,Digital_Music_Purchase,5,0,0,N,N,"EXCELLENT! Also have this show on a DVD, ...\t...",,,
8,US,7235219,R30DW5GAUIAGJO,B00C5XG6XA,611455629,Redeemed,Digital_Music_Purchase,2,0,0,N,Y,I am in the process of learning this wonderful...,,,
9,US,4252573,R3FTX5EKLXUL5O,B001RJSNG6,421460338,Secrets,Digital_Music_Purchase,5,0,0,N,Y,""" just like some good ole Kentucky Fried Chick...",,,


In [74]:
# Assign review_headline to review_body when review_headline is not null and review_body is null or an empty string
df = df.withColumn("review_body", when((col("review_headline").isNotNull()) \
                                       & (col("review_body").isNull() | (col("review_body") == "")),\
                                       col("review_headline")).otherwise(col("review_body")))

In [75]:
showMissingValues(df)


Missing values:
marketplace: 0
customer_id: 0
review_id: 0
product_id: 0
product_parent: 0
product_title: 0
product_category: 0
star_rating: 0
helpful_votes: 0
total_votes: 0
vine: 0
verified_purchase: 0
review_headline: 60
review_body: 0
review_date: 64
review_date_2: 0


#### review_date

In [76]:
#return records where review_date is null
df.filter(col("review_date").isNull()).limit(5).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2
0,US,51337633,R164OBU2RBPL5D,B00138CSLM,436023672,Black & White Night,Digital_Music_Purchase,5,0,0,N,N,"EXCELLENT! Also have this show on a DVD, ...\t...","EXCELLENT! Also have this show on a DVD, ...\t...",,
1,US,7235219,R30DW5GAUIAGJO,B00C5XG6XA,611455629,Redeemed,Digital_Music_Purchase,2,0,0,N,Y,I am in the process of learning this wonderful...,I am in the process of learning this wonderful...,,
2,US,4252573,R3FTX5EKLXUL5O,B001RJSNG6,421460338,Secrets,Digital_Music_Purchase,5,0,0,N,Y,""" just like some good ole Kentucky Fried Chick...",""" just like some good ole Kentucky Fried Chick...",,
3,US,22028460,R2Z8K50L5GFKNA,B000VWKVSI,515353411,Ah! Leah! (Album Version),Digital_Music_Purchase,5,0,0,N,Y,has been and still is one of my all-time favor...,has been and still is one of my all-time favor...,,
4,US,30753448,R34LOIFOPWS1CW,B008593AFA,98784325,Becoming,Digital_Music_Purchase,5,0,0,N,Y,Another lame local band to sit through\tI saw ...,Another lame local band to sit through\tI saw ...,,


In [77]:
df.limit(5).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2
0,US,26524369,R3HH81BIQ3ZZE1,B00SETTZGE,10327631,RUGGED & DAPPER – Facial Cleanser for Men – 8 ...,Beauty,3,6,11,N,Y,"Good/great face wash, poor bottle design",The face wash is great. My skin feels differen...,2015-04-25T00:00:00.000Z,
1,US,26920430,R2R0NREZG6WG2V,B006XY3Y1Y,57006252,Tretinoin Microsphere 0.1% USP w/w 20g,Beauty,4,1,1,N,Y,Works Great,"Works well on uneven skin tone, melasma, sun s...",2012-06-08T00:00:00.000Z,
2,US,26523163,R1ZYSPZ3LWHVM4,B008QYE40Y,966404562,Sleek i-Divine Storm Palette Mineral based Eye...,Beauty,5,1,1,N,Y,Wonderful palette!,This eyeshadow palette is wonderful! The color...,2013-05-26T00:00:00.000Z,
3,US,9963368,R3VXVN0I0E1PIV,B004VFXVJW,806462967,The Shave Well Company a TRULY FOG FREE Shower...,Beauty,5,1,1,N,Y,BEST MIRROR EVER,My husband has a very rough beard and therefor...,2012-02-22T00:00:00.000Z,
4,US,2662502,R2H4PHUSEOH4T8,B0009OAHQY,591505715,Claiborne Cologne by Liz Claiborne for men Col...,Beauty,5,1,1,N,Y,Five Stars,Love it,2014-11-13T00:00:00.000Z,


In [78]:
# Convert the date_string column to the datetime format "yyyy-mm-dd"
df = df.withColumn("review_date", to_date(col("review_date")))

In [79]:
showMissingValuesByCol("review_date",df)


Missing values:
review_date: 64


In [80]:
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date'),
 ('review_date_2', 'string')]

In [81]:
df.filter(isnull(col("review_date"))).limit(10).toPandas()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,review_date_2
0,US,51337633,R164OBU2RBPL5D,B00138CSLM,436023672,Black & White Night,Digital_Music_Purchase,5,0,0,N,N,"EXCELLENT! Also have this show on a DVD, ...\t...","EXCELLENT! Also have this show on a DVD, ...\t...",,
1,US,7235219,R30DW5GAUIAGJO,B00C5XG6XA,611455629,Redeemed,Digital_Music_Purchase,2,0,0,N,Y,I am in the process of learning this wonderful...,I am in the process of learning this wonderful...,,
2,US,4252573,R3FTX5EKLXUL5O,B001RJSNG6,421460338,Secrets,Digital_Music_Purchase,5,0,0,N,Y,""" just like some good ole Kentucky Fried Chick...",""" just like some good ole Kentucky Fried Chick...",,
3,US,22028460,R2Z8K50L5GFKNA,B000VWKVSI,515353411,Ah! Leah! (Album Version),Digital_Music_Purchase,5,0,0,N,Y,has been and still is one of my all-time favor...,has been and still is one of my all-time favor...,,
4,US,30753448,R34LOIFOPWS1CW,B008593AFA,98784325,Becoming,Digital_Music_Purchase,5,0,0,N,Y,Another lame local band to sit through\tI saw ...,Another lame local band to sit through\tI saw ...,,
5,US,13418638,R3TTC36XJVTT9F,B008E62TTW,174178495,Shiva Murthi - Shankarudu,Digital_Music_Purchase,5,1,1,N,N,Chinna Kuyil' Musician Srimati Chitra\tVery in...,Chinna Kuyil' Musician Srimati Chitra\tVery in...,,
6,US,45566062,R2XYGMALN7X3Q0,B00138JBLW,335252461,Dream Of A Lifetime,Digital_Music_Purchase,5,0,0,N,N,"""Its all good.'\tIF M GAY HAD TO0 HAVE A LAST ...","""Its all good.'\tIF M GAY HAD TO0 HAVE A LAST ...",,
7,US,32308077,R3SI1MVHU7YCG8,0440414806,159975375,Holes,Books,5,0,0,N,N,"""Holes ' Helps Children Dig Into Reading\tFrom...","""Holes ' Helps Children Dig Into Reading\tFrom...",,
8,US,25495441,RKAT3A16B7RQK,B000083JWE,729019812,The Best Of,Music,2,48,51,N,N,"""Mauriat's Best ?\tI was looking forward to re...","""Mauriat's Best ?\tI was looking forward to re...",,
9,US,9965601,RGS8Y9K7X8M4A,B00005MK9Z,360246660,Confrontation (Remastered),Music,5,0,0,N,Y,the legacy of his music & message will live on...,the legacy of his music & message will live on...,,


In [82]:
#gets data types for each column
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date'),
 ('review_date_2', 'string')]

In [83]:
# Group the data by date and count the number of products for each date
date_counts = df.groupBy("review_date").count()

# Sort the result in descending order based on the count
sorted_date_counts = date_counts.orderBy(col("count").desc())

# Select the date with the highest count
most_product_date = sorted_date_counts.select("review_date").first()[0]

# Show the date with the most products
print(most_product_date)

2015-02-20


In [84]:
# Replace null values in review_date column with "2015-02-20" converted to date
df = df.na.fill({'review_date': '2015-02-20'})

In [85]:
showMissingValuesByCol("review_date",df)


Missing values:
review_date: 0


In [86]:
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date'),
 ('review_date_2', 'string')]

In [87]:
#converts date string to date
df = df.withColumn("review_date", to_date(col("review_date")))

In [88]:
showMissingValuesByCol("review_date",df)


Missing values:
review_date: 0


In [89]:
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'date'),
 ('review_date_2', 'string')]

In [90]:
#drop review_date_2 column
df=df.drop(col("review_date_2"))

In [91]:
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   26524369|R3HH81BIQ3ZZE1|B00SETTZGE|      10327631|RUGGED & DAPPER –...|          Beauty|          3|            6|         11|   N|                Y|Good/great face w...|The face wash is ...| 2015-04-25|
|         US|   26920430|R2R0NREZG6WG2V|B006XY3Y1Y|      57006252|Tretinoin Microsp...|          Beauty|          4|    

In [92]:
# List of column names to drop
columns_to_drop = ('product_parent',"marketplace")

df=df.drop(*columns_to_drop)

#### review_headline

In [93]:
df=df.na.fill({'review_headline': ""})

In [94]:
showMissingValues(df)


Missing values:
customer_id: 0
review_id: 0
product_id: 0
product_title: 0
product_category: 0
star_rating: 0
helpful_votes: 0
total_votes: 0
vine: 0
verified_purchase: 0
review_headline: 0
review_body: 0
review_date: 0


### Removing Duplicates Amazon Product Reviews Dataset

In [95]:
shape(df)

Shape: (1033354, 13)


In [96]:
df=df.distinct()

In [97]:
shape(df)

Shape: (1033179, 13)


### Removing Duplicates Amazon Spam Reviews Dataset

In [98]:
shape(spam_df)

Shape: (1010113, 5)


In [99]:
spam_df=spam_df.distinct()

In [100]:
shape(spam_df)

Shape: (1009291, 5)


### Remove Nas

In [101]:
#showMissingValues(df)

### Creating Season Column

##### Amazon Product Reviews Dataset

In [102]:
df = df.withColumn('unhelpful_votes', col('total_votes') - col('helpful_votes'))

### Creating Season Column

##### Amazon Product Reviews Dataset

In [103]:
# Create a new column "season" based on the month of "review_date"
df = df.withColumn("season", when(month(col("review_date")).isin([12, 1, 2]), "Winter")
                      .when(month(col("review_date")).isin([3, 4, 5]), "Spring")
                      .when(month(col("review_date")).isin([6, 7, 8]), "Summer")
                      .otherwise("Fall"))


##### Amazon Spam Reviews Dataset

In [104]:
func =  udf (lambda x: datetime.strptime(x, '%m %d, %Y'), DateType())

spam_df = spam_df.withColumn('reviewDate', func(col('reviewTime')))
#spam_df = spam_df.withColumn("reviewDate", to_date("reviewTime", "dd MM,yyyy"))

# Create a new column "season" based on the month of "review_date"
spam_df = spam_df.withColumn("season", when(month(col("reviewDate")).isin([12, 1, 2]), "Winter")
                      .when(month(col("reviewDate")).isin([3, 4, 5]), "Spring")
                      .when(month(col("reviewDate")).isin([6, 7, 8]), "Summer")
                      .otherwise("Fall"))

In [105]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter


### Creating Month Column

In [106]:
# Create a new column "month" with the month name extracted from "review_date"
df = df.withColumn("month", date_format(col("review_date"), "MMMM"))

### Creating Year Column

##### Amazon Product Reviews Dataset

In [107]:
# Create a new column "year" with the year extracted from "review_date"
df = df.withColumn("year", year(col("review_date")))

In [108]:
df.limit(5).toPandas()

Unnamed: 0,customer_id,review_id,product_id,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,unhelpful_votes,season,month,year
0,50756929,R10DKNVRG1HIIK,0969755147,Mind Power into the 21st Century: Techniques t...,Books,5,13,14,N,N,The best book on mind power,From many books that I read on topic of power ...,2000-05-15,1.0,Spring,May,2000
1,12195318,R10M3EY6B48DXS,B002VC6U8Q,Usc Fanfare W/ Tribute to Troy,Digital_Music_Purchase,5,0,0,N,N,"love, love, love !!!",Get it ... well worth the price -- for the tru...,2013-01-24,0.0,Winter,January,2013
2,34413051,R10RHNK6A9NOQ,B004V2LX6I,Maxpedition Kodiak S-type Gearslinger,Sports,5,1,1,N,Y,Awesome!!!!,This bag has been my EDC since purchasing it. ...,2013-01-27,0.0,Winter,January,2013
3,12045840,R1102VPDVD3S4X,B000V658DC,They Can't All Be Zingers,Digital_Music_Purchase,5,0,0,N,Y,Primus Sucks,Sounds Good! Most of the hits you want on one ...,2014-03-03,0.0,Spring,March,2014
4,42508526,R11DIJT8PQOG1X,B000V697WK,Gaucho,Digital_Music_Purchase,4,0,1,N,Y,good music,Good music,2014-12-09,1.0,Winter,December,2014


##### Amazon Spam Reviews Dataset

In [109]:
# Create a new column "year" with the year extracted from "review_date"
spam_df = spam_df.withColumn("year", year(col("reviewDate")))

In [110]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014


### Creating Sentiment Column

##### Amazon Product Reviews Dataset

In [111]:
sentiment = udf(lambda x: TextBlob(x).sentiment[0])
spark.udf.register("sentiment", sentiment)
df = df.withColumn('sentiment_score',sentiment('review_body').cast('double'))

In [112]:
df.show()

+-----------+--------------+----------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------------+------+---------+----+--------------------+
|customer_id|     review_id|product_id|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|unhelpful_votes|season|    month|year|     sentiment_score|
+-----------+--------------+----------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+---------------+------+---------+----+--------------------+
|   50756929|R10DKNVRG1HIIK|0969755147|Mind Power into t...|               Books|          5|           13|         14|   N|                N|The best book on ...|From many books t...| 2000-05-15|            1.0|Spring|      May|200

In [113]:
# Create a new column "sentiment" based on the sentiment_score
df = df.withColumn("sentiment", when(col("sentiment_score") > 0, "positive")
                                .when(col("sentiment_score") < 0, "negative")
                                .otherwise("neutral"))

In [114]:
df = df.withColumn('abs_sentiment_score', abs(df['sentiment_score']))

In [115]:
df.limit(5).toPandas()

Unnamed: 0,customer_id,review_id,product_id,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date,unhelpful_votes,season,month,year,sentiment_score,sentiment,abs_sentiment_score
0,50756929,R10DKNVRG1HIIK,0969755147,Mind Power into the 21st Century: Techniques t...,Books,5,13,14,N,N,The best book on mind power,From many books that I read on topic of power ...,2000-05-15,1.0,Spring,May,2000,0.1875,positive,0.1875
1,12195318,R10M3EY6B48DXS,B002VC6U8Q,Usc Fanfare W/ Tribute to Troy,Digital_Music_Purchase,5,0,0,N,N,"love, love, love !!!",Get it ... well worth the price -- for the tru...,2013-01-24,0.0,Winter,January,2013,0.491797,positive,0.491797
2,34413051,R10RHNK6A9NOQ,B004V2LX6I,Maxpedition Kodiak S-type Gearslinger,Sports,5,1,1,N,Y,Awesome!!!!,This bag has been my EDC since purchasing it. ...,2013-01-27,0.0,Winter,January,2013,0.093394,positive,0.093394
3,12045840,R1102VPDVD3S4X,B000V658DC,They Can't All Be Zingers,Digital_Music_Purchase,5,0,0,N,Y,Primus Sucks,Sounds Good! Most of the hits you want on one ...,2014-03-03,0.0,Spring,March,2014,0.45,positive,0.45
4,42508526,R11DIJT8PQOG1X,B000V697WK,Gaucho,Digital_Music_Purchase,4,0,1,N,Y,good music,Good music,2014-12-09,1.0,Winter,December,2014,0.7,positive,0.7


##### Amazon Spam Reviews Dataset

In [116]:
sentiment = udf(lambda x: TextBlob(x).sentiment[0])
spark.udf.register("sentiment", sentiment)
spam_df = spam_df.withColumn('sentiment_score',sentiment('reviewText').cast('double'))

In [117]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833


In [118]:
# Create a new column "sentiment" based on the sentiment_score
spam_df = spam_df.withColumn("sentiment", when(col("sentiment_score") > 0, "positive")
                                .when(col("sentiment_score") < 0, "negative")
                                .otherwise("neutral"))

In [119]:
#create column with absolute value for sentiment_score
spam_df = spam_df.withColumn('abs_sentiment_score', abs(spam_df['sentiment_score']))

In [120]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score,sentiment,abs_sentiment_score
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425,positive,0.425
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225,positive,0.225
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333,negative,0.283333
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222,positive,0.432222
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833,positive,0.205833


### Creating Review Length Column

##### Amazon Spam Reviews Dataset

In [121]:
spam_df = spam_df.withColumn("review_text_length", length("reviewText"))

In [122]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score,sentiment,abs_sentiment_score,review_text_length
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425,positive,0.425,73
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225,positive,0.225,185
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333,negative,0.283333,142
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222,positive,0.432222,175
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833,positive,0.205833,157


##### Amazon Product Reviews Dataset

In [123]:
df = df.withColumn("review_text_length", length("review_body"))

### Add labels column

##### Amazon Spam Reviews Dataset

In [124]:
# Map binary labels to "spam" and "ham"
spam_df = spam_df.withColumn("labels", when(col("class") == 1, "spam").otherwise("ham"))

### Cleaning Review Text

In [125]:
#function that gets samples of column
def getSamplesOfData(df,colName,size,isTail):
    samples=[]
    if isTail:        
        samples = df.orderBy(df[colName].desc()).limit(size).select(colName).rdd.flatMap(lambda x: x).collect()
        
    else:
        samples = df.orderBy(df[colName]).limit(size).select(colName).rdd.flatMap(lambda x: x).collect()

    # Print the samples
    print("Samples:")
    for sample in samples:
        print(sample)
        print("\n")
    if len(samples)<=0:
        print("List is Empty")

In [126]:
getSamplesOfData(df,"review_body",5,False)

Samples:
!


! Good !


! Unbelievable true story.


!!! AS A REGULAR AMAZON CUSTOMER, THIS IS THE FIRST TIME I ORDERED SOMETHING LEGITIMATELY FAULTY AND FAKE.  DO NOT BUY FROM THIS SELLER.  SET OF TWO CHARGERS, NEITHER PROVIDE A CONSISTENT CURRENT TO CHARGE THE DEVICE.  AFTER SEEING MANY OTHER REVIEWS, I AM NOT ALONE.  IF YOU ARE READING THIS, IMPORTER520, STOP RIPPING PEOPLE OFF OR YOU WILL NO LONGER BE ABLE TO SELL THROUGH AMAZON.  THAT IS ALL. !!!


!!! GREAT SELLER GREAT PRODUCT !!! Thanks




In [127]:
getSamplesOfData(df,"review_body",5,True)

Samples:
🙌🙌🙌🙌🙌


🙌


🙌


🙇🙇im unsure if this thing is just highly sensitive or if there's an off switch I'm missing but it NEVER shuts up. I tried hiding it deep inside my dresser and that didn't work. Now I'm giving it away and the whole time I'm driving its going off. I'm about to throw it out the window. All it did was upset my dog. On the plus side, the battery sure lasts a long time. My dog also doesn't understand the concept of balls so that may have something to do with him not liking it. I can see how it would be entertaining for some dogs, but I recommend putting it outside or somewhere you can't hear it.


😺




In [128]:
# Define a function for data cleaning
def clean_text(text):
    soup = BeautifulSoup(text, 'html.parser')
    text = soup.get_text()
    # Remove Unicode characters
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    
    # Normalize text
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    
    return text

##### Amazon Product Reviews Dataset

In [129]:
df2=df

In [130]:
# Register the clean_text function as a UDF (User-Defined Function)
clean_text_udf = udf(clean_text, StringType())

# Create a new column "review_body_clean" by applying the clean_text UDF to "review_body"
df2 = df2.withColumn("review_body_clean", clean_text_udf("review_body"))


In [131]:
#getSamplesOfData(df2,"review_body_clean",5,False)

In [132]:
#getSamplesOfData(df2,"review_body_clean",5,True)

In [133]:
# Tokenize the text column
tokenizer = Tokenizer(inputCol="review_body_clean", outputCol="token")
#df2 = tokenizer.transform(df2)

In [134]:
# Create the StopWordsRemover instance
remover = StopWordsRemover(inputCol="token",outputCol="stop_removed")

# Apply the StopWordsRemover transformation
#df2 = remover.transform(df2)


In [135]:
#df2.limit(5).toPandas()

In [136]:
#shape(df2.filter(col("review_body_clean")==""))

In [137]:
# Define your custom lemmatizer class
class Lemmatizer(Transformer, HasInputCol, HasOutputCol, DefaultParamsWritable, DefaultParamsReadable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)
    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output"):
        super(Lemmatizer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)
    
    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)

    # Implement the transformation logic
    def _transform(self, dataset):
       
        input_column = self.get_input_col()
        output_column =  self.get_output_col()
        
        # Initialize the WordNetLemmatizer
        
        lemmatizer = WordNetLemmatizer()
        # Define the lemmatization function
        def lemmatize_tokens(tokens):
            pos_tags = nltk.pos_tag(tokens)
            lemmas = []
            for token in tokens:
                lemma = lemmatizer.lemmatize(token)
                lemmas.append(lemma)
            return lemmas

        # Register the UDF
        lemmatize_udf = udf(lemmatize_tokens, ArrayType(StringType()))

        # Apply transformation
        return dataset.withColumn(output_column, lemmatize_udf(input_column))

In [138]:
#save cleaned data
#df2.write.mode("overwrite").option("header", "true")\
#.json("gs://productpal_bucket/cleaned_data.json")

In [139]:
lemmatizer= Lemmatizer(input_col="stop_removed", output_col="lemmas")

In [140]:
#create pipeline to preprocess text (tokenize text, remove stop words and lemmatize)
clean_reviews_pipeline = Pipeline(stages=[tokenizer, remover, lemmatizer])

In [141]:
#Fit the pipeline on the DataFrame
t_model=clean_reviews_pipeline.fit(df2)

In [142]:
# Transform the DataFrame using the fitted pipeline
df2=t_model.transform(df2)

In [143]:
df2.limit(5).toPandas()

Unnamed: 0,customer_id,review_id,product_id,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,...,month,year,sentiment_score,sentiment,abs_sentiment_score,review_text_length,review_body_clean,token,stop_removed,lemmas
0,50756929,R10DKNVRG1HIIK,0969755147,Mind Power into the 21st Century: Techniques t...,Books,5,13,14,N,N,...,May,2000,0.1875,positive,0.1875,745,from many books that i read on topic of power ...,"[from, many, books, that, i, read, on, topic, ...","[many, books, read, topic, power, thoughts, jo...","[many, book, read, topic, power, thought, john..."
1,12195318,R10M3EY6B48DXS,B002VC6U8Q,Usc Fanfare W/ Tribute to Troy,Digital_Music_Purchase,5,0,0,N,N,...,January,2013,0.491797,positive,0.491797,88,get it well worth the price for the true troja...,"[get, it, well, worth, the, price, for, the, t...","[get, well, worth, price, true, trojan, fan, g...","[get, well, worth, price, true, trojan, fan, g..."
2,34413051,R10RHNK6A9NOQ,B004V2LX6I,Maxpedition Kodiak S-type Gearslinger,Sports,5,1,1,N,Y,...,January,2013,0.093394,positive,0.093394,203,this bag has been my edc since purchasing it i...,"[this, bag, has, been, my, edc, since, purchas...","[bag, edc, since, purchasing, many, maxpeditio...","[bag, edc, since, purchasing, many, maxpeditio..."
3,12045840,R1102VPDVD3S4X,B000V658DC,They Can't All Be Zingers,Digital_Music_Purchase,5,0,0,N,Y,...,March,2014,0.45,positive,0.45,95,sounds good most of the hits you want on one d...,"[sounds, good, most, of, the, hits, you, want,...","[sounds, good, hits, want, one, disc, great, p...","[sound, good, hit, want, one, disc, great, pri..."
4,42508526,R11DIJT8PQOG1X,B000V697WK,Gaucho,Digital_Music_Purchase,4,0,1,N,Y,...,December,2014,0.7,positive,0.7,10,good music,"[good, music]","[good, music]","[good, music]"


In [144]:
#save pipeline
t_model.write().overwrite().save(r"C:\Users\dn10657\source\repos\Capstone\pipelines\reviews_preproc_pipeline")

##### Amazon Spam Reviews Dataset

In [145]:
# Register the clean_text function as a UDF (User-Defined Function)
clean_text_udf = udf(clean_text, StringType())

# Create a new column "review_body_clean" by applying the clean_text UDF to "review_body"
spam_df = spam_df.withColumn("reviewText_clean", clean_text_udf("reviewText"))

In [146]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score,sentiment,abs_sentiment_score,review_text_length,labels,reviewText_clean
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425,positive,0.425,73,spam,it was a gift but it seemed pretty durable and...
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225,positive,0.225,185,spam,we now have a 9 hole disc golf course in our b...
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333,negative,0.283333,142,ham,do not buy this the one i received was leaking...
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222,positive,0.432222,175,spam,this is a very handy size and weight for a clo...
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833,positive,0.205833,157,spam,this is an awesome sharpener very compact and ...


In [147]:
# Tokenize the text column
tokenizer = Tokenizer(inputCol="reviewText_clean", outputCol="token")
#spam_df = tokenizer.transform(spam_df)

In [148]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score,sentiment,abs_sentiment_score,review_text_length,labels,reviewText_clean
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425,positive,0.425,73,spam,it was a gift but it seemed pretty durable and...
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225,positive,0.225,185,spam,we now have a 9 hole disc golf course in our b...
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333,negative,0.283333,142,ham,do not buy this the one i received was leaking...
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222,positive,0.432222,175,spam,this is a very handy size and weight for a clo...
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833,positive,0.205833,157,spam,this is an awesome sharpener very compact and ...


In [149]:
#create pipeline to preprocess text (tokenize text, remove stop words and lemmatize)
clean_spam_pipeline = Pipeline(stages=[tokenizer, remover, lemmatizer])

In [150]:
#Fit the pipeline on the DataFrame
s_model=clean_spam_pipeline.fit(spam_df)

In [151]:
# Transform the DataFrame using the fitted pipeline
spam_df=s_model.transform(spam_df)

In [152]:
spam_df.limit(5).toPandas()

Unnamed: 0,class,reviewText,reviewTime,asin,reviewerID,reviewDate,season,year,sentiment_score,sentiment,abs_sentiment_score,review_text_length,labels,reviewText_clean,token,stop_removed,lemmas
0,1.0,It was a gift but it seemed pretty durable and...,"03 18, 2008",B000AOZ11Y,A2BSWYKV7ZV7H6,2008-03-18,Spring,2008,0.425,positive,0.425,73,spam,it was a gift but it seemed pretty durable and...,"[it, was, a, gift, but, it, seemed, pretty, du...","[gift, seemed, pretty, durable, nice, chime]","[gift, seemed, pretty, durable, nice, chime]"
1,1.0,We now have a 9 hole disc golf course in our b...,"12 12, 2012",B000ASG0DI,A1DQIB6VQ7Y22Q,2012-12-12,Winter,2012,0.225,positive,0.225,185,spam,we now have a 9 hole disc golf course in our b...,"[we, now, have, a, 9, hole, disc, golf, course...","[9, hole, disc, golf, course, back, yard, husb...","[9, hole, disc, golf, course, back, yard, husb..."
2,0.0,Do not buy this! The one I received was leakin...,"02 17, 2014",B000AXVOLQ,A21TK1P3Q4T6DM,2014-02-17,Winter,2014,-0.283333,negative,0.283333,142,ham,do not buy this the one i received was leaking...,"[do, not, buy, this, the, one, i, received, wa...","[buy, one, received, leaking, bad, weld, body,...","[buy, one, received, leaking, bad, weld, body,..."
3,1.0,This is a very handy size and weight for a clo...,"01 9, 2007",B000B76IJY,A2MKJZXSDSGJA0,2007-01-09,Winter,2007,0.432222,positive,0.432222,175,spam,this is a very handy size and weight for a clo...,"[this, is, a, very, handy, size, and, weight, ...","[handy, size, weight, clothcovered, pad, looki...","[handy, size, weight, clothcovered, pad, looki..."
4,1.0,This is an awesome sharpener! Very compact and...,"01 17, 2014",B000B8FW0E,A10UPCI7CPQXLA,2014-01-17,Winter,2014,0.205833,positive,0.205833,157,spam,this is an awesome sharpener very compact and ...,"[this, is, an, awesome, sharpener, very, compa...","[awesome, sharpener, compact, lightweight, sim...","[awesome, sharpener, compact, lightweight, sim..."


In [153]:
#save pipeline
s_model.write().overwrite().save(r"C:\Users\dn10657\source\repos\Capstone\pipelines\spam_preproc_pipeline")

**Exploratory Data Analysis**

##### Business Objectives

- Personalize recommendations and product offerings to increase customer engagement and sales.
- Optimize Customer Purchasing Experience by filtering out fake reviews
- Enhance the quality of existing products to improve customers perception of the company thereby fostering customer loyalty and repeat purchases

##### Research Objectives

- To conduct a comprehensive analysis of the dataset to identify and categorize product quality issues.

- To develop a model to classify fake or manipulated reviews within the dataset 

- To investigate the impact of incorporating review text, on the performance of the product recommendation system.


##### Number of Unique Customers 

In [None]:
#get the count of unique customers
unique_count = df2.agg(countDistinct("customer_id")).collect()[0][0]

# Print the result
print("Number of unique customer:", unique_count)

##### Number of Unique Products 

In [None]:
#get the count of unique products
unique_count = df2.agg(countDistinct("product_id")).collect()[0][0]

# Print the result
print("Number of unique products:", unique_count)

##### Number of Unique Product Categories 

In [None]:
#get the count of unique product category
unique_count = df2.agg(countDistinct("product_category")).collect()[0][0]

# Print the result
print("Number of product category:", unique_count)

##### Descriptive Statistics 

In [None]:
# Select the desired columns and compute the basic statistics
description = df2.select("star_rating", "helpful_votes", "total_votes","season","year","sentiment_score",\
                         "review_text_length").describe()

# Show the resulting DataFrame
description.toPandas()

In [None]:
df2.columns

In [None]:
# Group by "product_category" and compute the count of unique "product_id" values for each category
grouped_df = df2.groupBy("product_category").agg(count("review_id").alias("number_of_Reviews"))\
.orderBy("number_of_Reviews", ascending=False)

grouped_df.limit(20).toPandas()

In [None]:
# Group by "product_id" and "product_title" and compute the count of unique "customer_id" values for each product
grouped_df = df2.groupBy("product_id","product_category","product_title").agg(count("review_id").alias("number_of_Reviews"))

grouped_df=grouped_df.orderBy("number_of_Reviews", ascending=False)

# Show the resulting DataFrame
grouped_df.limit(20).toPandas()


In [None]:
# Group by "star_rating" and count the occurrences of each rating
ratingCounts = df2.groupBy("star_rating").count()

# Rename the count column to 'count'
ratingCounts = ratingCounts.withColumnRenamed("count", "count")

# Sort the DataFrame by "star_rating" in descending order
ratingCounts = ratingCounts.orderBy(desc("star_rating")).toPandas()

##### Star Rating Analysis

In [None]:
# who v/s fare barplot
sns.barplot(x = 'star_rating',
            y = 'count',
            data = ratingCounts)
plt.xticks(rotation=45) 
plt.xlabel("Star Rating")
plt.ylabel("Count of Ratings")
# Add a title to the plot
plt.title("Distribution of Customer Ratings")

# Show the plot
plt.show()

In [None]:
# Cast the "Year" column to integer to remove any decimal places
df2 = df2.withColumn("year", df2["year"].cast("int"))
# Group ratings by year and calculate the count of ratings
ratings_pd = df2.groupBy('year').agg(count('star_rating').alias('count')).orderBy("Year").toPandas()
ratings_pd

In [None]:
# Plot the line graph using Matplotlib
plt.figure(figsize=(10, 6))
plt.plot(ratings_pd["year"], ratings_pd["count"], marker='o')
plt.xlabel("Year")
plt.ylabel("Number of Ratings")
plt.title("Number of Ratings Over the Years")
plt.grid(True)
plt.show()

In [None]:
# Step 3: Group data and calculate distribution
grouped_df = df.groupBy('star_rating', 'sentiment').agg(count('*').alias('count'))

# Step 4: Pivot the data for visualization
pivoted_df = grouped_df.groupBy('star_rating').pivot('sentiment').sum('count').fillna(0)

# Step 5: Convert to Pandas DataFrame for visualization
pandas_df = pivoted_df.toPandas()


In [None]:
pandas_df

In [None]:
# Step 6: Plot the bar graph
ax = pandas_df.plot(x='star_rating', kind='bar', stacked=True)
ax.set_xlabel("Star Rating")
ax.set_ylabel("Count")
ax.set_title("Distribution of Sentiment Categories within Star Ratings")
plt.legend(title='Sentiment', title_fontsize='large')
plt.show()






In [None]:
# Separate the star ratings for verified and non-verified purchases
verified_ratings = df2.filter(df2['verified_purchase'] == 'Y').select('star_rating').toPandas()
non_verified_ratings = df2.filter(df2['verified_purchase'] == 'N').select('star_rating').toPandas()

# Plot the distribution of star ratings for verified and non-verified purchases
plt.hist(verified_ratings, bins=5, alpha=0.5, label='Verified Purchase', edgecolor='black')
plt.hist(non_verified_ratings, bins=5, alpha=0.5, label='Non-Verified Purchase', edgecolor='black')

# Set plot labels and title
plt.xlabel('Star Rating')
plt.ylabel('Frequency')
plt.title('Distribution of Star Ratings by Verified Purchase Status')

# Add a legend
plt.legend()

# Display the histogram
plt.show()



##### Review Analysis

In [None]:
# Step 3: Group data and calculate the count for each purchase verification status
grouped_df = df2.groupBy('verified_purchase').agg(count('*').alias('count'))

# Step 4: Convert to Pandas DataFrame for visualization
pandas_df = grouped_df.toPandas()


In [None]:
pandas_df.head()

In [None]:
# Create the bar plot using Seaborn
plt.figure(figsize=(10, 6))
sns.barplot(x='verified_purchase', y='count', data=pandas_df, palette='Set2')

# Set the x-axis and y-axis labels
plt.xlabel('Purchase Verification')
plt.ylabel('Number of Reviews')

# Set the title of the plot
plt.title('Number of Reviews by Purchase Verification')

# Show the plot
plt.show()

In [None]:
customer_reviews_count = df2.groupBy("customer_id").count()
sorted_customer_reviews = customer_reviews_count.orderBy(col("count").desc())
top_10_customers = sorted_customer_reviews.limit(10).toPandas()

# Plot the bar chart
plt.figure(figsize=(10, 6))
plt.bar(top_10_customers["customer_id"], top_10_customers["count"])
plt.xlabel("Customer ID")
plt.ylabel("Number of Reviews")
plt.title("Top 10 Customers with the Most Reviews")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
category_reviews_count = df2.groupBy("product_category").count()
sorted_category_reviews = category_reviews_count.orderBy(col("count").desc())
top_10_categories = sorted_category_reviews.limit(10).toPandas()

In [None]:
# Plot the bar chart
plt.figure(figsize=(10, 6))
plt.bar(top_10_categories["product_category"], top_10_categories["count"], color='lightcoral')
plt.xlabel("Product Category")
plt.ylabel("Number of Reviews")
plt.title("Top 10 Product Categories with the Most Reviews")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:

helpful_unhelpful_counts = df2.agg({"helpful_votes": "sum", "unhelpful_votes": "sum"}).collect()[0]


In [None]:

plt.figure(figsize=(8, 6))
sns.barplot(x=['Helpful', 'Unhelpful'], y=[helpful_unhelpful_counts['sum(helpful_votes)'], helpful_unhelpful_counts['sum(unhelpful_votes)']],
            palette=['lightblue', 'lightcoral'])


plt.xlabel('Review Type')
plt.ylabel('Number of Reviews')


plt.title('Number of Helpful Reviews vs Unhelpful Reviews')


plt.show()

In [None]:
def get_month_int(month_name):
    return month_order[month_name]

In [None]:
# Register the UDF
month_to_int_udf = udf(get_month_int, IntegerType())

In [None]:
# Create a dictionary to map month names to their corresponding numerical values
month_order = {
    'January': 1,
    'February': 2,
    'March': 3,
    'April': 4,
    'May': 5,
    'June': 6,
    'July': 7,
    'August': 8,
    'September': 9,
    'October': 10,
    'November': 11,
    'December': 12
}

# Apply the month order mapping to the "month" column
df2 = df2.withColumn('month_int', month_to_int_udf(df2['month']))

# Group by "year" and "month", count the occurrences, and order the results
trends_df = df2.groupBy("year", "month","month_int").agg(f.count("*").alias("count")) \
               .orderBy("year", "month_int").toPandas()

In [None]:
trends_df.head()

In [None]:
def improve_legend(ax=None):
    if ax is None:
        ax = plt.gca()

    lines = ax.lines  # Get all lines in the plot
    legend_handles, legend_labels = ax.get_legend_handles_labels()  # Get legend handles and labels

    for i, line in enumerate(lines):
        data_x, data_y = line.get_data()
        if len(data_x)>0 and len(data_y)>0:
            first_x = data_x[0]
            first_y = data_y[0]
            year = legend_labels[i]  # Get the year from the legend labels
            color = legend_handles[i].get_color()  # Get the color from the legend handles
            ax.annotate(
                year,
                xy=(first_x, first_y),
                xytext=(5, 0),
                textcoords="offset points",
                ha="left",
                va="center",
                color=color,
            )

In [None]:
# Define a custom color palette with brighter colors
custom_palette = sns.color_palette("bright")

# Plot the time trends
plt.figure(figsize=(12, 6))
ax=sns.lineplot(data=trends_df, x="month", y="count", hue="year", palette=custom_palette)
plt.xlabel("Month")
plt.ylabel("Number of Reviews")
plt.title("Time Trends: Number of Reviews")

improve_legend(ax)
plt.legend(title="Year")
plt.show()

In [None]:
# Join all reviews into a single string
all_reviews = df2.select(concat_ws('', df2['review_body_clean'])).collect()[0][0]

# Create a word cloud object
wordcloud = WordCloud(width=800, height=400, random_state=42, max_font_size=100).generate(all_reviews)

# Display the word cloud using matplotlib
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

In [None]:
def get_text_length(text):
    return len(text)

In [None]:
# Register the UDF
text_length_udf = udf(get_text_length, IntegerType())

In [None]:
# Create the new column "text_length" based on the length of the "review_body" column
#df2 = df2.withColumn('text_length', text_length_udf(df2['review_body_clean']))

In [None]:
df2.limit(1).toPandas()

In [None]:
newdf=df2.select("review_text_length").toPandas()


# Distribution plot of text length
plt.figure(figsize=(10, 6))
plt.hist(newdf['review_text_length'], bins=50)
plt.xlabel('Review Length')
plt.ylabel('Frequency')
plt.title('Distribution of Review Length ')
plt.show()

In [None]:
# Group the data by the 'vine' column and count the number of reviews for each category
review_counts = df2.groupBy('vine').count().orderBy('vine')

In [None]:
review_counts.show()

In [None]:
newdf=review_counts.toPandas()

In [None]:
# Create the bar chart using matplotlib
plt.bar(newdf['vine'], newdf['count'])
plt.xlabel('Vine Reviews')
plt.ylabel('Number of Reviews')
plt.title('Number of Reviews for Vine and Non-Vine Reviews')
plt.xticks([0, 1], ['Non-Vine', 'Vine'])
plt.show()

In [None]:
# Group the data by "customer_id" and count the number of reviews for each customer
customer_review_counts = df2.groupBy('customer_id').agg(count('review_id').alias('review_count'))

# Calculate the average number of reviews per customer
average_reviews = customer_review_counts.select(avg('review_count')).collect()[0][0]

# Print the average number of reviews per customer
print(f"On average, each customer gives {average_reviews:.2f} reviews.")

#### Explore Product Issues

In [None]:
df2.limit(5).toPandas()

In [None]:
df2.columns

In [None]:
negativeReviews=df2.filter(col("sentiment")=="negative")

In [None]:
negativeReviews.limit(5).toPandas()

In [None]:
negativeReviewsDF = negativeReviews.select("lemmas").toPandas()

In [None]:
neg_tokens = [token for sublist in negativeReviewsDF["lemmas"] for token in sublist]

In [None]:
len(neg_tokens)

In [None]:
n = 2  
bigrams =list(nltk.ngrams(neg_tokens, n))

In [None]:
freq_dist = nltk.FreqDist(bigrams)

In [None]:
# Get the first 10 N-grams for spam sorted by frequency in descending order
topbigrams = dict(sorted(freq_dist.items(), key=lambda x: x[1], reverse=True)[:20])

In [None]:
# Plot the first 10 N-grams for spam using horizontal bar chart (barh) with reversed order
plt.figure(figsize=(12, 6))
plt.barh(range(len(topbigrams)), list(topbigrams.values())[::-1], align='center',color="red")
plt.yticks(range(len(topbigrams)), list(topbigrams.keys())[::-1])
plt.xlabel("Frequency")
plt.ylabel("Bi-gram")
plt.title("Top 20 bi-grams for Negative Reviews")


plt.show()

In [None]:
n = 3  
trigrams =list(nltk.ngrams(neg_tokens, n))

In [None]:
freq_dist2 = nltk.FreqDist(trigrams)

In [None]:
# Get the first 10 N-grams for spam sorted by frequency in descending order
toptrigrams = dict(sorted(freq_dist2.items(), key=lambda x: x[1], reverse=True)[:20])

In [None]:
# Plot the first 10 N-grams for spam using horizontal bar chart (barh) with reversed order
plt.figure(figsize=(12, 6))
plt.barh(range(len(toptrigrams)), list(toptrigrams.values())[::-1], align='center',color="red")
plt.yticks(range(len(toptrigrams)), list(toptrigrams.keys())[::-1])
plt.xlabel("Frequency")
plt.ylabel("Tri-gram")
plt.title("Top 20 tri-grams for Negative Reviews")


plt.show()

#### Negative Reviews for ratings less than 3 stars

In [None]:
negativeReviewsDFLowRating = negativeReviews.filter(col("star_rating")<3).select("lemmas").toPandas()

In [None]:
negativeReviewsDFLowRating.head()

In [None]:
neg_tokens_lowRating = [token for sublist in negativeReviewsDFLowRating["lemmas"] for token in sublist]

In [None]:
len(neg_tokens_lowRating)

In [None]:
n = 2 
bigrams =list(nltk.ngrams(neg_tokens_lowRating, n))

In [None]:
freq_dist = nltk.FreqDist(bigrams)

In [None]:
# Get the first 10 N-grams for spam sorted by frequency in descending order
topbigrams = dict(sorted(freq_dist.items(), key=lambda x: x[1], reverse=True)[:20])

In [None]:
# Plot the first 10 N-grams for spam using horizontal bar chart (barh) with reversed order
plt.figure(figsize=(12, 24))
plt.barh(range(len(topbigrams)), list(topbigrams.values())[::-1], align='center',color="red")
plt.yticks(range(len(topbigrams)), list(topbigrams.keys())[::-1])
plt.xlabel("Frequency")
plt.ylabel("Tri-gram")
plt.title("Top 20 bi-grams for Negative Reviews with Low Ratings")


plt.show()

In [None]:
n = 3
trigrams =list(nltk.ngrams(neg_tokens_lowRating, n))

In [None]:
freq_dist = nltk.FreqDist(trigrams)

In [None]:
# Get the first 10 N-grams for spam sorted by frequency in descending order
toptrigrams = dict(sorted(freq_dist.items(), key=lambda x: x[1], reverse=True)[:20])

In [None]:
# Plot the first 10 N-grams for spam using horizontal bar chart (barh) with reversed order
plt.figure(figsize=(12, 24))
plt.barh(range(len(toptrigrams)), list(toptrigrams.values())[::-1], align='center',color="red")
plt.yticks(range(len(toptrigrams)), list(toptrigrams.keys())[::-1])
plt.xlabel("Frequency")
plt.ylabel("Tri-gram")
plt.title("Top 20 tri-grams for Negative Reviews with Low Ratings")


plt.show()

#### identify fake or manipulated reviews

In [None]:
reviewsDF=spam_df.select("review_text_length","labels","sentiment_score","sentiment","class").toPandas()

In [None]:
reviewsDF["review_text_length"].plot(bins=50,kind='hist',color='orange')

# Set axis labels and title
plt.xlabel('Review Text Length')
plt.ylabel('Frequency')
plt.title('Distribution of Review Text Length')

In [None]:
# Filter the DataFrame for 'ham' and 'spam' labels

ham = reviewsDF[reviewsDF["labels"]=="ham"]
spam = reviewsDF[reviewsDF["labels"]=="spam"]

In [None]:
ham["labels"].value_counts()

In [None]:
# Generate separate histograms for 'ham' and 'spam' subsets
fig, axes = plt.subplots(2, sharex=True, sharey=True)
ham["review_text_length"].plot(kind="hist", bins=50, ax=axes[0], color="blue", alpha=0.7)
spam["review_text_length"].plot(kind="hist", bins=50, ax=axes[1], color="red", alpha=0.7)

# Set axis labels and title
plt.xlabel('Review Length')
plt.ylabel('Frequency')
plt.suptitle('Distribution of Review Length for Ham vs Spam')

# Add legend
axes[0].legend(["Ham"])
axes[1].legend(["Spam"])

# Adjust spacing between subplots
plt.tight_layout()

# Display the plot
plt.show()

In [None]:
# Extract the correlations

correlation_matrix = reviewsDF[["class", "review_text_length", "sentiment_score"]].corr()
correlation_length = correlation_matrix.loc["class", "review_text_length"]
correlation_sentiment = correlation_matrix.loc["class", "sentiment_score"]

print("Correlation between review length and labels:", correlation_length)
print("Correlation between sentiment and labels:", correlation_sentiment)

In [None]:
reviewDF=spam_df[["labels","reviewDate","review_text_length","sentiment"]].toPandas()

In [None]:
reviewDF.head()

In [None]:
reviewDF.dtypes

In [None]:
reviewDF["reviewDate"]=pd.to_datetime(reviewDF['reviewDate'])

In [None]:
reviewDF.head()

In [None]:
reviewDF.dtypes

In [None]:
grouped_df = reviewDF.groupby([reviewDF['reviewDate'].dt.year, 'labels']).size().unstack().fillna(0)
# Plot the number of spam vs ham over the years
ax = grouped_df.plot(kind='line', marker='o', linestyle='-', figsize=(10, 6))

# Set labels and title
plt.xlabel('Year')
plt.ylabel('Count')
plt.title('Number of Spam vs Ham over the Years')

# Set legend
ax.legend(['Ham', 'Spam'])

# Show the plot
plt.show()

In [None]:
# Group the DataFrame by 'labels' and calculate the mean review length for each category
grouped_df = reviewDF.groupby('labels')['review_text_length'].mean()

# Plot the spam vs ham review length
ax = grouped_df.plot(kind='bar', figsize=(10, 6))

# Set labels and title
plt.xlabel('Labels')
plt.ylabel('Average Review Length')
plt.title('Average Review Length for Spam vs Ham')

# Show the plot
plt.show()

In [None]:
# Plot the review length by class
plt.figure(figsize=(10, 6))
sns.boxplot(x='labels', y='review_text_length', data=reviewDF)

# Set labels and title
plt.xlabel('Review Class')
plt.ylabel('Review Length')
plt.title('Review Length by Class (Spam vs Ham)')

# Show the plot
plt.show()

In [None]:
reviewDF.head()

In [None]:
# Plot the review length by sentiment
plt.figure(figsize=(10, 6))
colors = {'positive': 'green', 'negative': 'red', 'neutral': 'blue'}
for sentiment, data in reviewDF.groupby('sentiment'):
    plt.scatter(data['review_text_length'], [sentiment] * len(data), color=colors[sentiment], label=sentiment, alpha=0.7)

# Set labels and title
plt.xlabel('Review Length')
plt.ylabel('Sentiment')
plt.title('Review Length by Sentiment')

# Add legend
plt.legend()

# Show the plot
plt.show()

#### Word Cloud

In [None]:
spam_df=spam_df.withColumn("concatenated_lemmas", concat_ws(" ", col("lemmas")))

In [None]:
spam_df.limit(1).toPandas()

In [None]:
spam=spam_df.filter(spam_df.labels == "spam")

In [None]:
ham=spam_df.filter(spam_df.labels == "ham")

In [None]:
spam_reviews=spam.select(spam.concatenated_lemmas).rdd.flatMap(lambda x: x).collect()

In [None]:
len(spam_reviews)

In [None]:
spam_reviews[:2]

In [None]:
# Convert sentences to a list of words
words_list = [sentence.split() for sentence in spam_reviews]

In [None]:
spam_reviews=[word for sentence in words_list for word in sentence]

In [None]:
spam_reviews[:5]

In [None]:
len(spam_reviews)

In [None]:
ham_reviews=ham.select(ham.concatenated_lemmas).rdd.flatMap(lambda x: x).collect()

In [None]:
# Convert sentences to a list of words
words_list = [sentence.split() for sentence in ham_reviews]

In [None]:
ham_reviews=[word for sentence in words_list for word in sentence]

In [None]:
# Count the occurrences of each word
spam_word_counts = Counter(spam_reviews)

In [None]:
# Count the occurrences of each word
ham_word_counts = Counter(ham_reviews)

In [None]:
# Select the top 20 words with highest frequencies
spam_top_words = dict(spam_word_counts.most_common(20))

In [None]:
# Select the top 20 words with highest frequencies
ham_top_words = dict(ham_word_counts.most_common(20))

In [None]:
spam_wordcloud = WordCloud(width=800, height=400).generate_from_frequencies(spam_top_words)
ham_wordcloud = WordCloud(width=800, height=400).generate_from_frequencies(ham_top_words)

In [None]:
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.imshow(spam_wordcloud, interpolation="bilinear")
plt.title("Spam Word Cloud")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(ham_wordcloud, interpolation="bilinear")
plt.title("Ham Word Cloud")
plt.axis("off")

plt.tight_layout()
plt.show()

#### Bi-grams

In [None]:
ngram = NGram(n=2, inputCol="lemmas", outputCol="ngrams")
ham = ngram.transform(ham)

In [None]:
ngram = NGram(n=2, inputCol="lemmas", outputCol="ngrams")
spam = ngram.transform(spam)

In [None]:
ngram_spam_count = spam.groupBy("ngrams").count().orderBy(col("count").desc())

In [None]:
ngram_ham_count = ham.groupBy("ngrams").count().orderBy(col("count").desc())

In [None]:
# Convert the DataFrame to a Pandas DataFrame for easy plotting
ngram_count_pd = ngram_spam_count.limit(20).toPandas()

In [None]:
# Convert the DataFrame to a Pandas DataFrame for easy plotting
ngram_count_pd_ham = ngram_ham_count.limit(20).toPandas()

In [None]:
spam_ngrams = ngram_count_pd["ngrams"]
spam_frequencies = ngram_count_pd["count"]

In [None]:
ham_ngrams = ngram_count_pd_ham["ngrams"]
ham_frequencies = ngram_count_pd_ham["count"]

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(range(len(spam_ngrams)), spam_frequencies, align='center',color="red")
plt.xticks(range(len(spam_ngrams)), spam_ngrams, rotation=45)
plt.xlabel(f"Bi-gram")
plt.ylabel("Frequency")
plt.title(f"Top 20 Bi-grams for Spam Reviews")
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(range(len(ham_ngrams)), ham_frequencies, align='center',color="green")
plt.xticks(range(len(ham_ngrams)), ham_ngrams, rotation=45)
plt.xlabel(f"Bi-gram")
plt.ylabel("Frequency")
plt.title(f"Top 20 Bi-grams for Ham Reviews")
plt.tight_layout()
plt.show()

#### Trigram

In [None]:
ngram = NGram(n=3, inputCol="lemmas", outputCol="trigrams")
ham = ngram.transform(ham)

In [None]:
ngram = NGram(n=3, inputCol="lemmas", outputCol="trigrams")
spam = ngram.transform(spam)

In [None]:
ngram_spam_count = spam.groupBy("trigrams").count().orderBy(col("count").desc())
ngram_ham_count = ham.groupBy("trigrams").count().orderBy(col("count").desc())


In [None]:
ngram_count_pd_ham = ngram_ham_count.limit(20).toPandas()

# Convert the DataFrame to a Pandas DataFrame for easy plotting
ngram_count_pd = ngram_spam_count.limit(20).toPandas()

In [None]:
spam_ngrams = ngram_count_pd["trigrams"]
spam_frequencies = ngram_count_pd["count"]

In [None]:
ham_ngrams = ngram_count_pd_ham["trigrams"]
ham_frequencies = ngram_count_pd_ham["count"]

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(range(len(spam_ngrams)), spam_frequencies, align='center',color="red")
plt.xticks(range(len(spam_ngrams)), spam_ngrams, rotation=90)
plt.xlabel(f"Tri-gram")
plt.ylabel("Frequency")
plt.title(f"Top 20 Tri-grams for Spam Reviews")
plt.tight_layout()
plt.show()

In [None]:
ham_ngrams.head()

In [None]:
spam_ngrams

In [None]:
ham_ngrams=ham_ngrams[:5]

In [None]:
ham_frequencies=ham_frequencies[:5]

In [None]:
plt.figure(figsize=(12, 6))
plt.bar(range(len(ham_ngrams)), ham_frequencies, align='center',color="green")
plt.xticks(range(len(ham_ngrams)), ham_ngrams, rotation=90)
plt.xlabel("Tri-gram")
plt.ylabel("Frequency")
plt.title("Top 5 Tri-grams for Spam Reviews")
plt.tight_layout()
plt.show()

#### Feature Engineering

##### Amazon Spam Reviews Dataset

In [None]:
count_vec = CountVectorizer(inputCol='lemmas',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
#spam_df = spam_df.withColumn('abs_sentiment_score', abs(spam_df['sentiment_score']))
vectorassembler = VectorAssembler(inputCols=['tf_idf','review_text_length','abs_sentiment_score'],outputCol='raw_features')
scaler  = StandardScaler(inputCol="raw_features", outputCol="features", withMean=True, withStd=True)
#pca = PCA(k=2, inputCol="scaled_features", outputCol="features")

In [None]:
data_prep_pipe = Pipeline(stages=[count_vec,idf,vectorassembler,scaler])
cleaner = data_prep_pipe.fit(spam_df)
clean_data = cleaner.transform(spam_df)

In [None]:
cleaner.write().overwrite().save("gs://productpal_bucket/pipelines/data_prep_pipe")

In [None]:
clean_data.limit(5).toPandas()

In [None]:
clean_data = clean_data.select(['class','features'])

In [None]:
clean_data = clean_data.withColumnRenamed("class", "label")

In [None]:
# Split the data into training and testing sets
train_data, test_data = clean_data.randomSplit([0.8, 0.2], seed=0)

#### Model Building

##### Amazon Spam Reviews Dataset

#### Naive Bayes Model

In [None]:
# Define the Naive Bayes classifier
nb = NaiveBayes()

# Define the hyperparameter grid for tuning
nbparam_grid = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.1, 1.0, 10.0]) \
    .build()

# Set up the TrainValidationSplit with the NaiveBayes classifier and parameter grid
nbtvs = TrainValidationSplit(estimator=nb,
                           estimatorParamMaps=nbparam_grid,
                           evaluator=BinaryClassificationEvaluator(),
                           trainRatio=0.8)

In [None]:
# Fit the TrainValidationSplit on the training data
nbtvs_model = nbtvs.fit(train_data)

In [None]:
# Get the best NaiveBayes model from the TVS model
nbmodel = nbtvs_model.bestModel

In [None]:
# Extract the best hyperparameters
best_smoothing = nbmodel._java_obj.getSmoothing()

In [None]:
# Print the best hyperparameters
print(f"Best Smoothing Parameter: {best_smoothing}")

In [None]:


# Make predictions on the test data
predictions = nbmodel.transform(test_data)



In [None]:
# Evaluate the model's accuracy on the validation data
nbevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
nbareaUnderROC = nbevaluator.evaluate(predictions)

In [None]:
nbareaUnderROC

In [None]:
nbevaluator = BinaryClassificationEvaluator(metricName="areaUnderPR")
nbareaUnderPR = nbevaluator.evaluate(predictions)

In [None]:
nbareaUnderPR

In [None]:
nbmodel.write().overwrite().save("gs://productpal_bucket/models/nbModel")

#### Support Vector Machine Model

In [None]:
# Define the LinearSVC classifier
svm = LinearSVC()

# Define the hyperparameter grid for tuning
svmparam_grid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(svm.maxIter, [10, 100]) \
    .build()


# Set up the TrainValidationSplit with the SVM classifier and parameter grid
svmtvs = TrainValidationSplit(estimator=svm,
                           estimatorParamMaps=svmparam_grid,
                           evaluator=BinaryClassificationEvaluator(),
                           trainRatio=0.8)





In [None]:
svmtvs_model = svmtvs.fit(train_data)

In [None]:
svmmodel=svmtvs_model.bestModel

In [None]:
# Retrieve the best parameters
svmbest_reg_param = svmmodel._java_obj.getRegParam()
svmbest_max_iter = svmmodel._java_obj.getMaxIter()

In [None]:
print(f"Best Model Parameters: RegParam={svmbest_reg_param}, MaxIter={svmbest_max_iter}")

In [None]:
# Make predictions on the validation data
predictions = svmmodel.transform(test_data)

In [None]:
# Evaluate the model's accuracy on the validation data
svmevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
svmareaUnderROC = svmevaluator.evaluate(predictions)

In [None]:
svmareaUnderROC

In [None]:
# Evaluate the model's accuracy on the validation data
svmevaluator = BinaryClassificationEvaluator(metricName="areaUnderPR")
svmareaUnderPR = svmevaluator.evaluate(predictions)

In [None]:
svmareaUnderPR

In [None]:
svmmodel.write().overwrite().save("gs://productpal_bucket/models/svmmodel")

#### Decision Tree Classifier Model

In [None]:
# Create an instance of DecisionTreeClassifier
dt = DecisionTreeClassifier(maxDepth=15)

# Define the hyperparameter grid for tuning
dtparam_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .addGrid(dt.maxBins, [16, 32, 64]) \
    .addGrid(dt.minInstancesPerNode, [1, 5, 10]) \
    .addGrid(dt.minInfoGain, [0.0, 0.1, 0.2]) \
    .build()

# Set up the TrainValidationSplit with the DecisionTreeClassifier and parameter grid
dttvs = TrainValidationSplit(estimator=dt,
                           estimatorParamMaps=dtparam_grid,
                           evaluator=BinaryClassificationEvaluator(),
                           trainRatio=0.8)

# Train the model on the training data
dtmodel = dt.fit(train_data)

In [None]:
dttvs_model = dttvs.fit(train_data)

In [None]:
dtmodel=dttvs_model.bestModel

In [None]:
# Extract the parameter map from the best model
best_params = dtmodel.extractParamMap()

In [None]:
# Print the hyperparameters and their values
for param in best_params:
    print(f"{param.name}: {best_params[param]}")

In [None]:
# Make predictions on the validation data
predictions = dtmodel.transform(test_data)

In [None]:
# Evaluate the model's accuracy on the validation data
dtevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
dtareaUnderROC = dtevaluator.evaluate(predictions)

In [None]:
dtareaUnderROC

In [None]:
# Evaluate the model's accuracy on the validation data
dtevaluator = BinaryClassificationEvaluator(metricName="areaUnderPR")
dtareaUnderPR = dtevaluator.evaluate(predictions)

In [None]:
dtareaUnderPR

In [None]:
dtmodel.write().overwrite().save("gs://productpal_bucket/models/dtmodel")

#### K-means Clustering Content-Based Recommendation Engine

In [None]:

grouped_df = df2.groupBy("product_id","product_title").agg(
    avg("abs_sentiment_score").alias("avg_abs_sentiment"),
    avg("review_text_length").alias("avg_review_length"),
    round(avg("star_rating"),0).alias("avg_star_rating"),
    collect_list("lemmas").alias("combined_tokens")
)
grouped_df = grouped_df.withColumn("combined_tokens", flatten(col("combined_tokens")))

In [None]:
grouped_df.limit(5).toPandas()

In [None]:
grouped_df.count()

In [None]:
# Data Preprocessing: Filter relevant columns and handle missing values if necessary
cbDF = grouped_df.select("customer_id", "avg_star_rating","product_id","product_title",\
                         "avg_review_length","avg_abs_sentiment","combined_tokens")  

In [None]:
cbDF.limit(5).toPandas()

In [None]:
cbDF.count()

In [None]:
df2.count()

In [None]:
hashingTF = HashingTF(inputCol="combined_tokens", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="result")

In [None]:
# StringIndexer for product_asin and customer_id
product_indexer = StringIndexer(inputCol="product_id", outputCol="product_id_index")
customer_indexer = StringIndexer(inputCol="customer_id", outputCol="customerid__index")

In [None]:
# Select the relevant features for clustering
feature_columns = ["avg_review_length", "avg_abs_sentiment", "result"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="raw_features")
#reviews_data = assembler.transform(reviews_data)

In [None]:
# Normalize the features column
normalizer = Normalizer(inputCol="raw_features", outputCol="normalized_features")

In [None]:
pca = PCA(k=2, inputCol="normalized_features", outputCol="features")

In [None]:
# Pipeline for feature engineering
pipeline = Pipeline(stages=[hashingTF, idf,assembler,normalizer,pca, product_indexer, customer_indexer])
pipeline_model = pipeline.fit(cbDF)
df_transformed = pipeline_model.transform(cbDF)

In [None]:
df_transformed.limit(5).toPandas()

In [None]:
pipeline_model.write().overwrite().save("gs://productpal_bucket/pipelines/clustering_pipeline1")

In [None]:
# Split the data into training and testing sets
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=0)

In [None]:
# Create an instance of KMeans
kmeans = KMeans()


# Create a ParamGridBuilder to define the hyperparameter grid
kmparam_grid = ParamGridBuilder() \
    .addGrid(kmeans.k, [2, 3, 4]) \
    .addGrid(kmeans.maxIter, [10, 20, 30]) \
    .build()

# Define the evaluator
kmevaluator = ClusteringEvaluator()

# Create a TrainValidationSplit instance
kmtvs = TrainValidationSplit(estimator=kmeans,
                           estimatorParamMaps=kmparam_grid,
                           evaluator=kmevaluator,
                           trainRatio=0.8)

In [None]:
kmtvsModel = kmtvs.fit(train_data)

In [None]:
cbKmeansmodel=kmtvsModel.bestModel

In [None]:
clustered_data = cbKmeansmodel.transform(test_data)

In [None]:
# Evaluate the clustering performance using Within-Cluster Sum of Squared Errors (WCSS)
evaluator = ClusteringEvaluator()

In [None]:
# Evaluate the clustering performance using Silhouette Score
silhouette_score = evaluator.evaluate(clustered_data, {evaluator.metricName: "silhouette"})
print("Silhouette Score:", silhouette_score)

#### Hierarchial Clustering Model

In [None]:
# Trains a bisecting k-means model(hierarchial clustering)
bkm = BisectingKMeans()

# Create a ParamGridBuilder to define the hyperparameter grid
bkmparam_grid = ParamGridBuilder() \
    .addGrid(kmeans.k, [2, 3, 4]) \
    .addGrid(kmeans.maxIter, [10, 20, 30]) \
    .build()

# Define the evaluator
bkmevaluator = ClusteringEvaluator()

# Create a TrainValidationSplit instance
bkmtvs = TrainValidationSplit(estimator=bkm,
                           estimatorParamMaps=bkmparam_grid,
                           evaluator=bkmevaluator,
                           trainRatio=0.8)

In [None]:
bkmtvsModel = bkmtvs.fit(train_data)

In [None]:
cbbKmeansmodel=bkmtvsModel.bestModel

In [None]:
bkm_clustered_data = cbbKmeansmodel.transform(test_data)

In [None]:
# Evaluate the clustering performance using Silhouette Score
bkmsilhouette_score = evaluator.evaluate(bkm_clustered_data, {evaluator.metricName: "silhouette"})
print("Silhouette Score:", bkmsilhouette_score)

In [None]:
cbKmeansmodel.write().overwrite().save("gs://productpal_bucket/models/kmeans")

In [None]:
cbKmeansmodel= KMeansModel.load("gs://productpal_bucket/models/kmeans")

In [None]:
clustered_data=cbKmeansmodel.transform(test_data)

In [None]:
sample_data_1=clustered_data.sample(0.01, seed=143)

In [None]:
#Find similar products within the same cluster for a target product from the test data
target_product_id = sample_data_1.first()["product_id"]


In [None]:
target_product_id

In [None]:
sample_data_1.limit(1).show()

In [None]:
target_product_cluster = sample_data_1.filter(col("product_id")==target_product_id).first()["prediction"]

In [None]:
target_product_cluster

In [None]:
product_cluster_data=sample_data_1\
.select("prediction","product_id","normalized_features","product_title","star_rating")\
.filter(col("prediction")==target_product_cluster)

In [None]:
product_cluster_data.limit(2).toPandas()

In [None]:
product_data=sample_data_1.select("prediction","product_id","normalized_features","product_title","star_rating")\
.filter((col("prediction")==target_product_cluster) & (col("product_id")==target_product_id)).limit(1)

In [None]:
product_data.limit(2).toPandas()

In [None]:
# Define a UDF to calculate cosine similarity
def cosine_similarity(vector1, vector2):
    dot_product = float(vector1.dot(vector2))
    magnitude_product = float(vector1.norm(2) * vector2.norm(2))
    return dot_product / magnitude_product

cosine_similarity_udf = udf(cosine_similarity)

# Cross-join normalized features with itself to get all pairwise combinations
cross_joined_data = product_data.alias("a").crossJoin(product_cluster_data.alias("b"))


In [None]:
# Calculate cosine similarity and select relevant columns
cosine_similarity_df = cross_joined_data.select(
    "a.product_id",
    "b.product_id",
    cosine_similarity_udf("a.normalized_features", "b.normalized_features").alias("cosine_similarity")
)

In [None]:
# Filter out self-pairs (where product_id1 = product_id2)
cosine_similarity_df = cosine_similarity_df.filter(col("a.product_id") != col("b.product_id"))\
.orderBy(col("cosine_similarity").desc())

# Show the DataFrame
cosine_similarity_df.show()

In [None]:
top10prodIDs=cosine_similarity_df.limit(10).select(col("b.product_id")).withColumn("product_id",trim(col("product_id")))

In [None]:
top10prodIDs.show()

In [None]:
distinctProducts=product_cluster_data.select("product_id","product_title").distinct()\
.withColumn("product_id",trim(col("product_id")))

In [None]:
distinctProducts.show()

In [None]:
newdf=distinctProducts.join(top10prodIDs, on="product_id", how="inner")

In [None]:
newdf.show(truncate=False)

In [None]:
# Convert the product IDs DataFrame to a list of product IDs
product_ids_list = top10prodIDs.rdd.flatMap(lambda x: x).collect()

In [None]:
product_ids_list

In [None]:
filtered_product_details_df = distinctProducts.filter(col("product_id").isin(product_ids_list))

In [None]:
filtered_product_details_df.show(truncate=False)

#### Collaborative Filtering using Alternating Least Square Model

In [None]:
#convert string type customer_id and product-id to int
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ['customer_id', 'product_id']]
customerIndexer=StringIndexer(inputCol="customer_id", outputCol="customer_id_index").fit(df2)
productIndexer=StringIndexer(inputCol="product_id", outputCol="product_id_index").fit(df2)
pipeline = Pipeline(stages=[customerIndexer,productIndexer])


In [None]:
typeconverterPipeline = pipeline.fit(df2)

In [None]:
transformed = typeconverterPipeline.transform(df2)

In [None]:
typeconverterPipeline.write().overwrite().save("gs://productpal_bucket/pipelines/typeconverterPipeline")

In [None]:
transformed.limit(5).toPandas()

In [None]:
cfSample=transformed.sample(0.1,seed=56)

In [None]:
train_data, test_data = cfSample.randomSplit([0.8, 0.2], seed=0)

In [None]:
# initialize the ALS model
als= ALS(maxIter=5, rank=4, regParam=0.01,userCol='customer_id_index', itemCol='product_id_index', \
                ratingCol='star_rating', coldStartStrategy='drop',\
                nonnegative=True)


In [None]:
model=als.fit(train_data)

In [None]:
predictions = model.transform(test_data)



In [None]:
model.write().overwrite().save("gs://productpal_bucket/models/alsmodel")

In [None]:
evaluator=RegressionEvaluator(metricName='rmse',labelCol='star_rating',predictionCol='prediction')
rmse=evaluator.evaluate(predictions)
print(rmse)

In [None]:
model=ALSModel.load("gs://productpal_bucket/models/alsmodel")

In [None]:
train_data.limit(5).toPandas()

In [None]:
userRec=train_data.limit(5).select("customer_id_index");

In [None]:
userRec=userRec.withColumn("customer_id_index", col("customer_id_index").cast("integer"))

In [None]:
recommendations = model.recommendForUserSubset(userRec,1)

In [None]:
recommendations.show()

In [None]:
recs=recommendations.withColumn("itemAndRating",explode(recommendations.recommendations))\
.select("customer_id_index","itemAndRating.*")

In [None]:
recs=recs.withColumn("customer_id_index",col("customer_id_index").cast("double")).\
withColumn("product_id_index",col("product_id_index").cast("double"))

In [None]:
customerConverter=IndexToString(inputCol="customer_id_index",outputCol="customer_id",labels=customerIndexer.labels)
productConverter=IndexToString(inputCol="product_id_index",outputCol="product_id",labels=productIndexer.labels)

In [None]:
pModel=Pipeline(stages=[customerConverter,productConverter]).fit(cfSample)

In [None]:
results=pModel.transform(recs)

In [None]:
results.show()

In [None]:
product_id_list=results.select("product_id").rdd.flatMap(lambda x: x).collect()

In [None]:
relatedProducts=cfSample.filter(col("product_id").isin(product_id_list)).select("product_id","product_title","star_rating")

In [None]:
relatedProducts.count()

In [None]:
# Set the maximum column width to display
pd.set_option('display.max_colwidth', None)

relatedProducts.toPandas()

In [None]:
pModel.write().overwrite().save("gs://productpal_bucket/pipelines/indextostring_pipeline")

In [None]:
cfSample.write.mode("overwrite").option("header", "true")\
.json("gs://productpal_bucket/cfSample.json")