# Summary
1. explore the data with sql
    - target columns:
        - star_rating
        - verified_purchase
    - questions:
        - how does a user rate?
            - is there corelation between new rating and the product title length?
            - is there corelation between new rating and the review headline length?  
            - is there corelation between new rating and the review body length?  
            - is there corelation between new rating and the review date(age)?  
            - is there corelation between new rating and the verified purchase?  
            - is there corelation between new rating and the vine?  
            - is there corelation between new rating and the helpful votes?  
            - is there corelation between new rating and the total votes?  
            - is there corelation between new rating and the accumulated rating?  
                - for the same product_id, calculate avg accumulated rating for each review date
            - is there corelation between new rating and the accumulated number of reviews?  
        - did a user have a purchase?
            - is there corelation between the verified purchase and the product title length?
            - is there corelation between the verified purchase and the review headline length?  
            - is there corelation between the verified purchase and the review body length?  
            - is there corelation between the verified purchase and the review date(age)?  
            - is there corelation between the verified purchase and the star rating?  
            - is there corelation between the verified purchase and the vine?  
            - is there corelation between the verified purchase and the accumulated rating?  
            - is there corelation between the verified purchase and the accumulated number of reviews?  
            - is there corelation between the verified purchase and the accumulated helpful votes?  
            - is there corelation between the verified purchase and the accumulated total votes?           
2. generate new columns based on the findings/trends from the previous step
3. perform Null Hypothesis Test on the sample and the whole dataset
4. output the results and visualizations 

In [3]:
# %load "../../utils/environment.py"
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, GBTClassifier, \
    RandomForestClassifier
import pyspark as ps
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.functions import current_date, expr, datediff, to_date, lit, coalesce, length, regexp_replace,count,isnan,when,col

DATA_FILE = '../../data/amazon_reviews_us_Camera_v1_00.tsv.gz'
APP_NAME = 'EDA'
FEATURES = ['star_rating', 'exclam', 'helfulness', 'review_length', 'verified_purchase', 'age']
SAMPLE_SIZE = 10000

review_schema = StructType(
    [StructField('marketplace', StringType(), True),
     StructField('customer_id', StringType(), True),
     StructField('review_id', StringType(), True),
     StructField('product_id', StringType(), True),
     StructField('product_parent', StringType(), True),
     StructField('product_title', StringType(), True),
     StructField('product_category', StringType(), True),
     StructField('star_rating', IntegerType(), True),
     StructField('helpful_votes', IntegerType(), True),
     StructField('total_votes', IntegerType(), True),
     StructField('vine', StringType(), True),
     StructField('verified_purchase', StringType(), True),
     StructField('review_headline', StringType(), True),
     StructField('review_body', StringType(), True),
     StructField('review_date', StringType(), True)])

spark = (ps.sql.SparkSession.builder
         .master("local[1]")
         .appName(APP_NAME)
         .getOrCreate()
         )
sc = spark.sparkContext

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "\t") \
    .schema(review_schema) \
    .load(DATA_FILE)
df=df.na.drop(subset=["star_rating"])
df=df.fillna('',subset=['review_body'])

df = df.withColumn('review_length', length('review_body'))
df = df.withColumn('review_headline_length', length('review_headline'))
df = df.withColumn('product_title_length', length('product_title'))

df = df.withColumn('exclam',  df['review_length'] - length(regexp_replace('review_body', '\!', '')))
df = df.withColumn('positive',  (df['star_rating']>3).cast('integer'))
df = df.withColumn('age', datediff(current_date(), to_date(df['review_date'])))
df = df.withColumn('helfulness', coalesce(df['helpful_votes'] / df['total_votes'],lit(0.0)))
df = df.withColumn('label', expr("CAST(verified_purchase='Y' As INT)"))
df = df.withColumn('vine', expr("CAST(vine='Y' As INT)"))
fill_na_mean(df,'age')

review_all = df.select(FEATURES)
review_sample = df.select(FEATURES).limit(SAMPLE_SIZE).cache()


In [4]:
df.createOrReplaceTempView('df')

In [131]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+---+----------+-----+
|marketplace|customer_id|review_id|product_id|product_parent|product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body|review_date|review_length|review_headline_length|product_title_length|exclam|positive|age|helfulness|label|
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+---+----------+-----+
|          0|          0|        0|         0|             0|            0|               0|          0|            0|          0|   

In [11]:
spark.sql('''select review_id, product_id, age, star_rating, sum(1) over (partition by product_id order by age) as accum_review_count from df''').show()

