In [1]:
#@title
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:12 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:14 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-09-20 05:27:24--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-09-20 05:27:25 (1.05 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Start Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Luggage_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Luggage_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   40884699| R9CO86UUJCAW5|B00VGTN02Y|     786681372|Teenage Mutant Ni...|         Luggage|          3|            0|          0|   N|                Y|my review of this...|my review of this...| 2015-08-31|
|         US|   23208852|R3PR8X6QGVJ8B1|B005KIWL0E|     618251799|Kenneth Cole Reac...|         Luggage|          5|    

In [6]:
# Count number of rows in dataframe
df.count()

348657

In [7]:
# Drop duplicate rows
cleaned_df = df.dropDuplicates()

In [8]:
# Count number of rows
cleaned_df.count()

348657

In [10]:
# Check column types
cleaned_df.dtypes


[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'string'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [38]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [39]:
# Next we need to create the list of struct fields
schema = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), True), StructField("review_id", StringType(), True), StructField("product_id", StringType(), True), StructField("product_parent", IntegerType(), True), StructField("product_title", StringType(), True), StructField("product_category", StringType(), True), StructField("star_rating", IntegerType(), True), StructField("helpful_votes", IntegerType(), True), StructField("total_votes", IntegerType(), True), StructField("vine", StringType(), True), StructField("verified_purchase", StringType(), True), StructField("review_headline", StringType(), True), StructField("review_body", StringType(), True), StructField("review_date", DateType(), True)]
schema


[StructField(marketplace,StringType,true),
 StructField(customer_id,IntegerType,true),
 StructField(review_id,StringType,true),
 StructField(product_id,StringType,true),
 StructField(product_parent,IntegerType,true),
 StructField(product_title,StringType,true),
 StructField(product_category,StringType,true),
 StructField(star_rating,IntegerType,true),
 StructField(helpful_votes,IntegerType,true),
 StructField(total_votes,IntegerType,true),
 StructField(vine,StringType,true),
 StructField(verified_purchase,StringType,true),
 StructField(review_headline,StringType,true),
 StructField(review_body,StringType,true),
 StructField(review_date,DateType,true)]

In [40]:
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(marketplace,StringType,true),StructField(customer_id,IntegerType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,true),StructField(product_parent,IntegerType,true),StructField(product_title,StringType,true),StructField(product_category,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline,StringType,true),StructField(review_body,StringType,true),StructField(review_date,DateType,true)))

In [41]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Luggage_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Luggage_v1_00.tsv.gz"), schema=final, sep="\t", header=True)

# Show DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   40884699| R9CO86UUJCAW5|B00VGTN02Y|     786681372|Teenage Mutant Ni...|         Luggage|          3|            0|          0|   N|                Y|my review of this...|my review of this...| 2015-08-31|
|         US|   23208852|R3PR8X6QGVJ8B1|B005KIWL0E|     618251799|Kenneth Cole Reac...|         Luggage|          5|    

In [42]:
# Drop duplicate rows
cleaned_df = df.dropDuplicates()

In [43]:
# Count number of rows
cleaned_df.count()

348657

In [44]:
# Create review dataframe to match review_id_table
review_df = cleaned_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R11IBSD5E6HPSD|   20761040|B002B3FWXY|     677901073| 2015-08-31|
|R16F6MZAVR1IYL|   29442013|B000YB7T04|     553070376| 2015-08-31|
|R30889FXLP48F3|   29849475|B00F9KE912|     918753137| 2015-08-30|
|R1DW06GLTNCVWQ|    1988134|B001CDG7Q0|     460451174| 2015-08-29|
| RDC197RTSTZJY|    1302634|B0019GAOO6|      53787504| 2015-08-29|
|R3D44CQ92OA3BR|   26953574|B00YGDX2XE|     613329783| 2015-08-29|
|R115NO9WEEUBWD|   38639848|B00KH5FB5W|     684605802| 2015-08-28|
|R16QUOK50Y94JU|    8004704|B00JKJQJHA|     860629696| 2015-08-28|
| R1P81BC6VL7PZ|     121815|B00WJJVXQU|     990303407| 2015-08-28|
| RZZLHN9MWIAEJ|   48578265|B006GN4ERU|     333298218| 2015-08-28|
|R1A75HAI9Q04WV|    1211291|B00Z9NV81I|      63474760| 2015-08-27|
|R1PCHIM41UIET7|   32335114|B001TQTWGW|     293242998| 2015-08

