In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu focal InRelease
Get:6 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Get:10 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:12 http://ppa.la

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-02-08 21:19:04--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.3’


2023-02-08 21:19:04 (4.77 MB/s) - ‘postgresql-42.2.16.jar.3’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Health_Personal_Care_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     650634| R3EQSTM9PWRAL|B0091LBZSU|     578484426|Demograss Capsule...|Health & Personal...|          3|            0|          0|   N|                Y|         Three Stars|Only came with 30...| 2015-08-31|
|         US|   19827510| RBWPRK17XKIXD|B00PWW3LQ6|     456433146|Viva Labs #1 Prem...|Health & Personal

In [5]:
#check df data types 
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [6]:
# Drop null values
dropna_df = df.dropna()
dropna_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|     650634| R3EQSTM9PWRAL|B0091LBZSU|     578484426|Demograss Capsule...|Health & Personal...|          3|            0|          0|   N|                Y|         Three Stars|Only came with 30...| 2015-08-31|
|         US|   19827510| RBWPRK17XKIXD|B00PWW3LQ6|     456433146|Viva Labs #1 Prem...|Health & Personal

### Create DataFrames to match tables

In [7]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R3EQSTM9PWRAL|     650634|B0091LBZSU|     578484426| 2015-08-31|
| RBWPRK17XKIXD|   19827510|B00PWW3LQ6|     456433146| 2015-08-31|
| RRSLOAF273XFC|    1520474|B00DKEWA92|     460764511| 2015-08-31|
|R3S8W9Q6SWIT8O|   23905905|B0015R3A7M|     135102038| 2015-08-31|
|R3QQ6NSLRVBFJC|   28215779|B006B8U8BG|     200666829| 2015-08-31|
|R2XYDBMHUVJCSX|   23260912|B00PFZFD8Y|     344168617| 2015-08-31|
|R1L94ESVC657A9|    7965616|B0034792JG|     291831570| 2015-08-31|
|R3DI4B8LDWFQ3K|    3443178|B000052XIA|     493678550| 2015-08-31|
|R1G7VV4HCXUQQL|   20838781|B004GCUK8A|       3902598| 2015-08-31|
|R3TTNYN1TAMUSU|   15267517|B00K7W1QAO|     633971374| 2015-08-31|
|R3AMX2D9HMWV6Z|   30105089|B0034KYA36|     854858899| 2015-08-31|
|R3PO3K321GY8HI|    1622207|B00C7UZTT8|     264608272| 2015-08

