# Spark architecture for parallel processing

## Prepare Spark environment

In [1]:
## install java jdk for spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
## download the necessity package from apache website
!wget -q  --trust-server-names  https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz -O file.tgz 
print("Download completed successfully !!!")

## unzip donwloaded file
!tar zxvf file.tgz

Download completed successfully !!!
spark-3.2.1-bin-hadoop3.2/
spark-3.2.1-bin-hadoop3.2/LICENSE
spark-3.2.1-bin-hadoop3.2/NOTICE
spark-3.2.1-bin-hadoop3.2/R/
spark-3.2.1-bin-hadoop3.2/R/lib/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/DESCRIPTION
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/INDEX
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/Rd.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/features.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/hsearch.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/links.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/nsInfo.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/package.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/vignette.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/NAMESPACE
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR.rdb
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/R/SparkR.rd

In [3]:
## install requirement library for spark 
!pip install -q findspark 

In [4]:
## setting up location for spark and java home directory on drive
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [5]:
## install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 24 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=7b250ba00cdb81036c1e7f94972ecb7ecefed2f728a7ddb5106db95de2422610
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


# Prepare data

## Import necessary library 

In [6]:
## Normal libs to work with data
## adding support for large, multi-dimensional arrays and matrices.
import numpy as np 
## data structures and operations for manipulating numerical tables and time series.
import pandas as pd 

################################################################################
# Visualization
## the output of plotting commands is displayed inline within frontends like the Jupyter notebook, directly below the code cell that produced it.
%matplotlib inline 
## provides an implicit, MATLAB-like, way of plotting
import matplotlib.pyplot as plt 
## provides a high-level interface for drawing attractive and informative statistical
import seaborn as sns

################################################################################
## Utility
from collections import Counter
import string

################################################################################
## Feature engineering
from imblearn.under_sampling import RandomUnderSampler
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.probability import FreqDist
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk import word_tokenize
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import VectorAssembler
## utility packs for text processing, i.e. lower case all text
from pyspark.sql.functions import *


################################################################################
## ML models
## split data into train and test
from sklearn.model_selection import train_test_split
## use for classification pyspark models
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LinearSVC

################################################################################
## Evaluate model
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
import sklearn.metrics as metrics
## pyspark evaluation for classification models
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [7]:
import pyspark

In [8]:
pyspark.__version__

'3.2.1'

In [None]:
## download external text libs data
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


True

## Get data

In [None]:
## get data from remote file if needed
## url = "https://docs.google.com/spreadsheets/d/1ZVemCFQ_cWCEjriTFBLYHGM33q56eISk/edit?usp=sharing&ouid=102981063366545209715&rtpof=true&sd=true"
## s = requests.get(url).text

## get data by uploading file
df = pd.read_excel('Womens_Clothing_E_Commerce_Reviews.xlsx', sheet_name='Reviews', index_col=0)

In [None]:
## check 5 rows of dataset
df.head()

Unnamed: 0,Clothing ID,Age,Title,Review Text,Rating,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name
0.0,767.0,33.0,,Absolutely wonderful - silky and sexy and comf...,4.0,1.0,0.0,Initmates,Intimate,Intimates
1.0,1080.0,34.0,,Love this dress! it's sooo pretty. i happene...,5.0,1.0,4.0,General,Dresses,Dresses
2.0,1077.0,60.0,Some major design flaws,I had such high hopes for this dress and reall...,3.0,0.0,0.0,General,Dresses,Dresses
3.0,1049.0,50.0,My favorite buy!,"I love, love, love this jumpsuit. it's fun, fl...",5.0,1.0,0.0,General Petite,Bottoms,Pants
4.0,847.0,47.0,Flattering shirt,This shirt is very flattering to all due to th...,5.0,1.0,6.0,General,Tops,Blouses


## Feature engineering

#### Fill NaN value with ' '. Avoid nan string when combine texts

In [None]:
## avoid nan string when combine texts
df['Title'] = df['Title'].fillna('')
df['Review Text'] = df['Review Text'].fillna('')

#### Combine Title & Review Text column into 1 Review Description column

