<a href="https://colab.research.google.com/github/Fergus1212/review-star-ranker/blob/master/sparnknlp_review_rating_1and5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4
! pip install --ignore-installed -q spark-nlp==2.5.3

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)


In [2]:
# Run this in console to keep Co Lab Running
'''
function ConnectButton(){
    console.log("Connect pushed"); 
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click() 
}
setInterval(ConnectButton,60000);
'''

'\nfunction ConnectButton(){\n    console.log("Connect pushed"); \n    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click() \n}\nsetInterval(ConnectButton,60000);\n'

In [3]:
# Spark NLP Dependencies
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.common import *

# PySpark Dependencies
from pyspark.ml import Pipeline
from pyspark.ml.tuning import TrainValidationSplit

# Pandas
import pandas as pd

# Misc
import numpy as np

# Graphs
import matplotlib.pyplot as plt

In [4]:
# Start Spark session
spark = sparknlp.start()

In [5]:
# Best practice to print versions
print("Spark NLP version", sparknlp.version())
print("Apache Spark version", spark.version)

Spark NLP version 2.5.3
Apache Spark version 2.4.4


In [6]:
# Yin genius way of loading Amazon
file = 'amazon_reviews_us_Pet_Products_v1_00.tsv.gz'
os.system(f'wget https://s3.amazonaws.com/amazon-reviews-pds/tsv/{file}')
data = pd.read_csv(file, compression='gzip', header=0, sep='\t', error_bad_lines=False)

%time

b'Skipping line 21404: expected 15 fields, saw 22\nSkipping line 31839: expected 15 fields, saw 22\nSkipping line 37779: expected 15 fields, saw 22\nSkipping line 38344: expected 15 fields, saw 22\nSkipping line 61449: expected 15 fields, saw 22\n'
b'Skipping line 80975: expected 15 fields, saw 22\nSkipping line 85603: expected 15 fields, saw 22\nSkipping line 95950: expected 15 fields, saw 22\nSkipping line 105132: expected 15 fields, saw 22\nSkipping line 115378: expected 15 fields, saw 22\n'
b'Skipping line 152632: expected 15 fields, saw 22\nSkipping line 159310: expected 15 fields, saw 22\nSkipping line 162724: expected 15 fields, saw 22\nSkipping line 168588: expected 15 fields, saw 22\nSkipping line 170412: expected 15 fields, saw 22\nSkipping line 187169: expected 15 fields, saw 22\n'
b'Skipping line 205461: expected 15 fields, saw 22\nSkipping line 210928: expected 15 fields, saw 22\nSkipping line 213691: expected 15 fields, saw 22\nSkipping line 228697: expected 15 fields, sa

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.25 µs


In [7]:
# # For testing smaller set (comment out otherwise)
# df0 = data.iloc[np.r_[1:50000]]
# df0.reset_index(inplace=True, drop=True)

# Leave un-commented to ignore slicing
df0 = data

In [8]:
df0.count()

marketplace          2639853
customer_id          2639853
review_id            2639853
product_id           2639853
product_parent       2639853
product_title        2639852
product_category     2639853
star_rating          2639852
helpful_votes        2639852
total_votes          2639852
vine                 2639852
verified_purchase    2639852
review_headline      2639843
review_body          2639636
review_date          2639843
dtype: int64

In [9]:
# Display data
df0.head()

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,28794885,REAKC26P07MDN,B00Q0K9604,510387886,(8-Pack) EZwhelp Belly Band/Wrap,Pet Products,5.0,0.0,0.0,N,Y,"A great purchase for ""dribbly"" dogs",Best belly bands on the market! These are a g...,2015-08-31
1,US,11488901,R3NU7OMZ4HQIEG,B00MBW5O9W,912374672,Warren Eckstein's Hugs & Kisses Vitamin Minera...,Pet Products,2.0,0.0,1.0,N,Y,My dogs love Hugs and Kisses,"My dogs love Hugs and Kisses. However, the la...",2015-08-31
2,US,43214993,R14QJW3XF8QO1P,B0084OHUIO,902215727,Tyson's True Chews Premium Jerky - 12 ounce Ch...,Pet Products,5.0,0.0,0.0,N,Y,I have been purchasing these for a long time. ...,I have been purchasing these for a long time. ...,2015-08-31
3,US,12835065,R2HB7AX0394ZGY,B001GS71K2,568880110,"Soft Side Pet Crate, Navy/Tan",Pet Products,5.0,0.0,0.0,N,Y,it is easy to open and close,"It is extremely well constructed, it is easy t...",2015-08-31
4,US,26334022,RGKMPDQGSAHR3,B004ABH1LG,692846826,"EliteField 3-Door Folding Soft Dog Crate, Indo...",Pet Products,5.0,0.0,0.0,N,Y,Dog crate,Worked really well. Very pleased with my purc...,2015-08-31


In [10]:
# Select out labels and classes
df = df0[['review_body', 'star_rating']]

In [11]:
# Drop duplicates
df = df.drop_duplicates(subset='review_body').copy()
df.head()

Unnamed: 0,review_body,star_rating
0,Best belly bands on the market! These are a g...,5.0
1,"My dogs love Hugs and Kisses. However, the la...",2.0
2,I have been purchasing these for a long time. ...,5.0
3,"It is extremely well constructed, it is easy t...",5.0
4,Worked really well. Very pleased with my purc...,5.0


In [12]:
# Reset index from drop
df.reset_index(drop=True, inplace=True)

In [13]:
# View classification col counts
df.star_rating.value_counts()

5.0    1535282
4.0     366343
1.0     241989
3.0     211816
2.0     148631
Name: star_rating, dtype: int64

