In [None]:
# ETL of Amazon Reviews Dataset 1 and 2

# Amazon Reviews Dataset - ETL (Extract, Transform and Load)
Author: Rosie Gianan
#### Dataset source URL:
https://s3.amazonaws.com/amazon-reviews-pds/tsv/index.txt
    
#### Dataset being processed for this Extract, Transform and Load (ETL):
https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz

https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz
#### DATA COLUMNS:
-	marketplace       - 2 letter country code of the marketplace where the review was written.
-	customer_id       - Random identifier that can be used to aggregate reviews written by a single author.
-	review_id         - The unique ID of the review.
-	product_id        - The unique Product ID the review pertains to. In the multilingual dataset the reviews
-	for the same product in different countries can be grouped by the same product_id.
-	product_parent    - Random identifier that can be used to aggregate reviews for the same product.
-	product_title     - Title of the product.
-	product_category  - Broad product category that can be used to group reviews 
-	(also used to group the dataset into coherent parts).
-	star_rating       - The 1-5 star rating of the review.
-	helpful_votes     - Number of helpful votes.
-	total_votes       - Number of total votes the review received.
-	vine              - Review was written as part of the Vine program.
-	verified_purchase - The review is on a verified purchase.
-	review_headline   - The title of the review.
-	review_body       - The review text.
-	review_date       - The date the review was written.

#### DATA FORMAT
-	Tab ('\t') separated text file, without quote or escape characters.
-	First line in each file is header; 1 line corresponds to 1 record.

In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
# spark_version = 'spark-3.<enter version>'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connected to cloud.                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 37.3 kB/88.7 kB 42%] [Connected to cloud.                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:5 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:9 https

## Postgres connection

