## Rio Document Classification Dev 04 (Logistic Regression + hypm) 20%

Data Loading

In [3]:
# Load in one of the tables
dfx00 = spark.sql("select * from default.video_games_5")
dfx01 = spark.sql("select * from default.books_5_small")
dfx02 = spark.sql("select * from default.home_and_kitchen_5_small")
df = dfx00.union(dfx01).union(dfx02)
df = df.sample(False, 0.2, seed=42)
df = df.cache()
print((df.count(), len(df.columns)))

Data Wrangling

In [5]:
# Drop duplicates
print("DF Before duplication removal: ", df.count())
df = df.dropDuplicates(['reviewerID', 'asin', 'reviewTime'])
print("DF After duplication removal: ", df.count())

# Fill in the empty vote column with 0, and convert it to numeric type
from pyspark.sql.types import *
df = df.withColumn("vote", df.vote.cast(IntegerType())) \
                                                 .fillna(0, subset=["vote"])
# Convert Unix timestamp to readable date
from pyspark.sql.functions import from_unixtime, to_date
df = df.withColumn("reviewTime", to_date(from_unixtime(df.unixReviewTime))) \
                                                .drop("unixReviewTime")
# Convert reviewTime to date format
df = df.withColumn('reviewTimeNew', to_date('reviewTime', 'MM dd, yyyy'))

# Feature subsetting
df = df.select(['overall', 'verified', 'reviewTime', 'label', 'reviewText', 'summary'])
print((df.count(), len(df.columns)))

Lexical Features

In [7]:
# Total word count - review text
import pyspark.sql.functions as func
df = df.withColumn('totalWordCount', func.size(func.split(func.col('reviewText'), ' ')))

# Total word count - review text
import pyspark.sql.functions as func
df = df.withColumn('summaryWordCount', func.size(func.split(func.col('summary'), ' ')))

# Day, month and year of review
df = df.withColumn('reviewYear', func.year(func.col('reviewTime')))
df = df.withColumn('reviewMonth', func.month(func.col('reviewTime')))
df = df.withColumn('reviewDayofMonth', func.dayofmonth(func.col('reviewTime')))

Class Weights

In [9]:
import pyspark.sql.functions as func
from pyspark.sql.types import FloatType

# Re-balancing (weighting) of records to be used in the logistic loss objective function
numPositives = df.filter(df["label"] == 1).count()
datasetSize = df.count()
balancingRatio = (datasetSize - numPositives) / datasetSize
print("numPositives   = {}".format(numPositives))
print("datasetSize    = {}".format(datasetSize))
print("balancingRatio = {}".format(balancingRatio))

def calculateWeights(d):
    if d == 1.0:
      return 1 * balancingRatio * 0.95
    else:
      return 1 * (1.0 - balancingRatio * 0.95)
    
udfcalculateWeights = func.udf(calculateWeights, FloatType())
    
df = df.withColumn("classWeightCol", udfcalculateWeights(df["label"]))

Pre-processing & ML Pipeline

In [11]:
from pyspark.ml import Pipeline
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Document Assembler
spark = sparknlp.start()
allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
documentAssembler = DocumentAssembler().setInputCol('reviewText').setOutputCol('document')

## Tokenizer
tokens = Tokenizer().setInputCols(['document']).setOutputCol('tokenized').setTargetPattern('\\w+') \
                    .addException('e-mail').addException('New York').addException('xbox').addException('ps').addException('psp') \
                    .addException('wii').addException('nintendo').addException('sega').addException('icarus')
sentDet = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")
normalizer = Normalizer().setInputCols(['tokenized']).setOutputCol('normalized') \
                         .setLowercase(True)

## Lemmatizer
lemmatizer = LemmatizerModel.pretrained().setInputCols(['normalized']).setOutputCol('lemmatized')

## POS tagger & Chunker
pos_tagger = PerceptronModel.pretrained('pos_anc').setInputCols(['checked', 'sentence']).setOutputCol('pos')
chunker = Chunker().setInputCols(['sentence', 'pos']).setOutputCol('POS_grams').setRegexParsers(allowed_tags)

## N-grams
ngrams_cum2 = NGramGenerator().setInputCols(['checked']).setOutputCol('2grams').setN(2).setEnableCumulative(False)
ngrams_cum3 = NGramGenerator().setInputCols(['checked']).setOutputCol('3grams').setN(3).setEnableCumulative(False)

## Stopwords - English and Spanish
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stopwords_eng = stopwords.words('english') #Other languages need to be added
stopwords_spa = stopwords.words('spanish') #Other languages need to be added

