### Importing Libraries

In [1]:
#import statements
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.ml.feature import Bucketizer

from pyspark.sql.functions import *

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark.conf.set("spark.sql.caseSensitive", "true")

In [2]:
spark = SparkSession.builder.enableHiveSupport().appName('AmazonData').getOrCreate()
sc = spark.sparkContext

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.historyServer.address', 'cluster-5430-m:18080'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'cluster-5430-m'),
 ('spark.sql.warehouse.dir', 'file:/spark-warehouse'),
 ('spark.executor.memory', '5739m'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.driver.host', 'cluster-5430-m.us-central1-c.c.big-data-sm.internal'),
 ('spark.ui.proxyBase', '/proxy/application_1647055434597_0001'),
 ('spark.history.fs.logDirectory',
  'gs://dataproc-temp-us-central1-599830097340-qhsl44mp/be55f996-47d0-404b-bcac-6b1d3bd744f2/spark-job-history'),
 ('spark.executor.instances', '2'),
 ('spark.app.startTime', '1647055574961'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.sql.autoBroadcastJoinThreshold', '43m'),
 ('spark.submit.deployMode', 'client'),
 ('spark.extraListeners',
  'com.google.cloud.spark.performa

### Importing Datasets

In [9]:
path = 'gs://smbigdata1/main_final.csv'
df6 = spark.read.csv(path)

In [10]:
df6 = df6.withColumnRenamed("_c0", "uniqueID") \
.withColumnRenamed("_c1", "productID") \
.withColumnRenamed("_c2", "overall") \
.withColumnRenamed("_c3", "reviewText") \
.withColumnRenamed("_c4", "reviewTime") \
.withColumnRenamed("_c5", "reviewerID") \
.withColumnRenamed("_c6", "summary") \
.withColumnRenamed("_c7", "unixReviewTime") \
.withColumnRenamed("_c8", "verified") \
.withColumnRenamed("_c9", "Category") \
.withColumnRenamed("_c10", "brand") \
.withColumnRenamed("_c11", "date") \
.withColumnRenamed("_c12", "price") \
.withColumnRenamed("_c13", "rank") \
.withColumnRenamed("_c14", "title") \
.withColumnRenamed("_c15", "timestamp") \
.withColumnRenamed("_c16", "year") \
.withColumnRenamed("_c17", "month") \
.withColumnRenamed("_c18", "count")

In [11]:
df6.count()

                                                                                

209338758

In [12]:
df6.show(5)

+--------------------+----------+-------+--------------------+-----------+--------------+--------------------+--------------+--------+--------------------+-----+------+-----+--------------------+--------------------+-------------------+----+-----+-----+
|            uniqueID| productID|overall|          reviewText| reviewTime|    reviewerID|             summary|unixReviewTime|verified|            Category|brand|  date|price|                rank|               title|          timestamp|year|month|count|
+--------------------+----------+-------+--------------------+-----------+--------------+--------------------+--------------+--------+--------------------+-----+------+-----+--------------------+--------------------+-------------------+----+-----+-----+
|B00001OGXKA3SKEKM...|B00001OGXK|    2.0|The wig was a wil...| 09 3, 2015|A3SKEKMOD182WG|which wasn't grea...|    1441238400|    true|Clothing, Shoes a...| null|5 star| null|823,903inClothing...|Disguise - Adult ...|2015-09-03 00:00:00|20

### Converting review text into Term Frequencies

In [23]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

#tokenize words
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
df6 = tokenizer.transform(df6)

#drop the redundant source column
df6= df6.drop("reviewText")
df6.show(5)

[Stage 67:>                                                         (0 + 1) / 1]

+--------------------+----------+-------+-----------+--------------+--------------------+--------------+--------+----+--------------------+--------------------+------+-----+--------------------+--------------------+----------+-----+--------------------+
|            uniqueID| productID|overall| reviewTime|    reviewerID|             summary|unixReviewTime|verified|vote|            Category|               brand|  date|price|                rank|               title|productID1|count|               words|
+--------------------+----------+-------+-----------+--------------+--------------------+--------------+--------+----+--------------------+--------------------+------+-----+--------------------+--------------------+----------+-----+--------------------+
|1519588135A2KUM7J...|1519588135|    3.0|12 18, 2016|A2KUM7JB88Z2NC|          Not to bad|    1482019200|    true|null|Clothing, Shoes a...|Visit Amazon's Je...|  null|$7.99|   2,024,632inBooks(|One White Lie (Th...|1519588135|  247|[it, w

                                                                                

In [24]:
from pyspark.ml.feature import StopWordsRemover

#remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df6 = remover.transform(df6)

#drop the redundant source column
df6= df6.drop("words")
df6.show(5)

[Stage 82:>                                                         (0 + 1) / 1]

+--------------------+----------+-------+-----------+--------------+--------------------+--------------+--------+----+--------------------+--------------------+------+-----+--------------------+--------------------+----------+-----+--------------------+
|            uniqueID| productID|overall| reviewTime|    reviewerID|             summary|unixReviewTime|verified|vote|            Category|               brand|  date|price|                rank|               title|productID1|count|            filtered|
+--------------------+----------+-------+-----------+--------------+--------------------+--------------+--------+----+--------------------+--------------------+------+-----+--------------------+--------------------+----------+-----+--------------------+
|1519588135A2KUM7J...|1519588135|    3.0|12 18, 2016|A2KUM7JB88Z2NC|          Not to bad|    1482019200|    true|null|Clothing, Shoes a...|Visit Amazon's Je...|  null|$7.99|   2,024,632inBooks(|One White Lie (Th...|1519588135|  247|[page,

                                                                                

In [None]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(df6)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
nlpdf = idfModel.transform(featurizedData)
nlpdf.select("features").show(5, truncate=False)



+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [None]:
from pyspark.ml.feature import StringIndexer

nlpdf_binary = nlpdf

nlpdf_binary = nlpdf_binary.withColumn(
'ratings',
F.when((F.col("overall") <= 5.0) & (F.col("overall") >= 3.0), "1") \
    .when((F.col("overall") < 3.0) & (F.col("overall") >= 0), "0"))

indexer = StringIndexer(inputCol="ratings", outputCol="label")
indexer_model = indexer.fit(nlpdf_binary)
nlpdf_binary=indexer_model.transform(nlpdf_binary)

                                                                                

In [33]:
from pyspark.ml.feature import StringIndexer

nlpdf = nlpdf.withColumn(
'ratings',
F.when((F.col("overall") <= 5.0) & (F.col("overall") > 4.0), "4-5") \
    .when((F.col("overall") <= 4.0) & (F.col("overall") > 3.0), "3-4") \
    .when((F.col("overall") <= 3.0) & (F.col("overall") > 2.0), "2-3") \
    .when((F.col("overall") <= 2.0) & (F.col("overall") > 1.0), "1-2") \
    .when((F.col("overall") <= 1.0) & (F.col("overall") > 0), "0-1") \
)

indexer = StringIndexer(inputCol="ratings", outputCol="label")
indexer_model = indexer.fit(nlpdf)
nlpdf=indexer_model.transform(nlpdf)

                                                                                

In [None]:
#Splitting the data into test and train data
train, test = nlpdf_binary.randomSplit(weights=[0.8,0.2], seed=200)

### Running Binary Logistic Regression and Multi-Class Random Forest

In [None]:
%%time
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol = 'features', labelCol='label')

# Fit the model to the data.
lgrm = lgr.fit(train)

# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test)

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

22/03/12 04:32:40 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/03/12 04:32:40 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

0.8740153695753506




0.816253676344756
CPU times: user 3.18 s, sys: 901 ms, total: 4.09 s
Wall time: 1h 4min


                                                                                

In [None]:
%%time
from pyspark.ml.classification import RandomForestClassifier

#Set parameters for the Random Forest.
rfc = RandomForestClassifier(maxDepth=5, numTrees=15, impurity="gini", labelCol="label", predictionCol="prediction")

#Fit the model to the data.
rfcm = rfc.fit(train)

#Given a dataset, predict each point's label, and show the results.
predictions = rfcm.transform(test)

#print evaluation metrics

# print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
# print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))



NameError: name 'evaluator' is not defined

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

                                                                                

0.6064639638414144




0.4578982756124226


                                                                                