In [1]:
__author__ = "Yasaman Emami"
__email__ = ['emami.yasamann@gmail.com','yasaman.emami@sjsu.edu']

# Create spark session and spark dataframe from data

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
  
spark = SparkSession.builder.getOrCreate()

schema = StructType([
      StructField("reviewerID",StringType(),True),
      StructField("asin",StringType(),True),
      StructField("reviewerName",StringType(),True),
      StructField("helpful",StringType(),True),
      StructField("reviewText",StringType(),True),
      StructField("overall",StringType(),True),
      StructField("summary",StringType(),True),
      StructField("unixReviewTime",StringType(),True),
      StructField("reviewTime",StringType(),True)
])

df = spark.read.schema(schema).json('data/Software.json')
df.show(2)

21/12/23 21:05:39 WARN Utils: Your hostname, YasamanEms-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.250 instead (on interface en0)
21/12/23 21:05:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/23 21:05:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                          (0 + 1) / 1]

+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
|    reviewerID|      asin|       reviewerName|helpful|          reviewText|overall|       summary|unixReviewTime| reviewTime|
+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
|A240ORQ2LF9LUI|0077613252|         Michelle W|   null|The materials arr...|    4.0|Material Great|    1394496000|03 11, 2014|
|A1YCCU0YRLS0FE|0077613252|Rosalind White Ames|   null|I am really enjoy...|    4.0|        Health|    1393113600|02 23, 2014|
+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
only showing top 2 rows



                                                                                

# Data preprocessing

In [4]:
from pyspark.sql.types import IntegerType

#manual feature selection
df = df.selectExpr("cast(reviewText as string) reviewText",
                   "cast(overall as int) overall")


### Drop null values

In [5]:
#drop any rows contain null
df = df.na.drop("any")

### Checking the number of records for each overall rating

In [6]:
from pyspark.sql import SparkSession

# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((overall)),overall from df group by overall").show()

                                                                                

+--------------+-------+
|count(overall)|overall|
+--------------+-------+
|        102542|      1|
|         39394|      3|
|        212399|      5|
|         73590|      4|
|         31445|      2|
+--------------+-------+



### Filter rating scores

In [7]:
# filter records, with rating score of 3 because they are neutral
# filter records, with rating value if out of [0,5] since it would be invalid
df = df.where("overall<6 and overall!=3")
df = df.where("overall>0")

### Cheking scores after filtering

In [8]:
from pyspark.sql import SparkSession

# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((overall)),overall from df group by overall").show()

                                                                                

+--------------+-------+
|count(overall)|overall|
+--------------+-------+
|        102542|      1|
|        212399|      5|
|         73590|      4|
|         31445|      2|
+--------------+-------+



### Bucketizing overall scores to two categories

In [9]:
from pyspark.ml.feature import Bucketizer

#Bucketizing sentiments into two categories (0 for neg and 1 for pos)
bucketizer = Bucketizer(splits=[ 1, 4, 5 ],inputCol="overall", outputCol="category")
df = bucketizer.setHandleInvalid("keep").transform(df)

df.show()

+--------------------+-------+--------+
|          reviewText|overall|category|
+--------------------+-------+--------+
|The materials arr...|      4|     1.0|
|I am really enjoy...|      4|     1.0|
|IF YOU ARE TAKING...|      1|     0.0|
|I have used Learn...|      5|     1.0|
|Strong backgroung...|      4|     1.0|
|i got this book o...|      5|     1.0|
|I was very happy ...|      5|     1.0|
|Recieved in a tim...|      5|     1.0|
|Maybe it's just m...|      2|     0.0|
|This was the text...|      5|     1.0|
|Not worth the pri...|      2|     0.0|
|I love how this b...|      4|     1.0|
|Great on the deli...|      5|     1.0|
|The book was deli...|      5|     1.0|
|Required to buy t...|      2|     0.0|
|Didn't help me mu...|      1|     0.0|
|Disappointing tex...|      1|     0.0|
|This book provide...|      4|     1.0|
|I've been using D...|      4|     1.0|
|The demo is done ...|      4|     1.0|
+--------------------+-------+--------+
only showing top 20 rows



### Replacing regex and convert sentiments to lowercase

In [10]:
import pyspark.sql.functions as sq
from pyspark.sql.functions import lower, col

#convert txt to lower case
df = df.select("*", lower(col('reviewText')).alias("lower_text"))
#remove new lines
df = df.withColumn("no_line_text", sq.regexp_replace("lower_text", r"\n", " "))
#only keep words contains a-z
df = df.withColumn("removed_punc_text", sq.regexp_replace("no_line_text", r"[^a-z]", " "))
#replace multi space with one 
df = df.withColumn("text_ready", sq.regexp_replace("removed_punc_text", r" +", ' '))
#remove single character from string
df = df.withColumn("final_txt", sq.regexp_replace("text_ready",r"\b[a-zA-Z]\b", ""))

