In [3]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.6.tgz
!tar xf spark-2.4.6-bin-hadoop2.6.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.6"

# Start a SparkSession
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProjectAnalysis").getOrCreate()

In [47]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Jewelry_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

jewelryDF = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Jewelry_v1_00.tsv.gz"), header=True, sep='\t')
jewelryDF.dropna()
jewelryDF.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   50423057|R135Q3VZ4DQN5N|B00JWXFDMG|     657335467|Everbling Purple ...|         Jewelry|          5|            0|          0|   N|                Y|           Beauties!|so beautiful even...| 2015-08-31|
|         US|   11262325|R2N0QQ6R4T7YRY|B00W5T1H9W|      26030170|925 Sterling Silv...|         Jewelry|          5|    

In [48]:
jewelryDF.count()

1767753

## Transform DataFrame to fit review_id_table

In [49]:
reviewsDF = jewelryDF.select(["review_id","customer_id", "product_id", "product_parent", "review_date"])
reviewsDF = reviewsDF.dropDuplicates(["review_id"])
reviewsDF.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1008UUSRYBCN8|   33389214|B005X09V3E|     312020021| 2012-07-27|
|R1027MP2LE49YY|   24505016|B008NBFSBE|     418135320| 2015-07-13|
|R103QQQLKO0AC6|    6395747|B00E4RC4G8|     983412781| 2014-11-02|
|R104HO49LQSQYQ|   11426522|B0033B2JTI|     971475887| 2011-11-09|
|R105HT3D9UGK5U|   36133138|B00510XWKO|     258061028| 2012-12-01|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [50]:
#check if schema matches what we need
reviewsDF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- review_date: string (nullable = true)



In [51]:
#we need to convert data types to match SQL schemas
from pyspark.sql.types import * 

reviewsDF = reviewsDF.withColumn("customer_id",reviewsDF["customer_id"].cast(IntegerType()))\
    .withColumn("product_parent",reviewsDF["product_parent"].cast(IntegerType()))\
    .withColumn("review_date",reviewsDF["review_date"].cast(DateType()))

reviewsDF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



Transform DataFrame to fit products table

In [52]:
productsDF = jewelryDF.select(["product_id", "product_title"])
productsDF = productsDF.dropDuplicates(["product_id"])
productsDF.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|1438809484|Twilight New Moon...|
|B00006C7GQ|Coral and Turquoi...|
|B00006JMZ8|Delicate Gem Neck...|
|B00007G9BS|14K Cultured (Ako...|
|B00009EOU4|14K White Gold Di...|
+----------+--------------------+
only showing top 5 rows



In [53]:
productsDF.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



Transform DataFrame to fit customers table

In [54]:
customersDF = jewelryDF.select(["customer_id"])
#create count column per ID
customersDF = customersDF.groupBy("customer_id").count() 
#Select new column and rename to customer_count.
customersDF = customersDF.select(["customer_id", "count"]).withColumnRenamed("count", "customer_count")
customersDF.show(5)
#ensure the right types for our schemas
customersDF = customersDF.withColumn("customer_id",customersDF["customer_id"].cast(IntegerType())) \
                         .withColumn("customer_count",customersDF["customer_count"].cast(IntegerType()))
customersDF.printSchema()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   22279960|             1|
|   51290010|             1|
|    1354679|             1|
|   44662810|             3|
|    7763633|             1|
+-----------+--------------+
only showing top 5 rows

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



Transform DataFrame to fit vine table

In [55]:
#create new dataframe with necesssary columns based on schema
vineDF = jewelryDF.select(["review_id","star_rating", "helpful_votes", "total_votes", "vine"])
vineDF = vineDF.dropDuplicates(["review_id"])
#convert to schema data types
vineDF = vineDF.withColumn("star_rating",vineDF["star_rating"].cast(IntegerType()))\
               .withColumn("helpful_votes",vineDF["helpful_votes"].cast(IntegerType()))\
               .withColumn("total_votes",vineDF["total_votes"].cast(IntegerType()))
vineDF.show(5)
vineDF.printSchema()


+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R1008UUSRYBCN8|          5|            1|          1|   N|
|R1027MP2LE49YY|          1|            1|          1|   N|
|R103QQQLKO0AC6|          5|            1|          1|   N|
|R104HO49LQSQYQ|          5|            0|          0|   N|
|R105HT3D9UGK5U|          3|            7|          7|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



Write Dataframes to RDS POSTGRE

In [75]:
# Configuration for RDS instance
mode="append"
jdbc_url = "jdbc:postgresql://ratingsdb.clw8hn9uuxrr.us-west-1.rds.amazonaws.com:5432/jeweleryDB"
config = {"user":"postgres",
          "password": "",
          "driver":"org.postgresql.Driver"}

In [76]:
reviewsDF.write.jdbc(url=jdbc_url, table="review_id_table", mode=mode, properties=config)
customersDF.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)
productsDF.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)
vineDF.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)

Py4JJavaError: ignored