In [None]:
## combine Title & Review Text column into 1 Review Description column
df = df.assign(ReviewDescription = df['Title'].astype(str) + ' ' + df['Review Text'].astype(str))
df.head()

Unnamed: 0,Clothing ID,Age,Title,Review Text,Rating,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name,ReviewDescription
0.0,767.0,33.0,,Absolutely wonderful - silky and sexy and comf...,4.0,1.0,0.0,Initmates,Intimate,Intimates,Absolutely wonderful - silky and sexy and com...
1.0,1080.0,34.0,,Love this dress! it's sooo pretty. i happene...,5.0,1.0,4.0,General,Dresses,Dresses,Love this dress! it's sooo pretty. i happen...
2.0,1077.0,60.0,Some major design flaws,I had such high hopes for this dress and reall...,3.0,0.0,0.0,General,Dresses,Dresses,Some major design flaws I had such high hopes ...
3.0,1049.0,50.0,My favorite buy!,"I love, love, love this jumpsuit. it's fun, fl...",5.0,1.0,0.0,General Petite,Bottoms,Pants,"My favorite buy! I love, love, love this jumps..."
4.0,847.0,47.0,Flattering shirt,This shirt is very flattering to all due to th...,5.0,1.0,6.0,General,Tops,Blouses,Flattering shirt This shirt is very flattering...


#### Drop NaN values

In [None]:
## drop NaN values in categorical columns
df = df.dropna()
df.isnull().sum()

Clothing ID                0
Age                        0
Title                      0
Review Text                0
Rating                     0
Recommended IND            0
Positive Feedback Count    0
Division Name              0
Department Name            0
Class Name                 0
ReviewDescription          0
dtype: int64

#### Remove special character in ReviewDescription column

In [None]:
## remove special character
df['ReviewDescription'] = df['ReviewDescription'].str.replace(r"[^a-zA-Z ]","")
df['ReviewDescription'] = df['ReviewDescription'].str.replace(r"[0-9]","")
df.head()

  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,Clothing ID,Age,Title,Review Text,Rating,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name,ReviewDescription
0.0,767.0,33.0,,Absolutely wonderful - silky and sexy and comf...,4.0,1.0,0.0,Initmates,Intimate,Intimates,Absolutely wonderful silky and sexy and comf...
1.0,1080.0,34.0,,Love this dress! it's sooo pretty. i happene...,5.0,1.0,4.0,General,Dresses,Dresses,Love this dress its sooo pretty i happened ...
2.0,1077.0,60.0,Some major design flaws,I had such high hopes for this dress and reall...,3.0,0.0,0.0,General,Dresses,Dresses,Some major design flaws I had such high hopes ...
3.0,1049.0,50.0,My favorite buy!,"I love, love, love this jumpsuit. it's fun, fl...",5.0,1.0,0.0,General Petite,Bottoms,Pants,My favorite buy I love love love this jumpsuit...
4.0,847.0,47.0,Flattering shirt,This shirt is very flattering to all due to th...,5.0,1.0,6.0,General,Tops,Blouses,Flattering shirt This shirt is very flattering...


#### Binning rating

In [None]:
## if 1-3 stars = 0 
## if 4-5 stars = 1
bins = [0, 3, 5]
labels = [0,1]
df['Binned Rating'] = pd.cut(df['Rating'], bins=bins, labels=labels)
df.tail()

Unnamed: 0,Clothing ID,Age,Title,Review Text,Rating,Recommended IND,Positive Feedback Count,Division Name,Department Name,Class Name,ReviewDescription,Binned Rating
23476.0,1104.0,34.0,Great dress for many occasions,I was very happy to snag this dress at such a ...,5.0,1.0,0.0,General Petite,Dresses,Dresses,Great dress for many occasions I was very happ...,1
23477.0,862.0,48.0,Wish it was made of cotton,"It reminds me of maternity clothes. soft, stre...",3.0,1.0,0.0,General Petite,Tops,Knits,Wish it was made of cotton It reminds me of ma...,0
23478.0,1104.0,31.0,"Cute, but see through","This fit well, but the top was very see throug...",3.0,0.0,1.0,General Petite,Dresses,Dresses,Cute but see through This fit well but the top...,0
23479.0,1084.0,28.0,"Very cute dress, perfect for summer parties an...",I bought this dress for a wedding i have this ...,3.0,1.0,2.0,General,Dresses,Dresses,Very cute dress perfect for summer parties and...,0
23480.0,1104.0,52.0,Please make more like this one!,This dress in a lovely platinum is feminine an...,5.0,1.0,22.0,General Petite,Dresses,Dresses,Please make more like this one This dress in a...,1


