In [0]:
# Load in one of the tables
df3 = spark.sql("select * from default.reviews_train")
print((df2.count(), len(df2.columns)))


In [0]:
print("Before duplication removal: ", df3.count())
df2 = df3.dropDuplicates(['reviewerID', 'asin'])
print("After duplication removal: ", df2.count())

In [0]:
df2.printSchema()
df2.show(5)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
df2 = df2.withColumn('date', from_unixtime('unixReviewTime').cast(DateType()))\
.withColumn("Year", year(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("Month", month(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("Quarter", quarter(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("date",to_timestamp(col("date"))).withColumn("D_O_M", date_format(col("date"), "d"))


In [0]:
df2 = df2.withColumn("D_O_W", dayofweek(df2.date))

In [0]:
#Unreadable date format
#df3 = df2.withColumn('reviewTime',to_date(df2.reviewTime, 'yyyy-MM-dd'))

In [0]:

#from pyspark.sql.functions import *
#df3 = df2.withColumn('reviewTime',to_date(df2.reviewTime, 'yyyy-MM-dd'))\
#.withColumn('day_of_week',dayofweek(df3.reviewTime))\
#.withColumn('Month',month(df4.reviewTime))\
#.withColumn('Year',year(df5.reviewTime))\
#.withColumn('Quarter',quarter(df6.reviewTime))\
#df3.show(5)

In [0]:
df2.printSchema()

In [0]:
drop_list = ['reviewTime','reviewID','unixReviewTime']
df = df2.select([column for column in df2.columns if column not in drop_list])
df.show(5)

In [0]:
# Let's look at some quick summary statistics
df.describe().show()

In [0]:
# The count of each overall rating
# 5.0 Has the most count
from pyspark.sql.functions import col
df.groupBy("overall").count().orderBy(col("overall").asc()).show()
display(df.groupBy("overall").count().orderBy("overall"))

overall,count
1.0,148266
2.0,128170
3.0,245156
4.0,514177
5.0,1868392


In [0]:
# The most common product IDs
df.groupBy("reviewerID").count().orderBy(col("count").desc()).show(10)
display(df.groupBy("reviewerID").count().orderBy(col("count").desc()).head(50))

reviewerID,count
A3V6Z4RCDGRC44,742
AJKWF4W7QD4NS,637
A2F6N60Z96CAJI,560
A3W4D8XOGLWUN5,446
A2TCG2HV1VJP6V,391
A2QHS1ZCIQOL7E,389
A29BQ6B90Y1R5F,367
A1K1JW1C5CUSUZ,329
A2582KMXLK2P06,317
A119Q9NFGVOEJZ,297


In [0]:
# The most common product IDs
df.groupBy("asin").count().orderBy(col("count").desc()).show(10)
display(df.groupBy("asin").count().orderBy(col("count").desc()).head(50))

asin,count
000711835X,15354
0007420412,15103
0007548672,14316
0007444117,7058
0007350899,6599
0007378033,5693
006017322X,4655
0007384289,4216
0002247399,4119
0007155662,4067


In [0]:
#Class Imbalance
display(df.groupBy("label").count().orderBy("label"))

label,count
0,2377916
1,526245


In [0]:
#Check for Null 
df_Columns= ["overall","verified","reviewerID","asin","Year","Month","quarter","reviewerName","reviewText","summary","label"]
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df_Columns]).show()

In [0]:
df.printSchema()

In [0]:
#df = df.na.drop(subset=["overall","verified","reviewerID","asin","Year","Month","quarter","reviewerName","reviewText","summary","label"])
df = df.na.drop()
df.show(5)
print((df.count(), len(df.columns)))

In [0]:
from pyspark.sql.types import *
#Change Verified from Boolean to Integer
df = df.withColumn("verified", df["verified"].cast(IntegerType()))\
.withColumn("D_O_M", df["D_O_M"].cast(IntegerType()))\
.withColumn("overall", df["overall"].cast(IntegerType()))\
.withColumn("D_O_W", df["D_O_W"].cast(IntegerType()))

In [0]:
df.printSchema()
df.show(5)
print((df.count(), len(df.columns)))

In [0]:
# set seed for reproducibility
(trainingData, testingData) = df.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))

In [0]:
#combine all the preprocessing steps into a pipeline.
# That way, we can run the same steps on both the training data, and testing data and beyond (new data)
# without copying and pasting any code.

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, NGram, VectorAssembler, StringIndexer, OneHotEncoder,HashingTF, IDF, FeatureHasher 
import re
from sparknlp.annotator import ContextSpellCheckerModel
from nltk.stem import WordNetLemmatizer

# We'll tokenize the text using a simple RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")
regexTokenizer1 = RegexTokenizer(inputCol="summary", outputCol="s_words", pattern="\\W")
regexTokenizer2 = RegexTokenizer(inputCol="reviewerName", outputCol="r_words", pattern="\\W")

#n-grams
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngram1 = NGram(n=2,inputCol="s_words",outputCol="s_ngrams")
ngram3 = NGram(n=3, inputCol="words", outputCol="n3grams")
ngram13 = NGram(n=3,inputCol="s_words",outputCol="s3_ngrams")
ngram4 = NGram(n=4, inputCol="words", outputCol="n4grams")
ngram14 = NGram(n=4,inputCol="s_words",outputCol="s4_ngrams")

# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="ngrams", outputCol="filtered")
stopwordsRemover1 = StopWordsRemover(inputCol="s_ngrams", outputCol="s_filtered")
stopwordsRemover3 = StopWordsRemover(inputCol="n3grams", outputCol="filtered_ng3")
stopwordsRemover13 = StopWordsRemover(inputCol="s3_ngrams", outputCol="s_filtered_ng3")
stopwordsRemover4 = StopWordsRemover(inputCol="n4grams", outputCol="filtered_ng4")
stopwordsRemover14 = StopWordsRemover(inputCol="s4_ngrams", outputCol="s_filtered_ng4")

# Vectorize the sentences using simple BOW method. Other methods are possible:
CountV = CountVectorizer(inputCol="filtered", outputCol="rawFeatures",vocabSize=10000, minDF=5)
CountV1 = CountVectorizer(inputCol="s_filtered", outputCol="s_rawFeatures", vocabSize=10000, minDF=5)
CountV2 = CountVectorizer(inputCol="r_words", outputCol="r_rawFeatures", vocabSize=10000, minDF=5)
CountV3 = CountVectorizer(inputCol="filtered_ng3", outputCol="rawFeaturesng3", minDF=5)
CountV13 = CountVectorizer(inputCol="s_filtered_ng3", outputCol="s_rawFeaturesng3", minDF=5)
CountV4 = CountVectorizer(inputCol="filtered_ng4", outputCol="rawFeaturesng4", minDF=5)
CountV14 = CountVectorizer(inputCol="s_filtered_ng4", outputCol="s_rawFeaturesng4", minDF=5)

#idf
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf1 = IDF(inputCol="s_rawFeatures", outputCol="s_features")
idf2 = IDF(inputCol="r_rawFeatures", outputCol="r_features")
idfng3 = IDF(inputCol="rawFeaturesng3", outputCol="featuresng3")
idf1ng3 = IDF(inputCol="s_rawFeaturesng3", outputCol="s_featuresng3")
idfng4 = IDF(inputCol="rawFeaturesng4", outputCol="featuresng4")
idf1ng4 = IDF(inputCol="s_rawFeaturesng4", outputCol="s_featuresng4")

#encode Overall
ndexer = StringIndexer(inputCols=["overall"], outputCols=["overall_ndexer"], handleInvalid="keep")
encoder = OneHotEncoder(inputCols=["overall_ndexer"],
                        outputCols=["overall_encode"])

ndexer1 = StringIndexer(inputCols=["asin"], outputCols=["asin_ndexer"], handleInvalid="keep")
encoder1 = OneHotEncoder(inputCols=["asin_ndexer"],
                        outputCols=["asin_encode"])

ndexer2 = StringIndexer(inputCols=["reviewerID"], outputCols=["reviewerID_ndexer"], handleInvalid="keep")
encoder2 = OneHotEncoder(inputCols=["reviewerID_ndexer"],
                        outputCols=["reviewerID_encode"])

ndexer3 = StringIndexer(inputCols=["verified"], outputCols=["verified_ndexer"], handleInvalid="keep")
encoder3 = OneHotEncoder(inputCols=["verified_ndexer"],
                        outputCols=["verified_encode"])


#Year Month 
hasher = FeatureHasher(inputCols=["Year", "Month", "Quarter","D_O_M","D_O_W"],
                       outputCol="features_date")

#Vector Assembler
vector_Assem = VectorAssembler(inputCols = ['features','s_features','r_features','overall_encode','asin_encode','reviewerID_encode','verified_encode','features_date'] , outputCol = "features3")

pipeline = Pipeline(stages=[regexTokenizer, regexTokenizer1,regexTokenizer2, ngram, ngram1,ngram3, ngram13,ngram4, ngram14, stopwordsRemover, stopwordsRemover1,stopwordsRemover3,stopwordsRemover13,stopwordsRemover4,stopwordsRemover14, CountV, CountV1, CountV2,CountV3,CountV13,CountV4,CountV14,idf, idf1, idf2,idfng3,idf1ng3,idfng4,idf1ng4, ndexer,encoder, ndexer1,encoder1, ndexer2,encoder2,ndexer3,encoder3,hasher, vector_Assem])


In [0]:
# Fit the pipeline to training documents.

pipelineFit = pipeline.fit(trainingData)
trainingDataTransformed = pipelineFit.transform(trainingData)
trainingDataTransformed.show(5)

In [0]:
from pyspark.ml.classification import LogisticRegression

# More classification docs: https://spark.apache.org/docs/latest/ml-classification-regression.html

lr = LogisticRegression(maxIter=20, regParam=0.1, elasticNetParam=0, featuresCol='features3', labelCol='label')
lrModel = lr.fit(trainingDataTransformed)

In [0]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

print("Training Accuracy:  " + str(trainingSummary.accuracy))
print("Training Precision: " + str(trainingSummary.precisionByLabel))
print("Training Recall:    " + str(trainingSummary.recallByLabel))
print("Training FMeasure:  " + str(trainingSummary.fMeasureByLabel()))
print("Training AUC:       " + str(trainingSummary.areaUnderROC))

In [0]:
trainingSummary.roc.show()

In [0]:
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
for objective in objectiveHistory:
    print(objective)

In [0]:
testingDataTransform = pipelineFit.transform(testingData)
testingDataTransform.show(5)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(testingDataTransform)
#predictions.show(5)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
#print('Test Area Under ROC', evaluator.evaluate(predictions))

In [0]:
# Load in the tables
test_df = spark.sql("select * from default.reviews_test")
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

In [0]:
test_df = test_df.fillna(value=' ')

In [0]:

test_df = test_df.withColumn('date', from_unixtime('unixReviewTime').cast(DateType()))\
.withColumn("Year", year(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("Month", month(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("Quarter", quarter(from_unixtime('unixReviewTime').cast(DateType())))\
.withColumn("date",to_timestamp(col("date"))).withColumn("D_O_M", date_format(col("date"), "d"))

In [0]:
test_df = test_df.withColumn("D_O_W", dayofweek(test_df.date))

In [0]:
test_df = test_df.withColumn("verified", test_df["verified"].cast(IntegerType()))\
.withColumn("D_O_M", test_df["D_O_M"].cast(IntegerType()))\
.withColumn("overall", test_df["overall"].cast(IntegerType()))\
.withColumn("D_O_W", test_df["D_O_W"].cast(IntegerType()))

In [0]:
drop_list = ['date']
test_df = test_df.select([column for column in test_df.columns if column not in drop_list])
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

In [0]:
test_df.printSchema()

In [0]:
test_df_Transform = pipelineFit.transform(test_df)
test_df_Transform.show(5)

In [0]:
predictions = lrModel.transform(test_df_Transform)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
display(submission_data.select('reviewID', 'label'))

reviewID,label
80000001,0.034335315
80000002,0.10139176
80000003,0.10573602
80000004,0.90549487
80000005,0.41153896
80000006,0.12408637
80000007,0.05657063
80000008,0.42416927
80000009,0.0851287
80000010,0.328407