In [45]:
# Create products dataframe to match products table
products_df = cleaned_df.select(["product_id", "product_title"])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B002B3FWXY|Travelon Anti-The...|
|B000YB7T04|totes Signature C...|
|B00F9KE912|Samsonite Wheeled...|
|B001CDG7Q0|17 Inch Gray Whee...|
|B0019GAOO6| Everest Hiking Pack|
|B00YGDX2XE|MEKU PU leather P...|
|B00KH5FB5W|Ever Moda Designe...|
|B00JKJQJHA|Dot&Dot Travel Ac...|
|B00WJJVXQU|17" Red & Black M...|
|B006GN4ERU|Olympia Deluxe Fa...|
|B00Z9NV81I|SENPAIC Canvas La...|
|B001TQTWGW|Travelon Anti-The...|
|B009AZ69V0|Traveler's Choice...|
|B00FF4AAS8|HOVEOX Stylish Me...|
|B000BT2FK8|Royce Leather Lux...|
|B00DRJQZ0K|Reinforced Design...|
|B011VZE7DS|Monster Trucks Ba...|
|B00LZXPJB6|Canvas 15" Laptop...|
|B00FUWSU3W|Bago Travel Duffe...|
|B00LJUROJ0|Travelpro Infligh...|
+----------+--------------------+
only showing top 20 rows



In [46]:
# Drop duplicates to keep only unique values
products_df = products_df.dropDuplicates()

In [47]:
# Create vine dataframe to match vine_table
vine_df = cleaned_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R11IBSD5E6HPSD|          1|           29|         31|   N|
|R16F6MZAVR1IYL|          4|            0|          0|   N|
|R30889FXLP48F3|          5|            1|          1|   N|
|R1DW06GLTNCVWQ|          5|            0|          0|   N|
| RDC197RTSTZJY|          4|            0|          0|   N|
|R3D44CQ92OA3BR|          5|            0|          0|   N|
|R115NO9WEEUBWD|          4|            1|          1|   N|
|R16QUOK50Y94JU|          5|            0|          0|   N|
| R1P81BC6VL7PZ|          4|            0|          0|   N|
| RZZLHN9MWIAEJ|          5|            0|          0|   N|
|R1A75HAI9Q04WV|          1|            8|          9|   N|
|R1PCHIM41UIET7|          3|            0|          0|   N|
|R2S1PWOTIMYQ29|          5|            0|          1|   N|
|R3MFI5JTNYZCFG|          5|            

In [48]:
# Create Customer count dataframe to match customers table
import pyspark.sql.functions as f

customer_count_df = cleaned_df.groupBy('customer_id').count().select('customer_id', f.col('count').alias('customer_count')).show()



+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   47535771|             3|
|   13112909|             1|
|    4792279|             1|
|   32206659|             3|
|   19937450|             1|
|   39921618|             1|
|   40235161|             1|
|   16038309|             1|
|   11480681|             1|
|   32298977|             1|
|   29078265|             1|
|   18869492|             4|
|   11361633|             2|
|   11150260|             1|
|   51160127|             1|
|   24122316|             1|
|    1980278|             1|
|   43742600|             2|
|   46072848|             1|
|   14571094|             1|
+-----------+--------------+
only showing top 20 rows



In [30]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://reviews-challenge.cos0wnwxlodh.us-east-2.rds.amazonaws.com:5432/reviews_db"
config = {"user":"postgres", 
          "password": "XXXXXXXX", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to customers table in RDS
customer_count_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [50]:
# Write DataFrame to review_id_table in RDS
review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [52]:
# Write DataFrame to vine table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)