In [11]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:8 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Ign:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:12 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:13 https://developer.download.nvidia.com/compute/machine-le

In [12]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-03-10 19:50:39--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-03-10 19:50:40 (10.7 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("OutdoorsETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [14]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
aws_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Lawn_and_Garden_v1_00.tsv.gz"), header=True, sep="\t", timestampFormat="yyyy/MM/dd HH:mm:ss")

# Show DataFrame
aws_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32787517| RED72VWWCOS7S|B008HDQYLQ|     348668413|Garden Weasel Gar...| Lawn and Garden|          1|            2|          8|   N|                Y|            One Star|I don't hate the ...| 2015-08-31|
|         US|   16374060| RZHWQ208LTEPV|B005OBZBD6|     264704759|10 Foot Mc4 Solar...| Lawn and Garden|          5|    

In [15]:
aws_df = aws_df.dropna()

In [16]:
aws_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [17]:
#Count the number of rows in the dataset
aws_df.count()

2557005

Transform the dataset to fit in the tables in the schema file.

"customers" Table
*   customer_id
*   customer_count

"products" Table
*   product_id
*   product_title

"review_id_table"
*   review_id
*   customer_id
*   product_id
*   product_parent
*   review_date

















In [18]:
#set up Customer Table
customers_df =aws_df.select(["customer_id"])
customers_df.show(5)

+-----------+
|customer_id|
+-----------+
|   32787517|
|   16374060|
|    9984817|
|   12635190|
|   43905102|
+-----------+
only showing top 5 rows



In [19]:
customers_count_df = customers_df.groupBy("customer_id").count()
customers_count_df1 = customers_count_df.orderBy("customer_id").select(["customer_id", "count"])
customers_count_df1.show(5)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   10000009|    1|
|   10000023|    1|
|   10000079|    1|
|   10000081|    3|
|   10000083|    1|
+-----------+-----+
only showing top 5 rows



In [20]:
customers_lag = customers_count_df1.withColumnRenamed("count", "customer_count")
customers_lag_df= customers_lag.dropDuplicates()
customers_lag_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   45046643|             2|
|   37757447|             3|
|   27567982|             2|
|   29135708|             3|
|   50583551|             7|
+-----------+--------------+
only showing top 5 rows



In [21]:
customers_lag_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_count: long (nullable = false)



Customer Table Schema


1.   customer_id INT PRIMARY KEY NOT NULL UNIQUE

2.   customer_count INT





In [22]:
#Import struct fields
from pyspark.sql.types import StructField, StringType, IntegerType, StructType


In [23]:
# change customer id from string, null to interger, not null
customers_final= customers_lag_df.withColumn("customer_id", customers_lag_df["customer_id"].cast(IntegerType()))



Products Table Schema (no changes)

1.   product_id TEXT PRIMARY NOT NULL UNIQUE
2.   product_title TEXT



In [28]:
# products table- product_id, product_title
products_lag =aws_df.select(["product_id", "product_title"])
products_lag_df= products_lag.dropDuplicates()
products_lag_df.show(5)

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B0081SBO4Y|Key Pair Lawn Wit...|
|B00EGFKOZ6|Swiftly Done Brig...|
|B00SOU5MYM|Apalus 4.2" LCD S...|
|B00GIDK4LS|Genuine Honda OEM...|
|B001GQTZ4E|Luster Leaf Rapic...|
+----------+--------------------+
only showing top 5 rows



Review Table Schema

1.   review_id TEXT PRIMARY KEY NOT NULL
2.   customer_id INTEGER
3.   product_id TEXT
4.   product_parent INTEGER
5.   review_date DATE yy-mm-dd








In [29]:
#review_id_table: review_id, customer_id, product_id, product_parent, review_date

reviews_start = aws_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
reviews_start.show(5)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RED72VWWCOS7S|   32787517|B008HDQYLQ|     348668413| 2015-08-31|
| RZHWQ208LTEPV|   16374060|B005OBZBD6|     264704759| 2015-08-31|
|R37LBC3XAVLYOO|    9984817|B00RQL8U2G|      95173602| 2015-08-31|
|R3L7XJMA0MVJWC|   12635190|B0081SBO4Y|     835659279| 2015-08-31|
|R2I2GHSI7T1UBN|   43905102|B008E6OK3U|     539243347| 2015-08-31|
+--------------+-----------+----------+--------------+-----------+
only showing top 5 rows



In [30]:
#Change from string to integer- customer_id to integer/product_parent to integer
reviews_1= reviews_start.withColumn("customer_id", reviews_start["customer_id"].cast(IntegerType()))
reviews_lag_df= reviews_1.withColumn("product_parent", reviews_1["product_parent"].cast(IntegerType()))



In [31]:
#Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://upennhw.ciknwdomcy0r.us-east-1.rds.amazonaws.com:5432/big_data_challenge"
config = {"user": "root", 
          "password": "", 
          "driver": "org.postgresql.Driver"}

In [32]:
customers_final.write.jdbc(url=jdbc_url, table = "customers", mode=mode, properties=config)

In [34]:
products_lag_df.write.jdbc(url=jdbc_url, table = "products", mode=mode, properties=config)

In [35]:
reviews_lag_df.write.jdbc(url=jdbc_url, table = "review_id", mode=mode, properties=config)