In [78]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [79]:
sc

## 1. Data Loading and initial pre-processing

In [80]:
# start with easy implemetation: only consider the content of the 2 fields review_title and review_text
# concantenate them in one new field "review_concat"from pyspark.sql import SQLContext
from pyspark.sql import functions as fn
from pyspark.sql.types import IntegerType
import pandas as pd

filepath = 'data_processed/ExctractedData.json'
# load JSON file
s_df = spark.read.json(filepath)
s_df.count()
s_df = s_df.drop_duplicates(subset=['review_id'])
pd_df = s_df.groupBy('review_id').count().toPandas().set_index("count").sort_index(ascending=False)

In [81]:
# control no duplicate
pd_df.head()

Unnamed: 0_level_0,review_id
count,Unnamed: 1_level_1
1,R15DG6BI3K1I78
1,R1UU50BM0S4LPY
1,R27KEMBTEQ4MHI
1,R1HMP34XP1V9BE
1,R22I2JYOOXA3PP


In [82]:
# concatenate review text and title in one field
s_df = s_df.withColumn('review_concat',fn.concat(fn.col('review_title'),fn.lit(' '), fn.col('review_text')))
# review_score is of type String ==> cast it from String to Integer
s_df = s_df.withColumn("review_score", s_df["review_score"].cast(IntegerType()))

# add new binary score (0 or 1), 
# 1 to 2 stars = 0 and 3 to 5 stars = 1
from pyspark.sql.functions import udf
def scoreToBin(value):
   if   value < 4: return 0
   else : return 1
udfScoreToBin = udf(scoreToBin, IntegerType())
s_df = s_df.withColumn("bin_score", udfScoreToBin("review_score"))
s_df.printSchema()

root
 |-- book_id: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_user: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- review_concat: string (nullable = true)
 |-- bin_score: integer (nullable = true)



In [83]:
# control that the function is properly working
s_df.where(fn.col('review_score') == 3).first()

Row(book_id='0062678426', book_title='The Woman in the Window: A Novel', review_id='R1HMP34XP1V9BE', review_score=3, review_text='I wanted this to be better, it started so strong and then lost itself in the last third -to predictability.', review_title='Good, but not as good as the hype', review_user='Amazon Customer', timestamp=1557521653, review_concat='Good, but not as good as the hype I wanted this to be better, it started so strong and then lost itself in the last third -to predictability.', bin_score=0)

In [84]:
print('Total # of rows: ' + str(s_df.count()))
print('# of rows per review score:')
s_df.groupBy("review_score") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()
print('# of rows per BINARY review score:')
s_df.groupBy("bin_score") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

Total # of rows: 11573
# of rows per review score:
+------------+-----+
|review_score|count|
+------------+-----+
|           5| 9383|
|           4| 1529|
|           3|  346|
|           2|  170|
|           1|  145|
+------------+-----+

# of rows per BINARY review score:
+---------+-----+
|bin_score|count|
+---------+-----+
|        1|10912|
|        0|  661|
+---------+-----+



# 2. Creating data sets
Training, validation and test set + upsampling of training set

In [85]:
# Now make a new stratified split 70-10-20% with same proportion of bin_score 0 and 1
training_strat_df = s_df.sampleBy("bin_score", fractions={0: 0.8, 1: 0.8}, seed=42)
test_strat_df = s_df.subtract(training_strat_df)

# show some stats
# training sets
print('# rows training set: ' + str(training_strat_df.count()))
print('# rows per class')
training_strat_df.groupBy("bin_score") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()
# test set
print('# rows test set: ' + str(test_strat_df.count()))
print('# rows per class')
test_strat_df.groupBy("bin_score") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

# rows training set: 9191
# rows per class
+---------+-----+
|bin_score|count|
+---------+-----+
|        1| 8670|
|        0|  521|
+---------+-----+

# rows test set: 2382
# rows per class
+---------+-----+
|bin_score|count|
+---------+-----+
|        1| 2242|
|        0|  140|
+---------+-----+