+--------------+----------+----+-----------+------------------+
|     review_id|product_id| age|star_rating|accum_review_count|
+--------------+----------+----+-----------+------------------+
|R1O6Z1DDT2P1XI|B00000J47G|1809|          5|                 1|
|R1JG9M7Z29JZ8A|B00000J47G|1815|          5|                 2|
|R1JJGHPWABDFXV|B00000J47G|4786|          5|                 3|
|R2RMYNWCP79YVQ|B00000J47G|4788|          1|                 4|
|R2TBZTLKU11VZU|B00000J47G|5077|          5|                 5|
|R14B3XKWH0WQDL|B00000J47G|5799|          3|                 6|
| R9EVQ9YXO8N5V|B00000J47G|5815|          1|                 7|
| RJTDPXIPVVQIA|B00000J47G|5918|          1|                 8|
|R1K9NN4IK0S7NT|B00000J47G|6130|          4|                 9|
| R36SG72DYDIQL|B00000J47G|6414|          4|                10|
| R5OS4MY8RBGZG|B00000J47G|6415|          5|                12|
|R3LVY8TXNJ733Q|B00000J47G|6415|          5|                12|
|R215WFCN9MNL6G|B00000J47G|6433|        

In [6]:
spark.sql('''select review_id, product_id, age, star_rating, avg(star_rating) over (partition by product_id order by age) as accum_rating from df''').show()

+--------------+----------+----+-----------+------------------+
|     review_id|product_id| age|star_rating|      accum_rating|
+--------------+----------+----+-----------+------------------+
|R1O6Z1DDT2P1XI|B00000J47G|1809|          5|               5.0|
|R1JG9M7Z29JZ8A|B00000J47G|1815|          5|               5.0|
|R1JJGHPWABDFXV|B00000J47G|4786|          5|               5.0|
|R2RMYNWCP79YVQ|B00000J47G|4788|          1|               4.0|
|R2TBZTLKU11VZU|B00000J47G|5077|          5|               4.2|
|R14B3XKWH0WQDL|B00000J47G|5799|          3|               4.0|
| R9EVQ9YXO8N5V|B00000J47G|5815|          1|3.5714285714285716|
| RJTDPXIPVVQIA|B00000J47G|5918|          1|              3.25|
|R1K9NN4IK0S7NT|B00000J47G|6130|          4|3.3333333333333335|
| R36SG72DYDIQL|B00000J47G|6414|          4|               3.4|
| R5OS4MY8RBGZG|B00000J47G|6415|          5|3.6666666666666665|
|R3LVY8TXNJ733Q|B00000J47G|6415|          5|3.6666666666666665|
|R215WFCN9MNL6G|B00000J47G|6433|        

In [139]:
for column in ['review_date','review_length']:
    df.select('{0}'.format(column)).where('{0} is null'.format(column)).show()

+-----------+
|review_date|
+-----------+
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
|       null|
+-----------+
only showing top 20 rows

+-------------+
|review_length|
+-------------+
+-------------+



In [120]:
df.select('*').where('star_rating is null').show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+----+----------+-----+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body|review_date|review_length|review_headline_length|product_title_length|exclam|positive| age|helfulness|label|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+----+----------+-----+
|         US|     377623|R18EGX6WJJO414|B004QVK3MY|     869011606|Ardinbir Photogra...|       

In [116]:
df.count()

1801974

In [156]:
spark.sql('select star_rating, sum(label) as purchased from df group by star_rating order by star_rating').show()

+-----------+---------+
|star_rating|purchased|
+-----------+---------+
|          1|   126789|
|          2|    71002|
|          3|   115780|
|          4|   276470|
|          5|   904360|
+-----------+---------+



In [13]:
spark.sql('select star_rating, sum(helpful_votes) as helpful_votes from df group by star_rating order by star_rating').show()

+-----------+---------+
|star_rating|purchased|
+-----------+---------+
|          1|   599024|
|          2|   293612|
|          3|   489380|
|          4|  1061154|
|          5|  2792305|
+-----------+---------+



In [None]:
spark.sql('select star_rating, sum(tot_votes) as helpful_votes from df group by star_rating order by star_rating').show()

In [155]:
spark.sql('select star_rating, sum(vine) as vine from df group by star_rating order by star_rating').show()

+-----------+----+
|star_rating|vine|
+-----------+----+
|          1| 143|
|          2| 357|
|          3|1139|
|          4|2951|
|          5|3293|
+-----------+----+



