# Goal

The goal of this notebook is to explore the cloud-based ETL process using the following tools:  
*  A large dataset is obtained from the Amazon stored in an AWS S3 bucket
*  Google Colab pySpark is used to extract and transform the data from the S3 bucket into dataframes.
*  The dataframes are loaded to AWS RDs, specifically PostgreSQL.

# ETL Steps

1.  Identify a dataset that would be interesting to analyze
1.  Info about the Amazon data is found at https://s3.amazonaws.com/amazon-reviews-pds/tsv/index.txt  
1.  Create a Jupyter notebook in Google Colab.  
1.  Using largely boilerplate code, setup the PySpark environment
1.  Load the S3 bucket data to PySpark
1.  Transform the data to be in the following schema structure:

```
-- Note:  product_id is a foreign key to the product.product_id table
--        review_date should be date formats in 'yyyy-mm-dd' format
CREATE TABLE review_id_table (
  review_id TEXT PRIMARY KEY NOT NULL,
  customer_id INTEGER,
  product_id TEXT,
  product_parent INTEGER,
  review_date DATE
);

-- Note:  product_id and product_title are unique
CREATE TABLE products (
  product_id TEXT PRIMARY KEY NOT NULL UNIQUE,
  product_title TEXT
);

-- Note:  table summarizing the number of customer purchases
CREATE TABLE customers (
  customer_id INT PRIMARY KEY NOT NULL UNIQUE,
  customer_count INT
);

-- Note:  table of review data
CREATE TABLE vine_table (
  review_id TEXT PRIMARY KEY,
  star_rating INTEGER,
  helpful_votes INTEGER,
  total_votes INTEGER,
  vine TEXT
);
```
1.  Create an AWS RDS PostgreSQL database that has a public endpoint but is protected with a username and password.
1.  Open pgAdmin and using the generated AWS RDS information, connect into the database.
1.  In pgAdmin, generate the database tables using the sql schema
1.  In Google Colab, modify the dataset to form tables that match the schema.   
1.  Load these dataframes to AWS RDS Postgres through PySpark using a JDBC connection.  
1.  Create queries in Google Colab or in pgAdmin or in a jupyter notebook using SQLAlchemy.

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# Environment Setup and Dependencies

In [None]:
# Import AWS RDS configuration
from google.colab import files
src = list(files.upload().values())[0]
open('config.py','wb').write(src)
from config import username, password, rds_url

Saving config.py to config (1).py


In [None]:
# Dependencies
import os

# set spark version
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark


# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 242 kB] [2 InRelease 14.2 kB/88.7 kB 16%] [Waiting for hea                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Ign:8 https://developer.download.nvidia.com/c

In [None]:
# postgres connection
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-08-01 06:59:49--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.3’


2022-08-01 06:59:50 (5.04 MB/s) - ‘postgresql-42.2.9.jar.3’ saved [914037/914037]



In [None]:
# setup pyspark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonETL")\
  .config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar")\
  .getOrCreate()

# Extract 
*  Create connectiont to S3 bucket and file

In [None]:
# add files to pyspark
from pyspark import SparkFiles

# Load file
# https://s3.amazonaws.com/amazon-reviews-pds/tsv/index.txt
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz" 
filename = "amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

# read file
df = spark.read.csv(SparkFiles.get(filename), header=True, inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12190288|R3FU16928EP5TC|B00AYB1482|     668895143|Enlightened: Seas...|Digital_Video_Dow...|          5|            0|          0|   N|                Y|I loved it and I ...|I loved it and I ...| 2015-08-31|
|         US|   30549954|R1IZHHS1MH3AQ4|B00KQD28OM|     246219280|             Vicious|Digital_Video_Dow

# Transform
*  Remove bad and duplicated records
*  Check number of records left
*  Convert column datatypes if needed
*  Create table data
*  Normalize data

In [None]:
# size of dataframe (rows)
print(df.count())


4057147


In [None]:
# drop incomplete records
df = df.dropna()
print(df.count())

4056518


In [None]:
# drop duplicated records (if any; should be none)
df = df.dropDuplicates()
print(df.count())

4056518


In [None]:
# check datatypes
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [None]:
# create table based on schema
review_id_table = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_table.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1005KN8L3OP23|   51950426|B00COTH9VI|     956367867| 2015-04-07|
|R1008R0427X1FG|   42507369|B009KHHELW|      41559476| 2014-05-28|
|R100AJRT6FE05K|    2458036|B0048ZXXIO|     814772102| 2014-06-27|
|R100AOYGH18ZXK|   23459444|B00GBDWZDU|     936264488| 2015-05-20|
|R100BC7LPZKRNN|   38247406|B007SPQZMC|     192466294| 2013-03-13|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [None]:
# convert review-date to date format
from pyspark.sql.functions import to_date, col
review_id_table = review_id_table.withColumn("review_date", to_date(col("review_date"),"yyyy-MM-dd").alias("review_date"))

In [None]:
# check change
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
# create table based on schema
products_table = df.select(["product_id", "product_title"])
products_table.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00COTH9VI|Seeking Asian Female|
|B009KHHELW|Duck Dynasty Seas...|
|B0048ZXXIO|Team Umizoomi Sea...|
|B00GBDWZDU|     Christmas Crush|
|B007SPQZMC|Downton Abbey Sea...|
+----------+--------------------+
only showing top 5 rows



In [None]:
# reference table
# entries should be unique
products_table = products_table.select('product_id', 'product_title').distinct()
products_table.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B006MYGL8S|   Deadwood Season 1|
|B005LLSZNM|Sons Of Anarchy S...|
|B00MQOFWLK|  Too Young The Hero|
|B00MQOXI8Y|   The Expendables 3|
|B009OQWQCQ|Absolutely Fabulo...|
+----------+--------------------+
only showing top 5 rows



In [None]:
# create table based on schema
vine_table = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_table.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1005KN8L3OP23|          5|            1|          1|   N|
|R1008R0427X1FG|          4|            0|          0|   N|
|R100AJRT6FE05K|          5|            0|          0|   N|
|R100AOYGH18ZXK|          3|            0|          0|   N|
|R100BC7LPZKRNN|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [None]:
# create table based on schema
customer_count = df.select('customer_id').groupby('customer_id').count()
customer_count.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   36771518|    5|
|   23006345|    5|
|    8899358|    1|
|   45518338|    7|
|   31452416|    1|
+-----------+-----+
only showing top 5 rows



In [None]:
# rename column
customer_count_table = customer_count.withColumnRenamed("count","customer_count")

In [None]:
# check changes
customer_count_table.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



# Load
*  Connect to AWS RDS
*  Insert dateframes into AWS PostgreSQL


In [None]:
# Configure settings for RDS
# use imported variables from config.py
mode = "append"
jdbc_url=f"jdbc:postgresql://{rds_url}"
config = {"user": username, 
          "password": password, 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to review_id_table in RDS

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write dataframe to products table in RDS

products_table.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write dataframe to payment_info table in RDS

customer_count_table.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write dataframe to vine_table in RDS

vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)