# Create spark session and spark dataframe from data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
  
spark = SparkSession.builder.getOrCreate()

schema = StructType([
      StructField("reviewerID",StringType(),True),
      StructField("asin",StringType(),True),
      StructField("reviewerName",StringType(),True),
      StructField("helpful",StringType(),True),
      StructField("reviewText",StringType(),True),
      StructField("overall",StringType(),True),
      StructField("summary",StringType(),True),
      StructField("unixReviewTime",StringType(),True),
      StructField("reviewTime",StringType(),True)
  ])
df = spark.read.schema(schema).json('data/Software.json')
df.show(2)

21/12/08 17:37:21 WARN Utils: Your hostname, YasamanEms-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.250 instead (on interface en0)
21/12/08 17:37:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/08 17:37:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/08 17:37:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]

+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
|    reviewerID|      asin|       reviewerName|helpful|          reviewText|overall|       summary|unixReviewTime| reviewTime|
+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
|A240ORQ2LF9LUI|0077613252|         Michelle W|   null|The materials arr...|    4.0|Material Great|    1394496000|03 11, 2014|
|A1YCCU0YRLS0FE|0077613252|Rosalind White Ames|   null|I am really enjoy...|    4.0|        Health|    1393113600|02 23, 2014|
+--------------+----------+-------------------+-------+--------------------+-------+--------------+--------------+-----------+
only showing top 2 rows



                                                                                

# Data preprocessing

In [2]:
from pyspark.sql.types import IntegerType


df = df.selectExpr("cast(reviewText as string) reviewText",
                    "cast(overall as int) overall")


### Drop null values

In [3]:
#drop any rows contain null
df = df.na.drop("any")

### Checking the number of records for each overall rating

In [4]:
from pyspark.sql import SparkSession

# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((overall)),overall from df group by overall").show()

                                                                                

+--------------+-------+
|count(overall)|overall|
+--------------+-------+
|        102542|      1|
|         39394|      3|
|        212399|      5|
|         73590|      4|
|         31445|      2|
+--------------+-------+



### Filter rating scores

In [5]:
df = df.where("overall<6 and overall!=3")
df = df.where("overall>0")

### Cheking scores after filtering

In [6]:
from pyspark.sql import SparkSession
  
# creating sparksession and giving app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((overall)),overall from df group by overall").show()

                                                                                

+--------------+-------+
|count(overall)|overall|
+--------------+-------+
|        102542|      1|
|        212399|      5|
|         73590|      4|
|         31445|      2|
+--------------+-------+



### Bucketizing overall scores to two categories

In [7]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 1, 4, 5 ],inputCol="overall", outputCol="category")
df = bucketizer.setHandleInvalid("keep").transform(df)

df.show()

+--------------------+-------+--------+
|          reviewText|overall|category|
+--------------------+-------+--------+
|The materials arr...|      4|     1.0|
|I am really enjoy...|      4|     1.0|
|IF YOU ARE TAKING...|      1|     0.0|
|I have used Learn...|      5|     1.0|
|Strong backgroung...|      4|     1.0|
|i got this book o...|      5|     1.0|
|I was very happy ...|      5|     1.0|
|Recieved in a tim...|      5|     1.0|
|Maybe it's just m...|      2|     0.0|
|This was the text...|      5|     1.0|
|Not worth the pri...|      2|     0.0|
|I love how this b...|      4|     1.0|
|Great on the deli...|      5|     1.0|
|The book was deli...|      5|     1.0|
|Required to buy t...|      2|     0.0|
|Didn't help me mu...|      1|     0.0|
|Disappointing tex...|      1|     0.0|
|This book provide...|      4|     1.0|
|I've been using D...|      4|     1.0|
|The demo is done ...|      4|     1.0|
+--------------------+-------+--------+
only showing top 20 rows



### Replacing regex and convert sentiments to lowercase

In [8]:
import pyspark.sql.functions as sq
from pyspark.sql.functions import lower, col
df = df.select("*", lower(col('reviewText')).alias("lower_text"))
df = df.withColumn("no_line_text", sq.regexp_replace("lower_text", r"\n", " "))
df = df.withColumn("no_digit_text", sq.regexp_replace("no_line_text", r"[0-9]", " "))
df = df.withColumn("text_ready", sq.regexp_replace("no_digit_text", r"[^\P{P}-]+", " "))


### Drop Duplicates

In [9]:
print("\n # of records including duplicates:  " + str(df.count()))