In [14]:
# Condense ratings
# 2 down to 1
df.loc[df["star_rating"] == 2, ['star_rating']] = 1

# 3 down to 1
df.loc[df["star_rating"] == 3, ['star_rating']] = 1

# 4 up to 5
df.loc[df["star_rating"] == 4, ['star_rating']] = 5

In [15]:
# View classification col again
df.star_rating.value_counts()

5.0    1901625
1.0     602436
Name: star_rating, dtype: int64

In [16]:
# Store the min value_count for balancing
label_min = df.star_rating.value_counts().min()

In [17]:
# Sample based on label min
df_1_bal = df.loc[df["star_rating"] == 1].sample(n = label_min, replace = True)
df_5_bal = df.loc[df["star_rating"] == 5].sample(n = label_min, replace = True)

frames = [
          df_1_bal,
          df_5_bal,
]

df_balanced = pd.concat(frames)

In [18]:
df_balanced.dtypes

review_body     object
star_rating    float64
dtype: object

In [19]:
# Select out features and labels
entireDataset = spark.createDataFrame(df_balanced[['star_rating', 'review_body']])

In [20]:
entireDataset.dtypes

[('star_rating', 'double'), ('review_body', 'string')]

In [21]:
# Set label as integer
import pyspark.sql.functions as F

entireDataset = entireDataset.withColumn("star_rating", F.round(entireDataset["star_rating"]).cast('integer'))
entireDataset.show()

+-----------+--------------------+
|star_rating|         review_body|
+-----------+--------------------+
|          1|Jammed a lot.  Wh...|
|          1|I thought this wa...|
|          1|The chips don't w...|
|          1|Guinea pigs will ...|
|          1|Needs more surfac...|
|          1|We have two cats....|
|          1|Delivered with a ...|
|          1|I like the qualit...|
|          1|Does its job but ...|
|          1|The LARGE size I ...|
|          1|Only took the Ame...|
|          1|      Too much bulky|
|          1|I was hoping that...|
|          1|I liked it, my do...|
|          1|This product did ...|
|          1|these turned out ...|
|          1|Try on 4 young di...|
|          1|I bought this bas...|
|          1|Be warned, this i...|
|          1|It is not engrave...|
+-----------+--------------------+
only showing top 20 rows



In [22]:
# Check counts
entireDataset.groupBy('star_rating').count().show()

+-----------+------+
|star_rating| count|
+-----------+------+
|          1|602436|
|          5|602436|
+-----------+------+



In [23]:
# Check infered values
entireDataset.printSchema()

root
 |-- star_rating: integer (nullable = true)
 |-- review_body: string (nullable = true)



In [24]:
# View col names
entireDataset.columns

['star_rating', 'review_body']

In [25]:
# Prepare for NLP pipeline with initial DocumentAssembler
# Content is inside review_body
document = DocumentAssembler() \
  .setInputCol("review_body") \
  .setOutputCol("document") \
  .setCleanupMode("shrink")

In [26]:
# Using sentence detector (pretrained)
# Sentence detector
use = UniversalSentenceEncoder.pretrained() \
  .setInputCols(["document"]) \
  .setOutputCol("sentence_embeddings")

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]


In [27]:
# Our labels / classes are the 1 - 5 star ratings
# These are in the star_rating col

classifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("star_rating")\
  .setMaxEpochs(30)\
  .setLr(.01)\
  .setEnableOutputLogs(True)

# Additional hyperparamters
# .setBatchSize(5)\
# .setDropout(0.5)\

In [28]:
# Declare the pipeline
use_clf_pipeline = Pipeline(
  stages = [
    document,
    use,
    classifierdl
  ]
)

In [29]:
# Split into training and testing data
trainDataset, testDataset = entireDataset.randomSplit([0.75, 0.25], seed=12345)
print(trainDataset.count())
print(testDataset.count())

903763
301109


In [None]:
# Begin fitting on the data
use_pipelineModel = use_clf_pipeline.fit(trainDataset)
%time

In [None]:
!cd ~/annotator_logs && ls -l

In [None]:
!cat ~/annotator_logs/ClassifierDLApproach_868e0d90f8fc.log error

In [None]:
# Set predictions on testDataset
predictions = use_pipelineModel.transform(testDataset)
predictions.count()
predictions.show()

In [None]:
# Run predictions
view = use_pipelineModel.transform(testDataset).select('star_rating','review_body', 'class.result').toPandas()

In [None]:
type(view)

In [None]:
# Import reports for quantifying results
from sklearn.metrics import classification_report, accuracy_score

df = view

df['result'] = df['result'].apply(lambda x: x[0])

In [None]:
df.result = df.result.astype('int64')

In [None]:
print(classification_report(df['star_rating'], df.result))
print(accuracy_score(df['star_rating'], df.result))

In [None]:
# # Save the model
# from pyspark.ml import Pipeline, PipelineModel

# use_pipelineModel.save("model_name")

In [None]:
# # Load the model
# reloaded_model = PipelineModel.load("model_name")

In [None]:
# Random test strings
series_reviews = [
                  'This is terrible yuck.',
                  'It made me sad.',
                  'Could be better but ok.',
                  'Absolutely love it'
]

df_input_test = spark.createDataFrame(pd.DataFrame({'review_body': series_reviews}))
df_input_test

In [None]:
df_prediction = use_pipelineModel.transform(df_input_test).select('review_body', 'class.result').toPandas()
df_prediction

In [None]:
# # Predictions based on reload
# predictions = reloaded_model.transform(df_input_test)
# predictions.count()
# predictions.show()

In [None]:
# Zip up for export
!zip -r /content/file.zip /content/model_name/

In [None]:
## DOWNLOAD MODEL
# from google.colab import files
# files.download("/content/file.zip")