### Drop Duplicates

In [11]:
print("\n # of records including duplicates:  " + str(df.count()))

[Stage 22:>                                                         (0 + 8) / 8]


 # of records including duplicates:  419976


                                                                                

In [12]:
df = df.dropDuplicates()

In [13]:
print("\n # of records after drop duplicates:  " + str(df.count()))




 # of records after drop duplicates:  385876


                                                                                

### Shuffling records

In [14]:
from pyspark.sql.functions import rand 
#shuffling data
df = df.orderBy(rand())

### Formating labels as fasttext requires

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

label_names = {0.0:"__label__negative",1.0:"__label__positive"}
udf_cat = udf(lambda x: label_names[x], StringType())

df = df.withColumn("label", udf_cat("category"))

### Cheking count of each label

In [16]:
# creating sparksession and giving app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((label)),label from df group by label").show()

                                                                                

+------------+-----------------+
|count(label)|            label|
+------------+-----------------+
|      256690|__label__positive|
|      129186|__label__negative|
+------------+-----------------+



# Split Data to train and test sets

In [17]:
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 100)
train_data.show(truncate = 20)



+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|          reviewText|overall|category|          lower_text|        no_line_text|   removed_punc_text|          text_ready|           final_txt|            label|
+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|"I love Webroot, ...|      5|     1.0|"i love webroot, ...|"i love webroot, ...| i love webroot  ...| i love webroot i...|  love webroot it...|__label__positive|
|(This item should...|      4|     1.0|(this item should...|(this item should...| this item should...| this item should...| this item should...|__label__positive|
|....no complaints...|      4|     1.0|....no complaints...|....no complaints...|    no complaints...| no complaints he...| no complaints he...|__label__positive|
|1) Not only is th...|

                                                                                

# Concatintating label and asociated sentiment to create text file

In [18]:
from pyspark.sql.functions import concat,col,lit

train_df = train_data.select(concat(train_data.label,lit(" "),train_data.final_txt).alias("all"))

test_df = test_data.select(concat(test_data.label,lit(" "),test_data.final_txt).alias("all"))


In [19]:
train_data.show(truncate = 20)



+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|          reviewText|overall|category|          lower_text|        no_line_text|   removed_punc_text|          text_ready|           final_txt|            label|
+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|"I love Webroot, ...|      5|     1.0|"i love webroot, ...|"i love webroot, ...| i love webroot  ...| i love webroot i...|  love webroot it...|__label__positive|
|(This item should...|      4|     1.0|(this item should...|(this item should...| this item should...| this item should...| this item should...|__label__positive|
|....no complaints...|      4|     1.0|....no complaints...|....no complaints...|    no complaints...| no complaints he...| no complaints he...|__label__positive|
|1) Not only is th...|

                                                                                

In [20]:
train_df.show(truncate=60)



+------------------------------------------------------------+
|                                                         all|
+------------------------------------------------------------+
|__label__positive   love webroot it is very easy to use  ...|
|__label__positive  this item should be called   protector...|
|__label__positive  no complaints here to complain about d...|
|__label__negative  not only is the program core so old th...|
|__label__positive  of what  needed in one very easy to us...|
|  __label__positive  yo enjoys it pretty much as advertised |
|__label__positive  seems to do what is supposed to do and...|
|__label__negative  accuracy out of the box is an embellis...|
|__label__negative   data hook product link linked class  ...|
|__label__positive   data hook product link linked class  ...|
|__label__negative  div id video block  hg ay  uj class  s...|
|__label__positive  fantastic product well known and prove...|
|__label__positive  great introduction to  great openin

                                                                                

## Write datasets to text file as fasttext requires txt input file 

In [21]:
textfile = open("train_data.txt", "w")
i =0
for row in list(train_df.toLocalIterator()):
    i += 1
    textfile.write(row['all'] + "\n")

textfile.close()

textfile = open("test_data.txt", "w")
i =0
for row in list(test_df.toLocalIterator()):
    i += 1
    textfile.write(row['all'] + "\n")

textfile.close()



In [22]:
print("\n number of training records: " + str(train_df.count()))
print("\n number of testing records: " + str(test_df.count()))

                                                                                


 number of training records: 270252





 number of testing records: 115624


                                                                                

# Training fasttext classification model, manully set the hyperparameters

In [23]:
import fasttext
model_manual_params = fasttext.train_supervised(input="train_data.txt", lr=0.1, epoch=25, wordNgrams=1, bucket=200000, dim=10, loss='hs')

Read 20M words
Number of words:  92029
Number of labels: 2
Progress: 100.0% words/sec/thread: 4281731 lr:  0.000000 avg.loss:  0.239810 ETA:   0h 0m 0s  7.8% words/sec/thread: 4681253 lr:  0.092204 avg.loss:  0.294875 ETA:   0h 0m14s 22.3% words/sec/thread: 4602498 lr:  0.077706 avg.loss:  0.269207 ETA:   0h 0m12s 25.2% words/sec/thread: 4554056 lr:  0.074824 avg.loss:  0.266023 ETA:   0h 0m12s 27.7% words/sec/thread: 4553673 lr:  0.072324 avg.loss:  0.262984 ETA:   0h 0m11s 29.5% words/sec/thread: 4545873 lr:  0.070505 avg.loss:  0.262554 ETA:   0h 0m11s 43.8% words/sec/thread: 4530173 lr:  0.056223 avg.loss:  0.255645 ETA:   0h 0m 9s 57.4% words/sec/thread: 4524343 lr:  0.042561 avg.loss:  0.250131 ETA:   0h 0m 6s 59.9% words/sec/thread: 4516677 lr:  0.040097 avg.loss:  0.249405 ETA:   0h 0m 6s 79.1% words/sec/thread: 4405701 lr:  0.020851 avg.loss:  0.244399 ETA:   0h 0m 3s 90.7% words/sec/thread: 4348068 lr:  0.009264 avg.loss:  0.241498 ETA:   0h 0m 1s 94.5% words/sec/thread: 4329

# Training fasttext classification model with auto hyperparameter tuning

In [24]:
model_auto_hyper = fasttext.train_supervised(input="train_data.txt", autotuneValidationFile='test_data.txt')

Progress: 100.0% Trials:   10 Best score:  0.934763 ETA:   0h 0m 0s
Training again with best arguments
Read 20M words
Number of words:  92029
Number of labels: 2
Progress: 100.0% words/sec/thread:  735878 lr:  0.000000 avg.loss:  0.132907 ETA:   0h 0m 0s  4.2% words/sec/thread:  985178 lr:  0.056579 avg.loss:  0.400784 ETA:   0h 0m25s 20.9% words/sec/thread:  835530 lr:  0.046688 avg.loss:  0.260779 ETA:   0h 0m24s 22.8% words/sec/thread:  779921 lr:  0.045570 avg.loss:  0.255020 ETA:   0h 0m26s 23.4% words/sec/thread:  760837 lr:  0.045205 avg.loss:  0.253466 ETA:   0h 0m26s 25.5% words/sec/thread:  682903 lr:  0.043973 avg.loss:  0.246096 ETA:   0h 0m28s 26.8% words/sec/thread:  656416 lr:  0.043216 avg.loss:  0.242038 ETA:   0h 0m29s 30.9% words/sec/thread:  656499 lr:  0.040818 avg.loss:  0.230053 ETA:   0h 0m27s 36.7% words/sec/thread:  679773 lr:  0.037381 avg.loss:  0.216059 ETA:   0h 0m24s 37.3% words/sec/thread:  681827 lr:  0.037006 avg.loss:  0.214493 ETA:   0h 0m24s 38.1% w

## Computing Percision & Recall for binary classes model

In [25]:
from pyspark.sql.functions import udf, monotonically_increasing_id

test = test_data.select('text_ready').rdd.flatMap(lambda x: x).collect()

#predict labels based on manual hyper parameters model and auto hyper parameters tunning
pred = model_manual_params.predict(test)
pred_auto = model_auto_hyper.predict(test)

# insert predicted labels column from manual param set model to test_data dataframe as "prediction" column
test_data = test_data.repartition(1).withColumn(
                                        "manual_hyper_param_model_prediction", 
                                        udf(lambda id: ' '.join(pred[0][id]))(monotonically_increasing_id())
)
# insert predicted labels column from auto hyper param model to test_data dataframe as "prediction_auto_m" column
test_data = test_data.repartition(1).withColumn(
                                        "auto_hyper_param_model_prediction", 
                                        udf(lambda id: ' '.join(pred_auto[0][id]))(monotonically_increasing_id())
)

test_data.show()

[Stage 858:>                                                        (0 + 1) / 1]

+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------------------------+---------------------------------+
|          reviewText|overall|category|          lower_text|        no_line_text|   removed_punc_text|          text_ready|           final_txt|            label|manual_hyper_param_model_prediction|auto_hyper_param_model_prediction|
+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------------------------+---------------------------------+
|....talk about be...|      1|     0.0|....talk about be...|....talk about be...|    talk about be...| talk about being...| talk about being...|__label__negative|                  __label__negative|                __label__negative|
|<a data-hook="pro...|      1|     0.0|<a data-hook="pro...|<a data-

                                                                                

### manual parameters

In [26]:
# manual param model accuracy level
model_manual_params.test("test_data.txt")

(115624, 0.9107797689061095, 0.9107797689061095)

### auto hyper params

In [27]:
# auto hyper params model accuracy level
model_auto_hyper.test("test_data.txt")

(115624, 0.9345724071127102, 0.9345724071127102)

In [29]:
# calculating recall and precision for both models
predicted_cols = ['manual_hyper_param_model_prediction', 'auto_hyper_param_model_prediction']

for pred in predicted_cols:
    tp = test_data[(test_data.label == "__label__positive") & (test_data[pred] == "__label__positive")].count()
    tn = test_data[(test_data.label == "__label__negative") & (test_data[pred] == "__label__negative")].count()
    fp = test_data[(test_data.label == "__label__negative") & (test_data[pred] == "__label__positive")].count()
    fn = test_data[(test_data.label == "__label__positive") & (test_data[pred] == "__label__negative")].count()
    print ("True Positives", pred, str(tp))
    print ("True Negatives", pred,str(tn))
    print ("False Positives", pred,str(fp))
    print ("False Negatives", pred,str(fn))
    print ("Total: ", str(test_data.count()))

    r = float(tp)/(tp + fn)
    print ("\n" + pred + " recall", str(r))

    p = float(tp) / (tp + fp)
    print ("\n" + pred + " precision", str(p))

                                                                                

True Positives manual_hyper_param_model_prediction 72231
True Negatives manual_hyper_param_model_prediction 33077
False Positives manual_hyper_param_model_prediction 5667
False Negatives manual_hyper_param_model_prediction 4649


                                                                                

Total:  115624

manual_hyper_param_model_prediction recall 0.9395291363163372

manual_hyper_param_model_prediction precision 0.9272510205653547


                                                                                

True Positives auto_hyper_param_model_prediction 73078
True Negatives auto_hyper_param_model_prediction 34708
False Positives auto_hyper_param_model_prediction 4036
False Negatives auto_hyper_param_model_prediction 3802




Total:  115624

auto_hyper_param_model_prediction recall 0.9505463059313215

auto_hyper_param_model_prediction precision 0.9476619031563659


                                                                                

### Getting auto hyperparameters values for best model

In [30]:
args_obj = model_auto_hyper.f.getArgs()
for hparam in dir(args_obj):
    if not hparam.startswith('__'):
        print(f"{hparam} -> {getattr(args_obj, hparam)}")

autotuneDuration -> 300
autotuneMetric -> f1
autotuneModelSize -> 
autotunePredictions -> 1
autotuneValidationFile -> test_data.txt
bucket -> 5429115
cutoff -> 0
dim -> 61
dsub -> 2
epoch -> 9
input -> train_data.txt
label -> __label__
loss -> loss_name.softmax
lr -> 0.059040920313827926
lrUpdateRate -> 100
maxn -> 0
minCount -> 1
minCountLabel -> 0
minn -> 0
model -> model_name.supervised
neg -> 5
output -> 
pretrainedVectors -> 
qnorm -> False
qout -> False
retrain -> False
saveOutput -> False
seed -> 0
setManual -> <bound method PyCapsule.setManual of <fasttext_pybind.args object at 0x7fd40c017130>>
t -> 0.0001
thread -> 7
verbose -> 2
wordNgrams -> 3
ws -> 5


## Saving model for future use

In [31]:
model_auto_hyper.save_model("model_amazon_sentiments.bin")

## Predicting signle sentiments to better see the results

In [32]:
# k parameter is giving the confidence of each label
model_auto_hyper.predict(" worst customer delivery",k=2)

(('__label__negative', '__label__positive'),
 array([1.00001001e+00, 1.00000034e-05]))

In [33]:
model_auto_hyper.predict(" terrible quality")

(('__label__negative',), array([1.00001001]))

In [34]:
model_auto_hyper.predict(" very high-quality")

(('__label__positive',), array([0.99811327]))

In [35]:
model_auto_hyper.predict(" poor toys")

(('__label__negative',), array([1.00001001]))

In [36]:
model_auto_hyper.predict(" nice color of many different items")

(('__label__positive',), array([1.00001001]))

In [37]:
model_auto_hyper.predict("dont hesitate to buy")

(('__label__positive',), array([0.9976359]))

In [38]:
model_auto_hyper.predict("never buy")

(('__label__negative',), array([1.00000989]))

In [39]:
model_auto_hyper.predict("came to my door tear up and broken but it seems it was good")

(('__label__positive',), array([0.68580037]))

In [40]:
model_auto_hyper.predict("came to my door tear up and broken ")

(('__label__negative',), array([0.97905266]))

In [41]:
model_auto_hyper.predict("it was awufully good")

(('__label__positive',), array([0.9533323]))

In [42]:
model_auto_hyper.predict("it sucks")

(('__label__negative',), array([1.00001001]))