[Stage 22:>                                                         (0 + 8) / 8]


 # of records including duplicates:  419976


                                                                                

In [10]:
df = df.dropDuplicates()

In [11]:
print("\n # of records after drop duplicates:  " + str(df.count()))




 # of records after drop duplicates:  385876


                                                                                

### Shuffling records

In [12]:
from pyspark.sql.functions import rand 
#shuffling data
df = df.orderBy(rand())

### Formating labels as fasttext requires

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

t = {0.0:"__label__negative",1.0:"__label__positive"}
udf_cat = udf(lambda x: t[x], StringType())

df = df.withColumn("label", udf_cat("category"))

### Cheking count of each label

In [14]:
# creating sparksession and giving app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
# creating a temporary view of
# Dataframe and storing it into df2
df.createOrReplaceTempView("df")

# using the SQL query to count all
# distinct records and display the
# count on the screen
spark.sql("select count((label)),label from df group by label").show()

                                                                                

+------------+-----------------+
|count(label)|            label|
+------------+-----------------+
|      256690|__label__positive|
|      129186|__label__negative|
+------------+-----------------+



# Split Data to train and test sets

In [15]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
trainingData.show(truncate = 20)

                                                                                

+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+-----------------+
|          reviewText|overall|category|          lower_text|        no_line_text|       no_digit_text|          text_ready|            label|
+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+-----------------+
|'HGTV Home Design...|      4|     1.0|'hgtv home design...|'hgtv home design...|'hgtv home design...| hgtv home design...|__label__positive|
|* Important Note ...|      5|     1.0|* important note ...|* important note ...|* important note ...|  important note ...|__label__positive|
|-The program caus...|      1|     0.0|-the program caus...|-the program caus...|-the program caus...|-the program caus...|__label__negative|
|..This is a piece...|      1|     0.0|..this is a piece...|..this is a piece...|..this is a piece...| this is a piece ...|__label__negative|
|1st t

# Concatintating label and asociated sentiment to create text file

In [16]:
from pyspark.sql.functions import concat,col,lit

train_df = trainingData.select(concat(trainingData.label,lit(" "),trainingData.text_ready).alias("all"))

test_df = testData.select(concat(testData.label,lit(" "),testData.text_ready).alias("all"))


In [17]:
train_df.show(truncate=60)



+------------------------------------------------------------+
|                                                         all|
+------------------------------------------------------------+
|__label__positive  hgtv home design   remodeling suite   ...|
|__label__positive   important note at bottom    i own the...|
|__label__negative -the program caused my computer to shut...|
|__label__negative  this is a piece of junk   i don t have...|
|__label__negative  st time reviewer here  i purchased thi...|
|__label__positive       i switched from  turbotax  to  h ...|
|__label__negative      is have a communication problem wi...|
|__label__positive   licenses for at most $   each  often ...|
|        __label__positive     is the best protection  i find|
|__label__negative <a data-hook= product-link-linked  clas...|
|__label__positive <a data-hook= product-link-linked  clas...|
|__label__positive a very reliable and upgraded version of...|
|__label__positive a full installation within minutes  

                                                                                

## Write datasets to text file as fasttext requires

In [18]:
textfile = open("train_data.txt", "w")
i =0
for row in list(train_df.toLocalIterator()):
    i += 1
    textfile.write(row['all'] + "\n")

textfile.close()

textfile = open("test_data.txt", "w")
i =0
for row in list(test_df.toLocalIterator()):
    i += 1
    textfile.write(row['all'] + "\n")

textfile.close()



In [19]:
print("\n number of training records: " + str(train_df.count()))
print("\n number of testing records: " + str(test_df.count()))

                                                                                


 number of training records: 270270





 number of testing records: 115606


                                                                                

# Training fasttext classification model, manully set the hyperparameters

In [20]:
import fasttext
model = fasttext.train_supervised(input="train_data.txt", lr=0.1, epoch=25, wordNgrams=1, bucket=200000, dim=10, loss='hs')