In [2]:
def fill_na_mean(df, column):
    df=df.fillna(df.selectExpr('avg({0}) as mean'.format(column)).first().asDict()['mean'],subset=[column])

In [96]:
for column in ['review_body','age']:
    df.select(['review_id',column]).where('{0} is NULL'.format(column)).show()

+---------+-----------+
|review_id|review_body|
+---------+-----------+
+---------+-----------+

+---------+---+
|review_id|age|
+---------+---+
+---------+---+



In [145]:
df = df.withColumn('vine',convert_str_to_int(df, col='vine', type_='int'))

AssertionError: col should be Column

In [30]:
review_sample.createOrReplaceTempView("samples")
review_all.createOrReplaceTempView("all")
df.createOrReplaceTempView("df")

In [39]:
def get_correlation_target_col_length(target,col,table):
    spark.sql("select {0}, avg(length({1})) from {2} group by {0} order by {0}".format(target, col,table)).show()

In [39]:
def get_correlation_target_col(target,col,table):
    spark.sql("select {0}, avg(({1})) from {2} group by {0} order by {0}".format(target, col,table)).show()

In [41]:
import matplotlib.pyplot as plt

In [67]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+---+----------+-----+
|marketplace|customer_id|review_id|product_id|product_parent|product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body|review_date|review_length|review_headline_length|product_title_length|exclam|positive|age|helfulness|label|
+-----------+-----------+---------+----------+--------------+-------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+-----------+-----------+-------------+----------------------+--------------------+------+--------+---+----------+-----+
|          0|          0|        0|         0|             0|            0|               0|          0|            0|          0|   

In [57]:
for col in df.columns:
    print("{0}: {1} unique values".format(col,df.select(col).distinct().count()))

marketplace: 1 unique values
customer_id: 1116762 unique values
review_id: 1801974 unique values
product_id: 168675 unique values
product_parent: 153457 unique values
product_title: 154845 unique values
product_category: 2 unique values
star_rating: 6 unique values
helpful_votes: 878 unique values
total_votes: 896 unique values
vine: 3 unique values
verified_purchase: 3 unique values
review_headline: 1015326 unique values
review_body: 1696853 unique values
review_date: 5859 unique values
review_length: 9046 unique values
review_headline_length: 178 unique values
product_title_length: 401 unique values
exclam: 89 unique values
positive: 3 unique values
age: 5859 unique values
helfulness: 5079 unique values
label: 3 unique values


In [38]:
for target in ['star_rating','verified_purchase']:
    for col in ['review_body','review_headline','product_title']:
        get_correlation_target_col_length(target=target,col=col, table='df')

+-----------+------------------------+
|star_rating|avg(length(review_body))|
+-----------+------------------------+
|       null|                    null|
|          1|       431.2341881145638|
|          3|       523.1530527119506|
|          5|      363.87705009052956|
|          4|       522.3469643551947|
|          2|       530.5819313408546|
+-----------+------------------------+

+-----------+----------------------------+
|star_rating|avg(length(review_headline))|
+-----------+----------------------------+
|       null|                        null|
|          1|          27.045951679919135|
|          3|          28.525484235826383|
|          5|           22.91408348122623|
|          4|           25.77995247995248|
|          2|          28.798051655323313|
+-----------+----------------------------+

+-----------+--------------------------+
|star_rating|avg(length(product_title))|
+-----------+--------------------------+
|       null|                     673.5|
|          1| 

In [50]:
for col in df.columns:
    spark.sql('select count(*) from df where {0} is NULL'.format(col)).count()

In [20]:
# %load ../../utils/helpers.py
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, GBTClassifier, \
    RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics

from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize

import re


def get_kv_pairs(row, exclusions=[]):
    # get the text from the row entry
    text = str(row.review_body).lower()
    # create blacklist of words
    blacklist = set(stopwords.words('english'))
    # add explicit words
    [blacklist.add(i) for i in exclusions]
    # extract all words
    words = re.findall(r'([^\w+])', text)
    # for each word, send back a count of 1
    # send a list of lists
    return [[w, 1] for w in words if w not in blacklist]


def get_word_counts(texts, exclusions=[]):
    mapped_rdd = texts.rdd.flatMap(lambda row: get_kv_pairs(row, exclusions))
    counts_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda a: a[1])
    return counts_rdd.collect()


def convert_str_to_int(df, col='verified_purchase', type_='int'):
    return df.select((df[col] == 'Y').cast(type_))


def get_review_age(df):
    return df.select(datediff(current_date(), to_date(df['review_date'])))


