In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab-ml").getOrCreate()

In [2]:
spark

In [3]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import RFormula
import matplotlib.pyplot as plt
import numpy as np
import datetime

## Import natural language processing toolkit 
import re
import nltk
import string
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

from pyspark.sql.functions import udf


In [4]:
path = "s3://bailey-bucket-dtb/user_dedup.json"
reviews = spark.read.json(path)

In [5]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [6]:
reviews.groupBy('overall').agg(count('reviewerID').alias('count')).sort('overall').show()

+-------+--------+
|overall|   count|
+-------+--------+
|    1.0| 6712117|
|    2.0| 4265230|
|    3.0| 7049302|
|    4.0|15480820|
|    5.0|49169670|
+-------+--------+



In [7]:
sample_rev = reviews.sample(False, .001, 12345)

In [8]:
# remove punctuation
def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)
    return nopunct

# binarize rating
def convert_rating(rating):
    rating = int(rating)
    if rating >=4: return 1
    else: return 0

# udf
punct_remover = udf(lambda x: remove_punct(x))
rating_convert = udf(lambda x: convert_rating(x))

# apply to review raw data
review_df = sample_rev.select('reviewerID', punct_remover('reviewText'), rating_convert('overall'))

review_df = review_df.withColumnRenamed('<lambda>(reviewText)', 'text')\
                     .withColumn('label', review_df["<lambda>(overall)"].cast(IntegerType()))\
                     .drop('<lambda>(overall)')

review_df.show(5)

+--------------------+--------------------+-----+
|          reviewerID|                text|label|
+--------------------+--------------------+-----+
|A000187635I595IAV...|I am a newbie at ...|    1|
|A00316981NM2QRXZ3...|It is entertainin...|    1|
|A00338282E99B8OR2...|These series are ...|    1|
|A00418961HZF1HI8M...|I would give   st...|    1|
|A0057832QE3XH24Z27YB|DOOM   BFG editio...|    1|
+--------------------+--------------------+-----+
only showing top 5 rows



In [10]:
from pyspark.ml.feature import * 
from nltk.stem.porter import *
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords

# tokenize
tok = Tokenizer(inputCol="text", outputCol="words")
review_tokenized = tok.transform(review_df)

# remove stop words
stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
review_tokenized = stopword_rm.transform(review_tokenized)

review_tokenized.show(5)

