In [39]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [40]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-02-02 18:59:50--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.2’


2023-02-02 18:59:50 (6.35 MB/s) - ‘postgresql-42.2.16.jar.2’ saved [1002883/1002883]



In [41]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [42]:
from pyspark.sql.functions import sum,avg,max,count,to_date

### Load Amazon Data into Spark DataFrame

In [43]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   32114233|R1QX6706ZWJ1P5|B00OYRW4UE|     223980852|Elite Sportz Exer...|Personal_Care_App...|          5|            0|          0|   N|                Y|Good quality. Shi...|Exactly as descri...|2015-08-31 00:00:00|
|         US|   18125776|R3QWMLJHIW6P37|B0000537JQ|     819771537|     E

### Create DataFrames to match tables

In [44]:
#Check the column types
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'timestamp')]

In [45]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
# Review DataFrame
review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1QX6706ZWJ1P5|   32114233|B00OYRW4UE|     223980852| 2015-08-31|
|R3QWMLJHIW6P37|   18125776|B0000537JQ|     819771537| 2015-08-31|
|R14Z1VR1N0Z9G6|   19917519|B00HXXO332|     849307176| 2015-08-31|
| R25ZRJL0GH0U0|   18277171|B00EOB0JA2|     700864740| 2015-08-31|
|R3837KYH7AZNIY|    2593270|B00OC2O1UC|     794298839| 2015-08-31|
|R2MN0QYCY6EVIV|    2592955|B00HES9CMS|     318730927| 2015-08-31|
|R3AN2UJ1D42ZS0|   15168265|B0016BFR4G|     887476137| 2015-08-31|
|R3U29ZLUWEEK4M|   13761624|B00K504UUG|     458706868| 2015-08-31|
|R16ZDMJJHK796C|   37070734|B00HES9CMS|     318730927| 2015-08-31|
| RRRDOEJZD1Y22|   29615023|B00P6TUO5G|     170248843| 2015-08-31|
|R2KR5ZEAS859DK|   47893062|B0006VJ6TO|     412568457| 2015-08-31|
| RR7PGQY763IHF|    2582596|B00H9L7VIW|     851045898| 2015-08

In [46]:
# Create the customers_table DataFrame
customers_df = df.groupby("customer_id").agg(count("customer_id")).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   11160821|             1|
|    1099354|             1|
|   37200466|             1|
|   31581262|             1|
|   27596904|             1|
|    2405598|             1|
|   11229098|             1|
|   17237226|             1|
|   46000147|             1|
|   15234640|             1|
|   28937218|             1|
|   43920023|             1|
|   27954895|             1|
|    8269764|             1|
|   31689770|             1|
|   30117841|             1|
|   38591899|             1|
|   24557051|             1|
|    2780908|             1|
|   32466409|             1|
+-----------+--------------+
only showing top 20 rows



In [47]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df.select(["product_id","product_title"]).drop_duplicates(["product_id"])
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|097459363X|FitBALL All Round...|
|1574998005|Nannini SOS Readi...|
|1574998021|Nannini SOS Readi...|
|1933622199|Mighty Bright LED...|
|3979000532|Pewter Snake Chai...|
|3979002411|Gold Hamsa Hand K...|
|3979002632|CRYSTAL CLEAR GRE...|
|3979002829|Real Olive Wood B...|
|3979004813|Kabbalah Glass Bl...|
|7391000442|Holy Land Jerusal...|
|7391001015|Holy water 3 in1 ...|
|7391001066|Holy water 4 in1 ...|
|7391001244|Dark Copper Mezuz...|
|7391001252|Dark Copper Mezuz...|
|7750002634|Spider Man Origin...|
|8742248299|Blessed By Pope B...|
|9502738756|Blessed By Pope B...|
|9502738764|Blessed By Pope B...|
|9895502079|GOLDFILLED NECKLA...|
|9895502664|BEIGE CASUAL HAT ...|
+----------+--------------------+
only showing top 20 rows



In [48]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1QX6706ZWJ1P5|          5|            0|          0|   N|                Y|
|R3QWMLJHIW6P37|          5|            0|          0|   N|                Y|
|R14Z1VR1N0Z9G6|          5|            1|          1|   N|                Y|
| R25ZRJL0GH0U0|          2|            0|          0|   N|                Y|
|R3837KYH7AZNIY|          4|            0|          1|   N|                Y|
|R2MN0QYCY6EVIV|          5|            0|          0|   N|                Y|
|R3AN2UJ1D42ZS0|          5|            0|          0|   N|                Y|
|R3U29ZLUWEEK4M|          5|            0|          0|   N|                Y|
|R16ZDMJJHK796C|          5|            0|          0|   N|                N|
| RRRDOEJZD1Y22|          4|            0|          0|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [49]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<Removed For Privacy>:5432/<Removed For Privacy>"
config = {"user":"postgres", 
          "password": <Removed For Privacy>, 
          "driver":"org.postgresql.Driver"}

In [51]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [52]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [53]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [54]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)