def prepare_features(df):
    df = df.withColumn('review_length', length('review_body'))
    df = df.withColumn('exclam',  df['review_length'] - length(regexp_replace('review_body', '\!', '')))
    df = df.withColumn('age', datediff(current_date(), to_date(df['review_date'])))
    df = df.withColumn('helfulness', coalesce(df['helpful_votes'] / df['total_votes'],lit(0.0)))
    df = df.withColumn('label', expr("CAST(verified_purchase='Y' As INT)"))
    select_cols = df.select(['star_rating', 'helfulness', 'age', 'review_length', 'label'])#.na.fill(0)
    return select_cols


def split_data(df, rate=.9):
    training = df.sampleBy("label", fractions={0: rate, 1: rate}, seed=12)
    return training, df.subtract(training)


def get_auc_roc(classifier, training, test):
    model = classifier.fit(training)
    out = model.transform(test) \
        .select("prediction", "label") \
        .rdd.map(lambda x: (float(x[0]), float(x[1])))
    metrics = BinaryClassificationMetrics(out)
    print("Model: {1}. Area under ROC: {0:2f}".format(metrics.areaUnderROC, clf.__class__))
    return model, out, metrics


def get_vectorized_features(df, cols=['star_rating']):
    va = VectorAssembler().setInputCols(cols).setOutputCol(
        'features')
    return va.transform(df)

In [2]:
spark.sql("select customer_id, count(*) as counts from dfTable group by customer_id order by counts desc").show(5)

+-----------+------+
|customer_id|counts|
+-----------+------+
|   31588426|   285|
|   50820654|   191|
|   52764559|   171|
|   44777060|   148|
|   52340667|   146|
+-----------+------+
only showing top 5 rows



In [3]:
%%time
spark.sql("select star_rating, count(*) as counts from dfTable group by star_rating").show(5)

+-----------+-------+
|star_rating| counts|
+-----------+-------+
|       null|      2|
|          1| 170157|
|          3| 141460|
|          5|1062706|
|          4| 336700|
+-----------+-------+
only showing top 5 rows

CPU times: user 1.71 ms, sys: 3.69 ms, total: 5.4 ms
Wall time: 18.5 s


In [4]:
spark.sql("select product_id, count(*) as counts from dfTable group by product_id order by counts desc").show(5)

+----------+------+
|product_id|counts|
+----------+------+
|B006ZP8UOW|  4654|
|B00007E7JU|  4399|
|B0039BPG1A|  3619|
|B002VPE1WK|  3565|
|B0050R67U0|  3177|
+----------+------+
only showing top 5 rows



In [5]:
spark.sql("select product_id, count(*) as counts from dfTable group by product_id order by counts desc").show(5)

+----------+------+
|product_id|counts|
+----------+------+
|B006ZP8UOW|  4654|
|B00007E7JU|  4399|
|B0039BPG1A|  3619|
|B002VPE1WK|  3565|
|B0050R67U0|  3177|
+----------+------+
only showing top 5 rows



In [11]:
get_correlation_review_length('star_rating')

+-----------+------------------------+
|star_rating|avg(length(review_body))|
+-----------+------------------------+
|       null|                    null|
|          1|       431.2341881145638|
|          3|       523.1530527119506|
|          5|      363.87705009052956|
|          4|       522.3469643551947|
|          2|       530.5819313408546|
+-----------+------------------------+



In [14]:
get_correlation_review_length('verified_purchase')

+-----------------+------------------------+
|verified_purchase|avg(length(review_body))|
+-----------------+------------------------+
|             null|                    null|
|                Y|        345.015310178679|
|                N|       788.8134254592749|
+-----------------+------------------------+



In [None]:
for col in df.columns:
    get_correlation_review_length(col)

+-----------+------------------------+
|marketplace|avg(length(review_body))|
+-----------+------------------------+
|         US|       420.7652951133557|
+-----------+------------------------+

+-----------+------------------------+
|customer_id|avg(length(review_body))|
+-----------+------------------------+
|     109519|      105.66666666666667|
|   45046643|                   141.0|
|   45676353|      140.31034482758622|
|   49258595|                   977.0|
|   11081770|                   483.0|
|   51671883|                   685.6|
|   23678637|                   122.0|
|   14479333|                    21.0|
|   12318800|                    30.0|
|   41257827|                   160.0|
|   52809621|                     8.0|
|   31693763|                    44.0|
|   52942372|                  297.25|
|   10820140|                    76.0|
|   45253663|                  937.75|
|    3910358|                    44.0|
|   43009502|                  218.25|
|   52809622|           