In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.3'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()



0% [Working]            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub0% [1 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)]                                                                               Hit:2 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [1 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)]                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [1 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)]0% [1 InRelease gpgv 15.9 kB] [Connecting to archive.ubuntu.com (91.189.91.39)]                                                                               Hit:4 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:5 http://ppa.launchpad.net

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-09-03 02:28:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-09-03 02:28:29 (1.66 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("kitchen_ETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [5]:
# Import Amazon Data
from pyspark import SparkFiles
url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Home_Improvement_v1_00.tsv.gz'
spark.sparkContext.addFile(url)

In [35]:
homeimp_df = spark.read.csv(SparkFiles.get('amazon_reviews_us_Home_Improvement_v1_00.tsv.gz'), sep='\t', header=True)

In [36]:
homeimp_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48881148|R215C9BDXTDQOW|B00FR4YQYK|     381800308|SadoTech Model C ...|Home Improvement|          4|            0|          0|   N|                Y|          Four Stars|        good product| 2015-08-31|
|         US|   47882936|R1DTPUV1J57YHA|B00439MYYE|     921341748|iSpring T32M 3.2 ...|Home Improvement|          5|    

In [37]:
# Count the number of records (rows) in the dataset.
homeimp_df.count()

2634781

### Function to set nullable states to match schema

In [58]:
import pyspark.sql.functions as F

# https://stackoverflow.com/questions/46072411/can-i-change-the-nullability-of-a-column-in-my-spark-dataframe



def set_df_columns_nullable(spark, df, column_list, nullable=True):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

In [None]:
# Transform the dataset to fit the tables in the schema file. Be sure the DataFrames match in data type and in column name.

# Schema: 
# CREATE TABLE review_id_table (
#   review_id TEXT PRIMARY KEY NOT NULL,
#   customer_id INTEGER,
#   product_id TEXT,
#   product_parent INTEGER,
#   review_date DATE -- this should be in the formate yyyy-mm-dd
# );



In [38]:
# Check datatypes
homeimp_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [49]:
# Convert columns in root dataframe to schema datatype
from pyspark.sql.types import IntegerType,DateType
homeimp_df = homeimp_df.withColumn('customer_id', homeimp_df['customer_id'].cast(IntegerType()))
homeimp_df = homeimp_df.withColumn('product_parent', homeimp_df['product_parent'].cast(IntegerType()))
homeimp_df = homeimp_df.withColumn('review_date', homeimp_df['review_date'].cast(DateType()))

# vine
homeimp_df = homeimp_df.withColumn('star_rating', homeimp_df.star_rating.cast(IntegerType()))
homeimp_df = homeimp_df.withColumn('helpful_votes', homeimp_df.helpful_votes.cast(IntegerType()))
homeimp_df = homeimp_df.withColumn('total_votes', homeimp_df.total_votes.cast(IntegerType()))

In [50]:
homeimp_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



### review_id_table table

In [41]:
homeimp_review_id_df = homeimp_df.select(['review_id','customer_id','product_id','product_parent','review_date']).dropna()
homeimp_review_id_df.show(2)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R215C9BDXTDQOW|   48881148|B00FR4YQYK|     381800308| 2015-08-31|
|R1DTPUV1J57YHA|   47882936|B00439MYYE|     921341748| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 2 rows



In [59]:
# fix nullability
homeimp_review_id_df = set_df_columns_nullable(spark,homeimp_review_id_df,['review_id'], False)

homeimp_review_id_df.printSchema()

root
 |-- review_id: string (nullable = false)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



### products table

In [45]:
homeimp_products_df = homeimp_df.select(['product_id','product_title']).dropna()
homeimp_products_df.show(2)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00FR4YQYK|SadoTech Model C ...|
|B00439MYYE|iSpring T32M 3.2 ...|
+----------+--------------------+
only showing top 2 rows



In [61]:
# fix nullability
homeimp_products_df = set_df_columns_nullable(spark,homeimp_products_df,['product_id'], False)

homeimp_products_df.printSchema()

root
 |-- product_id: string (nullable = false)
 |-- product_title: string (nullable = true)



### customers table

In [51]:
homeimp_customers_df = homeimp_df.select(['customer_id'])

In [52]:
homeimp_customers_df = homeimp_df.groupBy('customer_id').count()
homeimp_customers_df.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   28377689|    1|
|   28258386|    2|
|    9967574|    1|
|   25153155|    1|
|   10088068|    1|
+-----------+-----+
only showing top 5 rows



In [53]:
homeimp_customers_df = homeimp_customers_df.withColumnRenamed('count','customer_count')
homeimp_customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   28377689|             1|
|   28258386|             2|
|    9967574|             1|
|   25153155|             1|
|   10088068|             1|
+-----------+--------------+
only showing top 5 rows



In [63]:
# fix nullability
homeimp_customers_df = set_df_columns_nullable(spark,homeimp_customers_df,['customer_id'], False)
homeimp_customers_df = set_df_columns_nullable(spark,homeimp_customers_df,['customer_count'], True)

homeimp_customers_df.printSchema()

root
 |-- customer_id: integer (nullable = false)
 |-- customer_count: long (nullable = true)



### vine table

In [55]:
homeimp_vine_df = homeimp_df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
homeimp_vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R215C9BDXTDQOW|          4|            0|          0|   N|
|R1DTPUV1J57YHA|          5|            0|          0|   N|
| RFAZK5EWKJWOU|          5|            0|          0|   N|
|R2XT8X0O0WS1AL|          5|            0|          0|   N|
|R14GRNANKO2Y2J|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [64]:
# fix nullability
homeimp_vine_df = set_df_columns_nullable(spark, homeimp_vine_df, ['review_id'], False)

homeimp_vine_df.printSchema()

root
 |-- review_id: string (nullable = false)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [66]:
# Load the DataFrames that correspond to tables into an RDS instance. 
# Note: This process can take up to 10 minutes for each. Be sure that everything is correct before uploading.
# from config import aws_url, aws_pw
aws_url = 'http'
aws_pw = 'pw'

mode = "append"
jdbc_url=f"jdbc:postgresql://{aws_url}:5432/postgres"
config = {"user":"postgres", 
          "password": aws_pw, 
          "driver":"org.postgresql.Driver"}

In [None]:
df_list =    [homeimp_review_id_df, homeimp_products_df, homeimp_customers_df, homeimp_vine_df]
rds_tables = ['review_id_table',    'products',           'customers',         'vine_table']

for i in range(len(df_list)):
  df_list[i].write.jdbc(url=jdbc_url, table=rds_tables[i], mode=mode, properties=config)