In [1]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [2 InRelease 3,626 B/3,626 B 1000% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [3 InRelease 14.2 kB/88.7 kB 16%] [Waiting for headers0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 14.2 kB/88.7 k

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETLProject").getOrCreate()

In [40]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame 
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz"), 
                                             inferSchema=True, sep='\t', timestampFormat="yyyy-MM-dd")
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|  product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|     179886| RY01SAV7HZ8QO|B00NTI0CQ2|     667358431|Aketek 1080P LED ...|Home Entertainment|          4|            0|          0|   N|                Y|good enough for m...|not the best pict...|2015-08-31 00:00:00|
|         US|   37293769|R1XX8SDGJ4MZ4L|B00BUCLVZU|     621695622|TiVo Mini with

## How many rows are in the dataset?

In [41]:
# Count the number of entries in the dataset
print(f"There are {df.count()} rows in the dataset")

There are 705889 rows in the dataset


## Transform DataFrame to fit review_id table

In [42]:
review_df = df.select(["review_id", "customer_id","product_id", "product_parent", "review_date"])
review_df.show()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
| RY01SAV7HZ8QO|     179886|B00NTI0CQ2|     667358431|2015-08-31 00:00:00|
|R1XX8SDGJ4MZ4L|   37293769|B00BUCLVZU|     621695622|2015-08-31 00:00:00|
|R149Q3B5L33NN5|    8332121|B00RBX9D5W|     143071132|2015-08-31 00:00:00|
|R2ZVD69Z6KPJ4O|   47054962|B00UJ3IULO|     567816707|2015-08-31 00:00:00|
|R1DIKG2G33ZLNP|   23413911|B0037UCTXG|     909557698|2015-08-31 00:00:00|
|R3L6FGKAW0EYFI|    4417771|B004N866SU|     414565179|2015-08-31 00:00:00|
| RAO0QZH5VC6VI|   47900707|B00JE6AOJS|     798450889|2015-08-31 00:00:00|
|R25IK0UAHWNB22|   34112894|B00COL0B7A|     777554234|2015-08-31 00:00:00|
|R2A9IHKZMTMAL1|   20691979|B00QHLSKOE|     885228855|2015-08-31 00:00:00|
| R5XVKTHL6SITI|   25983343|B00UNL2MUW|     164482798|2015-08-31 00:00:00|
|R2QZZOSTDDY1IE|   358160

In [43]:
# Show the schema to confirm the column type
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [44]:
# Drop the hh:mm:ss from the timestamp

# Import date time functions
from pyspark.sql.functions import date_format

review_df = review_df.withColumn("review_date", date_format(review_df['review_date'], 'yyyy-MM-dd'))
review_df.show(10)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RY01SAV7HZ8QO|     179886|B00NTI0CQ2|     667358431| 2015-08-31|
|R1XX8SDGJ4MZ4L|   37293769|B00BUCLVZU|     621695622| 2015-08-31|
|R149Q3B5L33NN5|    8332121|B00RBX9D5W|     143071132| 2015-08-31|
|R2ZVD69Z6KPJ4O|   47054962|B00UJ3IULO|     567816707| 2015-08-31|
|R1DIKG2G33ZLNP|   23413911|B0037UCTXG|     909557698| 2015-08-31|
|R3L6FGKAW0EYFI|    4417771|B004N866SU|     414565179| 2015-08-31|
| RAO0QZH5VC6VI|   47900707|B00JE6AOJS|     798450889| 2015-08-31|
|R25IK0UAHWNB22|   34112894|B00COL0B7A|     777554234| 2015-08-31|
|R2A9IHKZMTMAL1|   20691979|B00QHLSKOE|     885228855| 2015-08-31|
| R5XVKTHL6SITI|   25983343|B00UNL2MUW|     164482798| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [45]:
# Drop any null values
review_df.dropna()

DataFrame[review_id: string, customer_id: int, product_id: string, product_parent: int, review_date: string]

In [46]:
# Check to see if the number of rows matches the dataset
print(f"There are {review_df.count()} rows in the data frame")

There are 705889 rows in the data frame


## Transform Dataframe to match products table

In [47]:
products_df = df.select(["product_id", "product_title"])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00NTI0CQ2|Aketek 1080P LED ...|
|B00BUCLVZU|TiVo Mini with IR...|
|B00RBX9D5W|Apple TV MD199LL/...|
|B00UJ3IULO|New Roku 3 6.5 Fo...|
|B0037UCTXG|Generic DVI-I Dua...|
|B004N866SU|Samsung 3D LED HD...|
|B00JE6AOJS|Jiake Wireless Bl...|
|B00COL0B7A|3pcs/lot 3in1 3ft...|
|B00QHLSKOE|Matricom G-Box Q²...|
|B00UNL2MUW|VIZIO S2920W-C0B ...|
|B00RIC9JB4|Hitachi 49" Class...|
|B00HPMCO6O|Sony BDPS5200 3D ...|
|B004QGXWS6|Sylvania 7-Inch T...|
|B00FO12XY6|Roku HD Streaming...|
|B00BD7UVO4|LG Electronics BP...|
|B00TRQPEYK|LG Electronics LF...|
|B005STXQG8|SquareTrade TV Pr...|
|B00BEL11RA|Cambridge - Azur ...|
|B00QHLSKOE|Matricom G-Box Q²...|
|B00MWCJ8VQ|Roku 3500XB Strea...|
+----------+--------------------+
only showing top 20 rows



In [48]:
# Check the columns to match the schema
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [49]:
# Check to see if the number of rows matches the dataset
print(f"There are {products_df.count()} rows in the data frame")

There are 705889 rows in the data frame


## Transform the dataframe to match the customers table

In [50]:
customers_df = df.select(["customer_id"])
customers_df.show()

+-----------+
|customer_id|
+-----------+
|     179886|
|   37293769|
|    8332121|
|   47054962|
|   23413911|
|    4417771|
|   47900707|
|   34112894|
|   20691979|
|   25983343|
|   35816068|
|   10628020|
|    9059625|
|    2681147|
|   33449922|
|   43069144|
|   46780686|
|   49037595|
|   27868511|
|    3004043|
+-----------+
only showing top 20 rows



In [51]:
customers_df = customers_df.groupBy('customer_id').agg({"customer_id": "count"})
customers_df = customers_df.withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   10142992|             1|
|   16457323|             6|
|   11935383|             1|
|   46277736|             1|
|   13671072|             1|
|   21453814|             1|
|   17684885|             1|
|   20415768|             1|
|   15212710|             1|
|    5220924|             1|
|   46253451|             6|
|     971908|             1|
|   32829933|             1|
|   51221518|             1|
|   12002637|             2|
|   16105308|             1|
|     135867|             1|
|   47425808|             1|
|   43138273|             1|
|   16411995|             1|
+-----------+--------------+
only showing top 20 rows



## Transform dataset to match the vine table

In [52]:
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RY01SAV7HZ8QO|          4|            0|          0|   N|
|R1XX8SDGJ4MZ4L|          5|            0|          0|   N|
|R149Q3B5L33NN5|          5|            0|          0|   N|
|R2ZVD69Z6KPJ4O|          1|            0|          2|   N|
|R1DIKG2G33ZLNP|          4|            0|          0|   N|
|R3L6FGKAW0EYFI|          1|            1|          1|   N|
| RAO0QZH5VC6VI|          1|            0|          0|   N|
|R25IK0UAHWNB22|          3|            0|          0|   N|
|R2A9IHKZMTMAL1|          5|            1|          2|   N|
| R5XVKTHL6SITI|          5|            0|          0|   N|
|R2QZZOSTDDY1IE|          3|            3|          6|   N|
|R38CUDCFPSNYTD|          5|            0|          0|   N|
| RM6ZR6NH052YH|          3|            1|          2|   N|
| RUQK5N4WH8UN8|          5|            

In [53]:
# Check to see if the number of rows matches the dataset
print(f"There are {vine_df.count()} rows in the data frame")

There are 705889 rows in the data frame


## Setup the RDS

In [38]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.ckkgxbdiidvr.us-east-2.rds.amazonaws.com:5432/big_data_hw_db"
config = {"user":"root", 
          "password": "", 
          "driver":"org.postgresql.Driver"}

In [39]:
# Write DataFrame to review_id table in RDS

review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

Py4JJavaError: ignored

In [None]:
# Write DataFrame to products table in RDS

products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
# Write DataFrame to customers table in RDS

customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write DataFrame to vine table in RDS

vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)