In [8]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
# Review DataFrame
review_id_df = df.select(["review_id", "customer_id", "product_id", "product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| R3EQSTM9PWRAL|     650634|B0091LBZSU|     578484426| 2015-08-31|
| RBWPRK17XKIXD|   19827510|B00PWW3LQ6|     456433146| 2015-08-31|
| RRSLOAF273XFC|    1520474|B00DKEWA92|     460764511| 2015-08-31|
|R3S8W9Q6SWIT8O|   23905905|B0015R3A7M|     135102038| 2015-08-31|
|R3QQ6NSLRVBFJC|   28215779|B006B8U8BG|     200666829| 2015-08-31|
|R2XYDBMHUVJCSX|   23260912|B00PFZFD8Y|     344168617| 2015-08-31|
|R1L94ESVC657A9|    7965616|B0034792JG|     291831570| 2015-08-31|
|R3DI4B8LDWFQ3K|    3443178|B000052XIA|     493678550| 2015-08-31|
|R1G7VV4HCXUQQL|   20838781|B004GCUK8A|       3902598| 2015-08-31|
|R3TTNYN1TAMUSU|   15267517|B00K7W1QAO|     633971374| 2015-08-31|
|R3AMX2D9HMWV6Z|   30105089|B0034KYA36|     854858899| 2015-08-31|
|R3PO3K321GY8HI|    1622207|B00C7UZTT8|     264608272| 2015-08

In [9]:
review_id_df.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [10]:
# Drop duplicates
review_id_df = review_id_df.drop_duplicates()
review_id_df.show(10)

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1008WF1AIUWKS|   12101563|B0002UTOOQ|     709959934| 2005-10-10|
|R100DYAE04E155|   14211820|B004INDWT6|     336701839| 2015-08-05|
|R100J58NKW9LX8|   13050915|B004DGH596|     152437605| 2015-06-23|
|R100UMGF3BI96D|   50271303|B009QTDYYA|     169730663| 2014-11-09|
|R101CO02I540IY|   38242118|B005EMUWW0|      22936885| 2015-04-06|
|R101HLDXJMOM4Q|   43950448|B000HEECYY|     343107068| 2011-10-20|
|R101N6UHAOF164|   43510369|B006F2OS3C|      81289428| 2013-05-21|
|R101O5XU6RH3JI|   31233737|B003OIXSCW|     459502289| 2015-07-18|
|R101RYUUUZLMJ4|   20948046|B00892JAAW|     954450745| 2013-08-14|
|R1025CCZ2BJMUB|   13621841|B001KXZ808|     949699275| 2011-12-27|
+--------------+-----------+----------+--------------+-----------+
only showing top 10 rows



In [11]:
# Create the customers_table DataFrame
# customers_df = df.groupby("").agg({""}).withColumnRenamed("", "customer_count")
from pyspark.sql.functions import count
customers_df = df.groupby("customer_id").agg(count("customer_id")).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show(10)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    4919528|             5|
|   51451778|             3|
|   12713799|             1|
|    8673341|             1|
|   42146698|             2|
|    1117644|             2|
|   28058398|             1|
|   14375645|             1|
|   24540309|             1|
|   39715602|             1|
+-----------+--------------+
only showing top 10 rows



In [12]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df.select(["product_id", "product_title"]).drop_duplicates()
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00N2BW2PK|Fitbit Charge HR ...|
|B001D8ZGAM|Aquasentials Easy...|
|B002NVSCQI|Beans72 Organic B...|
|B001O0BL9E|Manuka Health - M...|
|B00AYIM9Y8|BulkSupplements P...|
|B00EXPSWAI|HSI PROFESSIONAL ...|
|B00CMK8M1A|Scott Extra Soft ...|
|B00P80AV74|Monster Energy Dr...|
|B00PIOZVMA|Johnson & Johnson...|
|B00787ZQFW|MaritzMayer Raspb...|
|B0071O8B3G|Lipozene Diet Pil...|
|B005H441UE|Emphaplex 90 Caps...|
|B004WPBYUY|Vitanica Yeast Ar...|
|B00SQN2CXQ|Cloruro de Magnes...|
|B00O9NIRUY|Jimmy Orange Bran...|
|B00Y34E9ZQ|Slight Touch Cont...|
|B00ISDMQ8U|Puracy Natural La...|
|B002VWJYL6|Drive Medical Win...|
|B00NWYMDJM|Premium Prenatal ...|
|B000BABW5Q|Snorepin™ – Anti ...|
+----------+--------------------+
only showing top 20 rows



In [13]:
# Create the vine_table DataFrame
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"]).drop_duplicates()
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R100E1FM8XA4JG|          5|            0|          1|   N|                Y|
|R100WFNO3G9NH2|          4|            1|          1|   N|                Y|
|R1016UG1SG1QGD|          4|            2|          2|   N|                Y|
|R101C72ARCDPR8|          5|            1|          1|   N|                Y|
|R101DRRLSRJXXA|          1|            0|          0|   N|                Y|
|R101J8QS1B95A5|          5|            1|          1|   N|                N|
|R101L35IDH8EDC|          2|            7|          9|   Y|                N|
|R101NY8ALM26ER|          4|            0|          0|   N|                Y|
|R101XJZJAQ1UZ4|          5|            0|          0|   N|                Y|
|R102HYH7ZI9I9L|          5|            0|          0|   N|     

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [14]:
# Store environmental variable
# from getpass import getpass
# pass2 = getpass("password")
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://challenge-dataviz.ca1ivnhrh4vu.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "password", 
          "driver":"org.postgresql.Driver"}

In [15]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
review_id_df

DataFrame[review_id: string, customer_id: int, product_id: string, product_parent: int, review_date: date]

In [17]:
review_id_df.head(10)

[Row(review_id='R1008WF1AIUWKS', customer_id=12101563, product_id='B0002UTOOQ', product_parent=709959934, review_date=datetime.date(2005, 10, 10)),
 Row(review_id='R100DYAE04E155', customer_id=14211820, product_id='B004INDWT6', product_parent=336701839, review_date=datetime.date(2015, 8, 5)),
 Row(review_id='R100J58NKW9LX8', customer_id=13050915, product_id='B004DGH596', product_parent=152437605, review_date=datetime.date(2015, 6, 23)),
 Row(review_id='R100UMGF3BI96D', customer_id=50271303, product_id='B009QTDYYA', product_parent=169730663, review_date=datetime.date(2014, 11, 9)),
 Row(review_id='R101CO02I540IY', customer_id=38242118, product_id='B005EMUWW0', product_parent=22936885, review_date=datetime.date(2015, 4, 6)),
 Row(review_id='R101HLDXJMOM4Q', customer_id=43950448, product_id='B000HEECYY', product_parent=343107068, review_date=datetime.date(2011, 10, 20)),
 Row(review_id='R101N6UHAOF164', customer_id=43510369, product_id='B006F2OS3C', product_parent=81289428, review_date=da

In [20]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

Py4JJavaError: ignored

In [19]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [18]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)