## How to run?
- If facing an error of "folder already exists", delete the corresponding folder and rerun 

In [16]:
from pyspark.sql import SparkSession
import os
import math
import time
from pyspark.sql.functions import *



In [2]:
spark = SparkSession \
    .builder \
    .appName("Amazon Recommender System") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.master", "local[4]") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
data_path = 'processed'

In [11]:
from utils import preprocess_data
import logging

logger = logging.getLogger(__name__)

foldername = 'data'

logger.warning('Started to unzip raw data.')
try:
    preprocess_data.unzip_file(foldername)
except FileNotFoundError:
    logger.warning('No raw files exists.')

logger.warning('Started to process data.')
try:
    df_product_category, df_rating = preprocess_data.process_data(spark, foldername=foldername)
except FileNotFoundError:
    logger.warning('No data exists.')

# df_product_category.coalesce(1).write.csv(f"./processed/products")
# df_rating.coalesce(1).write.csv(f"./processed/ratings")

Started to unzip raw data.
Started to process data.


(160792, 5)


## Data Exploration
1. What are rating values?
2. What is the total number of rating users?
3. What is the total number of rated items?
4. What are item categories?
5. What are the numbers of rated items/rating users per category?
6. Count for each rating value

### 1. What are rating values?

In [12]:
df_rating.select('overall').distinct().collect()

[Row(overall=1.0),
 Row(overall=4.0),
 Row(overall=3.0),
 Row(overall=2.0),
 Row(overall=5.0)]

### 2. What is the total number of rating users?

In [13]:
df_rating.select('reviewerID').distinct().count()

13447

### 3. What is the total number of rated items?

In [14]:
df_rating.select('productID').distinct().count()

5260

## On average, number of products each user bought

In [31]:
# df_rating.select('reviewerID').distinct().count()/df_rating.select('productID').distinct().count()
# To save computation power, manually copy the values above and do the calculation
13447/5260

2.5564638783269964

# On average, number of users purchase a product

In [34]:
# compare with total number of users to illustrate the market potential and choices explosion
5260/13447

0.39116531568379564

### 4. What are item categories?

In [15]:
df_product_category.select('category').distinct().count()

1

### Number of products in each category


In [20]:
df_product_category.groupby('category').agg(countDistinct('productID')).show(20)

+--------+-------------------------+
|category|count(DISTINCT productID)|
+--------+-------------------------+
|    Baby|                     5260|
+--------+-------------------------+



### Number of reviews for each category

In [25]:
df_rating.join(df_product_category, 'productID') \
    .groupBy('category') \
    .count() \
    .show(20)

+--------+-------+
|category|  count|
+--------+-------+
|    Baby|8813005|
+--------+-------+



### Number of reviewers for each category

In [26]:
df_rating.join(df_product_category, 'productID') \
       .groupBy('category') \
       .agg(countDistinct('reviewerID')) \
       .show(20)

+--------+--------------------------+
|category|count(DISTINCT reviewerID)|
+--------+--------------------------+
|    Baby|                     13447|
+--------+--------------------------+



### 6. Count for each rating value

In [29]:
df_rating.groupBy('overall').count().show()

+-------+-----+
|overall|count|
+-------+-----+
|    1.0| 5723|
|    4.0|25649|
|    3.0|13301|
|    2.0| 6857|
|    5.0|71779|
+-------+-----+

