In [None]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
#Install PostgreSQL in Notebook
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-10-24 16:29:28--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-10-24 16:29:28 (5.00 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
#Create a Spark Session to Allow ETL Operations on Amazon Dataset(s)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Amazon-ETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles

url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
#SparkFiles.getRootDirectory()
wireless_data_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep=",", header=True, inferSchema=True)

# Show DataFrame
wireless_data_df.show()

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|marketplace	customer_id	review_id	product_id	product_parent	product_title	product_category	star_rating	helpful_votes	total_votes	vine	verified_purchase	review_headline	review_body	review_date|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                           US	16414143	R3W4P...|
|                                                                                                                                                                           US	50800750	R15V5...|
|                             

In [None]:
#Count the Rows
wireless_data_df.count()

9002021

In [None]:
#Import Struct Fields
from pyspark.sql.types import StructField, StringType, IntegerType, DateType, StructType

In [None]:
# Next we need to create the list of struct fields
review_id_schema = [StructField("review_id", StringType(), False), StructField("customer_id", IntegerType(), True), StructField("product_id", StringType(), True), StructField("review_date", DateType(), True)]
products_schema = [StructField("product_id", StringType(), False), StructField("product_title", StringType(), True)]
customers_schema = [StructField("customer_id", IntegerType(), False), StructField("customer_count", IntegerType(), True)]
vine_schema = [StructField("review_id", StringType(), False), StructField("star_rating", IntegerType(), True), StructField("helpful_votes", IntegerType(), True), StructField("total_votes", IntegerType(), True), StructField("vine", StringType(), True)]
final_schema = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), True), StructField("review_id", StringType(), True), StructField("product_id", StringType(), False), 
                StructField("product_parent", StringType(), True), StructField("product_title", StringType(), True), StructField("product_category", StringType(), True), 
                StructField("star_rating", IntegerType(), True), StructField("helpful_votes", IntegerType(), True), StructField("total_votes", IntegerType(), True), 
                StructField("vine", StringType(), True), StructField("verified_purchase", StringType(), True), StructField("review_headline review_body", StringType(), True),
                StructField("review_date", DateType(), True)]
print(review_id_schema)
print(products_schema)
print(customers_schema)
print(vine_schema)
print(final_schema)

[StructField(review_id,StringType,false), StructField(customer_id,IntegerType,true), StructField(product_id,StringType,true), StructField(review_date,DateType,true)]
[StructField(product_id,StringType,false), StructField(product_title,StringType,true)]
[StructField(customer_id,IntegerType,false), StructField(customer_count,IntegerType,true)]
[StructField(review_id,StringType,false), StructField(star_rating,IntegerType,true), StructField(helpful_votes,IntegerType,true), StructField(total_votes,IntegerType,true), StructField(vine,StringType,true)]
[StructField(marketplace,StringType,true), StructField(customer_id,IntegerType,true), StructField(review_id,StringType,true), StructField(product_id,StringType,false), StructField(product_parent,StringType,true), StructField(product_title,StringType,true), StructField(product_category,StringType,true), StructField(star_rating,IntegerType,true), StructField(helpful_votes,IntegerType,true), StructField(total_votes,IntegerType,true), StructField(v

In [None]:
# Pass in our fields
final = StructType(fields=final_schema)
final

StructType(List(StructField(marketplace,StringType,true),StructField(customer_id,IntegerType,true),StructField(review_id,StringType,true),StructField(product_id,StringType,false),StructField(product_parent,StringType,true),StructField(product_title,StringType,true),StructField(product_category,StringType,true),StructField(star_rating,IntegerType,true),StructField(helpful_votes,IntegerType,true),StructField(total_votes,IntegerType,true),StructField(vine,StringType,true),StructField(verified_purchase,StringType,true),StructField(review_headline review_body,StringType,true),StructField(review_date,DateType,true)))

In [None]:
#Import SQL DataFrame
from pyspark.sql import DataFrame as D

In [None]:
#Return New DataFrame with final_schema
wireless_data_dataframe = spark.read.csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep=",", header=True, schema=final)
wireless_data_dataframe

DataFrame[marketplace: string, customer_id: int, review_id: string, product_id: string, product_parent: string, product_title: string, product_category: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline review_body: string, review_date: date]

In [None]:
#Transform the Dataset to Fit the Tables in the Schema
review_id_table = wireless_data_dataframe.select('review_id', 'customer_id', 'product_id', 'product_parent', 'review_date')
products_table = wireless_data_dataframe.select('product_id', 'product_title')
customers_table = wireless_data_dataframe.select('customer_id')
vine_table = wireless_data_dataframe.select('review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine')
review_id_table.show()
products_table.show()
customers_table.show()
vine_table.show()

+--------------------+-----------+--------------------+--------------------+-----------+
|           review_id|customer_id|          product_id|      product_parent|review_date|
+--------------------+-----------+--------------------+--------------------+-----------+
|  my battery drai...|       null| which kind of ma...|  the wireless ch...|       null|
| they’re growing ...|       null| the pairing via ...| though it’s prob...|       null|
|                null|       null|                null|                null|       null|
| Travel Charger f...|       null|       iPhone 6 plus|            iPhone 5|       null|
|                null|       null|                null|                null|       null|
| better customer ...|       null|                null|                null|       null|
|                null|       null|                null|                null|       null|
|                null|       null|                null|                null|       null|
|                null

In [None]:
#Include Applicable JDBC Driver
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

In [None]:
#Import SparkContext
from pyspark import SparkContext

In [None]:
#Prepare JDBC URL
postgres_url="jdbc:postgresql://amazon-etl.c3wziqys8iwb.us-east-2.rds.amazonaws.com:5432/amazon-etl"

#List the Config
config= {
    "driver": "org.postgresql.Driver",
    "user": "postgres",
    "password": "Question1967"
}

In [None]:
#Load DataFrames to RDS Instances
review_id_table.write.jdbc(url=postgres_url, table='review_id', mode='append', properties=config)

Py4JJavaError: ignored

In [None]:
products_table.write.jdbc(url=postgres_url, table='products', mode='append', properties=config)

In [None]:
customers_table.write.jdbc(url=postgres_url, table='customers', mode='append', properties=config)

In [None]:
vine_table.write.jdbc(url=postgres_url, table='vine', mode='append', properties=config)