+--------------------+--------------------+-----+--------------------+--------------------+
|          reviewerID|                text|label|               words|           words_nsw|
+--------------------+--------------------+-----+--------------------+--------------------+
|A000187635I595IAV...|I am a newbie at ...|    1|[i, am, a, newbie...|[newbie, buying, ...|
|A00316981NM2QRXZ3...|It is entertainin...|    1|[it, is, entertai...|[entertaining, , ...|
|A00338282E99B8OR2...|These series are ...|    1|[these, series, a...|[series, great, ,...|
|A00418961HZF1HI8M...|I would give   st...|    1|[i, would, give, ...|[give, , , stars,...|
|A0057832QE3XH24Z27YB|DOOM   BFG editio...|    1|[doom, , , bfg, e...|[doom, , , bfg, e...|
+--------------------+--------------------+-----+--------------------+--------------------+
only showing top 5 rows



In [11]:
# add ngram column
n = 3
ngram = NGram(inputCol = 'words', outputCol = 'ngram', n = n)
add_ngram = ngram.transform(review_tokenized)

# generate the top frequent ngram
ngrams = add_ngram.rdd.flatMap(lambda x: x[-1]).filter(lambda x: len(x.split())==n)
ngram_tally = ngrams.map(lambda x: (x, 1))\
                      .reduceByKey(lambda x,y: x+y)\
                      .sortBy(lambda x: x[1], ascending=False)\
                      .filter(lambda x: x[1]>=20)
ngram_list = ngram_tally.map(lambda x: x[0]).collect()

In [15]:
from pyspark.ml.feature import IDF
from pyspark.mllib.linalg import Vectors as MLLibVectors

# replace the word with selected ngram
def ngram_concat(text):
    text1 = text.lower()
    for ngram in ngram_list:
        return text1.replace(ngram, ngram.replace(' ', '_'))

ngram_df = udf(lambda x: ngram_concat(x))
ngram_df = review_tokenized.select(ngram_df('text'), 'label')\
                          .withColumnRenamed('<lambda>(text)', 'text')

# tokenize and remove stop words with ngram
tok = Tokenizer(inputCol="text", outputCol="words")
review_tokenized = tok.transform(review_df)
tokenized_ngram = tok.transform(ngram_df)
tokenized_ngram = stopword_rm.transform(tokenized_ngram)

stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
review_tokenized = stopword_rm.transform(review_tokenized)

# count vectorizer and tfidf
cv = CountVectorizer(inputCol='words_nsw', outputCol='tf')
cvModel = cv.fit(review_tokenized)
count_vectorized = cvModel.transform(review_tokenized)

idf = IDF().setInputCol('tf').setOutputCol('tfidf')
tfidfModel = idf.fit(count_vectorized)
tfidf_df = tfidfModel.transform(count_vectorized)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# split into training and testing set
splits = tfidf_df.select(['tfidf', 'label']).randomSplit([0.8,0.2],seed=100)
train = splits[0].cache()
test = splits[1].cache()

# Elastic Net Logit
lambda_par = 0.02
alpha_par = 0.3
lr = LogisticRegression().\
        setLabelCol('label').\
        setFeaturesCol('tfidf').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

lrModel = lr.fit(train)
lr_pred = lrModel.transform(test)
f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
lr_f1 = f1_eval.evaluate(lr_pred)
print("F1 score: %.4f" % lr_f1)

F1 score: 0.7842


In [19]:
# add ngram column
n = 3
ngram = NGram(inputCol = 'words', outputCol = 'ngram', n = n)
add_ngram = ngram.transform(review_tokenized)
add_ngram.show(5)

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|          reviewerID|                text|label|               words|           words_nsw|               ngram|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|A000187635I595IAV...|I am a newbie at ...|    1|[i, am, a, newbie...|[newbie, buying, ...|[i am a, am a new...|
|A00316981NM2QRXZ3...|It is entertainin...|    1|[it, is, entertai...|[entertaining, , ...|[it is entertaini...|
|A00338282E99B8OR2...|These series are ...|    1|[these, series, a...|[series, great, ,...|[these series are...|
|A00418961HZF1HI8M...|I would give   st...|    1|[i, would, give, ...|[give, , , stars,...|[i would give, wo...|
|A0057832QE3XH24Z27YB|DOOM   BFG editio...|    1|[doom, , , bfg, e...|[doom, , , bfg, e...|[doom  ,   bfg,  ...|
+--------------------+--------------------+-----+--------------------+--------------------+-----

In [28]:
from pyspark.mllib.classification import SVMModel, SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors as MLLibVectors

# count vectorizer and tfidf
cv_ngram = CountVectorizer(inputCol='ngram', outputCol='tf_ngram')
cvModel_ngram = cv_ngram.fit(add_ngram)
cv_df_ngram = cvModel_ngram.transform(add_ngram)

# split into training and testing set
idf_ngram = IDF().setInputCol('tf_ngram').setOutputCol('tfidf_ngram')
tfidfModel_ngram = idf_ngram.fit(cv_df_ngram)
tfidf_df_ngram = tfidfModel_ngram.transform(cv_df_ngram)

# split into training & testing set
splits_ngram = tfidf_df_ngram.select(['tfidf_ngram', 'label']).randomSplit([0.8,0.2],seed=100)
train_ngram = splits_ngram[0].cache()
test_ngram = splits_ngram[1].cache()

# convert to LabeledPoint vectors
train_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
test_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))

# fit SVM model of trigrams
numIterations = 50
regParam = 0.3
svm = SVMWithSGD.train(train_lb_ngram, numIterations, regParam=regParam)

In [34]:
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

words = (tok
    .transform(tfidf_df)
    .select(explode(col('words')).alias('text')))

predictors = (model
    .transform(words)
    .select(col('summary').alias('word'), 'probability'))

first = udf(lambda x: x[0].item(), FloatType())
second = udf(lambda x: x[1].item(), FloatType())

predictive_words = (predictors
   .select(
       'word', 
       second(col('probability')).alias('positive'), 
       first(col('probability')).alias('negative'))
   .groupBy('word')
   .agg(
       F.max('positive').alias('positive'),
       F.max('negative').alias('negative')))

positive_predictive_words = (predictive_words
    .select(col('word').alias('positive_word'), col('positive').alias('pos_prob'))
    .sort('pos_prob', ascending=False))

negative_predictive_words = (predictive_words
    .select(col('word').alias('negative_word'), col('negative').alias('neg_prob'))
    .sort('neg_prob', ascending=False))

IllegalArgumentException: 'Output column words already exists.'

In [9]:
# remove punctuation
def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)
    return nopunct

# binarize rating
def convert_rating(rating):
    rating = int(rating)
    if rating >=4: return 1
    else: return 0

# udf
punct_remover = udf(lambda x: remove_punct(x))
rating_convert = udf(lambda x: convert_rating(x))

# apply to review raw data
review_df = sample_rev.select('asin', 'helpful', 'overall', 'reviewerID', punct_remover('reviewText'), rating_convert('overall'), 'reviewTime', 'summary', 'unixReviewTime') #, rating_convert('overall')

# review_df = review_df.withColumnRenamed('<lambda>(reviewText)', 'reviewText')\
#                      .withColumn('label', review_df["<lambda>(overall)"].cast(IntegerType()))\
#                      .drop('<lambda>(overall)')\
#                      .limit(1000000)

review_df.show(5)

+----------+-------+-------+--------------------+--------------------+-----------------+-----------+--------------------+--------------+
|      asin|helpful|overall|          reviewerID|<lambda>(reviewText)|<lambda>(overall)| reviewTime|             summary|unixReviewTime|
+----------+-------+-------+--------------------+--------------------+-----------------+-----------+--------------------+--------------+
|B003EO1H7E| [0, 0]|    5.0|A000187635I595IAV...|I am a newbie at ...|                1|01 26, 2013|Was asked for my ...|    1359158400|
|B00EBQRSTK| [0, 0]|    4.0|A00316981NM2QRXZ3...|It is entertainin...|                1|09 28, 2013|        entertaining|    1380326400|
|0399159606| [0, 0]|    5.0|A00338282E99B8OR2...|These series are ...|                1|07 24, 2013|     Wonderful Book!|    1374624000|
|1439876363| [0, 1]|    5.0|A00418961HZF1HI8M...|I would give   st...|                1|02 15, 2014| Meet my expectation|    1392422400|
|B008E6ZXA4| [0, 0]|    5.0|A0057832QE3XH

In [10]:
review_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- <lambda>(reviewText): string (nullable = true)
 |-- <lambda>(overall): string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [11]:
review_df.groupBy('overall').agg(count('reviewerID').alias('count')).sort('overall').show()

+-------+-----+
|overall|count|
+-------+-----+
|    1.0| 6580|
|    2.0| 4273|
|    3.0| 6973|
|    4.0|15381|
|    5.0|48984|
+-------+-----+



In [56]:
# from pyspark.ml.feature import * 
# from nltk.stem.porter import *
# from nltk.tokenize import word_tokenize, sent_tokenize
# from nltk.corpus import stopwords

# # tokenize
# tok = Tokenizer(inputCol="<lambda>(reviewText)", outputCol="words")
# review_tokenized = tok.transform(review_df)

# # remove stop words
# stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
# review_tokenized = stopword_rm.transform(review_tokenized)

# review_tokenized.select('<lambda>(reviewText)', 'words', 'words_nsw').show(5)

In [41]:
# # add ngram column
# n = 3
# ngram = NGram(inputCol = 'words', outputCol = 'ngram', n = n)
# add_ngram = ngram.transform(review_tokenized)

# # generate the top frequent ngram
# ngrams = add_ngram.rdd.flatMap(lambda x: x[-1]).filter(lambda x: len(x.split())==n)
# ngram_tally = ngrams.map(lambda x: (x, 1))\
#                       .reduceByKey(lambda x,y: x+y)\
#                       .sortBy(lambda x: x[1], ascending=False)\
#                       .filter(lambda x: x[1]>=20)
# ngram_list = ngram_tally.map(lambda x: x[0]).collect()

In [55]:
# # replace the word with selected ngram
# def ngram_concat(text):
#     text1 = text.lower()
#     for ngram in ngram_list:
#         return text1.replace(ngram, ngram.replace(' ', '_'))

# ngram_df = udf(lambda x: ngram_concat(x))
# # ngram_df = review_tokenized.select(ngram_df('text'), 'label')\
# #                           .withColumnRenamed('<lambda>(text)', 'text')

# # tokenize and remove stop words with ngram
# tok = Tokenizer(inputCol="<lambda>(reviewText)", outputCol="words")
# review_tokenized = tok.transform(review_df)
# tokenized_ngram = tok.transform(ngram_df)
# tokenized_ngram = stopword_rm.transform(tokenized_ngram)

# stopword_rm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
# review_tokenized = stopword_rm.transform(review_tokenized)

# # count vectorizer and tfidf
# cv = CountVectorizer(inputCol='words_nsw', outputCol='tf')
# cvModel = cv.fit(review_tokenized)
# count_vectorized = cvModel.transform(review_tokenized)

# tfidfModel = idf.fit(count_vectorized)
# tfidf_df = tfidfModel.transform(count_vectorized)

In [12]:
from pyspark.sql.functions import col, expr, udf, trim
from pyspark.sql.types import IntegerType
import re

remove_punctuation = udf(lambda line: re.sub('[^A-Za-z\s]', '', line))
make_binary = udf(lambda rating: 0 if rating in [1, 2] else 1, IntegerType())

reviews = (sample_rev
    .filter(col('overall').isin([1, 2, 3, 4, 5]))
    .withColumn('label', make_binary(col('overall')))
    .select(col('label').cast('int'), remove_punctuation('summary').alias('summary'))
    .filter(trim(col('summary')) != ''))

In [13]:
train, test = reviews.randomSplit([.8, .2], seed=123)

In [14]:
def multiply_dataset(dataset, n):
    return dataset if n <= 1 else dataset.union(multiply_dataset(dataset, n - 1))

reviews_good = train.filter('label == 1')
reviews_bad = train.filter('label == 0')

reviews_bad_multiplied = multiply_dataset(reviews_bad, reviews_good.count() / reviews_bad.count())


train_reviews = reviews_bad_multiplied.union(reviews_good)

In [15]:
accuracy = reviews_good.count() / float(train_reviews.count())
print('Always predicting 5 stars accuracy: {0}'.format(accuracy))

Always predicting 5 stars accuracy: 0.4851215548124133


In [17]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import LogisticRegression

tokenizer = Tokenizer(inputCol='summary', outputCol='words')

pipeline = Pipeline(stages=[
    tokenizer, 
    StopWordsRemover(inputCol='words', outputCol='filtered_words'),
    HashingTF(inputCol='filtered_words', outputCol='rawFeatures', numFeatures=120000),
    IDF(inputCol='rawFeatures', outputCol='features'),
    LogisticRegression(regParam=.3, elasticNetParam=.01)
])

In [18]:
model = pipeline.fit(train_reviews)

KeyboardInterrupt: 

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

prediction = model.transform(test)
BinaryClassificationEvaluator().evaluate(prediction)

In [1]:
spark.stop()

NameError: name 'spark' is not defined