#### Copy df

In [None]:
## copy df into another dataframe to save original data
rating_class = df.copy()

#### Remove unfrequent words

In [None]:
## get a frequent word dictionary
words_fdist = FreqDist(word for word in word_tokenize(rating_class['ReviewDescription'].str.cat(sep=' ')))

## convert dict to df
words_df = pd.DataFrame.from_dict(words_fdist,\
                                       orient='index').\
                                rename(columns={0:'freq'})

## get common list words                                
common_l = words_df[words_df.freq > 50].index.to_list()
print(common_l)

['Absolutely', 'wonderful', 'silky', 'and', 'sexy', 'comfortable', 'Love', 'this', 'dress', 'its', 'pretty', 'i', 'happened', 'to', 'find', 'it', 'in', 'a', 'store', 'im', 'glad', 'did', 'bc', 'never', 'would', 'have', 'ordered', 'online', 'petite', 'bought', 'am', 'love', 'the', 'length', 'on', 'me', 'hits', 'just', 'little', 'below', 'knee', 'definitely', 'be', 'true', 'midi', 'someone', 'who', 'is', 'truly', 'design', 'flaws', 'I', 'had', 'such', 'high', 'hopes', 'for', 'really', 'wanted', 'work', 'initially', 'small', 'my', 'usual', 'size', 'but', 'found', 'so', 'fact', 'that', 'could', 'not', 'zip', 'up', 'medium', 'which', 'was', 'ok', 'overall', 'top', 'half', 'fit', 'nicely', 'bottom', 'very', 'tight', 'under', 'layer', 'several', 'somewhat', 'cheap', 'over', 'layers', 'flaw', 'sewn', 'into', 'zipper', 'c', 'My', 'favorite', 'buy', 'jumpsuit', 'fun', 'flirty', 'fabulous', 'every', 'time', 'wear', 'get', 'nothing', 'great', 'compliments', 'Flattering', 'shirt', 'This', 'flatteri

In [None]:
## print number of common words
len(common_l)

1612

In [None]:
def remove_unfreq_words(review, common_l):
    ## tokenizer
    nopunc = []
    word_lst = []
    for word in review.split():
        if word.lower() not in common_l:
            review.replace(word, '')
        else:
            nopunc.append(word.lower()) ## lower text

    nopunc = ' '.join(nopunc)

    return nopunc

In [None]:
## remove unfrequen words
rating_class['ReviewDescription'] = rating_class['ReviewDescription'].apply(remove_unfreq_words, common_l=common_l)
rating_class['ReviewDescription']

0.0        absolutely wonderful silky and sexy and comfor...
1.0        love this dress its pretty i happened to find ...
2.0        some design flaws i had such high hopes for th...
3.0        my favorite buy i love love love this jumpsuit...
4.0        flattering shirt this shirt is very flattering...
                                 ...                        
23476.0    great dress for many occasions i was very happ...
23477.0    wish it was made of cotton it reminds me of ma...
23478.0    cute but see through this fit well but the top...
23479.0    very cute dress perfect for summer and we i bo...
23480.0    please make more like this one this dress in a...
Name: ReviewDescription, Length: 23467, dtype: object

## Init Spark

In [None]:
def init_spark(master, appName):
    ## use findSpark package to make a Spark Context available

    import findspark
    findspark.init()
            
    
    from pyspark import SparkContext
    from pyspark.conf import SparkConf 
    from pyspark.sql import SparkSession # working with dataframe
    
    SparkContext.setSystemProperty('spark.executor.memory', '15g')
    SparkContext.setSystemProperty('spark.driver.memory', '15g')
    spark_context = SparkContext(master=master, appName=appName)
    spark_session = SparkSession(spark_context)
    
    print("Init Spark successfully!")
    
    return spark_context, spark_session

spark_context, spark_session = init_spark("local", "Classification - Womens E-Commerce Clothing Reviews")

Init Spark successfully!


## Select necessary columns to convert to pyspark dataframe

In [None]:
preprocessed_df = rating_class[['Recommended IND', 'ReviewDescription', 'Binned Rating']]

## Convert from pandas df to spark df

In [None]:
preprocessed_spdf = spark_session.createDataFrame(preprocessed_df)
preprocessed_spdf.show()

+---------------+--------------------+-------------+
|Recommended IND|   ReviewDescription|Binned Rating|
+---------------+--------------------+-------------+
|            1.0|absolutely wonder...|            1|
|            1.0|love this dress i...|            1|
|            0.0|some design flaws...|            0|
|            1.0|my favorite buy i...|            1|
|            1.0|flattering shirt ...|            1|
|            0.0|not for the very ...|            0|
|            1.0|fun i this in my ...|            1|
|            1.0|surprisingly goes...|            1|
|            1.0|flattering i love...|            1|
|            1.0|such a fun dress ...|            1|
|            1.0|this dress is per...|            1|
|            1.0|perfect more and ...|            1|
|            1.0|runs big bought t...|            1|
|            1.0|pretty party dres...|            0|
|            1.0|nice but not for ...|            1|
|            1.0|you need to be at...|        

# Text processing

## Stop word removal

In [None]:
## create ad-hoc list with user define based on domain
stopwordList=['']

## setting up stop words
stopwordList.extend(StopWordsRemover().getStopWords())
stopwordList = list(set(stopwordList))

## Prepare pipeline

In [None]:
## Detail for each step function is provided in the report
## 1st step
#topic_indexer = StringIndexer(inputCol='topic_name',
#                             outputCol='index_topic')

## 2nd step
tokenizer = Tokenizer(inputCol='ReviewDescription',
                     outputCol='tokenized_ReviewDescription')

## 3rd step
stopremove = StopWordsRemover(inputCol='tokenized_ReviewDescription',
                            outputCol='stop_ReviewDescription',
                            stopWords=stopwordList)

## 4th step
count_vec = CountVectorizer(inputCol='stop_ReviewDescription',
                           outputCol='vector_ReviewDescription')

## 5th step
idf = IDF(inputCol='vector_ReviewDescription',
          outputCol='processed_ReviewDescription')
    
## define pipeline
#pipeline = Pipeline(stages=[topic_indexer, tokenizer, stopremove, count_vec, idf])

pipeline = Pipeline(stages=[tokenizer,
                            stopremove,
                            count_vec,
                            idf])

## fit and transform
processed_features = pipeline.fit(preprocessed_spdf)
features = processed_features.transform(preprocessed_spdf)

In [None]:
### for easy copy paste column names
features.columns

['Recommended IND',
 'ReviewDescription',
 'Binned Rating',
 'tokenized_ReviewDescription',
 'stop_ReviewDescription',
 'vector_ReviewDescription',
 'processed_ReviewDescription']

In [None]:
features.show(5)

+---------------+--------------------+-------------+---------------------------+----------------------+------------------------+---------------------------+
|Recommended IND|   ReviewDescription|Binned Rating|tokenized_ReviewDescription|stop_ReviewDescription|vector_ReviewDescription|processed_ReviewDescription|
+---------------+--------------------+-------------+---------------------------+----------------------+------------------------+---------------------------+
|            1.0|absolutely wonder...|            1|       [absolutely, wond...|  [absolutely, wond...|    (1422,[23,166,384...|       (1422,[23,166,384...|
|            1.0|love this dress i...|            1|       [love, this, dres...|  [love, dress, pre...|    (1422,[0,1,8,18,1...|       (1422,[0,1,8,18,1...|
|            0.0|some design flaws...|            0|       [some, design, fl...|  [design, flaws, h...|    (1422,[0,2,3,5,10...|       (1422,[0,2,3,5,10...|
|            1.0|my favorite buy i...|            1|      

# Prepare input

In [None]:
## assembling all features into an input
assembler = VectorAssembler(inputCols=['Recommended IND', 'processed_ReviewDescription'],
                           outputCol='features')

In [None]:
## transform the assembler
features = assembler.transform(features)

In [None]:
features.show(5)

+---------------+--------------------+-------------+---------------------------+----------------------+------------------------+---------------------------+--------------------+
|Recommended IND|   ReviewDescription|Binned Rating|tokenized_ReviewDescription|stop_ReviewDescription|vector_ReviewDescription|processed_ReviewDescription|            features|
+---------------+--------------------+-------------+---------------------------+----------------------+------------------------+---------------------------+--------------------+
|            1.0|absolutely wonder...|            1|       [absolutely, wond...|  [absolutely, wond...|    (1422,[23,166,384...|       (1422,[23,166,384...|(1423,[0,24,167,3...|
|            1.0|love this dress i...|            1|       [love, this, dres...|  [love, dress, pre...|    (1422,[0,1,8,18,1...|       (1422,[0,1,8,18,1...|(1423,[0,1,2,9,19...|
|            0.0|some design flaws...|            0|       [some, design, fl...|  [design, flaws, h...|    (14

In [None]:
## select the input features
model_data = features.select('features', 'Binned Rating')
model_data.show(5)

+--------------------+-------------+
|            features|Binned Rating|
+--------------------+-------------+
|(1423,[0,24,167,3...|            1|
|(1423,[0,1,2,9,19...|            1|
|(1423,[1,3,4,6,11...|            0|
|(1423,[0,2,5,8,63...|            1|
|(1423,[0,2,8,14,1...|            1|
+--------------------+-------------+
only showing top 5 rows



# Build model

In [None]:
## train test splitting
(training, test) = model_data.randomSplit([0.7,0.3])

In [None]:
## create a list of classification models
lg = LogisticRegression(labelCol='Binned Rating', featuresCol = 'features')
nb = NaiveBayes(labelCol='Binned Rating', featuresCol = 'features', modelType='multinomial')
dt = DecisionTreeClassifier(labelCol='Binned Rating', featuresCol = 'features')
rf = RandomForestClassifier(labelCol='Binned Rating', featuresCol = 'features')
gb = GBTClassifier(labelCol='Binned Rating', featuresCol = 'features')
#mp = MultilayerPerceptronClassifier(labelCol='Binned Rating', featuresCol = 'features')
sv = LinearSVC(labelCol='Binned Rating', featuresCol = 'features')
lst_model =[lg, nb, dt, rf]

In [None]:
## create accuracy list
global list_acc 
list_acc = []
global list_pre 
list_pre = []
global list_rec 
list_rec = []

In [None]:
def build_model_classification(model, training, test):
    
    fitted_model = model.fit(training)

    test_model = fitted_model.transform(test)

    acc_evaluator = MulticlassClassificationEvaluator(labelCol = 'Binned Rating',
                                                 predictionCol = 'prediction',
                                                 metricName = 'accuracy')

    accuracy = acc_evaluator.evaluate(test_model)

    list_acc.append(accuracy)

In [None]:
build_model_classification(lg, training, test)

In [None]:
build_model_classification(nb, training, test)

In [None]:
build_model_classification(dt, training, test)

In [None]:
build_model_classification(rf, training, test)

In [None]:
build_model_classification(gb, training, test)

In [None]:
#build_model_classification(mp, training, test)

In [None]:
build_model_classification(sv, training, test)

In [None]:
list_acc

[0.9107718167810618,
 0.866928141196246,
 0.9359854321333521,
 0.7989914553859084,
 0.9357052808516599,
 0.9221179436895924]

In [None]:
list_models = ['Logistic Regression',
               'Naive Bayes',
               'Decision Tree',
               'Random Forest',
               'GBT',
               'SVC']

In [None]:
result_df = pd.DataFrame(
    {'model': list_models,
     'accuracy': list_acc
    })

In [None]:
result_df.sort_values(by='accuracy', ascending=False)

Unnamed: 0,model,accuracy
2,Decision Tree,0.935985
4,GBT,0.935705
5,SVC,0.922118
0,Logistic Regression,0.910772
1,Naive Bayes,0.866928
3,Random Forest,0.798991