stopwords_cleaner_eng = StopWordsCleaner().setInputCols(['lemmatized']).setOutputCol('no_eng_stop_lemmatized').setStopWords(stopwords_eng)
stopwords_cleaner_spa = StopWordsCleaner().setInputCols(['no_eng_stop_lemmatized']) \
                                           .setOutputCol('no_spa_stop_lemmatized').setStopWords(stopwords_spa)

## Spell Checker - Peter Norvig (https://norvig.com/ngrams/)
spell = NorvigSweetingModel.pretrained() \
        .setInputCols(["no_spa_stop_lemmatized"]) \
        .setOutputCol("checked")

## Reformatting
finisher = Finisher().setInputCols(['checked', 'POS_grams', '2grams', '3grams']).setIncludeMetadata(False) # set to False to remove metadata

## Count Vectorizer
from pyspark.ml.feature import CountVectorizer
tf = CountVectorizer(inputCol='finished_checked', outputCol='tf',
                         vocabSize=7000, minTF=5, maxDF=0.95)

## TF-IDF (uni & bi)
from pyspark.ml.feature import IDF
idf = IDF(inputCol='tf', outputCol='tfidf', minDocFreq=5)

## Assembler
assembler = VectorAssembler(inputCols=["verified", 
                                       "overall", 
                                       "tfidf",
                                       "reviewMonth",
                                       "reviewYear",
                                       "totalWordCount"], outputCol="features")

## Logistic Regression
ml_alg  = LogisticRegression(weightCol="classWeightCol",maxIter=10)
#                             regParam=0.05,
#                             elasticNetParam=0.1,
#                             maxIter=10)

## Pre-proc & ML pipeline
pipeline = Pipeline(stages=[documentAssembler,
                                 sentDet,                
                                 tokens,
                                 normalizer,
                                 lemmatizer,
                                 stopwords_cleaner_eng,
                                 stopwords_cleaner_spa,
                                 spell,
                                 pos_tagger,
                                 chunker,
                                 ngrams_cum2,
                                 ngrams_cum3,
                                 finisher,
                                 tf,
                                 idf,
                                 assembler])


paramGrid = ParamGridBuilder() \
    .addGrid(ml_alg.elasticNetParam, [0.1,0.3,0.5,0.8,0.9,1.0]) \
    .addGrid(ml_alg.regParam, [0.05,0.10, 0.125, 0.15, 0.2, 0.25]) \
    .build()

