# Amazon Customer Review Analysis
## By. Andrew Simmons & Jingnan Jin

## Schema Reference
0. marketplace
1. customer_id
2. review_id
3. product_id
4. product_parent
5. product_title
6. product_category
7. star_rating - [1-5]
8. helpful_votes
9. total_votes
10. vine
11. verified_purchase
12. review_headline
13. review_body
14. review_date

In [None]:
from collections import defaultdict
import html
import math
from operator import itemgetter
from pathlib import Path
import re

import findspark
import matplotlib.pyplot as plt
import numpy as np

findspark.init()

# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
conf = SparkConf().setAppName("AmazonCustomerReviewAnalysis").setMaster("local[*]")

sc = SparkContext(conf=conf)

spark = SparkSession(sparkContext=sc)

In [None]:
"""Constants"""

PLOT_DIMENSIONS = (20, 10)

with open('useless_words.txt') as f:
    USELESS_WORDS = set(f.read().splitlines())

In [None]:
"""Process list of files to separate by category"""

data_dir = Path('sample_data')
data_files = [file for file in data_dir.glob('*.tsv.gz') if file.name.startswith('amazon_reviews_us_')]

data_categories = defaultdict(list)
for file in data_files:
    category_name = file.name[18:-13].replace('_', ' ')
    data_categories[category_name].append(file)

In [None]:
"""Create mapping of category names to unioned RDD"""

for key, value in data_categories.items():
    data_categories[key] = sc.union([sc.textFile(str(file)) for file in value])

In [None]:
"""Remove headers from data"""

for key, value in data_categories.items():
    data_categories[key] = value.filter(lambda x: not x.startswith('marketplace'))

In [None]:
"""Split TSV"""

for key, value in data_categories.items():
    data_categories[key] = value.map(lambda x: x.split('\t'))

In [None]:
"""Preprocess certain values"""

for key, value in data_categories.items():
    data_categories[key] = value.map(lambda x: (
        x[0],
        x[1],
        x[2],
        x[3],
        x[4],
        x[5],
        x[6],
        int(x[7]),
        int(x[8]),
        int(x[9]),
        x[10],
        x[11],
        x[12],  # TODO: Determine if headlines are HTML escaped like the bodies
        html.unescape(x[13]),  # TODO: Consider how to handle <br /> tags
        x[14]
    ))

## How many records exist in each category?

In [None]:
record_counts = []
for key, value in data_categories.items():
    record_counts.append((key, value.count()))

categories, counts = zip(*sorted(record_counts, key=itemgetter(1)))

fig, ax = plt.subplots()
fig.set_size_inches(PLOT_DIMENSIONS)

rects = ax.bar(categories, counts)

ax.set_title('Amazon Review Counts by Category')
ax.set_xlabel('Product Category')
ax.set_ylabel('Review Counts')

for tick in ax.get_xticklabels():
    tick.set_rotation(90)

print(f'Total number of reviews in dataset: {sum(counts)}')

## Overall, how satisfied are customers of each product category?

In [None]:
average_review_by_category = []
for key, value in data_categories.items():
    star_ratings = value.map(lambda x: int(x[7]))
    average_review_by_category.append((key, star_ratings.mean()))

categories, ratings = zip(*sorted(average_review_by_category, key=itemgetter(1)))

fig, ax = plt.subplots()
fig.set_size_inches(PLOT_DIMENSIONS)

rects = ax.bar(categories, ratings)

ax.set_title('Average Product Category Rating')
ax.set_xlabel('Product Category')
ax.set_ylabel('Average Rating')

for tick in ax.get_xticklabels():
    tick.set_rotation(90)

## How does the distribution of review scores change between product category?

In [None]:
review_distributions = {}
for key, value in data_categories.items():
    review_distributions[key] = sorted(value.map(lambda x: (x[7], 1)).reduceByKey(lambda x, y: x + y).collect(), key=itemgetter(0))

num_columns = 3

fig, ax = plt.subplots(nrows=math.ceil(len(review_distributions.keys()) / num_columns),
                       ncols=num_columns)
fig.set_size_inches((20, 100))
ax = ax.flatten()