In [2]:
# Postgres connection
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-19 20:08:01--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-19 20:08:02 (4.69 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



## Spark session

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

## Extract the data from AWS S3

In [4]:
# Read in data from S3 Buckets - dataset 1
from pyspark import SparkFiles
url1 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url1)
df_spark_1 = spark.read.csv(SparkFiles.get("amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
df_spark_1.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16199106|R203HPW78Z7N4K|B0067WNSZY|     633038551|FGGF3032MW Galler...|Major Appliances|          5|            0|          0|   N|                Y|If you need a new...|What a great stov...| 2015-08-31|
|         US|   16374060|R2EAIGVLEALSP3|B002QSXK60|     811766671|Best Hand Clothes...|Major Appliances|          5|    

In [5]:
# Count the number of records(rows) in the dataset
df_spark_1.count()

96901

In [7]:
# Read in data from S3 Buckets - dataset 2
url2 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url2)
df_spark_2 = spark.read.csv(SparkFiles.get("amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
df_spark_2.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32114233|R1QX6706ZWJ1P5|B00OYRW4UE|     223980852|Elite Sportz Exer...|Personal_Care_App...|          5|            0|          0|   N|                Y|Good quality. Shi...|Exactly as descri...| 2015-08-31|
|         US|   18125776|R3QWMLJHIW6P37|B0000537JQ|     819771537|     Ezy Dose Weekly|Personal_Care_App

In [8]:
df_spark_2.count()

85981

In [9]:
# Combined the two df's
df_spark_combined = df_spark_1.union(df_spark_2);
df_spark_combined.count()

182882

## Transform the data

In [10]:
# Remove duplicate rows
df = df_spark_combined.dropDuplicates()
df.count()

182882

In [11]:
# Print the column names and types
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'string'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [12]:
# Convert string to INT
from pyspark.sql.types import IntegerType
df = df.withColumn("customer_id", df["customer_id"].cast(IntegerType()))
df = df.withColumn("product_parent", df["product_parent"].cast(IntegerType()))
df = df.withColumn("star_rating", df["star_rating"].cast(IntegerType()))
df = df.withColumn("helpful_votes", df["helpful_votes"].cast(IntegerType()))
df = df.withColumn("total_votes", df["total_votes"].cast(IntegerType()))
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

#### Create the df for review_id_table 


In [13]:
# Create the df for review_id_table 
from pyspark.sql.functions import to_date

df_review_id_table = df.select([ "review_id"
                               , "customer_id"
                               , "product_id"
                               , "product_parent"
                               , to_date("review_date", "yyy-MM-dd").alias("review_date")
                               ]).dropDuplicates(["review_id"])
df_review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R10027C8D4WSAD|   37716509|B00H9L7VIW|     851045898| 2015-04-27|
|R1002K20VAEH6A|   18964280|B005AJ5DZI|     869059654| 2014-10-20|
|R1003Z32HIMKN4|   47288664|B000674526|     275657479| 2005-11-23|
| R1006XCDPYEY4|   48666875|B000UWBRVY|     992690540| 2008-09-21|
|R1007KFAT5G8Z6|   42272940|B0041J411K|     830124956| 2013-11-22|
|R1008O4I8A9XOY|   45934613|B00006IUV6|     594838938| 2010-11-08|
|R100D8F1TL0AK5|   40753271|B002UYYPAK|     339522627| 2011-05-04|
|R100JK7MQMJWSJ|   12392650|B003DA62VA|     380393589| 2014-04-02|
|R100MVRIXP8E9K|   50315571|B00022WCAM|      92480803| 2005-01-05|
|R100O6OJPGXOHD|    9998433|B000FGTH8K|     115747874| 2010-02-06|
|R100Q5LKP65TJE|   44772354|B0016S5TU6|     316290081| 2013-12-03|
|R100Q80KCRMJQR|   11704580|B002B51358|     574196257| 2015-08

In [14]:
df_review_id_table.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [15]:
df_review_id_table.count()

182882

#### Create the df for products table

In [16]:
# Create the df for products table
df_products = df.select([ "product_id"
                        , "product_title"
                        ]).dropDuplicates(["product_id"]) 
df_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0545058236|Kenmore 2-Stage D...|
|0970408285|NONE XMEL706M-8P ...|
|097459363X|FitBALL All Round...|
|1574998021|Nannini SOS Readi...|
|3979000532|Pewter Snake Chai...|
|3979002632|CRYSTAL CLEAR GRE...|
|3979002829|Real Olive Wood B...|
|3979004813|Kabbalah Glass Bl...|
|7391000442|Holy Land Jerusal...|
|7391001252|Dark Copper Mezuz...|
|8742248299|Blessed By Pope B...|
|9502738756|Blessed By Pope B...|
|9895502079|GOLDFILLED NECKLA...|
|989550702X|Star of David Mag...|
|9895507372|44cm Red Crystal ...|
|9895514069|5 in one Holy Wat...|
|9895514271|20cm/8" Kabbalah ...|
|9895515308|Pink Flexible Bra...|
|9895517939|SKI MASK WINTER H...|
|B00004SACT|Sanyo Two-Door 2....|
+----------+--------------------+
only showing top 20 rows



In [17]:
df_products.dtypes

[('product_id', 'string'), ('product_title', 'string')]

In [18]:
df_products.count()

29256

#### Create the df for customers table

In [19]:
# Create the df for customers table
df_count_customer = df.groupBy("customer_id").count()
df_customers = df_count_customer.select([ "customer_id"
                                        , "count"
                                        ]).dropDuplicates(["customer_id"]) 
df_customers = df_customers.withColumnRenamed("count", "customer_count")
df_customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   17533812|             1|
|   49076716|             1|
|   50142216|             1|
|   13947800|             1|
|   45910590|             1|
|   14171713|             1|
|   22043983|             1|
|   51186475|             2|
|   16771645|             1|
|   32366239|             2|
|    4781106|             1|
|   26478376|             1|
|   32953356|             1|
|   17494546|             1|
|   35178197|             1|
|    8968933|             1|
|   12402644|             1|
|   32537843|             1|
|   34589154|             1|
|   40313241|             1|
+-----------+--------------+
only showing top 20 rows



In [20]:
df_customers.dtypes

[('customer_id', 'int'), ('customer_count', 'bigint')]

In [26]:
df_customers.count()

172925

### Create the df for vine_table

In [21]:
# Create the df for vine_table
df_vine_table = df.select([ "review_id"
                          , "star_rating"
                          , "helpful_votes"
                          , "total_votes"
                          , "vine"
                          ]).dropDuplicates(["review_id"])
df_vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R10027C8D4WSAD|          5|            3|          4|   N|
|R1002K20VAEH6A|          4|            0|          0|   N|
|R1003Z32HIMKN4|          4|            8|          8|   N|
| R1006XCDPYEY4|          1|           18|         18|   N|
|R1007KFAT5G8Z6|          1|            1|          1|   N|
|R1008O4I8A9XOY|          2|            1|          1|   N|
|R100D8F1TL0AK5|          1|           11|         11|   N|
|R100JK7MQMJWSJ|          5|            0|          0|   N|
|R100MVRIXP8E9K|          4|           16|         16|   N|
|R100O6OJPGXOHD|          5|            0|          0|   N|
|R100Q5LKP65TJE|          5|            0|          0|   N|
|R100Q80KCRMJQR|          3|            0|          0|   N|
|R100Q85C4FVZ09|          5|            0|          0|   N|
|R100SFP9LPO26V|          5|            

In [22]:
df_vine_table.dtypes

[('review_id', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string')]

In [23]:
df_vine_table.count()

182882

## Load the data

In [24]:
# Configure settings for RDS
# /content/config.py
# from content/config.py import username, password, host 
# Note: Create the config.py with username, password, host information
from config import username, password, host 

database_name = "AWS_RDS_postgres_db_1"
port = 5432
mode = "append"
jdbc_url=f'jdbc:postgresql://{host}:{port}/{database_name}'
config = {"user":f'{username}',
          "password": f'{password}',
          "driver":"org.postgresql.Driver"}

In [26]:
# Write DataFrame to review_id_table table in RDS
try:
  df_review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
except e:
  print(e)  

In [27]:
# Retrieve the number of rows from review_id_table
get_df_review_id_table = spark.read.jdbc(url=jdbc_url, table='review_id_table', properties=config)
get_df_review_id_table.count()

182882

In [28]:
# Write DataFrame to products table in RDS
try:
  df_products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)
except e:
  print(e) 

In [29]:
# Retrieve the number of rows from products table
get_df_products = spark.read.jdbc(url=jdbc_url, table='products', properties=config)
get_df_products.count()

29256

In [30]:
# Write DataFrame to customers table in RDS
try:
  df_customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)
except e:
  print(e) 

In [31]:
# Retrieve the number of rows from customers table
get_df_customers = spark.read.jdbc(url=jdbc_url, table='customers', properties=config)
get_df_customers.count()

172925

In [32]:
# Write DataFrame to vine_table table in RDS
try:
  df_vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)
except e:
  print(e) 

In [33]:
# Retrieve the number of rows from vine_table
get_df_vine_table = spark.read.jdbc(url=jdbc_url, table='vine_table', properties=config)
get_df_vine_table.count()

182882