In [0]:
# Load in one of the tables
df = spark.sql("select * from default.reviews_train")
print((df.count(), len(df.columns)))

(3138710, 11)


In [0]:
# For our initial modeling efforts, we are not going to use the following features
drop_list = ['reviewID', 'reviewTime', 'image', 'style', 'reviewerName']
df = df.select([column for column in df.columns if column not in drop_list])
df.show(5)
print((df.count(), len(df.columns)))

+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
|overall|verified|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|
+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
|    5.0|    true|A1HP7NVNPFMA4N|0700026657|This game is a bi...|but when you do i...|    1445040000|    0|
|    5.0|   false|A1REUF3A1YCPHM|0001713353|The King, the Mic...|A story children ...|    1112140800|    0|
|    5.0|    true| A8LUWTIPU9CZB|0560467893|Great product, lo...|          Five Stars|    1446681600|    0|
|    5.0|    true| AVP0HXC9FG790|0001713353|  The kids loved it!|          Five Stars|    1466380800|    0|
|    3.0|    true|A3B6GKQQ1JJ167|0560467893|Pretty flimsy, bu...|                 Meh|    1430956800|    1|
+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
only showing top 5 rows

(31

In [0]:
df = df.na.drop(subset=["overall","reviewText","label","verified","unixReviewTime","summary", "asin", "reviewerID"])
df.show(5)
print((df.count(), len(df.columns)))

+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
|overall|verified|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|
+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
|    5.0|    true|A1HP7NVNPFMA4N|0700026657|This game is a bi...|but when you do i...|    1445040000|    0|
|    5.0|   false|A1REUF3A1YCPHM|0001713353|The King, the Mic...|A story children ...|    1112140800|    0|
|    5.0|    true| A8LUWTIPU9CZB|0560467893|Great product, lo...|          Five Stars|    1446681600|    0|
|    5.0|    true| AVP0HXC9FG790|0001713353|  The kids loved it!|          Five Stars|    1466380800|    0|
|    3.0|    true|A3B6GKQQ1JJ167|0560467893|Pretty flimsy, bu...|                 Meh|    1430956800|    1|
+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+
only showing top 5 rows

(31

In [0]:
# set seed for reproducibility
(trainingData, testingData) = df.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))
(trainingData, td) = trainingData.randomSplit([0.2, 0.8], seed = 42)
# use this for sampling
#trainingData_sample = trainingData.sampleBy("label", fractions={0: 0.05, 1: 0.20}, seed=0)
#print("Training Data Sample Count: " + str(trainingData_sample.count()))
#trainingData_sample.groupBy("label").count().show()
#trainingData = trainingData_sample


Training Dataset Count: 2511519
Test Dataset Count: 626820


In [0]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.annotator import Normalizer as norm

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, StringIndexer, SQLTransformer, IndexToString, VectorAssembler, Normalizer, NGram
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes


In [0]:
document_assembler = DocumentAssembler() \
      .setInputCol("reviewText") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
      
normalizer = norm() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["stem"]) \
    .setOutputCol("lemma")

finisher = Finisher() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)

# Generate Term Frequency
countVectors = CountVectorizer(inputCol="token_features", outputCol="cv_features", vocabSize=10000, minDF=5)
# Generate Inverse Document Frequency weighting
idf = IDF(inputCol="cv_features", outputCol="idfFeatures", minDocFreq=5)

# Generate NGrams and create TF and TF-IDF features
#ngrams = NGram(n=2, inputCol="token_features", outputCol="ngram_features")
#countVectors_ngram = CountVectorizer(inputCol="ngram_features", outputCol="ngram_cv_features", vocabSize=10000, minDF=5)
#idf_ngram = IDF(inputCol="ngram_cv_features", outputCol="ngram_idfFeatures", minDocFreq=5)

assembler = VectorAssembler(inputCols=["verified", "overall", "idfFeatures",   "unixReviewTime"], outputCol="features")

nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            lemmatizer,
            finisher,
            countVectors,
            idf,
            assembler
           ])


# IDEAS ******
# - try chunker
# - try embeddings



lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][ / ][OK!]


In [0]:
pipeline_model = nlp_pipeline.fit(trainingData)
trainingDataTransformed = pipeline_model.transform(trainingData)
trainingDataTransformed.show(5)