Read 22M words
Number of words:  124203
Number of labels: 2
Progress: 100.0% words/sec/thread: 4129179 lr:  0.000000 avg.loss:  0.238139 ETA:   0h 0m 0s 10.4% words/sec/thread: 4416722 lr:  0.089640 avg.loss:  0.282216 ETA:   0h 0m15s 15.4% words/sec/thread: 4373847 lr:  0.084637 avg.loss:  0.272456 ETA:   0h 0m15s4373387 lr:  0.080081 avg.loss:  0.267244 ETA:   0h 0m14s 21.0% words/sec/thread: 4362717 lr:  0.079017 avg.loss:  0.265886 ETA:   0h 0m14s 29.2% words/sec/thread: 4311618 lr:  0.070823 avg.loss:  0.260784 ETA:   0h 0m12s 47.2% words/sec/thread: 4167043 lr:  0.052797 avg.loss:  0.251859 ETA:   0h 0m 9s 54.8% words/sec/thread: 4168602 lr:  0.045209 avg.loss:  0.249266 ETA:   0h 0m 8s 58.0% words/sec/thread: 4167755 lr:  0.041951 avg.loss:  0.248626 ETA:   0h 0m 7s 63.4% words/sec/thread: 4164887 lr:  0.036561 avg.loss:  0.247242 ETA:   0h 0m 6s 75.4% words/sec/thread: 4163943 lr:  0.024630 avg.loss:  0.243705 ETA:   0h 0m 4s 99.4% words/sec/thread: 4148120 lr:  0.000631 avg.lo

# Training fasttext classification model with auto hyperparameter tuning

In [21]:
model_auto_hyper = fasttext.train_supervised(input="train_data.txt", autotuneValidationFile='test_data.txt')

Progress: 100.0% Trials:   10 Best score:  0.935367 ETA:   0h 0m 0s
Training again with best arguments
Read 22M words
Number of words:  124203
Number of labels: 2
Progress: 100.0% words/sec/thread: 1107950 lr:  0.000000 avg.loss:  0.129678 ETA:   0h 0m 0s 30.7% words/sec/thread: 1102968 lr:  0.043083 avg.loss:  0.223019 ETA:   0h 0m17s 61.8% words/sec/thread: 1168993 lr:  0.023771 avg.loss:  0.165925 ETA:   0h 0m 9s 65.9% words/sec/thread: 1167354 lr:  0.021201 avg.loss:  0.161332 ETA:   0h 0m 8s 70.6% words/sec/thread: 1167973 lr:  0.018287 avg.loss:  0.155816 ETA:   0h 0m 7s 71.3% words/sec/thread: 1166427 lr:  0.017820 avg.loss:  0.154981 ETA:   0h 0m 6s 87.8% words/sec/thread: 1146151 lr:  0.007594 avg.loss:  0.139309 ETA:   0h 0m 3s 91.5% words/sec/thread: 1130862 lr:  0.005274 avg.loss:  0.136103 ETA:   0h 0m 2s 92.2% words/sec/thread: 1129596 lr:  0.004839 avg.loss:  0.135424 ETA:   0h 0m 1s 96.9% words/sec/thread: 1114035 lr:  0.001899 avg.loss:  0.131922 ETA:   0h 0m 0s 97.7% 

## Computing Percision & Recall for binary classes model

In [22]:
from pyspark.sql.functions import udf, monotonically_increasing_id

test = testData.select('text_ready').rdd.flatMap(lambda x: x).collect()
pred = model.predict(test)
pred_auto = model_auto_hyper.predict(test)
d = pred[0][:]
d_auto = pred_auto[0][:]
testData = testData.repartition(1).withColumn(
    "prediction", 
    udf(lambda id: ' '.join(d[id]))(monotonically_increasing_id()))
testData = testData.repartition(1).withColumn(
    "prediction_auto_m", 
    udf(lambda id: ' '.join(d_auto[id]))(monotonically_increasing_id()))
testData.show()

[Stage 856:>                                                        (0 + 1) / 1]

+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+-----------------+
|          reviewText|overall|category|          lower_text|        no_line_text|       no_digit_text|          text_ready|            label|       prediction|prediction_auto_m|
+--------------------+-------+--------+--------------------+--------------------+--------------------+--------------------+-----------------+-----------------+-----------------+
|......by some of ...|      5|     1.0|......by some of ...|......by some of ...|......by some of ...| by some of the s...|__label__positive|__label__positive|__label__positive|
|4 stars because n...|      4|     1.0|4 stars because n...|4 stars because n...|  stars because n...|  stars because n...|__label__positive|__label__positive|__label__positive|
|<a data-hook="pro...|      1|     0.0|<a data-hook="pro...|<a data-hook="pro...|<a data-hook="pro...|<a data-

                                                                                

### manual parameters

In [24]:
model.test("test_data.txt")

(115606, 0.9114751829489819, 0.9114751829489819)

