In [None]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/comp

In [None]:
# Download a Postgres diver that will allow Spark to interact with Postres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-08-08 20:46:39--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2020-08-08 20:46:39 (3.62 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [None]:
# Start a Spark session with an additional option that adds the driver to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets - Amazon Reviews for Toys
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
toy_reviews_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Toys_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

# Show DataFrame
# toy_reviews_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [None]:
# review number of rows within DataFrame - pre transform
toy_reviews_df.count()

4864249

In [None]:
# Drop null values
dropna_toy_df = toy_reviews_df.dropna()
# dropna_toy_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [None]:
# Drop duplicate rows
drop_duplicates_toy_df = dropna_toy_df.dropDuplicates()
# drop_duplicates_toy_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|      10085| RZXZZV7REJMVC|B002YPE1MM|      36616636|BMW Plush Bear Cr...|            Toys|          5|            2|          2|   N|                Y|THE BEST BEAR EVE...|Recently bought t...| 2013-07-23|
|         US|      11628|R1JP55UFEGJN8X|B0017H07IU|     146857225|      Royal Cribbage|            Toys|          5|    

In [None]:
# review number of rows within DataFrame - post transform
drop_duplicates_toy_df.count()

4863497

In [None]:
# review data types
drop_duplicates_toy_df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [None]:
# Load in a sql function to use columns
from pyspark.sql.functions import col, to_date

In [None]:
# identified incorrect data type - convert review_date to date (currently string) - test code
# updated_datatypes = drop_duplicates_toy_df.select(to_date("review_date","yyyy-MM-dd").alias('review_date'))
# updated_datatypes.dtypes

[('review_date', 'date')]

In [None]:
# Create DataFrame to match review_id_table
review_id_df = drop_duplicates_toy_df.select(["review_id","customer_id", "product_id", "product_parent", to_date("review_date","yyyy-MM-dd").alias('review_date')])
print(review_id_df.dtypes)
# review_id_df.show()

[('review_id', 'string'), ('customer_id', 'int'), ('product_id', 'string'), ('product_parent', 'int'), ('review_date', 'date')]
+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RZXZZV7REJMVC|      10085|B002YPE1MM|      36616636| 2013-07-23|
|R1JP55UFEGJN8X|      11628|B0017H07IU|     146857225| 2013-06-13|
|R3A7MKMD5EZ4W3|      12937|B00MVV114A|     413343234| 2015-06-07|
|R1JRDP9VJFXFZ6|      13330|B0058LT9O4|     284768315| 2015-07-03|
| RPCFLJHJWWQIT|      14578|B00ERU9VPY|     516199598| 2015-02-22|
|R2I6G40JFJ1SPC|      15335|B00GK5XNJO|     739198872| 2015-01-20|
|R2M4JW7A54AUH2|      15520|B00F8QDGGG|     268342368| 2015-02-13|
| RYSQ3ODNTQQQX|      16638|B009VN3644|     868043491| 2014-01-11|
| RHIBXPP5SD2T5|      19107|B001539ICW|      49163138| 2015-01-05|
|R35UHK0TM7SUHM|      22926|B00B7T6MDC|      94225835| 2014-02-08|
|

In [None]:
# Create DataFrame to match customers table
customer = drop_duplicates_toy_df.groupBy("customer_id").count() 
customer_df = customer.withColumnRenamed("count","customer_count")
# customer_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    1342030|             2|
|    2239429|             2|
|    2300450|             1|
|    2776943|             2|
|    3339856|             1|
|    4080618|             1|
|    5343647|             4|
|    5894286|             1|
|    7455053|            22|
|   10458870|             2|
|   10548283|            46|
|   10854449|             9|
|   11114447|             1|
|   11118822|             1|
|   11205154|             1|
|   11583167|             3|
|   11769593|             1|
|   11853634|             3|
|   12152570|            11|
|   12611201|             3|
+-----------+--------------+
only showing top 20 rows



In [None]:
# Create DataFrame to match products table
products = drop_duplicates_toy_df.select(["product_id", "product_title"])
products_df = products.dropDuplicates(['product_id'])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0545449359|Klutz Fashion For...|
|0615394124|     füdoo Board Kit|
|0615881386|The Revised Crisw...|
|0735328757|Mudpuppy Butterfl...|
|0764955063|The Square of St....|
|0890100039|Kalmbach Building...|
|0954840526|World Prehistoric...|
|0966257545|Ladybug Game NEW ...|
|0972428232| Conversations to Go|
|0980345553|Art of Conversati...|
|0982757751|World of Harmony ...|
|0988179024|All Quiet on the ...|
|1409582485|Listen and Learn ...|
|1554841615|     Cat Smarts Game|
|1556345542|Munchkin 2 - Unna...|
|1567677037|Hot Dots Let’s Le...|
|1572158700|Cat in the Hat Gi...|
|1579823645|MerryMakers Pete ...|
|1585640514|Star Fleet Battle...|
|1589941411|     War of the Ring|
+----------+--------------------+
only showing top 20 rows



In [None]:
# Create DataFrame to match vine_table
vine_df = drop_duplicates_toy_df.select(["review_id","star_rating", "helpful_votes", "total_votes", "vine"])
# vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RZXZZV7REJMVC|          5|            2|          2|   N|
|R1JP55UFEGJN8X|          5|            0|          0|   N|
|R3A7MKMD5EZ4W3|          5|            0|          0|   N|
|R1JRDP9VJFXFZ6|          5|            0|          0|   N|
| RPCFLJHJWWQIT|          5|            0|          0|   N|
|R2I6G40JFJ1SPC|          5|            1|          1|   N|
|R2M4JW7A54AUH2|          5|            0|          0|   N|
| RYSQ3ODNTQQQX|          5|            0|          0|   N|
| RHIBXPP5SD2T5|          5|            0|          0|   N|
|R35UHK0TM7SUHM|          5|            3|          3|   N|
|R3F42L9GZNBHMZ|          3|            0|          0|   N|
|R32VHLWMLD8I3A|          5|            0|          0|   N|
| REHNMXXAXIZ9D|          5|            0|          0|   N|
|R1S0M0AMAWLKGL|          5|            

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://module-16-challenge.c2n1k7qoxf6t.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "-->password<--", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to review_id table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write DataFrame to customers table in RDS
customer_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write DataFrame to vine_table table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)