In [1]:
# init SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Pipeline').getOrCreate()

In [67]:
# load reviews fro json file
data_frame = spark.read.json('../data/reviews.json') \
                       .select('overall', 'reviewText') \
                       .sample(False, fraction=0.5)

print('Number of reviews: %d' % data_frame.count())
data_frame.show(5, truncate=True)

Number of reviews: 97237
+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    4.0|They look good an...|
|    5.0|These stickers wo...|
|    5.0|awesome! stays on...|
|    5.0|Came just as desc...|
|    5.0|Performs exactly ...|
+-------+--------------------+
only showing top 5 rows



In [68]:
from pyspark.sql import Row
# TODO: convert lines to lower case using data_frame.rdd.map transformation

data_frame_lower = spark.createDataFrame(
         data_frame.rdd.map(lambda r: Row(
                              overall=r.overall - 1,
                              reviewText=r.reviewText.lower())))

data_frame_lower.show(5)

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    3.0|they look good an...|
|    4.0|these stickers wo...|
|    4.0|awesome! stays on...|
|    4.0|came just as desc...|
|    4.0|performs exactly ...|
+-------+--------------------+
only showing top 5 rows



In [11]:
# TODO: show distinct ratings
data_frame_lower.select('overall').distinct().collect()

[Row(overall=0.0),
 Row(overall=1.0),
 Row(overall=4.0),
 Row(overall=3.0),
 Row(overall=2.0)]

In [44]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark.ml.feature import HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

?LogisticRegression

In [69]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark.ml.feature import HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# TODO: preprocess text

tokenizer = Tokenizer(inputCol='reviewText', outputCol='tokenized')

with open('../data/stopwords.txt') as src:
    stop_words_list = [word.strip() for word in src]
    
stop_words = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                              outputCol='stop_words',
                              stopWords=stop_words_list)

ngram = NGram(n=2,
              inputCol=stop_words.getOutputCol(), 
              outputCol='ngram')

hashing = HashingTF(numFeatures=512,
                    binary=True,
                    inputCol=ngram.getOutputCol(),
                    outputCol='hashing')

logreg = LogisticRegression(featuresCol=hashing.getOutputCol(),
                           labelCol='overall',
                           predictionCol='prediction',
                           family='multinomial')

pipeline = Pipeline(stages=[tokenizer,
                     stop_words, 
                     ngram,
                     hashing,
                     logreg])

model = pipeline.fit(data_frame_lower)

In [36]:
# TODO: get pipeline prediction using transform()
prediction = model.transform(data_frame_lower)

In [49]:
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

# TODO: estimate quality
overall = prediction.select('overall').collect()
predicted = prediction.select('prediction').collect()

print('%s' % confusion_matrix(overall, predicted))
print('mse: %s' % mean_squared_error(overall, predicted))
print('acc: %s' % accuracy_score(overall, predicted))

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 76, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
AttributeError: 'HashingTF' object has no attribute '_java_obj'


[[ 150    0    1    0   16]
 [   0  131    1    2   17]
 [   0    1  151   14  110]
 [   1    1   18  224  274]
 [   6    8   52   96 1269]]
mse: 0.650412898152
acc: 0.756979944947


In [62]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# TODO: build param grid
# TODO: find best param match using accuracy as a target metric
?CrossValidator

In [70]:
params = ParamGridBuilder()\
  .addGrid(hashing.numFeatures, [128, 1024])\
  .addGrid(logreg.regParam, [1e-3])\
  .build()

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', 
                                              labelCol='overall',
                                              metricName='accuracy')

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator)

crossval_model = cv.fit(data_frame_lower)

In [71]:
# output average metric for each param set
for accuracy, params in zip(crossval_model.avgMetrics, cv.getEstimatorParamMaps()):
    print(params, accuracy)

{Param(parent='LogisticRegression_402eb7f3ffe17dbade4d', name='regParam', doc='regularization parameter (>= 0).'): 0.001, Param(parent='HashingTF_4cc99369eadcf34c56fc', name='numFeatures', doc='number of features.'): 128} 0.5588109524920779
{Param(parent='LogisticRegression_402eb7f3ffe17dbade4d', name='regParam', doc='regularization parameter (>= 0).'): 0.001, Param(parent='HashingTF_4cc99369eadcf34c56fc', name='numFeatures', doc='number of features.'): 1024} 0.5523188665849588
