# Amazon Reviews

written in Google Colab using Amazon S3, Amazon RDS and PySpark

by Nicole Lund
1. Extract review data from S3
2. Transform the data to match the provided schema file
3. Load the data to RDS

### Selected Data Source

EBooks
* https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv.gz
* https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz

### Required database schema

In [None]:
# CREATE TABLE review_id_table (
#   review_id TEXT PRIMARY KEY NOT NULL,
#   customer_id INTEGER,
#   product_id TEXT,
#   product_parent INTEGER,
#   review_date DATE -- this should be in the formate yyyy-mm-dd
# );

# -- This table will contain only unique values
# CREATE TABLE products (
#   product_id TEXT PRIMARY KEY NOT NULL UNIQUE,
#   product_title TEXT
# );

# -- Customer table for first data set
# CREATE TABLE customers (
#   customer_id INT PRIMARY KEY NOT NULL UNIQUE,
#   customer_count INT
# );

# -- vine table
# CREATE TABLE vine_table (
#   review_id TEXT PRIMARY KEY,
#   star_rating INTEGER,
#   helpful_votes INTEGER,
#   total_votes INTEGER,
#   vine TEXT
# );


### Initialize Notebook Settings

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!pip install pyspark

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 59 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=587abd9a734997fec611014e4f0a03d56446d1f2c60807b5de95db4a3b449e2f
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2021-08-27 20:13:11--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2021-08-27 20:13:11 (5.97 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Import dependencies
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.types import StructField, StringType, IntegerType, DateType, StructType
from pyspark.sql.functions import col

In [4]:
# Initialize Session
spark = SparkSession.builder.appName("reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

### Setup RDS Access

In [5]:
# Collect RDS Credentials securely
from getpass import getpass
print('Input RDS URL in the jdbc:postgresql://<Endpoint>:5432/<DB identifier> format:')
jdbc_url = getpass()
print('Input RDS Password:')
password = getpass()

Input RDS URL in the jdbc:postgresql://<Endpoint>:5432/<DB identifier> format:
··········
Input RDS Password:
··········


In [6]:
# Configure RDS settings
mode = "append"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

### Download and merge source data into one DataFrame

In [7]:
# Define Function: Read in data from S3 Buckets

def download(url):
  # Define struct fields that match the RDS schema
  schema_def = [ \
                StructField("marketplace", StringType(), True), \
                StructField("customer_id", IntegerType(), True), \
                StructField("review_id", StringType(), True), \
                StructField("product_id", StringType(), True), \
                StructField("product_parent", IntegerType(), True), \
                StructField("product_title", StringType(), True), \
                StructField("product_category", StringType(), True), \
                StructField("star_rating", IntegerType(), True), \
                StructField("helpful_votes", IntegerType(), True), \
                StructField("total_votes", IntegerType(), True), \
                StructField("vine", StringType(), True), \
                StructField("verified_purchase", StringType(), True), \
                StructField("review_headline", StringType(), True), \
                StructField("review_body", StringType(), True), \
                StructField("review_date", DateType(), True) \
              ]
  schema_struct = StructType(fields=schema_def)  

  # Retrieve file name  
  file = url[48:]
  print(f'Retrieving {file}')
  
  # Download file
  spark.sparkContext.addFile(url)
  reviews_df = spark.read.csv(SparkFiles.get(file), schema=schema_struct, sep="\t", header=True, inferSchema=True)

  # Display file info
  print(f'Number of records: {reviews_df.count()}')
  print(reviews_df.show())
  
  # Return DataFrame
  return reviews_df

In [8]:
# Download Data File 00
url_0 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz"
reviews_0_df = download(url_0)

Retrieving amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz
Number of records: 12520722
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   35471030|R1C5OK9AND7PRP|B00AHK07V0|     175130663|Hunter's Moon (A ...|Digital_Ebook_Pur...|          5|            0|          0|   N|                Y|Shugak is like a ...|This is the most ...| 2015-08-31|
|         U

In [9]:
# Download Data File 01
url_1 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv.gz"
reviews_1_df = download(url_1)

Retrieving amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv.gz
Number of records: 5101693
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   33605939| RGYFDX8QXKEIR|B007KO2MLO|     328837464|           Big Maria|Digital_Ebook_Pur...|          4|            0|          0|   N|                N|              Quirky|Elmore Leonard me...| 2013-09-09|
|         US

In [10]:
# Append the review DataFrames together
reviews_df = reviews_0_df.union(reviews_1_df)
print(f'Total number of records: {reviews_df.count()}')

Total number of records: 17622415


In [11]:
# View schema
reviews_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



### Look for null values

There were not enough null values to warrant removal transfer to the RDS database again.

In [13]:
print(f'marketplace count = {reviews_df.filter(reviews_df.marketplace.isNotNull()).count()}')

marketplace count = 17622415


In [14]:
print(f'marketplace count = {reviews_df.filter(reviews_df.marketplace.isNotNull()).count()}')
print(f'customer_id count = {reviews_df.filter(reviews_df.customer_id.isNotNull()).count()}')
print(f'review_id count = {reviews_df.filter(reviews_df.review_id.isNotNull()).count()}')
print(f'product_id count = {reviews_df.filter(reviews_df.product_id.isNotNull()).count()}')
print(f'product_parent count = {reviews_df.filter(reviews_df.product_parent.isNotNull()).count()}')
print(f'product_title count = {reviews_df.filter(reviews_df.product_title.isNotNull()).count()}')
print(f'product_category count = {reviews_df.filter(reviews_df.product_category.isNotNull()).count()}')
print(f'helpful_votes count = {reviews_df.filter(reviews_df.helpful_votes.isNotNull()).count()}')
print(f'total_votes count = {reviews_df.filter(reviews_df.total_votes.isNotNull()).count()}')
print(f'vine count = {reviews_df.filter(reviews_df.vine.isNotNull()).count()}')
print(f'verified_purchase count = {reviews_df.filter(reviews_df.verified_purchase.isNotNull()).count()}')
print(f'review_headline count = {reviews_df.filter(reviews_df.review_headline.isNotNull()).count()}')
print(f'review_body count = {reviews_df.filter(reviews_df.review_body.isNotNull()).count()}')
print(f'review_date count = {reviews_df.filter(reviews_df.review_date.isNotNull()).count()}')

marketplace count = 17622415
customer_id count = 17622415
review_id count = 17622415
product_id count = 17622415
product_parent count = 17622415
product_title count = 17622415
product_category count = 17622389
helpful_votes count = 17622381
total_votes count = 17622381
vine count = 17622381
verified_purchase count = 17622381
review_headline count = 17622370
review_body count = 17620384
review_date count = 17620261


### Create and write review_id_table to RDS database
It took 2h 12min to upload 17622415 records.

------------

select count(review_id)

  from review_id_table

In [None]:
# Create review_id_table DataFrame

review_id_table = reviews_df['review_id', 'customer_id', 'product_id', 'product_parent', 'review_date']
print(f'Total number of records: {review_id_table.count()}')
review_id_table.show()

Total number of records: 17622415
+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1C5OK9AND7PRP|   35471030|B00AHK07V0|     175130663| 2015-08-31|
|R333RNBQMPVUFT|   26579324|B014085WTQ|     859232728| 2015-08-31|
|R2A2K0GTNSKXM1|    9152364|B0113MFPN0|     411472516| 2015-08-31|
|R365LAQ9REV876|   52087605|B00T8RIK7G|     353828021| 2015-08-31|
|R294OF3SNH6SWZ|   46454660|B00EMXBDMA|     803172158| 2015-08-31|
|R2OMRH58WYEEP9|   33168429|B00GL3OJIY|     325772386| 2015-08-31|
| RBFXYUJZIH5JR|   18546798|B00RW30QE8|     236508296| 2015-08-31|
|R2K2RSBNV42HU4|   47463409|B0101AE85Q|     924272105| 2015-08-31|
|R2CLRFFJ5HJSU3|   11767467|B00IO7QAI2|     393059633| 2015-08-31|
| RS1O7V45AADDO|   14241457|B00PM995TG|     529331494| 2015-08-31|
| RYD2AF3KUC4OL|   52821351|B00K4C4GVI|     310020786| 2015-08-31|
| RHQTCPUF1CXY2|   14416837|

In [None]:
# Write review_id_table to database

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

### Create and write products to RDS database
This took 15min to upload 1387212 records.

------------

select count(product_id)

  from products


In [None]:
# Create products DataFrame

products = reviews_df['product_id', 'product_title']
products = products.dropDuplicates()
print(f'Total number of records: {products.count()}')
products.show()

Total number of records: 1387212
+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00BGQ6M0E|Annihilation (Sta...|
|B00ZPWCOUW|Soldiers of Pearl...|
|B00AFXSTM8|Stardust of Yeste...|
|B00ZCYULJY|Cowboy Down Under...|
|B00YKBCE1S|Liquid (Wyvern Se...|
|B003NX7O00|Storm Prey (The P...|
|B012EN6BBM|Claiming His Cowg...|
|B00JCFF090|My Big Round Red ...|
|B00GIZYMVY|Scandal With a Pr...|
|B0145PWTF4|   Biblia Longcrofta|
|B00OQRL4QW|The Change (A Nov...|
|B005LOPNWK|A Good and Perfec...|
|B012QOVWXQ|Twisted Bitch (Si...|
|B00EYFNTIW|          Delusional|
|B00O6V1VH0|Ever After: A Nan...|
|B004H1TLQ2|The Atomic Times:...|
|B007YCJZM6|Eden (The Provide...|
|B00CRWLL7U|CLEP History of t...|
|B00HKZ330W|Does the Atom Hav...|
|B007HB81YC|Hidden (Firelight...|
+----------+--------------------+
only showing top 20 rows



In [None]:
# Write products to database

products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

### Create and write customers to RDS database
This took 8min to upload 5300209 records.

------------

select count(customers)

  from customers


In [None]:
# Create customers DataFrame with customer_count as INT

customer_count = reviews_df.groupBy('customer_id').count()
customer_count = customer_count.withColumn('customer_count',col('count').cast(IntegerType()))
customers = customer_count['customer_id', 'customer_count']
print(f'Total number of records: {customers.count()}')
print(customers.printSchema())
customers.show()

Total number of records: 5300209
root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)

None
+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   41341751|            19|
|   14582882|             1|
|   15158447|             7|
|    3070853|            16|
|   26248773|           518|
|   50058978|             1|
|   16667845|             4|
|   14217455|             1|
|   14754839|             7|
|   35311351|             7|
|   40182046|             1|
|   47574496|             1|
|   43133249|            10|
|   44269342|           317|
|   20688276|             1|
|   12819130|             4|
|    1154193|            21|
|   52427235|             6|
|   15948906|             1|
|   22996409|            13|
+-----------+--------------+
only showing top 20 rows



In [None]:
# Write customers to database

customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

### Create and write vine_table to RDS database
This took 25min to upload 17622415 records.

------------

select count(vine_table)

  from vine_table


In [None]:
# Create vine_table DataFrame

vine_table = reviews_df['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine']
print(f'Total number of records: {vine_table.count()}')
vine_table.show()

Total number of records: 17622415
+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1C5OK9AND7PRP|          5|            0|          0|   N|
|R333RNBQMPVUFT|          5|            0|          0|   N|
|R2A2K0GTNSKXM1|          5|            0|          0|   N|
|R365LAQ9REV876|          3|            0|          0|   N|
|R294OF3SNH6SWZ|          4|            0|          0|   N|
|R2OMRH58WYEEP9|          5|            0|          0|   N|
| RBFXYUJZIH5JR|          5|            0|          0|   N|
|R2K2RSBNV42HU4|          5|            0|          0|   N|
|R2CLRFFJ5HJSU3|          4|            0|          0|   N|
| RS1O7V45AADDO|          4|            0|          0|   N|
| RYD2AF3KUC4OL|          5|            2|          2|   N|
| RHQTCPUF1CXY2|          4|            0|          0|   N|
|R3371UTDP6I1FU|          5|            0|          0|   N|
|R2RI2

In [None]:
# Write vine_table to database

vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)