In [1]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
#Establish connection with  POSTGRES
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar


--2020-06-27 22:51:41--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-06-27 22:51:41 (3.61 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
# Create SparkSession using its connection with Postgres
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("amazonCloudETL_DS2").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

## EXTRACT

In [4]:
# Load in file from amazon aws url - OUTDOORS
from pyspark import SparkFiles

url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Outdoors_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

outdoors_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Outdoors_v1_00.tsv.gz"),header= True, sep="\t",inferSchema=True, timestampFormat="yyyy-mm-dd")

# Show DataFrame
outdoors_df.show(2, truncate=False)

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-------------------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                            |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body       |review_date        |
+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-------------------+
|US         |18446823   |R35T75OLUGHL5C|B000NV6H94|110804376     |Stearns Youth Boating Vest (50-90 lbs.)                  |Outdoors        |4          |0            |0          |N   |Y                |Four Stars     |GOOD VAL

**Number of records in the dataframe**

In [5]:
#Count the number of records (rows) in the dataset
print(outdoors_df.count())

2302401


In [8]:
# outdoors_df2 = outdoors_df.dropDuplicates()
# print(outdoors_df2.count())

#no hay duplicados

2302401


Examine the schema

In [6]:
outdoors_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



## TRANSFORM

In [7]:
#Remove NA values
outdoors_df2 = outdoors_df.dropna()
outdoors_df2.show(2, truncate=False)

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-------------------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                            |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body       |review_date        |
+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-------------------+
|US         |18446823   |R35T75OLUGHL5C|B000NV6H94|110804376     |Stearns Youth Boating Vest (50-90 lbs.)                  |Outdoors        |4          |0            |0          |N   |Y                |Four Stars     |GOOD VAL

In [8]:
#Count the number of records (rows) in the dataset (without NA)
print(outdoors_df2.count())

2302173


In [9]:
#Change review_date column timestamp format to date
from pyspark.sql.functions import col, to_date

outdoors_df3 = outdoors_df2.withColumn("review_date", to_date(col("review_date"),"yyyy-mm-dd"))
outdoors_df3.show(2, truncate=False)

+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-----------+
|marketplace|customer_id|review_id     |product_id|product_parent|product_title                                            |product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_headline|review_body       |review_date|
+-----------+-----------+--------------+----------+--------------+---------------------------------------------------------+----------------+-----------+-------------+-----------+----+-----------------+---------------+------------------+-----------+
|US         |18446823   |R35T75OLUGHL5C|B000NV6H94|110804376     |Stearns Youth Boating Vest (50-90 lbs.)                  |Outdoors        |4          |0            |0          |N   |Y                |Four Stars     |GOOD VALUE        |2015-01-31 |


Create "review_id_table"

In [10]:
#Create df to match "review_id_table"
reviewID_outdoorsDF = outdoors_df3.select(["review_id","customer_id","product_id","product_parent","review_date"])
reviewID_outdoorsDF.show(2, truncate=False)

+--------------+-----------+----------+--------------+-----------+
|review_id     |customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R35T75OLUGHL5C|18446823   |B000NV6H94|110804376     |2015-01-31 |
|R2BV735O46BN33|13724367   |B000IN0W3Y|624096774     |2015-01-31 |
+--------------+-----------+----------+--------------+-----------+
only showing top 2 rows



In [11]:
#Validate Schema matches the table established in postgres DB
reviewID_outdoorsDF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



Create "products" table

In [12]:
#Create df to match "products" table
products_outdoorsDF = outdoors_df3.select(["product_id","product_title"])
products_outdoorsDF.show(2, truncate=False)

+----------+---------------------------------------------------------+
|product_id|product_title                                            |
+----------+---------------------------------------------------------+
|B000NV6H94|Stearns Youth Boating Vest (50-90 lbs.)                  |
|B000IN0W3Y|Primal Wear Men's Pink Floyd Dark Side of The Moon Jersey|
+----------+---------------------------------------------------------+
only showing top 2 rows



In [13]:
#Drop duplicates to contain only unique values
products_outdoorsDF = products_outdoorsDF.dropDuplicates()
products_outdoorsDF.count()

391700

In [14]:
#Validate Schema matches the table established in postgres DB
products_outdoorsDF.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



Create "customers" table

In [15]:
#Create df to match "customers" table
#First, create "customer_count" column
customers_count = outdoors_df3.groupBy("customer_id").count() #creates a df with 2 columns, "customer_id" and "count"

customers_outdoorsDF = customers_count.withColumnRenamed("count","customer_count")
customers_outdoorsDF.show(2,truncate= False)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|43679767   |1             |
|32024654   |1             |
+-----------+--------------+
only showing top 2 rows



In [16]:
#Validate Schema matches the table established in postgres DB
customers_outdoorsDF.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: long (nullable = false)



In [17]:
#Format customer_count column. Change from Long to Integer.
from pyspark.sql.types import IntegerType

customers_outdoorsDF = customers_outdoorsDF.withColumn("customer_count",col("customer_count").cast(IntegerType()))

#Validate schema
customers_outdoorsDF.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



Create "vine" table

In [18]:
#Create df to match "vine" table
vine_outdoorsDF = outdoors_df3.select(["review_id","star_rating","helpful_votes","total_votes","vine"])
vine_outdoorsDF.show(5, truncate=False)

+--------------+-----------+-------------+-----------+----+
|review_id     |star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R35T75OLUGHL5C|4          |0            |0          |N   |
|R2BV735O46BN33|5          |0            |0          |N   |
|R2NBEUGPQQGXP1|4          |0            |0          |N   |
|R17LLAOJ8ITK0S|3          |1            |1          |N   |
|R39PEQBT5ISEF4|1          |0            |0          |N   |
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [19]:
#Validate Schema matches the table established in postgres DB
vine_outdoorsDF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



## LOAD

Postgres Setup

In [20]:
# Configure settings for RDS

mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cw6xrdxbjex8.us-east-2.rds.amazonaws.com:5432/bigdataHW_db" 

config = {"user":"root", "password":"basededatos", "driver":"org.postgresql.Driver"}

Write DataFrame to RDS

In [22]:
# Write DataFrame to review_id_table table in RDS

reviewID_outdoorsDF.write.jdbc(url=jdbc_url, table='review_id_table2', mode=mode, properties=config)

In [21]:
# Write DataFrame to products table in RDS

products_outdoorsDF.write.jdbc(url=jdbc_url, table='products2', mode=mode, properties=config)

In [23]:
# Write DataFrame to customers table in RDS

customers_outdoorsDF.write.jdbc(url=jdbc_url, table='customers2', mode=mode, properties=config)

In [24]:
# Write DataFrame to vine_table table in RDS

vine_outdoorsDF.write.jdbc(url=jdbc_url, table='vine_table2', mode=mode, properties=config)