In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!pip install pyspark

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=1e97a029f6af77dededd6f303101ac800f2c8422c0d623ee5d3f4d6a77e67b59
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Succ

In [3]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-10-16 19:19:52--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2020-10-16 19:19:54 (1.02 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# Extract

In [10]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
watches_df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="yyyy-mm-dd")

# Show DataFrame
watches_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-01-31 00:08:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

# Count

In [11]:
# Count the total rows
watches_df.count()


960872

In [13]:
# Drop null values and recount the rows
watches_df = watches_df.dropna()
watches_df.count()

960679

# Schema

In [15]:
# Print schema to confirm the correct data types
watches_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



## Transform

In [22]:
# Create review dataframe to match "review_id_table" table
watch_review_id_df = watches_df.select(["review_id","customer_id","product_id", "product_parent","review_date"])
watch_review_id_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R3O9SGZBVQBV76|    3653882|B00FALQ1ZC|     937001370|2015-01-31 00:08:00|
| RKH8BNC3L5DLF|   14661224|B00D3RGO20|     484010722|2015-01-31 00:08:00|
|R2HLE8WKZSU3NL|   27324930|B00DKYC7TK|     361166390|2015-01-31 00:08:00|
|R31U3UH5AZ42LL|    7211452|B000EQS1JW|     958035625|2015-01-31 00:08:00|
|R2SV659OUJ945Y|   12733322|B00A6GFD7S|     765328221|2015-01-31 00:08:00|
| RA51CP8TR5A2L|    6576411|B00EYSOSE8|     230493695|2015-01-31 00:08:00|
| RB2Q7DLDN6TH6|   11811565|B00WM0QA3M|     549298279|2015-01-31 00:08:00|
|R2RHFJV0UYBK3Y|   49401598|B00A4EYBR0|     844009113|2015-01-31 00:08:00|
|R2Z6JOQ94LFHEP|   45925069|B00MAMPGGE|     263720892|2015-01-31 00:08:00|
| RX27XIIWY5JPB|   44751341|B004LBPB7Q|     124278407|2015-01-31 00:08:00|
|R15C7QEZT0LGZN|    99623

In [21]:
# Create product dataframe to match "products" table
watch_products_df = watches_df.select(["product_id","product_title"])
watch_products_df = watch_products_df.dropDuplicates()
watch_products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00EVX7V1I|Game Time Women's...|
|B009S4DODY|XOXO Women's XO55...|
|B00LBKXQRW|Anne Klein Women'...|
|B0009P679Y|Invicta Men's 993...|
|B00DHF30RU|M&c Women's | Cla...|
|B00NIDA43Y|GuTe Classic Skel...|
|B008EQDDPQ|Nautica Men's N13...|
|B004VRBZ66|Timex Men's T2N63...|
|B009BEO81I|        Fossil Riley|
|B008B39MTI|XOXO Women's XO55...|
|B00TGPM8PU|Handmade Wooden W...|
|B00VNXQQQ0|Eterna 2520-41-64...|
|B00B1PV1C4|Nautica Men's N19...|
|B00N1Y8TQ4|Tissot Men's T095...|
|B00G6DBTY6|red line Men's RL...|
|B00HM04AYI|Columbia Men's Fi...|
|B00VI8HB96|GUESS I90176L1 Wo...|
|B00IT25WJU|LanTac DGN556B Dr...|
|B0106S12XE|Skmei S Shock Ana...|
|B00FPSJ63Y|Michael Kors Ladi...|
+----------+--------------------+
only showing top 20 rows



In [26]:
# Create customers dataframe to match "customers" table
from pyspark.sql.functions import desc


watch_customers_df = watches_df.groupby("customer_id").agg({"customer_id":"count"})
watch_customers_df = watch_customers_df.orderBy(desc("count(customer_id)"))
watch_customers_df = watch_customers_df.withColumnRenamed("count(customer_id)", "customer_count")
watch_customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   42418272|           407|
|   40765068|           127|
|   19619134|           121|
|   52460875|           111|
|    5072809|           106|
|   18691646|            92|
|   14726863|            88|
|   13355404|            87|
|   42416004|            86|
|   45112699|            75|
|   44191290|            70|
|   22962882|            66|
|   47769852|            65|
|   23137720|            64|
|    5956842|            59|
|   21375608|            59|
|   25160623|            59|
|    1389173|            58|
|    8887467|            58|
|   24254971|            58|
+-----------+--------------+
only showing top 20 rows



In [27]:
# Create vine dataframe to match "vine_table" table
from pyspark.sql.functions import col

watch_vine_table_df = watches_df.select(["review_id","star_rating","helpful_votes","total_votes","vine"])
watch_vine_table_df = watch_vine_table_df.filter(col("vine")  == "Y")
watch_vine_table_df.show()


+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| R6EMI5RKW73N3|          4|            1|          2|   Y|
|R3GDHEAHV44OSP|          4|            0|          0|   Y|
| RJ1N4X4CNQIJ9|          4|            0|          0|   Y|
| R2IZD1134PGIP|          4|            1|          2|   Y|
|R1RVYFAX6ZO2R9|          4|            0|          1|   Y|
|R39TDJY3D0M9OQ|          4|            0|          0|   Y|
| RF9H3S6G96WFD|          5|            0|          0|   Y|
|R1THQWC9BZAINY|          5|            0|          0|   Y|
|R2T6NBVGF447BZ|          4|            0|          0|   Y|
|R2IVDAK7IYLXJS|          4|            1|          1|   Y|
| RCUJSPL51V519|          4|            1|          1|   Y|
| R3ZNTDTGSY8SC|          4|            1|          1|   Y|
|R2DC9TNKKI8EXN|          5|            1|          1|   Y|
|R2GX9CYB2H258C|          5|            

## **Load into RDS (Postgres)**

In [28]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://bigdatach3act3db.cw1hhyrwelau.us-east-2.rds.amazonaws.com:5432/amazonwatchreviews"
config = {"user":"postgres", 
          "password": "postgres", 
          "driver":"org.postgresql.Driver"}

In [29]:
# Write DataFrame to review_id_df table in RDS

watch_review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [30]:
# Write DataFrame to products table in RDS

watch_products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [31]:
# Write DataFrame to customers table in RDS

watch_customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [32]:
# Write DataFrame to vine_table table in RDS

watch_vine_table_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)