crossval = CrossValidator(estimator=ml_alg, \
                          estimatorParamMaps=paramGrid, \
                          evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"), \
                          numFolds=5 , parallelism = 64)  

Training/Testing Split

In [13]:
(trainingData, testData) = df.randomSplit([0.8, 0.2], seed = 42)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count:     " + str(testData.count()))

Transform Training Data

In [15]:
pipelineFit = pipeline.fit(trainingData)
trainingData_transform = pipelineFit.transform(trainingData)

Hyperparameter Tuning

In [17]:
mlFit = crossval.fit(trainingData_transform)

met = list(zip(mlFit.avgMetrics, paramGrid))
for m in met:
  print(m)
  print()
  
bestML = mlFit.bestModel
bestParams = bestML.extractParamMap()

Evaluate Testing Data

In [19]:
testData_transform =  pipelineFit.transform(testData)
predictions = bestML.transform(testData_transform)
#display(predictions)

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
pre_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
rec_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
auc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

print("Test Accuracy       = %g" % (acc_evaluator.evaluate(predictions)))
print("Test Precision      = %g" % (pre_evaluator.evaluate(predictions)))
print("Test Recall         = %g" % (rec_evaluator.evaluate(predictions)))
print("Test areaUnderROC   = %g" % (auc_evaluator.evaluate(predictions)))

Predictions on Kaggle Dataset

In [21]:
kaggle_df = spark.sql("select * from default.reviews_kaggle")
kaggle_df = kaggle_df.withColumn("reviewTime", to_date(from_unixtime(kaggle_df.unixReviewTime))) \
                                                .drop("unixReviewTime")
kaggle_df = kaggle_df.withColumn('reviewTimeNew', to_date('reviewTime', 'MM dd, yyyy'))
kaggle_df = kaggle_df.withColumn('totalWordCount', func.size(func.split(func.col('reviewText'), ' ')))
kaggle_df = kaggle_df.withColumn('reviewYear', func.year(func.col('reviewTime')))
kaggle_df = kaggle_df.withColumn('reviewMonth', func.month(func.col('reviewTime')))

print((kaggle_df.count(), len(kaggle_df.columns)))

In [22]:
display(kaggle_df)

reviewID,overall,verified,reviewTime,reviewerID,asin,reviewerName,reviewText,summary,reviewTimeNew,totalWordCount,reviewYear,reviewMonth
67000000,3,True,2012-05-16,A3IXM075VM1P9T,B007JYB3O2,nachtik,"I would say these were 5 regular short stories, nothing exceptional or to be worth recommending to others. I didn't read any full book or novel from this author, maybe that would be much better reading. But these short stories wouldn't convince me to go for it.",average reading,2012-05-16,47,2012,5
67000001,5,True,2012-12-25,A3LGZ7A3WSV3JJ,985719745,MSP,"WOW, DROPPIN DIMES 2 was SOOOOOOO GOOD, BETTER than pt. 1. A JUICY TALE of BETRAYAL, JEALOUSY, OBSESSION, a FEW CRAZY EX'S, TRUE LOVE, CONSEQUENCES to ACTIONS, PATERNITY TESTS, DRAMA, DRAMA, DRAMA, and MORE. ISIS was SOOOOOO SHIESTY. Although MAR'KEL made POOR CHOICES, with 3 POSSIBLE BABY DADDIES, I RESPECTED HER HONESTY. All THREE MEN TRULY LOVED HER, and the BABY. ONE had a FEW ISSUES. Who is the BABY DADDY? Which man will MAR'KEL CHOOSE? or Will the CHOICE be MADE for HER? BUY THIS BOOK and FIND OUT. YOU WILL NOT BE DISAPPOINTED. I was ENGROSSED and INTRIGUED to the VERY END. The way it ENDED, I KNOW a pt. 3 is in the FUTURE. I will be on PINS and NEEDLES ANTICIPATING its RELEASE. CHLOE JE'NORE, TWO THUMBS UP.......",WOW.................,2012-12-25,132,2012,12
67000002,5,True,2012-09-18,A3BF5G7CJNIAG0,B002KXH7PQ,C. S. DeMore,You'll love the pattern for witches in this book--great fun to make wall hangings for friends and family--each one a little bit different--a Buggy Barn trademark. Grat to be able to use my points to get a deal on some of these books!,Another Buggy Barn Delight,2012-09-18,44,2012,9
67000003,5,False,2015-07-27,A2W41RTHSHYC4Q,B011LXVWRO,Light,"For the majority of persons Algarve means beach, sun and fun. However, the author shows us that there are more things to do in Algarve. Actually there are activities for every member of the family, either in group or individually. In this way, parents can visit a medieval castle (Castelo de Silves), board a ferry to go to Spain, play golf, etc. Young people can enjoy all kinds of water activities and surfing, have a pool party or go to the disco in the evening. Children can enjoy Aqualand, a truly fun water park. At the end of the book, the author lists the top 10 bars, top 5 markets, gay spots, top festivals, coffee shops, clubs, hostels, restaurants (vegetarian too), top 10 beaches, churches, parks, etc. The author includes links to the internet so you can get more information. Also, he includes links to maps to locate every site easily. This is a complete and practical guide book!",This is a complete and practical guide book!,2015-07-27,160,2015,7
67000004,5,False,2003-03-29,ACTBQZV1CJ9E8,9706061681,Richard Eastwood,"MI MEJOR AMIGO..QUE ESPANTO ! La chica adelgazaba ( mintiendo a sus padres con que estaba a dieta ) Era una nia buena, pero esta horrible enfermedad se convierte en una obsesion tan seria que las hace mentir para que no las obliguen a comer.. Por desgracia, cuando mi amigo Michael se dio cuenta, ya era demasiado tarde ! SEPULTAMOS A MAGGIE LA SEMANA PASADA, LUEGO DE UNA LUCHA DE TRES AOS ! ES VITAL INFORMARNOS Y ESTE LIBRO LO HACE... A TIEMPO !",LE ACABA DE SUCEDER A LA HIJA MAYOR DE,2003-03-29,84,2003,3
67000005,5,True,2014-04-22,A1RL0VMJVV8Q9N,B00BDAIDQ4,Barbara270,"This is a short story, yet beautifully written. The author wrote with the brilliance of Steinbeck. The story in itself was riveting and well told. Add to the wonderful story the description used, and you have yourself an unforgettable read. I will read more by this author for the beauty of the words he selects. If you are a student of the written word, you're going to love this story.",Beautifully Written,2014-04-22,71,2014,4
67000006,5,False,2015-08-18,A3GXIIUS36J8MC,1943892172,Jeri T. Ryan,"Gamma Rift by Kalli Lanford was a huge surprise to me. It seems to be the start of a new series and I am not sure if it is a debut novel or if this is an author just writing under a new pen name. IF this is a debut novel I have to admit I am totally blown away by this really well developed writing style. Even if some parts of the storyline itself are predictable and remind me of other novels I read, the reader is still drawn totally into it from the first page, as there enough new and fresh elements added to it. I kind of got an idea how it was going to work out fast that I have to admit - but this is not always a bad thing. In this case it let me simply lean back and enjoy the ride as I saw the things develop the way I envisioned them. And the story already gave me a few ideas on how this could be turning out into a series. I will be looking forward into reading more from this author and this series. As I need an answer to the question that has been on my mind since finishing no spoilers but will there be hair in near future ? #laughing",I kind of got an idea how it was going to work out fast that I have to admit - but this is not always a bad thing. In this cas,2015-08-18,224,2015,8
67000007,5,True,2013-12-19,A24WXIBG9QK9ZQ,1493680544,LovesRomance,"Curse of the Thunder Jewel has it all--a fire-breathing dragon, a witch, magic, a curse, a reluctant hero, a prostitute with a good heart who longs to be a lady, and steamy sex. Very steamy sex. Joren and Daleena are thrown together by a magic jewel. He has it. She needs it. On their quest, they begin to realize that what they thought they wanted isn't what they need. Both characters are people who put others before them. Burkheart, an excellent writer, knows how to weave a tale that will keep you turning the pages. The story moved quickly with one adventure after another and with twists and turns that kept me reading. The ending was extremely satisfying, but one I couldn't guess. I recommend this one.",Wow! What an entertaining story,2013-12-19,127,2013,12
67000008,2,True,2008-11-03,A3DW43H13VGRMM,9058562271,MPC Direct Avoider,"Just got the book today and quickly skimmed through it. It's not what i was expecting maybe because my expectation was raised so high based upon after reading the other reviews here. As a professional floral designer, i thinks it's just OK. There are a few inspirations in it but it's not as good as books like ""Paris Floral Interiors"" or ""Meisterflorist"" by Gilles Pothier. His books are much more impressive.",It's OK,2008-11-03,75,2008,11
67000009,3,True,2014-06-07,A2WY3COAMVN3U4,989203700,Miss piggy,"I don't have children as I determined early that it it tough to have a career and be a mother to boot. I see in this book, that yes, the deck is stacked against women with careers who wish to be mothers. Not sure what the answer to this is, but I think the author has some up her sleeve. I wanted the book to be funnier. The book was detailed in report of this adventure, but didn't make me want to run out and do it.......guess it was because there was truth. We had baseball, wine and food, guns in addition to the story of the corporate world and the baby world. Hopefully women will read this before embarking on such a dual roll so at least that are informed of what to expect.",tough to do career and mother roles at the same time,2014-06-07,140,2014,6


In [23]:
kaggle_transform = pipelineFit.transform(kaggle_df)
kaggle_pred = bestML.transform(kaggle_transform)

In [24]:
display(kaggle_pred.select(["reviewID", "prediction"]))

reviewID,prediction
67000000,0.0
67000001,1.0
67000002,0.0
67000003,1.0
67000004,0.0
67000005,0.0
67000006,1.0
67000007,1.0
67000008,0.0
67000009,1.0


### Train the model on the entire dataset

Assemble the complete dataset

In [27]:
df_complete = dfx00.union(dfx01).union(dfx02).cache()
df_complete = df_complete.sample(False, 0.5, seed=42)

In [28]:
df_complete = df_complete.withColumn("reviewTime", to_date(from_unixtime(df_complete.unixReviewTime))) \
                                                .drop("unixReviewTime")
df_complete = df_complete.withColumn('reviewTimeNew', to_date('reviewTime', 'MM dd, yyyy'))
df_complete = df_complete.withColumn('totalWordCount', func.size(func.split(func.col('reviewText'), ' ')))
df_complete = df_complete.withColumn('reviewYear', func.year(func.col('reviewTime')))
df_complete = df_complete.withColumn('reviewMonth', func.month(func.col('reviewTime')))

print((df_complete.count(), len(df_complete.columns)))

Fit and transform the complete dataset

In [30]:
pipelineFit_complete = pipeline.fit(df_complete)
df_complete_transform = pipelineFit_complete.transform(df_complete)

Fit the ML to complete dataset

In [32]:
mlFit_complete = bestML.fit(df_complete_transform)

Apply the fitted model on Kaggle dataset

In [34]:
kaggle_transform_complete = pipelineFit_complete.transform(kaggle_df)
kaggle_pred_complete = mlFit_complete.transform(kaggle_transform_complete)

Display the results

In [36]:
display(kaggle_pred_complete)