In [None]:
# perform up sampling on the trainig to increase the number of reviews with bin_score = 0
# increase with a factor 5 to get above 2500 reviews with bin_score = 0
df_class_0 = training_strat_df[training_strat_df['bin_score'] == 0]
df_class_0_over = df_class_0.sample(withReplacement=True, fraction=5., seed = 42)

df_class_1 = training_strat_df[training_strat_df['bin_score'] == 1]

import functools 
def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

training_up_df = unionAll([df_class_0_over, df_class_1])

print('# of rows in the train set')
print('Total: ' + str(training_up_df.count()))
print('Per class:')
training_up_df.groupBy("bin_score") \
    .count() \
    .orderBy(fn.col("count").desc()) \
    .show()

# of rows in the train set
Total: 11207
Per class:
+---------+-----+
|bin_score|count|
+---------+-----+
|        1| 8670|
|        0| 2537|
+---------+-----+



# 3. Defining featurization pipeline
Will generate the inputs for the binary classifier (Logistic regression)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import IDF, RegexTokenizer, StringIndexer, StopWordsRemover, CountVectorizer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
import nltk
from nltk.corpus import stopwords 

# import nltk stop words
nltk.download('stopwords')
stop_words = list(set(stopwords.words('english')))

# 1. String indexer: convert book_id (string) to unique numeric undex
book_stringIdx = StringIndexer() \
    .setHandleInvalid("keep")\
    .setInputCol("book_id")\
    .setOutputCol("book_label")

# 2. Tokenizer, .setPattern("\\p{L}+") means that it remove accent from words
regex_tokenizer = RegexTokenizer()\
    .setGaps(False)\
    .setPattern("\\p{L}+")\
    .setInputCol("review_concat")\
    .setOutputCol("words")

# 3. Filter out stop words
stopword_remover = StopWordsRemover()\
    .setStopWords(stop_words)\
    .setCaseSensitive(False)\
    .setInputCol("words")\
    .setOutputCol("filtered")

# 4. TF: TF vectorization + remove words that appear in 5 docs or less
# converts text documents to vectors of term counts
count_vectorizer = CountVectorizer(minDF=5)\
    .setInputCol("filtered")\
    .setOutputCol("tf")

# 5. TF-IDF transform
# The IDFModel takes feature vectors (created from CountVectorizer) and scales each column. 
# Intuitively, it down-weights columns which appear frequently in a corpus.
idf = IDF()\
    .setInputCol("tf")\
    .setOutputCol("tfidf")

# 6. Feature assembler
# assemble tfidf textual features with book_label
assembler = VectorAssembler(inputCols=['tfidf','book_label'],outputCol="tfidf_book")

[nltk_data] Downloading package stopwords to /Users/admin/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# utility function to calculate and print prediction results
def printClassPredictions(predictions):
    predictions.select(fn.expr('float(prediction = bin_score)').alias('correct')).\
        select(fn.avg('correct')).show()
    print('bin_score = 0')
    predictions.filter(predictions['bin_score'] == 0).\
        select(fn.expr('float(prediction = bin_score)').alias('correct')).\
        select(fn.avg('correct')).show()
    print('bin_score = 1')
    predictions.filter(predictions['bin_score'] == 1).\
        select(fn.expr('float(prediction = bin_score)').alias('correct')).\
        select(fn.avg('correct')).show()

## 4. Model training

In [None]:
lr = LogisticRegression(featuresCol=assembler.getOutputCol(), labelCol="bin_score")

pipeline = Pipeline(stages=[book_stringIdx, regex_tokenizer, stopword_remover,
    count_vectorizer, idf, assembler, lr])

model = pipeline.fit(training_up_df)

## 5. Evaluation on test set and export of the final model

In [None]:
predictionsBin_test = model.transform(test_strat_df)
printClassPredictions(predictionsBin_test)
model.write().overwrite().save("lrm_bin.model")