In [1]:
# %load helpers.py
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, GBTClassifier, \
    RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.functions import current_date, expr, datediff, to_date
from pyspark.sql.functions import length, regexp_replace

from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize

import re


def get_kv_pairs(row, exclusions=[]):
    # get the text from the row entry
    text = str(row.review_body).lower()
    # create blacklist of words
    blacklist = set(stopwords.words('english'))
    # add explicit words
    [blacklist.add(i) for i in exclusions]
    # extract all words
    words = re.findall(r'([^\w+])', text)
    # for each word, send back a count of 1
    # send a list of lists
    return [[w, 1] for w in words if w not in blacklist]


def get_word_counts(texts, exclusions=[]):
    mapped_rdd = texts.rdd.flatMap(lambda row: get_kv_pairs(row, exclusions))
    counts_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b).sortByKey(True, 1)
    return counts_rdd.collect()


def convert_str_to_int(df, col='verified_purchase', type_='int'):
    return df.select((df[col] == 'Y').cast(type_))


def get_review_age(df):
    return df.select(datediff(current_date(), to_date(df['review_date'])))


def prepare_features(df):
    df = df.withColumn('exclam', length('review_body') - length(regexp_replace('review_body', '\!', '')))
    df = df.withColumn('age', datediff(current_date(), to_date(df['review_date'])))
    df = df.withColumn('review_length', length(df['review_body']))
    df = df.withColumn('helfulness', df['helpful_votes'] / df['total_votes'])
    df = df.withColumn('label', expr("CAST(verified_purchase='Y' As INT)"))
    select_cols = df.select(['star_rating', 'helfulness', 'age', 'review_length', 'label']).na.fill(0)
    return select_cols


def split_data(df, rate=.9):
    training = df.sampleBy("label", fractions={0: rate, 1: rate}, seed=12)
    return training, df.subtract(training)


def get_auc_roc(classifier, training, test):
    model = classifier.fit(training)
    out = model.transform(test) \
        .select("prediction", "label") \
        .rdd.map(lambda x: (float(x[0]), float(x[1])))
    metrics = BinaryClassificationMetrics(out)
    print("Model: {1}. Area under ROC: {0:2f}".format(metrics.areaUnderROC, clf.__class__))
    return model, out, metrics


def get_vectorized_features(df, cols=['star_rating']):
    va = VectorAssembler().setInputCols(cols).setOutputCol(
        'features')
    return va.transform(df)

In [9]:
# %load environment.py
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, GBTClassifier, \
    RandomForestClassifier
import pyspark as ps
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

DATA_FILE = '../../amazon_reviews_us_Camera_v1_00.tsv.gz'
APP_NAME = 'Prediction'
FEATURES = ['star_rating', 'review_body', 'helpful_votes', 'total_votes', 'verified_purchase', 'review_date']
SAMPLE_SIZE = 10000

review_schema = StructType(
    [StructField('marketplace', StringType(), True),
     StructField('customer_id', StringType(), True),
     StructField('review_id', StringType(), True),
     StructField('product_id', StringType(), True),
     StructField('product_parent', StringType(), True),
     StructField('product_title', StringType(), True),
     StructField('product_category', StringType(), True),
     StructField('star_rating', IntegerType(), True),
     StructField('helpful_votes', IntegerType(), True),
     StructField('total_votes', IntegerType(), True),
     StructField('vine', StringType(), True),
     StructField('verified_purchase', StringType(), True),
     StructField('review_headline', StringType(), True),
     StructField('review_body', StringType(), True),
     StructField('review_date', StringType(), True)])

spark = (ps.sql.SparkSession.builder
         .master("local[1]")
         .appName(APP_NAME)
         .getOrCreate()
         )
sc = spark.sparkContext

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .schema(review_schema) \
    .load(DATA_FILE)

review_all = df.select(FEATURES)
review_sample = df.select(FEATURES).limit(SAMPLE_SIZE).cache()


In [10]:
classifiers = [LogisticRegression(), NaiveBayes(), DecisionTreeClassifier(), RandomForestClassifier(),
                   GBTClassifier()]
results = []

# 10000 sample dataset

In [61]:
select_cols = prepare_features(review_sample)
features = get_vectorized_features(select_cols, cols=['star_rating', 'helfulness', 'age', 'review_length'])
training = features.sampleBy("label", fractions={0: 0.92, 1: 0.08}, seed=12)
# training = features.sample(True, 0.5, 42)
training.groupBy("label").count().orderBy("label").show()
test = features.subtract(training)

+-----+-----+
|label|count|
+-----+-----+
|    0|  739|
|    1|  731|
+-----+-----+



In [62]:
for clf in classifiers:
    model, out, metrics = get_auc_roc(clf, training, test)
    results.append([model, out, metrics])

Model: <class 'pyspark.ml.classification.LogisticRegression'>. Area under ROC: 0.666389
Model: <class 'pyspark.ml.classification.NaiveBayes'>. Area under ROC: 0.683751
Model: <class 'pyspark.ml.classification.DecisionTreeClassifier'>. Area under ROC: 0.662692
Model: <class 'pyspark.ml.classification.RandomForestClassifier'>. Area under ROC: 0.659492
Model: <class 'pyspark.ml.classification.GBTClassifier'>. Area under ROC: 0.660202


# entire dataset

In [11]:
select_cols = prepare_features(review_all)
features = get_vectorized_features(select_cols, cols=['star_rating', 'helfulness', 'age', 'review_length'])

In [12]:
features.groupBy("label").count().orderBy("label").show()

+-----+-------+
|label|  count|
+-----+-------+
|    0| 307573|
|    1|1494401|
+-----+-------+



In [13]:
# training = features.sampleBy("label", fractions={0: 0.9, 1: 0.1}, seed=24)
training = features.sample(True, 0.5, 42)
# training.groupBy("label").count().orderBy("label").show()
test = features.subtract(training)

In [None]:
for clf in classifiers:
    model, out, metrics = get_auc_roc(clf, training, test)
    results.append([model, out, metrics])