In [1]:
import pyspark
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import gzip
import os


In [2]:
sc = pyspark.SparkContext('local[*]')
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[27, 815, 156, 745, 885]

In [3]:
amz_data = sc.textFile('/reviews_Video_Games_5.json.gz')
amz_data

/reviews_Video_Games_5.json.gz MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0

# Goal: Predict pos/neg status of amazon reviews based on their text

In [4]:
sql = pyspark.sql.SQLContext(sc)

In [5]:
amz_df = sql.read.json('reviews_Video_Games_5.json.gz')

In [6]:
print(amz_df.count())

231780


In [7]:
amz_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [9]:
amz_df.select(amz_df.overall, amz_df.reviewText).show(5)

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    1.0|Installing the ga...|
|    4.0|If you like rally...|
|    1.0|1st shipment rece...|
|    3.0|I got this versio...|
|    4.0|I had Dirt 2 on X...|
+-------+--------------------+
only showing top 5 rows



In [23]:
# make new column showing pos/neg binary using values in 'overall' 

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

toBinary = udf((lambda x: 1 if x > 3.0 else 0), returnType=IntegerType())

amz_df = amz_df.withColumn('label', toBinary(amz_df.overall))

In [33]:
# remove punctuation
from pyspark.sql.types import StringType
from string import punctuation

transtab = str.maketrans('', '', punctuation)
remove_punct = udf((lambda x: x.lower().translate(transtab)), returnType=StringType())
amz_df = amz_df.withColumn('cleanText', remove_punct(amz_df.reviewText))

In [36]:
# split text into words
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="cleanText", outputCol="words")
amz_df2 = tokenizer.transform(amz_df)

In [57]:
amz_df2.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- label: integer (nullable = true)
 |-- cleanText: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [39]:
# remove stop words
from pyspark.ml.feature import StopWordsRemover

stopRemover = StopWordsRemover(inputCol='words', outputCol='tokens')

amz_df2 = stopRemover.transform(amz_df2)

In [58]:
# Hashing TF and IDF

from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol='tokens', outputCol='features')

idf = IDF(minDocFreq=3, inputCol='features', outputCol='idf')

cleaned_data = amz_df2.select(amz_df2.tokens, amz_df2.label)

In [64]:
# model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [70]:
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
lr = LogisticRegression()
pipeline = Pipeline(stages=[hashingTF, idf, lr])
evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, 
                    numFolds=4)

In [71]:
training, test = cleaned_data.randomSplit([0.7, 0.3])

cvModel = cv.fit(training)

In [74]:
# model evaluation (Logistic Regression)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

0.7140941205498501

In [75]:
cvModel.avgMetrics[:5]

[0.7123376374907467, 0.7123376374907467]