+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|overall|verified|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|            document|               token|          normalized|         cleanTokens|                stem|               lemma|      token_features|         cv_features|         idfFeatures|            features|
+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1.0|   false|A10IBHZBK4DLKE|B00004XSC5|ju

In [0]:
import pyspark.sql.functions as F
trainingDataTransformed = trainingDataTransformed.withColumn("summaryLength", F.length("summary"))
trainingDataTransformed = trainingDataTransformed.withColumn("reviewLength", F.length("reviewText"))
#trainingDataTransformed.show(5)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def punctCounts(input_text):
  punctuation = '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
  punct_count = 0
  
  if len(input_text) > 1:
    for char in range(0, len(punctuation)):
      #print(punctuation[char])
      character = punctuation[char]
      count = input_text.count(character)
      punct_count += count
    
  return punct_count

udfPunctCounts = udf(punctCounts, IntegerType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import nltk
from nltk.tokenize import word_tokenize

def wordCounts(input_text):
  words = nltk.word_tokenize(input_text)
  word_count = len(words)
  return word_count

udfWordCounts = udf(wordCounts, IntegerType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import nltk
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize

def avg_sentWords(input_text):
  words = nltk.word_tokenize(input_text)
  sentences = nltk.sent_tokenize(input_text)
  avg_words = len(words)/len(sentences)
  return avg_words

udfAvgSentWords = udf(avg_sentWords, FloatType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import nltk
from nltk.corpus import comparative_sentences

def perc_compWords(input_text):
  comp_count = 0
  comp_keywords = list(comparative_sentences.keywords())
  words = nltk.word_tokenize(input_text)
  for word in words:
    if word in comp_keywords:
      comp_count += 1
    else:
      comp_count += 0
  comp_perc = comp_count/len(words)
  
  return comp_perc

udfCompWordsPercent = udf(perc_compWords, FloatType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import spacy
spacy.cli.download("en_core_web_sm")
nlp = spacy.load('en_core_web_sm')


def ner_spacy(input_text):
  #input_text = "K. Hart is not funny, George Washington was though"
  if nlp(input_text) is None:
    ner_count = 0
  else:
    doc = nlp(input_text)
    ner_count = 0
    len(doc.ents)
    for ent in doc.ents:
      print(ent.label_)
      if 'PERSON' in ent.label_:
        ner_count += 1
      else:
        ner_count += 0
    #print(ner_count)
  
  return ner_count

udfnerSpacy = udf(ner_spacy, IntegerType())

[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from nltk.sentiment.vader import SentimentIntensityAnalyzer

def vaderPos(input_text):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(input_text)
    pos = float(vs['pos'])
    return pos
  
def vaderComp(input_text):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(input_text)
    comp = float(vs['compound'])
    return comp
  
def vaderNeg(input_text):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(input_text)
    neg = float(vs['neg'])
    return neg

udfVaderPos = udf(vaderPos, FloatType())
udfVaderComp = udf(vaderComp, FloatType())
udfVaderNeg = udf(vaderNeg, FloatType())

from textblob import TextBlob
def get_subjectivity(input_text):
    return TextBlob(input_text).sentiment.subjectivity

udfSubjectivity = udf(get_subjectivity, FloatType())

def get_polarity(input_text):
    return TextBlob(input_text).sentiment.polarity

udfPolarity = udf(get_polarity, FloatType())


In [0]:
from pyspark.sql.types import IntegerType

def calc_spellErrors(input_text):
    from nltk.corpus import wordnet as wn
    # Start with an error count of 0
    error_count = 0
    # For each word in the list of words found in each row of the "filtered" column
    for word in input_text:
        # Check if it exists by looking it up, if nothing is returned then the word either doesn't exist or it is likely unhelpful slang
        if (len(wn.synsets(word.lower()))) == 0:
            # Add 1 to the error count for this row and move on to the next word in the list
            error_count +=1
    return error_count

udfSpellingErrors = udf(calc_spellErrors, IntegerType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType
import nltk

def adj_count(data_str):
    adj_counter = 0
    total_counter = 0
    
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    nltk_tags = jj_tags

    # break string into 'words'
    text = data_str.split()
    # tag the text and keep only those with the right tags
    tagged_text = nltk.pos_tag(text)
    for tagged_word in tagged_text:
      total_counter += 1
      if tagged_word[1] in nltk_tags:
            adj_counter += 1
          
    if total_counter == 0:
      adj_perc = 0.0
    else:
      adj_perc = adj_counter/total_counter
    return adj_perc
  
adj_udf = udf(adj_count, FloatType())


def verb_count(data_str):
    verb_counter = 0
    # adjectives
    verb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = verb_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = nltk.pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            verb_counter += 1

    return verb_counter

verb_udf = udf(verb_count, IntegerType())

def noun_count(data_str):
    nouns_counter = 0
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
    nltk_tags = nn_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = nltk.pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            nouns_counter += 1

    return nouns_counter

noun_udf = udf(noun_count, IntegerType())

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, IntegerType
import nltk

def remove_stops(data_str):
  from nltk.corpus import stopwords
  # expects a string
  stops = set(stopwords.words("english"))
  text = data_str.split()

  total_counter = 0
  stop_counter = 0
  for word in text:
    total_counter += 1
    if word in stops:
      stop_counter += 1

  if total_counter == 0:
    stop_perc = 0.0
  else:
    stop_perc = stop_counter/total_counter
  
  return stop_perc

stopwords_udf = udf(remove_stops, FloatType())


In [0]:
from pyspark.sql.functions import col,when,count
from pyspark.sql.window import Window
 
trainingDataTransformed = trainingDataTransformed.withColumn("perc_pos_rating", count(when(trainingDataTransformed.overall>=3, 1))
                             .over(Window.partitionBy("asin")) / count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("asin")))

trainingDataTransformed = trainingDataTransformed.withColumn("perc_neg_rating", 1 - trainingDataTransformed.perc_pos_rating)
#trainingDataTransformed.show()

In [0]:
from pyspark.sql.functions import col,when,count
from pyspark.sql.window import Window
 
trainingDataTransformed = trainingDataTransformed.withColumn("perc_5stars", count(when(trainingDataTransformed.overall==5, 1))
                             .over(Window.partitionBy("asin")) / count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("asin")))

In [0]:
from pyspark.sql.functions import avg
trainingDataTransformed = trainingDataTransformed.withColumn("reviewLength_asin", avg(F.length("reviewText"))
                             .over(Window.partitionBy("asin")) )

trainingDataTransformed = trainingDataTransformed.withColumn("summaryLength_asin", avg(F.length("summary"))
                             .over(Window.partitionBy("asin")) )

In [0]:
from pyspark.sql.functions import min, max
trainingDataTransformed = trainingDataTransformed.withColumn("unixReviewTime_min", min("unixReviewTime").over(Window.partitionBy("asin")))
trainingDataTransformed = trainingDataTransformed.withColumn("earlyReview14d_asin", trainingDataTransformed.unixReviewTime - (trainingDataTransformed.unixReviewTime_min+1209600))
trainingDataTransformed = trainingDataTransformed.withColumn("unixReviewTime_max", max("unixReviewTime").over(Window.partitionBy("asin")))
trainingDataTransformed = trainingDataTransformed.withColumn("recentReview14d_asin", trainingDataTransformed.unixReviewTime - (trainingDataTransformed.unixReviewTime_max-1209600))

In [0]:
trainingDataTransformed = trainingDataTransformed.withColumn("reviewsCount_asin", count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("asin")))

In [0]:
from pyspark.sql.functions import col,when,count
from pyspark.sql.window import Window
 
trainingDataTransformed = trainingDataTransformed.withColumn("perc_verified_asin", count(when(trainingDataTransformed.verified==True, 1))
                             .over(Window.partitionBy("asin")) / count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("asin")))

In [0]:
from pyspark.sql.functions import col,when,count

trainingDataTransformed = trainingDataTransformed.withColumn("reviewerHelpful_perc", count(when(trainingDataTransformed.label==1, 1))
                             .over(Window.partitionBy("reviewerID")) / count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("reviewerID")))

trainingDataTransformed = trainingDataTransformed.withColumn("reviewerHelpful_count", count(when(trainingDataTransformed.label==1, 1))
                             .over(Window.partitionBy("reviewerID")))

trainingDataTransformed = trainingDataTransformed.withColumn("reviewsCount_revID", count(when(trainingDataTransformed.overall>=1, 1)).over(Window.partitionBy("reviewerID")))
trainingDataTransformed = trainingDataTransformed.withColumn("avgReviewLength_revID", avg(F.length("reviewText"))
                             .over(Window.partitionBy("reviewerID")))


# New feature
trainingDataTransformed = trainingDataTransformed.withColumn("avgRatingDiff_revID", trainingDataTransformed.overall - avg(trainingDataTransformed.overall)
                             .over(Window.partitionBy("reviewerID")))


In [0]:
#trainingDataTransformed = trainingDataTransformed.withColumn("reviewPUNCT", udfPunctCounts("reviewText"))
#trainingDataTransformed = trainingDataTransformed.withColumn('RevVader_Pos', udfVaderPos('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('RevVader_Neg', udfVaderNeg('reviewText'))

#trainingDataTransformed = trainingDataTransformed.withColumn('revTB_Subjectivity', udfSubjectivity('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('sumTB_Subjectivity', udfSubjectivity('summary'))
#trainingDataTransformed = trainingDataTransformed.withColumn('sumTB_Polarity', udfPolarity('summary'))
#trainingDataTransformed = trainingDataTransformed.withColumn('revTB_Polarity', udfPolarity('reviewText'))

trainingDataTransformed = trainingDataTransformed.withColumn('spellingErrors', udfSpellingErrors('reviewText'))
trainingDataTransformed = trainingDataTransformed.withColumn('spellingErrors_Sum', udfSpellingErrors('summary'))
#trainingDataTransformed = trainingDataTransformed.withColumn('stopPercent', stopwords_udf('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('wordCounts', udfWordCounts('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('wordCounts_Sum', udfWordCounts('summary'))
#trainingDataTransformed = trainingDataTransformed.withColumn('avgSentWords', udfAvgSentWords('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('compWordsPercent', udfCompWordsPercent('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn("personNER", udfnerSpacy("reviewerName"))
#trainingDataTransformed = trainingDataTransformed.na.fill(value=0,subset=["personNER"])

#trainingDataTransformed = trainingDataTransformed.withColumn('Adjectives', adj_udf('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('Verbs', verb_udf('reviewText'))
#trainingDataTransformed = trainingDataTransformed.withColumn('Nouns', noun_udf('reviewText'))
#trainingDataTransformed.show(5)

In [0]:
# Assemble any vectors into one features column
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#drop_list = ['features2','features','featuresn']
#trainingDataTransformed = trainingDataTransformed.select([column for column in trainingDataTransformed.columns if column not in drop_list])

assembler = VectorAssembler(
    inputCols=['features', 'summaryLength', 'spellingErrors', 'perc_pos_rating', 'perc_neg_rating', 'reviewLength_asin', "summaryLength_asin",
               'earlyReview14d_asin', 'recentReview14d_asin', 'reviewLength', 'spellingErrors_Sum', 'reviewsCount_asin','perc_verified_asin',
               "perc_5stars", "reviewsCount_revID"],
    outputCol='features2')

trainingDataTransformed = assembler.transform(trainingDataTransformed)
trainingDataTransformed=trainingDataTransformed.withColumnRenamed("features","featuresn").withColumnRenamed("features2","features") 
trainingDataTransformed.show()

+-------+--------+--------------+----------+--------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+-----------------+-------------------+--------------------+---------------------+------------------+---------------------+-------------------+--------------+------------------+--------------------+
|overall|verified|    reviewerID|      asin|          reviewText|             summary|unixReviewTime|label|            document|               token|          normalized|         cleanTokens|                stem|               lemma|      token_features|         cv_features|     

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from sparkdl.xgboost import XgboostClassifier

xgbClass = XgboostClassifier()

xgb_paramGrid = (ParamGridBuilder()
             .addGrid(xgbClass.missing, [0.0])
             .addGrid(xgbClass.learning_rate, [0.1])
             .addGrid(xgbClass.n_estimators, [ 200,500,800])
#             .addGrid(xgbClass.reg_lambda, [0, 0.3, 0.5, 1])
             .addGrid(xgbClass.reg_alpha, [ 0.5,10,20 ]) 
             .addGrid(xgbClass.colsample_bytree, [0.5,  1])
             .addGrid(xgbClass.colsample_bylevel, [0.5,  1])
             .build())
xgb_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
xgbcv = CrossValidator(estimator = xgbClass,
                    estimatorParamMaps = xgb_paramGrid,
                    evaluator = xgb_evaluator,
                    numFolds = 2)

#xgb_CVModel = xgbcv.fit(trainingDataTransformed)
xgboostModel  = xgbcv.fit(trainingDataTransformed)

In [0]:
xgboostModel.bestModel 

Out[43]: XgboostClassifierModel_a72f97f0bc36

In [0]:
zip(xgboostModel.avgMetrics, xgb_paramGrid)

Out[44]: <zip at 0x7f4879606740>

In [0]:
preTest = pipeline_model.transform(testingData)

preTest = preTest.withColumn("reviewLength", F.length("reviewText"))
preTest = preTest.withColumn("summaryLength", F.length("summary"))

preTest = preTest.withColumn("perc_pos_rating", count(when(preTest.overall>=3, 1))
                             .over(Window.partitionBy("asin")) / count(when(preTest.overall>=1, 1)).over(Window.partitionBy("asin")))
preTest = preTest.withColumn("perc_neg_rating", 1 - preTest.perc_pos_rating)

preTest = preTest.withColumn("perc_5stars", count(when(preTest.overall==5, 1))
                             .over(Window.partitionBy("asin")) / count(when(preTest.overall>=1, 1)).over(Window.partitionBy("asin")))
preTest = preTest.withColumn("perc_1stars", count(when(preTest.overall==1, 1))
                             .over(Window.partitionBy("asin")) / count(when(preTest.overall>=1, 1)).over(Window.partitionBy("asin")))
preTest = preTest.withColumn("perc_5or1stars", preTest.perc_5stars+preTest.perc_1stars)

preTest = preTest.withColumn("reviewLength_asin", avg(F.length("reviewText"))
                             .over(Window.partitionBy("asin")) )
preTest = preTest.withColumn("summaryLength_asin", avg(F.length("summary"))
                             .over(Window.partitionBy("asin")) )

preTest = preTest.withColumn("reviewsCount_asin", count(when(preTest.overall>=1, 1)).over(Window.partitionBy("asin")))

preTest = preTest.withColumn("perc_verified_asin", count(when(preTest.verified==True, 1))
                             .over(Window.partitionBy("asin")) / count(when(preTest.overall>=1, 1)).over(Window.partitionBy("asin")))

preTest = preTest.withColumn("unixReviewTime_min", min("unixReviewTime").over(Window.partitionBy("asin")))
preTest = preTest.withColumn("earlyReview14d_asin", preTest.unixReviewTime - (preTest.unixReviewTime_min+1209600))
preTest = preTest.withColumn("unixReviewTime_max", max("unixReviewTime").over(Window.partitionBy("asin")))
preTest = preTest.withColumn("recentReview14d_asin", preTest.unixReviewTime - (preTest.unixReviewTime_max-1209600))


preTest = preTest.withColumn("reviewerHelpful_perc", count(when(preTest.label==1, 1))
                             .over(Window.partitionBy("reviewerID")) / count(when(preTest.overall>=1, 1)).over(Window.partitionBy("reviewerID")))

preTest = preTest.withColumn("reviewerHelpful_count", count(when(preTest.label==1, 1))
                             .over(Window.partitionBy("reviewerID")))

preTest = preTest.withColumn("reviewsCount_revID", count(when(preTest.overall>=1, 1)).over(Window.partitionBy("reviewerID")))
preTest = preTest.withColumn("avgReviewLength_revID", avg(F.length("reviewText"))
                             .over(Window.partitionBy("reviewerID")) )


#preTest = preTest.withColumn('RevVader_Pos', udfVaderPos('reviewText'))
#preTest = preTest.withColumn('RevVader_Neg', udfVaderNeg('reviewText'))

#preTest = preTest.withColumn('sumTB_Polarity', udfPolarity('summary'))
#preTest = preTest.withColumn('revTB_Polarity', udfPolarity('reviewText'))
#preTest = preTest.withColumn('sumTB_Subjectivity', udfSubjectivity('summary'))
#preTest = preTest.withColumn('revTB_Subjectivity', udfSubjectivity('reviewText'))

preTest = preTest.withColumn('spellingErrors', udfSpellingErrors('reviewText'))
preTest = preTest.withColumn('spellingErrors_Sum', udfSpellingErrors('summary'))
#preTest = preTest.withColumn('stopPercent', stopwords_udf('reviewText'))
#preTest = preTest.withColumn('wordCounts', udfWordCounts('reviewText'))
#preTest = preTest.withColumn('wordCounts_Sum', udfWordCounts('summary'))
#preTest = preTest.withColumn('avgSentWords', udfAvgSentWords('reviewText'))
#preTest = preTest.withColumn('compWordsPercent', udfCompWordsPercent('reviewText'))
#preTest = preTest.withColumn("personNER", udfnerSpacy("reviewerName"))
#preTest = preTest.na.fill(value=0,subset=["personNER"])

#preTest = preTest.withColumn('Adjectives', adj_udf('reviewText'))

preTest = assembler.transform(preTest)
preTest = preTest.withColumnRenamed("features","featuresn").withColumnRenamed("features2","features") 

#LR_predictions =  logRegModel.transform(preTest)
#LGBM_predictions =  lgbm_model.transform(preTest)
XGB_predictions =  xgboostModel.transform(preTest)
#NB_predictions =  nbModel.transform(preTest)
#rfc_predictions =  randForestModel.transform(preTest)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

auc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC",labelCol="label")

#print("Logistic Reg. Test areaUnderROC   = %g" % (auc_evaluator.evaluate(LR_predictions)))
#print("LightGBM Test areaUnderROC   = %g" % (auc_evaluator.evaluate(LGBM_predictions)))
print("XGBoost Test areaUnderROC   = %g" % (auc_evaluator.evaluate(XGB_predictions)))
#print("NaiveBayes Test areaUnderROC   = %g" % (auc_evaluator.evaluate(NB_predictions)))

XGBoost Test areaUnderROC   = 0.900575


In [0]:
# Load in the tables
test_df = spark.sql("select * from default.reviews_test")
test_df.show(5)
print((test_df.count(), len(test_df.columns)))

+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|reviewID|overall|verified| reviewTime|    reviewerID|      asin|reviewerName|          reviewText|             summary|unixReviewTime|
+--------+-------+--------+-----------+--------------+----------+------------+--------------------+--------------------+--------------+
|80000001|    4.0|   false|07 27, 2015|A1JGAP0185YJI6|0700026657|      travis|I played it a whi...|But in spite of t...|    1437955200|
|80000002|    5.0|    true| 03 3, 2014|A1WK5I4874S3O2|0700026657|  WhiteSkull|I bought this gam...|A very good game ...|    1393804800|
|80000003|    5.0|    true|01 12, 2013|A1YDQQJDRHM0FJ|0001713353|       Leila|I am very happy w...|One of our famili...|    1357948800|
|80000004|    5.0|    true|11 20, 2011|A2E6AHFDJ3JBAZ|0681795107|    robosolo|I purchased two o...|Insulated stainle...|    1321747200|
|80000005|    5.0|   false|06 28, 2011|A38NXTZUF

In [0]:
test_df = test_df.na.fill(value="None",subset=["summary"])
#test_df.describe().show()

In [0]:
test_df_Transform = pipeline_model.transform(test_df)

test_df_Transform = test_df_Transform.withColumn("reviewLength", F.length("reviewText"))
test_df_Transform = test_df_Transform.withColumn("summaryLength", F.length("summary"))

test_df_Transform = test_df_Transform.withColumn("perc_pos_rating", count(when(test_df_Transform.overall>=3, 1))
                             .over(Window.partitionBy("asin")) / count(when(test_df_Transform.overall>=1, 1)).over(Window.partitionBy("asin")))
test_df_Transform = test_df_Transform.withColumn("perc_neg_rating", 1 - test_df_Transform.perc_pos_rating)

test_df_Transform = test_df_Transform.withColumn("perc_5stars", count(when(test_df_Transform.overall==5, 1))
                             .over(Window.partitionBy("asin")) / count(when(test_df_Transform.overall>=1, 1)).over(Window.partitionBy("asin")))

test_df_Transform = test_df_Transform.withColumn("reviewLength_asin", avg(F.length("reviewText"))
                             .over(Window.partitionBy("asin")) )
test_df_Transform = test_df_Transform.withColumn("summaryLength_asin", avg(F.length("summary"))
                             .over(Window.partitionBy("asin")) )

test_df_Transform = test_df_Transform.withColumn("reviewsCount_asin", count(when(test_df_Transform.overall>=1, 1)).over(Window.partitionBy("asin")))

test_df_Transform = test_df_Transform.withColumn("perc_verified_asin", count(when(test_df_Transform.verified==True, 1))
                             .over(Window.partitionBy("asin")) / count(when(test_df_Transform.overall>=1, 1)).over(Window.partitionBy("asin")))

test_df_Transform = test_df_Transform.withColumn("reviewsCount_revID", count(when(test_df_Transform.overall>=1, 1)).over(Window.partitionBy("reviewerID")))
test_df_Transform = test_df_Transform.withColumn("avgRatingDiff_revID", test_df_Transform.overall - avg(test_df_Transform.overall)
                             .over(Window.partitionBy("reviewerID")))

test_df_Transform = test_df_Transform.withColumn("unixReviewTime_min", min("unixReviewTime").over(Window.partitionBy("asin")))
test_df_Transform = test_df_Transform.withColumn("earlyReview14d_asin", test_df_Transform.unixReviewTime - (test_df_Transform.unixReviewTime_min+1209600))
test_df_Transform = test_df_Transform.withColumn("unixReviewTime_max", max("unixReviewTime").over(Window.partitionBy("asin")))
test_df_Transform = test_df_Transform.withColumn("recentReview14d_asin", test_df_Transform.unixReviewTime - (test_df_Transform.unixReviewTime_max-1209600))

#test_df_Transform = test_df_Transform.withColumn('SumVader_CompScore', udfSentimentScore('summary'))
#test_df_Transform = test_df_Transform.withColumn('RevVader_CompScore', udfSentimentScore('reviewText'))

#test_df_Transform = test_df_Transform.withColumn('sumTB_Polarity', udfPolarity('summary'))
#test_df_Transform = test_df_Transform.withColumn('revTB_Polarity', udfPolarity('reviewText'))
#test_df_Transform = test_df_Transform.withColumn('sumTB_Subjectivity', udfSubjectivity('summary'))
#test_df_Transform = test_df_Transform.withColumn('revTB_Subjectivity', udfSubjectivity('reviewText'))

#test_df_Transform = test_df_Transform.withColumn("reviewPUNCT", udfPunctCounts("reviewText"))
test_df_Transform = test_df_Transform.withColumn('stopPercent', stopwords_udf('reviewText'))
test_df_Transform = test_df_Transform.withColumn('spellingErrors', udfSpellingErrors('reviewText'))
test_df_Transform = test_df_Transform.withColumn('spellingErrors_Sum', udfSpellingErrors('summary'))

test_df_Transform = assembler.transform(test_df_Transform)
test_df_Transform = test_df_Transform.withColumnRenamed("features","featuresn").withColumnRenamed("features2","features")

#test_df_Transform.show(5)

In [0]:
predictions = xgboostModel.transform(test_df_Transform)
#predictions = xgb_CVModel.transform(test_df_Transform)
#predictions = lgbm_model.transform(test_df_Transform)
#predictions = lgbm_CVModel.transform(test_df_Transform)

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [0]:
display(submission_data.select('reviewID', 'label'))

reviewID,label
80000188,0.8445229
80000179,0.55056906
80000110,0.30708554
80000129,0.02440631
80000084,0.009590828
80000187,0.1313674
80000089,0.29014072
80000121,0.004933016
80000193,0.22191742
80000158,0.111468635


In [0]:
submission_data.write.csv("/FileStore/my-stuff/sparknlp_xgb_finalsz2.csv")

In [0]:
#dbutils.fs.put("/FileStore/my-stuff/sparknlp_xgb_finalsz.csv","ok")

In [0]:
#dbfs cp "dbfs:/FileStore/my-stuff/sparknlp_xgb_finalsz.csv" "C:\Users\sherr\Downloads"