In [3]:
from pyspark.sql import SparkSession 
import matplotlib.pyplot as plt 
%matplotlib inline
import numpy as np
import pandas as pd
from textblob import TextBlob
from pyspark.sql.functions import udf
import nltk
from nltk.corpus import stopwords
stop = stopwords.words('english')
from pyspark.ml.feature import HashingTF
from pyspark.sql.functions import split
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.sql.types import NumericType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_261.jdk/Contents/Home"
spark = SparkSession.builder.appName('Read CSV File into DataFrame'). getOrCreate()
data = spark.read.csv("steam_reviews.csv", header=True, inferSchema=True, multiLine=True, escape="\"")

In [5]:
data

DataFrame[_c0: int, app_id: int, app_name: string, review_id: int, language: string, review: string, timestamp_created: int, timestamp_updated: bigint, recommended: boolean, votes_helpful: bigint, votes_funny: bigint, weighted_vote_score: double, comment_count: int, steam_purchase: boolean, received_for_free: boolean, written_during_early_access: boolean, author.steamid: bigint, author.num_games_owned: bigint, author.num_reviews: bigint, author.playtime_forever: double, author.playtime_last_two_weeks: double, author.playtime_at_review: double, author.last_played: double]

In [6]:

df = data.where(data.language == 'english').select('review', 'recommended')
negative = df.where(df.recommended == False)
positive = df.where(df.recommended == True)
num_negative = negative.count()
positive = positive.limit(num_negative)

In [7]:
all_data = negative.union(positive)
def text_process(data): 
    if not data:
        return ''
    tweet_blob = TextBlob(data)
    words = tweet_blob.words
    sent = ' '.join(words)
    return sent 
processUDF = udf(lambda z: text_process(z))

all_data = all_data.withColumn('review_pure', processUDF(all_data.review)).select('review_pure', 'recommended')
all_data.show(3)

+--------------------+-----------+
|         review_pure|recommended|
+--------------------+-----------+
|They certainly du...|      false|
|terribly bugs kee...|      false|
|While there is a ...|      false|
+--------------------+-----------+
only showing top 3 rows



In [8]:
def remove_junk(data): #function to keep only characters and remove 'user'- which is not required 
    words=[words for words in data.split() if words != 'user']    
    clean_tokens = [t for t in words if re.match(r'[^\W\d]*$', t)] # Remove punctuations')]
    sent_join  = ' '.join(clean_tokens)
    return sent_join
junkUDF = udf(lambda z: remove_junk(z))
all_data = all_data.withColumn('review_remove', processUDF(all_data.review_pure)).select('review_remove', 'recommended')
all_data.show(3)

+--------------------+-----------+
|       review_remove|recommended|
+--------------------+-----------+
|They certainly du...|      false|
|terribly bugs kee...|      false|
|While there is a ...|      false|
+--------------------+-----------+
only showing top 3 rows



In [9]:
nltkUDF = udf(lambda words: ' '.join(word.lower() for word in words.split() if word not in stop))
all_data = all_data.withColumn('review_low', nltkUDF(all_data.review_remove)).select('review_low', 'recommended')
all_data.show(3)

+--------------------+-----------+
|          review_low|recommended|
+--------------------+-----------+
|they certainly du...|      false|
|terribly bugs kee...|      false|
|while lot content...|      false|
+--------------------+-----------+
only showing top 3 rows



In [10]:
lemmatizer = nltk.stem.WordNetLemmatizer()
w_tokenizer = nltk.tokenize.WhitespaceTokenizer()

def lemmatize_text(text):
    return ' '.join([lemmatizer.lemmatize(w) for w in w_tokenizer.tokenize(text)])
lemmaUDF = udf(lambda z: lemmatize_text(z))
all_data = all_data.withColumn('review', lemmaUDF(all_data.review_low)).select('review', 'recommended')
all_data.show(3)

+--------------------+-----------+
|              review|recommended|
+--------------------+-----------+
|they certainly du...|      false|
|terribly bug keep...|      false|
|while lot content...|      false|
+--------------------+-----------+
only showing top 3 rows