for i, (key, value) in enumerate(review_distributions.items()):
    stars, counts = zip(*value)
    
    rects = ax[i].bar(stars, counts)
    
    # TODO: Calculate mean more efficiantly
    total = 0
    for j in range(len(stars)):
        total += stars[j] * counts[j]
    mean = total / sum(counts)
    
    # TODO: Include a median line
    mean_line = ax[i].axvline(mean, color='red', linestyle='--')
    ax[i].legend([mean_line], ['Mean Category Rating'])
    
    ax[i].set_title(f'Ratings Frequency Distribution for {key} category')
    ax[i].set_xlabel('Star Rating')
    ax[i].set_ylabel('Rating Count')

## What words are most used in each category at each rating for review headlines?

In [None]:
"""
category: {
    1: [n most common words],
    2: ...
},
...
"""

DEFAULT_REVIEWS = {
    'One Star',
    'Two Stars',
    'Three Stars',
    'Four Stars',
    'Five Stars',
}

def remove_default_headlines(headline):
    """Remove default headlines"""
    if headline in DEFAULT_REVIEWS:
        return False
    return True


def preprocess(headline):
    
    # Convert multiple spaces to a single space
    headline = re.sub(r'\s+', ' ', headline, flags=re.I)
    
    # Remove space at the end of a headline
    # TODO: Make this work correctly
    #headline = re.sub(r'\s+$', '', headline)
    
    # Remove punctuation that is found at the end of words
    headline = re.sub(r'[,.!?]', '', headline)
    
    # Remove apostrophies as to group together both spellings of a word
    headline = re.sub(r"'", '', headline)
    
    return headline.lower()


def remove_useless_words(headline_word):
    """Remove stop words and elipses"""

    if headline_word in USELESS_WORDS:
        return False
    return True


def remove_empty_words(headline_word):
    if len(headline_word) == 0:
        return False
    return True


def remove_censored_swear_words(headline_word):
    """Amazon appears to have censored swear words by keeping the first character
    and replacing the other characters with astrisks. These are not useful to us.
    """
    
    if re.match('^[a-z]\*+$', headline_word) is not None:
        return False
    return True


def remove_words_without_alphanumeric(headline_word):
    if re.match('[a-z0-9]', headline_word) is None:
        return False
    return True


def remove_numbers(headline_word):
    """Remove words that consist of nothing but numbers"""
    if re.match('^[0-9]+$', headline_word) is not None:
        return False
    return True


# TODO: Remove single letter words


POSSIBLE_RATINGS = [1, 2, 3, 4, 5]

resulting_word_frequencies = defaultdict(dict)
for key, value in data_categories.items():
    rating_words = {}
    for i in POSSIBLE_RATINGS:
        reviews = value.filter(lambda x: x[7] == i)
        headlines = reviews.map(lambda x: x[12]).filter(remove_default_headlines).map(preprocess)
        
        headline_words = headlines.flatMap(lambda x: x.split()).filter(remove_useless_words).filter(remove_empty_words).filter(remove_censored_swear_words).filter(remove_words_without_alphanumeric)
        word_frequencies = headline_words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).map(lambda x: x[0])
        
        resulting_word_frequencies[key][i] = word_frequencies.take(3)

# Output results
for category in resulting_word_frequencies.keys():
    print(f'{category}:')
    for rating in POSSIBLE_RATINGS:
        print(f'  {rating}')
        words = resulting_word_frequencies[category][rating]
        for word in words:
            print(f'    {word}')

## What words are most used in each category at each rating for review bodies?

In [None]:
resulting_word_frequencies = defaultdict(dict)
for key, value in data_categories.items():
    rating_words = {}
    for i in POSSIBLE_RATINGS:
        reviews = value.filter(lambda x: x[7] == i)
        headlines = reviews.map(lambda x: x[13]).map(preprocess)
        
        headline_words = headlines.flatMap(lambda x: x.split()).filter(remove_useless_words).filter(remove_empty_words).filter(remove_censored_swear_words).filter(remove_words_without_alphanumeric)
        word_frequencies = headline_words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).map(lambda x: x[0])
        
        resulting_word_frequencies[key][i] = word_frequencies.take(3)

# Output results
for category in resulting_word_frequencies.keys():
    print(f'{category}:')
    for rating in POSSIBLE_RATINGS:
        print(f'  {rating}')
        words = resulting_word_frequencies[category][rating]
        for word in words:
            print(f'    {word}')