In [1]:
import os
# The latest version of spark 3.2  from http://www.apache.org/dist/spark/ 
spark_version = 'spark-3.2.3'
# spark_version = 'spark-3.'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()


!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 0 B/88.7 kB 0%] [Connected t                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [1 InRelease 43.1 kB/88.7 kB 49%] [Connec0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [1 InRelease 40% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers]                                                                         Get:4 http://archive.ubuntu.com/ubuntu bionic-updates 

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("bigDataETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
vg_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep="\t", header=True)
vg_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [4]:
# Drop null values, duplicates, and count number of rows.
vg_df = vg_df.dropna()
vg_df = vg_df.dropDuplicates()
vg_df.count()

1785886

In [5]:
# Show schema to see if it matches the sql schema tables.
vg_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [6]:
# Create dataframe for review_id_table schema
review_id = vg_df.select(["review_id", 
                          "customer_id", 
                          "product_id", 
                          "product_parent", 
                          "review_date"])
review_id.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1004PYTPK6ELD|   38281029|B00004WHWF|      89143877| 2002-01-23|
|R100EZDMO39LBZ|    1386752|B00OZBFUBY|     872540442| 2015-06-10|
|R1011I65X7RSKT|   41907229|B00006ISBU|     654790631| 2003-12-23|
|R101V84BKDOR1I|   12034223|B001KX5042|     279727821| 2014-08-08|
|R101VJUP2TFB3Y|   31532612|B009DL2TBA|     586138868| 2015-01-05|
|R1026A0A5F4D42|   30645349|B00FEKQZE6|     681125586| 2014-04-21|
|R102X78AGOZJY3|   20191662|B000ZK9QCS|     699333646| 2010-03-26|
|R104THW8CUUZXC|   20910843|B00DHF39EO|     100570084| 2015-06-15|
|R105YSA8EGSJ8D|    2435449|B0053B5RGI|     413365293| 2015-02-26|
|R106QTPT8YYZ1P|    4159081|B000TGB4UA|      17513844| 2015-04-22|
|R106UDOJ2QSWOR|   11343413|B00KVP78FE|     177244653| 2014-11-14|
|R109G94NZM2I7R|   22043357|B004XACA60|     432872195| 2012-03

In [22]:
# Check datatypes for all columns and ensure they are in the proper format from SQL
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType
review_id = review_id.withColumn("review_date", review_id["review_date"].cast(DateType()))
review_id = review_id.withColumn("customer_id", review_id["customer_id"].cast(IntegerType()))
review_id = review_id.withColumn("product_parent", review_id["product_parent"].cast(IntegerType()))

review_id.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [25]:
# Create products_df

products_df = vg_df.select(["product_id",
                            "product_title"])
products_df = products_df.dropDuplicates()
products_df.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00CJ7IUI6|The Elder Scrolls...|
|B00DHF39KS|Wolfenstein: The ...|
|B00MUTAVH6|Under Night In-Bi...|
|B001AZSEUW|              Peggle|
|B00KVOVBGM|PlayStation 4 Con...|
+----------+--------------------+
only showing top 5 rows



In [29]:
# Check datatypes
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [9]:
# Create customers_df, need to create customers_count column
customers_df = vg_df.select(["customer_id"])
customers_df = customers_df.groupBy("customer_id").count()
customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- count: long (nullable = false)



In [27]:
# Rename count column to customer_count and change column datatypes
customers_df = customers_df.withColumnRenamed("count", "customer_count")
customers_df = customers_df.withColumn("customer_id", customers_df["customer_id"].cast(IntegerType()))
customers_df = customers_df.withColumn("customer_count", customers_df["customer_count"].cast(IntegerType()))
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [11]:
# Check dataframe
customers_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    5401088|             5|
|   37964777|            16|
|   20872523|             1|
|   44777937|             4|
|    2384511|             1|
+-----------+--------------+
only showing top 5 rows



In [12]:
# Create vine_df
vine_df = vg_df.select(["review_id",
                        "star_rating",
                        "helpful_votes",
                        "total_votes",
                        "vine"])
vine_df = vine_df.dropDuplicates()
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R2MTHHQM6RSDQK|          4|            0|          1|   N|
|R26I4OP3M9BC2K|          5|            0|          0|   N|
|R2P5HQ648QPWVG|          5|            0|          0|   N|
|R293SZSOOV2DMC|          5|            0|          0|   N|
|R3793DQNKX2INH|          5|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [13]:
# Check schema
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)



In [30]:
# Change datatypes

vine_df = vine_df.withColumn("star_rating", vine_df["star_rating"].cast(IntegerType()))
vine_df = vine_df.withColumn("helpful_votes", vine_df["helpful_votes"].cast(IntegerType()))
vine_df = vine_df.withColumn("total_votes", vine_df["total_votes"].cast(IntegerType()))
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)



In [18]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://maelsse-db.ctgl6yfw7uje.us-east-2.rds.amazonaws.com:5432/bigdata"
config = {"user":"", 
          "password": "", 
          "driver":"org.postgresql.Driver"}

In [26]:
# Write DataFrame to review_id table in RDS
review_id.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [31]:
# Write DataFrame to customers table in RDS
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [32]:
# Write DataFrame to vine_table table in RDS
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [33]:
# Write DataFrame to products table in RDS
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)