In [11]:
# train, test = all_data.randomSplit(weights=[0.8, 0.2], seed=200)
all_data = all_data.withColumn('review_array', split(all_data.review, ' '))
all_data.show(3)
ht = HashingTF(inputCol="review_array", outputCol="features")
all_data = ht.transform(all_data).select('features', 'recommended')
all_data.show(3)
all_data = all_data.withColumn('label', all_data.recommended.cast('integer')).select('features', 'label')
all_data.show(3)

+--------------------+-----------+--------------------+
|              review|recommended|        review_array|
+--------------------+-----------+--------------------+
|they certainly du...|      false|[they, certainly,...|
|terribly bug keep...|      false|[terribly, bug, k...|
|while lot content...|      false|[while, lot, cont...|
+--------------------+-----------+--------------------+
only showing top 3 rows

+--------------------+-----------+
|            features|recommended|
+--------------------+-----------+
|(262144,[377,5381...|      false|
|(262144,[17893,32...|      false|
|(262144,[1232,230...|      false|
+--------------------+-----------+
only showing top 3 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[377,5381...|    0|
|(262144,[17893,32...|    0|
|(262144,[1232,230...|    0|
+--------------------+-----+
only showing top 3 rows



In [65]:

train, test = all_data.randomSplit(weights=[0.8, 0.2], seed=200)

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

# select example rows to display.
predictions = lrModel.transform(test) 
predictions.show()

+--------------------+-----------+-----+--------------------+--------------------+----------+
|            features|recommended|label|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+----------+
|(262144,[2,290,34...|      false|    0|[11.7671453688754...|[0.99999224484771...|       0.0|
|(262144,[5,61544,...|      false|    0|[0.02540774383412...|[0.50635159427107...|       0.0|
|(262144,[7,329,19...|      false|    0|[3.58689009589404...|[0.97306148163195...|       0.0|
|(262144,[7,406,12...|      false|    0|[43.7660774271393...|           [1.0,0.0]|       0.0|
|(262144,[7,406,13...|      false|    0|[71.2469164705956...|           [1.0,0.0]|       0.0|
|(262144,[7,406,16...|      false|    0|[215.053746112153...|           [1.0,0.0]|       0.0|
|(262144,[7,471,12...|      false|    0|[70.4330012421775...|           [1.0,0.0]|       0.0|
|(262144,[7,1001,2...|      false|    0|[19.0251086724352...

NameError: name 'MulticlassClassificationEvaluator' is not defined

In [67]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions) 
print("Test set accuracy = " + str(accuracy)) 

Test set accuracy = 0.8774084263443236


In [69]:
evaluator

MulticlassClassificationEvaluator_60a413d95587

In [13]:
#creating the pickle file for Logistic model
# lrModel.save("Model/lr_model")
# Creating a pickle file for the CountVectorizer
ht.save("Model/hashing-tf")

In [71]:
nb = NaiveBayes(labelCol="label", featuresCol="features") 
nbModel = nb.fit(train)
predictions = nbModel.transform(test) 
predictions.show()

+--------------------+-----------+-----+--------------------+--------------------+----------+
|            features|recommended|label|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+----------+
|(262144,[2,290,34...|      false|    0|[-15496.323883844...|[1.42041980037984...|       1.0|
|(262144,[5,61544,...|      false|    0|[-82.185212929691...|[0.07787589300353...|       1.0|
|(262144,[7,329,19...|      false|    0|[-275.61511147479...|[0.99963489985117...|       0.0|
|(262144,[7,406,12...|      false|    0|[-2042.0309400764...|[1.0,9.1834702311...|       0.0|
|(262144,[7,406,13...|      false|    0|[-6074.0115668025...|[1.0,6.6034342003...|       0.0|
|(262144,[7,406,16...|      false|    0|[-6172.6692620398...|[1.0,4.5602046408...|       0.0|
|(262144,[7,471,12...|      false|    0|[-2213.7309051816...|[1.0,4.8140139239...|       0.0|
|(262144,[7,1001,2...|      false|    0|[-1347.9607118610...

In [72]:
nbModel.save("Model/nb_model")

In [73]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions) 
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.8893870064953269