In [25]:
tp = testData[(testData.label == "__label__positive") & (testData.prediction == "__label__positive")].count()
tn = testData[(testData.label == "__label__negative") & (testData.prediction == "__label__negative")].count()
fp = testData[(testData.label == "__label__negative") & (testData.prediction == "__label__positive")].count()
fn = testData[(testData.label == "__label__positive") & (testData.prediction == "__label__negative")].count()
print ("True Positives:", str(tp))
print ("True Negatives:", str(tn))
print ("False Positives:", str(fp))
print ("False Negatives:", str(fn))
print ("Total: ", str(testData.count()))

r = float(tp)/(tp + fn)
print ("\n recall", str(r))

p = float(tp) / (tp + fp)
print ("\n precision", str(p))

                                                                                

True Positives: 71988
True Negatives: 33384
False Positives: 5444
False Negatives: 4790




Total:  115606

 recall 0.9376123368673318

 precision 0.9296931501188139


                                                                                

### auto hyper params

In [27]:
model_auto_hyper.test("test_data.txt")

(115606, 0.9352801757694237, 0.9352801757694237)

In [28]:
tp = testData[(testData.label == "__label__positive") & (testData.prediction_auto_m == "__label__positive")].count()
tn = testData[(testData.label == "__label__negative") & (testData.prediction_auto_m == "__label__negative")].count()
fp = testData[(testData.label == "__label__negative") & (testData.prediction_auto_m == "__label__positive")].count()
fn = testData[(testData.label == "__label__positive") & (testData.prediction_auto_m == "__label__negative")].count()
print ("True Positives:", str(tp))
print ("True Negatives:", str(tn))
print ("False Positives:", str(fp))
print ("False Negatives:", str(fn))
print ("Total: ", str(testData.count()))

r = float(tp)/(tp + fn)
print ("\n auto_param_recall", str(r))

p = float(tp) / (tp + fp)
print ("\n auto_param_precision", str(p))

                                                                                

True Positives: 73072
True Negatives: 35052
False Positives: 3776
False Negatives: 3706




Total:  115606

 auto_param_recall 0.9517309645992341

 auto_param_precision 0.950864043306267


                                                                                

### Getting auto hyperparameters values for best model

In [29]:
args_obj = model_auto_hyper.f.getArgs()
for hparam in dir(args_obj):
    if not hparam.startswith('__'):
        print(f"{hparam} -> {getattr(args_obj, hparam)}")

autotuneDuration -> 300
autotuneMetric -> f1
autotuneModelSize -> 
autotunePredictions -> 1
autotuneValidationFile -> test_data.txt
bucket -> 5356303
cutoff -> 0
dim -> 66
dsub -> 2
epoch -> 9
input -> train_data.txt
label -> __label__
loss -> loss_name.softmax
lr -> 0.062164072811046224
lrUpdateRate -> 100
maxn -> 0
minCount -> 1
minCountLabel -> 0
minn -> 0
model -> model_name.supervised
neg -> 5
output -> 
pretrainedVectors -> 
qnorm -> False
qout -> False
retrain -> False
saveOutput -> False
seed -> 0
setManual -> <bound method PyCapsule.setManual of <fasttext_pybind.args object at 0x7fd1a9449930>>
t -> 0.0001
thread -> 7
verbose -> 2
wordNgrams -> 3
ws -> 5


## Saving model for future use

In [30]:
model.save_model("model_amazon_sentiments.bin")

## Predicting signle sentiments to better see the results

In [31]:
model.predict(" worst customer delivery")

(('__label__negative',), array([1.00001001]))

In [32]:
model.predict(" terrible quality")

(('__label__negative',), array([1.00001001]))

In [33]:
model.predict(" very high-quality")

(('__label__positive',), array([0.99960905]))

In [34]:
model.predict(" poor toys")

(('__label__negative',), array([1.00001001]))

In [35]:
model.predict(" nice color of many different items")

(('__label__positive',), array([0.99960595]))

In [36]:
model.predict("dont hesitate to buy")

(('__label__positive',), array([1.00000501]))

In [37]:
model.predict("never buy")

(('__label__negative',), array([0.99768275]))

In [38]:
model.predict("came to my door tear up and broken but it seems it was good")

(('__label__positive',), array([0.93805468]))

In [39]:
model.predict("came to my door tear up and broken ")

(('__label__negative',), array([0.75868022]))

In [40]:
model.predict("it was awufully good")

(('__label__positive',), array([0.99987578]))

In [41]:
model.predict("it sucks")

(('__label__negative',), array([1.00001001]))