In [76]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java.
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables.
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession.
import findspark
findspark.init()


0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 htt

In [77]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-03-01 08:43:51--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-03-01 08:43:51 (10.4 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [78]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [80]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame.
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Automotive_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

auto_df = spark.read.option("header", "true").option("sep", "\t").csv("file://" + SparkFiles.get("amazon_reviews_us_Automotive_v1_00.tsv.gz"))
auto_df.show(10)



+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   36075342| RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|   N|                Y|     As it was used,|As it was used, t...| 2015-08-31|
|         US|   42462164|R3NORADVJO6IE6|B000C7S0TO|     907684644|Spectra Premium C...|      Automotive|          5|    

In [81]:
# Count the number of records (rows) in the dataset.
auto_df.count()

3514942

In [82]:
auto_df.dropna()

DataFrame[marketplace: string, customer_id: string, review_id: string, product_id: string, product_parent: string, product_title: string, product_category: string, star_rating: string, helpful_votes: string, total_votes: string, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: string]

In [83]:
# Find the total amount of customers who reviewed.

cust_groupby = auto_df.groupBy("customer_id").count()
cust_groupby.orderBy("customer_id").select("customer_id", "count").show()



+-----------+-----+
|customer_id|count|
+-----------+-----+
|   10000029|    1|
|    1000004|    2|
|   10000051|    1|
|   10000062|    1|
|   10000063|    1|
|   10000071|    2|
|   10000088|    1|
|   10000094|    1|
|   10000153|    2|
|   10000200|    1|
|   10000206|    2|
|   10000226|    1|
|   10000251|    1|
|   10000264|    1|
|   10000277|    1|
|   10000284|    1|
|    1000029|    3|
|   10000294|    9|
|   10000297|    1|
|   10000313|    1|
+-----------+-----+
only showing top 20 rows



In [84]:
cust_groupby.select(["customer_id"]).describe().show()

+-------+--------------------+
|summary|         customer_id|
+-------+--------------------+
|  count|             1907652|
|   mean|2.5359115781779382E7|
| stddev|  1.57403020913828E7|
|    min|            10000029|
|    max|             9999955|
+-------+--------------------+



In [126]:
#Add the column customer_count to auto_df.

from pyspark.sql.functions import lit

auto_df = auto_df.withColumn('customer_count', lit(1907652))

auto_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+--------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|customer_count|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+--------------+
|         US|   36075342| RAB23OVFNCXZQ|B00LPRXQ4Y|     339193102|17" 2003-2006 For...|      Automotive|          1|            0|          0|   N|                Y|     As it was used,|As it was used, t...| 2015-08-31|       1907652|
|         US|   42462164|R3NORADVJO6IE6|B000C7S0TO|     9076

In [108]:
# Create user dataframe to match active_user table.
review_id_table = auto_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"]).distinct()
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R100B45NW5YKEL|   49625310|B000IDUW5C|     747098893| 2014-08-18|
|R100GZ7XFMB8JS|   16786128|B001KS0AE2|     270455602| 2013-12-18|
|R100JU2GD1150U|    4564612|B00LEW1L6A|     944030912| 2015-04-24|
|R100ODVM0XYA54|   42745673|B0055DR29Y|     275069282| 2011-12-23|
|R100PV3JMQ7I2Y|   50802030|B00BQX2BSO|     889038997| 2014-01-24|
|R100R71GC7A0QB|   24877742|B000HZA24W|     939878705| 2013-09-29|
|R100R8MNKIBHHS|   30066626|B001IWL1O8|     729582909| 2014-12-06|
|R1012OVOGAA8E4|   52731640|B0015PNU7Y|     418540616| 2013-03-04|
|R101735QBQB4MU|   16005812|B009F10S3Y|     766426044| 2015-05-06|
|R1018JXA3RLU3J|   43524405|B00BR41J0I|     606030096| 2015-06-20|
|R101CWXK8D51BJ|   26161350|B0002MAYFM|     161468666| 2014-07-20|
|R101TP78NS0I8Y|   49641399|B001Q8HNX2|     689457744| 2011-10

In [124]:
# Create user dataframe to match products table.

products = auto_df.select(["product_id", "product_title"]).distinct()
products = products.dropDuplicates(["product_id"]) 
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0715000322|Can-Am 715000322 ...|
|0984527281|Word Teasers Lear...|
|0991557557|PicoPen - Stainle...|
|1844257517|Yamaha FZ6 Fazer ...|
|1888000104|JLC Clarity Style...|
|1940825113|War Gaming - Stee...|
|3228963691|Zzzz... Sleeping ...|
|3456495633|Motorcycle First ...|
|3696387606|JLC 9005 CREE LED...|
|3696387622|JLC CREE 5W High ...|
|4678615123|SEM Paints (SEM15...|
|4678615173|Color Coat Camel ...|
|477961421X|Subaru Impreza No...|
|4891071478|Subaru Impreza GC...|
|4891075627|Toyota Mark II/Ch...|
|5888000272|Elite Mailers Nis...|
|5917025029|Nelson Rigg Weath...|
|5926025419|Joe Rocket RS-2 M...|
|5927026230|Nelson Rigg Unise...|
|592702632X|Nelson Rigg Unise...|
+----------+--------------------+
only showing top 20 rows



In [88]:
# Create user dataframe to match customers table.
customers = auto_df.select(["customer_id", "customer_count"]).distinct()
customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    2298418|       1907652|
|   12925594|       1907652|
|   27181369|       1907652|
|   21819852|       1907652|
|   19824718|       1907652|
|    4418444|       1907652|
|    2669578|       1907652|
|   10307368|       1907652|
|   36879864|       1907652|
|   11504547|       1907652|
|   20158231|       1907652|
|   18236949|       1907652|
|     127206|       1907652|
|   43291518|       1907652|
|   14528920|       1907652|
|     539041|       1907652|
|    4203693|       1907652|
|   46146610|       1907652|
|   38631746|       1907652|
|     129028|       1907652|
+-----------+--------------+
only showing top 20 rows



In [125]:
# Create user dataframe to match vine table.

vine_table = auto_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"]).distinct()
vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R100C1AC7LQ9Z9|          5|            0|          0|   N|
|R1012Y1MKSP46H|          5|            2|          2|   N|
|R1019C0JFXOL7Z|          5|            1|          1|   N|
|R1029J8LJB2VEW|          4|            0|          0|   N|
|R102DT9MOSSGTF|          3|            1|          1|   N|
|R102EE329XUTVL|          2|            5|          9|   N|
|R102F40ZDEVX2N|          5|            0|          0|   N|
|R102OVGR72G28Y|          4|            2|          2|   N|
|R102TRFFSXN366|          4|            0|          0|   N|
|R102WWKPBL2Z5B|          2|            0|          0|   N|
|R102ZJTE75EBOE|          5|            1|          1|   N|
|R102ZLB5JUNJLQ|          2|            0|          1|   N|
|R1031PFMOAUAGC|          5|            1|          4|   N|
|R103GX0L749RT1|          3|            

In [122]:
# Configuration for RDS instance.
mode="append"
jdbc_url = "jdbc:postgresql://XXXXXXXX:5432/big_data_challenge_RDS_1"
config = {"user":"XXXXXXXX",
          "password": "XXXXXXXX",
          "driver":"org.postgresql.Driver"}


In [110]:
# Write review_id_table DataFrame to table.

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [107]:
# Write products DataFrame to table.

products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [92]:
# Write products DataFrame to table.

customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

In [None]:
# Write vine DataFrame to table.

vine.write.jdbc(url=jdbc_url, table='vine', mode=mode, properties=config)

In [123]:
# Write vine DataFrame to table.

vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)