In [40]:
# libraries
import numpy as np

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col

from pyspark.ml.feature import HashingTF,IDF, Tokenizer,StopWordsRemover
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

import mlflow

In [2]:
#create_pyspark_session
spark = SparkSession.builder.appName("amazon_training").\
config("spark.memory.offHeap.enabled","true").\
config("spark.memory.offHeap.size","10g").\
getOrCreate()

24/04/22 09:41:33 WARN Utils: Your hostname, Bryan resolves to a loopback address: 127.0.1.1; using 172.20.218.41 instead (on interface eth0)
24/04/22 09:41:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/22 09:41:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#load data preprocesses
df = spark.read.csv('ml_data/amazon_ml.csv', header = True)

In [4]:
df.show(2,0)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|overall|reviewText                                                                                                                                                    |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4.0    |No issues.                                                                                                                                                    |
|5.0    |Purchased this for my device, it worked as advertised. You can never have too much phone memory, since I download a lot of stuff this was a no brainer for me.|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------------

## balance dataset

In [10]:
#class count
class_counts = df.groupBy('overall').count()
class_counts.show()

+-------+-----+
|overall|count|
+-------+-----+
|    1.0|  244|
|    5.0| 3906|
|    4.0|  526|
|    2.0|   80|
|    3.0|  142|
+-------+-----+



In [14]:
#min class
min_class = class_counts.agg(F.min('count').alias('min_value'))
min_class.show()

+---------+
|min_value|
+---------+
|       80|
+---------+



In [27]:
min_class = min_class.collect()[0][0]

In [21]:
undersampled_df = df.filter(col("overall") != 5.0)  
undersampled_df.show()

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    4.0|          No issues.|
|    4.0|it works as expec...|
|    3.0|It works, but fil...|
|    1.0|I bought 2 of tho...|
|    4.0|The memory card i...|
|    1.0|I bougth this mic...|
|    1.0|"""Ordered this f...|
|    4.0|more like 8mb/s i...|
|    2.0|I used this for a...|
|    4.0|everything about ...|
|    4.0|Used in my phone ...|
|    3.0|This card adverti...|
|    4.0|I bought this SD ...|
|    4.0|good card. work a...|
|    4.0|I got this becaus...|
|    4.0|I got this memory...|
|    4.0|I bought two of t...|
|    2.0|"""It works but t...|
|    4.0|SanDisk Ultra 32 ...|
|    3.0|Does it's job and...|
+-------+--------------------+
only showing top 20 rows



In [22]:
majority_class_df = df.filter(col("overall") == 5.0)  
majority_class_df.show()

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    5.0|Purchased this fo...|
|    5.0|This think has wo...|
|    5.0|Bought it with Re...|
|    5.0|It's mini storage...|
|    5.0|I have it in my p...|
|    5.0|It's hard to beli...|
|    5.0|Works in a HTC Re...|
|    5.0|in my galaxy s4, ...|
|    5.0|I like this SD Ca...|
|    5.0|THE NAME OF ITSEL...|
|    5.0|Solid SDHC card t...|
|    5.0|Heard that the ca...|
|    5.0|I bought this to ...|
|    5.0|got this because ...|
|    5.0|Class 10 Speed Ra...|
|    5.0|The read and writ...|
|    5.0|This works with t...|
|    5.0|Works as expected...|
|    5.0|Works great in a ...|
|    5.0|SanDisk never dis...|
+-------+--------------------+
only showing top 20 rows



In [28]:
undersample_majority_class_df = majority_class_df.sample(False, min_class/majority_class_df.count())

In [31]:
undersample_majority_class_df.count()

83

In [33]:
balanced_df = undersampled_df.union(undersample_majority_class_df)

In [34]:
balanced_df.count()

1075

In [36]:
df = balanced_df
df.groupBy('overall').count().show()

+-------+-----+
|overall|count|
+-------+-----+
|    1.0|  244|
|    4.0|  526|
|    2.0|   80|
|    3.0|  142|
|    5.0|   83|
+-------+-----+



## tf - idf

In [37]:
#tokenizer the review
tokenizer = Tokenizer(inputCol = "reviewText", outputCol = "review_words")
wordsDF = tokenizer.transform(df)

In [38]:
wordsDF.show()

+-------+--------------------+--------------------+
|overall|          reviewText|        review_words|
+-------+--------------------+--------------------+
|    4.0|          No issues.|       [no, issues.]|
|    4.0|it works as expec...|[it, works, as, e...|
|    3.0|It works, but fil...|[it, works,, but,...|
|    1.0|I bought 2 of tho...|[i, bought, 2, of...|
|    4.0|The memory card i...|[the, memory, car...|
|    1.0|I bougth this mic...|[i, bougth, this,...|
|    1.0|"""Ordered this f...|["""ordered, this...|
|    4.0|more like 8mb/s i...|[more, like, 8mb/...|
|    2.0|I used this for a...|[i, used, this, f...|
|    4.0|everything about ...|[everything, abou...|
|    4.0|Used in my phone ...|[used, in, my, ph...|
|    3.0|This card adverti...|[this, card, adve...|
|    4.0|I bought this SD ...|[i, bought, this,...|
|    4.0|good card. work a...|[good, card., wor...|
|    4.0|I got this becaus...|[i, got, this, be...|
|    4.0|I got this memory...|[i, got, this, me...|
|    4.0|I b

In [42]:
#remove stop words
remover = StopWordsRemover(inputCol = 'review_words', outputCol = 'filtered')
wordsDF2 = remover.transform(wordsDF)

In [43]:
wordsDF2.show()

+-------+--------------------+--------------------+--------------------+
|overall|          reviewText|        review_words|            filtered|
+-------+--------------------+--------------------+--------------------+
|    4.0|          No issues.|       [no, issues.]|           [issues.]|
|    4.0|it works as expec...|[it, works, as, e...|[works, expected....|
|    3.0|It works, but fil...|[it, works,, but,...|[works,, file, wr...|
|    1.0|I bought 2 of tho...|[i, bought, 2, of...|[bought, 2, sandi...|
|    4.0|The memory card i...|[the, memory, car...|[memory, card, ex...|
|    1.0|I bougth this mic...|[i, bougth, this,...|[bougth, micro, s...|
|    1.0|"""Ordered this f...|["""ordered, this...|["""ordered, gala...|
|    4.0|more like 8mb/s i...|[more, like, 8mb/...|[like, 8mb/s, not...|
|    2.0|I used this for a...|[i, used, this, f...|[used, months, ph...|
|    4.0|everything about ...|[everything, abou...|[everything, grea...|
|    4.0|Used in my phone ...|[used, in, my, ph...|

In [None]:
# convert to tf vector
hashingTF = 

In [None]:
#create a pipeline for the data

In [None]:
#model hyperparamenters -mlflow

In [None]:
#select the best one

In [None]:
# save the best one