If you dont have the dataset, run the below code

In [1]:
import gdown

In [10]:
gdown.download("https://drive.google.com/uc?id=1mW974SwZsSMH-nr89c2Pe9PPHhT1ifDr")

Downloading...
From (uriginal): https://drive.google.com/uc?id=1mW974SwZsSMH-nr89c2Pe9PPHhT1ifDr
From (redirected): https://drive.google.com/uc?id=1mW974SwZsSMH-nr89c2Pe9PPHhT1ifDr&confirm=t&uuid=0476676f-93d2-4b12-a819-7937b499ff98
To: /Users/christianbutcher/Desktop/spark/notebooks/reviews.zip
100%|██████████████████████████████████████| 3.20M/3.20M [00:00<00:00, 13.9MB/s]


'reviews.zip'

In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
#import necessary packages
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StandardScaler

Load in data

In [5]:
df = spark.read.json('/Users/christianbutcher/Desktop/spark/reviews/*')
df.show()

                                                                                

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2156300|    1|136759198|The Demo was grea...|
|2372320|    0|136758922|First of, this ga...|
|1498040|    1|136761203|Пример того, как ...|
|1811990|    1|136761840|I have beaten the...|
|1811990|    1|136761635|It really is very...|
|1782810|    1|136633021|Great for its cur...|
|1649740|    1|136629798|THROW YOUR MONEY ...|
|1649740|    1|136629381|I forgot I backed...|
|1649740|    1|136628148|Firstly, if you'r...|
|1649740|    1|136627883|[h1] HUNT THE NIG...|
|1798010|    1|136814124|Out of all the “b...|
|2273470|    1|136811469|After just doing ...|
|2273470|    1|136810289|Well developed de...|
|2329130|    1|136812852|Another title pub...|
|2329130|    1|136810307|Rewind or Die is ...|
|1928420|    0|137490805|No option to chan...|
| 986130|    1|137493446|Awesome game! For...|
| 986130|    1|137493372|This is a decent ...|
| 986130|    

Clean the data set:

In [6]:
df = df.dropDuplicates(['review_id'])
df = df.filter(df['review_text'] != '')

In [None]:
#def lower(text):
#   return text.lower()
#
#lower_udf =udf(lower,StringType())

In [None]:
##Remove nonAscii
#def strip_non_ascii(data_str):
#''' Returns the string without non ASCII characters'''
#    stripped = (c for c in data_str if 0 < ord(c) < 127)
#    return ''.join(stripped)
# setup pyspark udf function
#strip_non_ascii_udf = udf(strip_non_ascii, StringType())

Create a balanced data set:

In [7]:
n = 500
seed = 1

fractions = df.groupBy("label").count().withColumn("required_n", n/col("count"))\
                .drop("count").rdd.collectAsMap()

df_bal = df.stat.sampleBy("label", fractions, seed)
df_bal.groupBy("label").count().show()



+-----+-----+
|label|count|
+-----+-----+
|    0|  499|
|    1|  507|
+-----+-----+



                                                                                

Split data into training and test sets:

In [8]:
(trainingData, testData) = df_bal.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

                                                                                

Training Dataset Count: 708




Test Dataset Count: 298


                                                                                

Inititalise pipeline stages:

In [9]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="review_text", outputCol="words", pattern="\\W")
# stop words
stops = StopWordsRemover.loadDefaultStopWords('english')
stopwordsRemover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(), outputCol="filtered", 
                                   stopWords = stops)
# bag of words count
countVectors = CountVectorizer(inputCol=stopwordsRemover.getOutputCol(), outputCol="features", 
                               vocabSize=30000, minDF=5)


hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms


lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

Put everything together in the pipeline:

In [10]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, lr])

In [11]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [12]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)

In [19]:
# Run cross-validation, and choose the best set of parameters.
model = crossval.fit(trainingData)

                                                                                

Obtain predictions for the test data:

In [20]:
prediction = model.transform(testData)

In [21]:
prediction.columns

['app_id',
 'label',
 'review_id',
 'review_text',
 'words',
 'filtered',
 'rawFeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [22]:
prediction.select('review_text','label','probability','prediction').show(10)

+--------------------+-----+--------------------+----------+
|         review_text|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|hai so i dont wit...|    1|[0.29622636001772...|       1.0|
|Fun game, a worth...|    1|[0.39636672883789...|       1.0|
|Cute, simple and ...|    1|[0.25049880010621...|       1.0|
|Wow, where did th...|    1|[0.34961248026696...|       1.0|
|This game is just...|    0|[0.68285556407288...|       0.0|
|Refunded in less ...|    0|[0.97194186498040...|       0.0|
|Tried it for an h...|    0|[0.74694255342842...|       0.0|
|BattleBlock Theat...|    1|[0.23425697935099...|       1.0|
|It gets stale ver...|    0|[0.86507549452499...|       0.0|
|       Oh absolutely|    1|[0.44576065370890...|       1.0|
+--------------------+-----+--------------------+----------+
only showing top 10 rows



Evaluate the model predictions:

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(prediction)

0.7311054671347239

Save the model locally to access later:

In [24]:
model.write().overwrite().save('/Users/christianbutcher/Desktop/spark/model/')

                                                                                

23/05/04 17:28:43 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /private/var/folders/xg/23cfqt_556d2nhkfsg_q2b300000gn/T/blockmgr-fe5561ad-89c9-49cd-9d58-3501971cccfa. Falling back to Java IO way
java.io.IOException: Failed to delete: /private/var/folders/xg/23cfqt_556d2nhkfsg_q2b300000gn/T/blockmgr-fe5561ad-89c9-49cd-9d58-3501971cccfa
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:163)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:110)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:91)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1206)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:374)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:370)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.Indexe