In [1]:
import os

# Find the latest versions of
#   Spark & Hadoop:  https://www.apache.org/dist/spark/
#                    https://spark.apache.org/downloads.html
#   Postgres driver: https://jdbc.postgresql.org/
os.environ['HADOOP_VERSION']   = hadoop_version   = 'hadoop3'
os.environ['SPARK_VERSION']    = spark_version    = 'spark-3.3.1'
os.environ['POSTGRES_VERSION'] = postgres_version = 'postgresql-42.5.0'

# Install Java
! apt install openjdk-11-jdk-headless > /dev/null
os.environ['JAVA_HOME']  = '/usr/lib/jvm/java-11-openjdk-amd64'

# Install Spark
! wget https://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-$HADOOP_VERSION.tgz
! tar xf $SPARK_VERSION-bin-$HADOOP_VERSION.tgz
os.environ['SPARK_HOME'] = f'/content/{spark_version}-bin-{hadoop_version}'
! pip install findspark

# Install Postgres driver
! wget https://jdbc.postgresql.org/download/$POSTGRES_VERSION.jar

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('AmazonReviews') \
  .config('spark.driver.extraClassPath', f'/content/{postgres_version}.jar') \
  .getOrCreate()
  
spark



--2022-11-06 21:39:37--  https://www.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving www.apache.org (www.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to www.apache.org (www.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz [following]
--2022-11-06 21:39:37--  https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f8:10a:201a::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2022-11-06 21:39:48 (29.1 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]

Looking in indexes: https://pypi.org/simple, https://us-python.pk

In [2]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [3]:
# Count the number of records (rows) in the dataset.
df.count()

960872

In [4]:
df.dropna()

DataFrame[marketplace: string, customer_id: string, review_id: string, product_id: string, product_parent: string, product_title: string, product_category: string, star_rating: string, helpful_votes: string, total_votes: string, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: string]

In [5]:
df.count()

960872

In [6]:
# Transform the dataset to fit the tables in PostgreSQL.
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



customer_id to be changed to INT

helpful_votes to be changed to INT

product_parent to be chenged to INT

review_date to be changed to date format (yyyy-mm-dd)

star_rating to be chenged to INT

total_votes to be changed to INT

In [7]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [8]:
schema = [StructField("marketplace", StringType(), True), StructField("customer_id", IntegerType(), True),StructField("review_id", StringType(), True),StructField("product_id", StringType(), True),StructField("product_parent", IntegerType(), True),StructField("product_title", StringType(), True),StructField("product_category", StringType(), True),StructField("star_rating", IntegerType(), True),StructField("helpful_votes", IntegerType(), True),StructField("total_votes", IntegerType(), True),StructField("vine", StringType(), True),StructField("verified_purchase", StringType(), True),StructField("review_headline", StringType(), True),StructField("review_body", StringType(), True),StructField("review_date", DateType(), True),]
schema

[StructField('marketplace', StringType(), True),
 StructField('customer_id', IntegerType(), True),
 StructField('review_id', StringType(), True),
 StructField('product_id', StringType(), True),
 StructField('product_parent', IntegerType(), True),
 StructField('product_title', StringType(), True),
 StructField('product_category', StringType(), True),
 StructField('star_rating', IntegerType(), True),
 StructField('helpful_votes', IntegerType(), True),
 StructField('total_votes', IntegerType(), True),
 StructField('vine', StringType(), True),
 StructField('verified_purchase', StringType(), True),
 StructField('review_headline', StringType(), True),
 StructField('review_body', StringType(), True),
 StructField('review_date', DateType(), True)]

In [9]:
final = StructType(fields=schema)
final

StructType([StructField('marketplace', StringType(), True), StructField('customer_id', IntegerType(), True), StructField('review_id', StringType(), True), StructField('product_id', StringType(), True), StructField('product_parent', IntegerType(), True), StructField('product_title', StringType(), True), StructField('product_category', StringType(), True), StructField('star_rating', IntegerType(), True), StructField('helpful_votes', IntegerType(), True), StructField('total_votes', IntegerType(), True), StructField('vine', StringType(), True), StructField('verified_purchase', StringType(), True), StructField('review_headline', StringType(), True), StructField('review_body', StringType(), True), StructField('review_date', DateType(), True)])

In [10]:
dataframe = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), schema=final, sep="\t", header=True)
dataframe.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [11]:
#Table1 is review_id, customer_id, product_id, product_parent, review_date
review_id_table = dataframe.select(['review_id','customer_id','product_id','product_parent','review_date'])

review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R3O9SGZBVQBV76|    3653882|B00FALQ1ZC|     937001370| 2015-08-31|
| RKH8BNC3L5DLF|   14661224|B00D3RGO20|     484010722| 2015-08-31|
|R2HLE8WKZSU3NL|   27324930|B00DKYC7TK|     361166390| 2015-08-31|
|R31U3UH5AZ42LL|    7211452|B000EQS1JW|     958035625| 2015-08-31|
|R2SV659OUJ945Y|   12733322|B00A6GFD7S|     765328221| 2015-08-31|
| RA51CP8TR5A2L|    6576411|B00EYSOSE8|     230493695| 2015-08-31|
| RB2Q7DLDN6TH6|   11811565|B00WM0QA3M|     549298279| 2015-08-31|
|R2RHFJV0UYBK3Y|   49401598|B00A4EYBR0|     844009113| 2015-08-31|
|R2Z6JOQ94LFHEP|   45925069|B00MAMPGGE|     263720892| 2015-08-31|
| RX27XIIWY5JPB|   44751341|B004LBPB7Q|     124278407| 2015-08-31|
|R15C7QEZT0LGZN|    9962330|B00KGTVGKS|      28017857| 2015-08-31|
|R361XSS37V0NCZ|   16097204|B0039UT5OU|     685450910| 2015-08

In [12]:
review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [13]:
#Table2 is product_id, product_title
products = dataframe.select(['product_id','product_title'])

products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00FALQ1ZC|Invicta Women's 1...|
|B00D3RGO20|Kenneth Cole New ...|
|B00DKYC7TK|Ritche 22mm Black...|
|B000EQS1JW|Citizen Men's BM8...|
|B00A6GFD7S|Orient ER27009B M...|
|B00EYSOSE8|Casio Men's GW-94...|
|B00WM0QA3M|Fossil Women's ES...|
|B00A4EYBR0|INFANTRY Mens Nig...|
|B00MAMPGGE|G-Shock Men's Gre...|
|B004LBPB7Q|Heiden Quad Watch...|
|B00KGTVGKS|Fossil Women's ES...|
|B0039UT5OU|Casio General Men...|
|B00MPF0XJQ|2Tone Gold Silver...|
|B003P1OHHS|Bulova Men's 98B1...|
|B00R70YEOE|Casio - G-Shock -...|
|B000FVE3BG|Invicta Men's 332...|
|B008X6JB12|Seiko Women's SUT...|
|B0040UOFPW|Anne Klein Women'...|
|B00UR2R5UY|Guess U13630G1 Me...|
|B00HFF57L0|Nixon Men's Geo V...|
+----------+--------------------+
only showing top 20 rows



In [22]:
products = products.dropDuplicates()

products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00EVX7V1I|Game Time Women's...|
|B009S4DODY|XOXO Women's XO55...|
|B00LBKXQRW|Anne Klein Women'...|
|B0009P679Y|Invicta Men's 993...|
|B00DHF30RU|M&c Women's | Cla...|
|B00NIDA43Y|GuTe Classic Skel...|
|B008EQDDPQ|Nautica Men's N13...|
|B004VRBZ66|Timex Men's T2N63...|
|B009BEO81I|        Fossil Riley|
|B008B39MTI|XOXO Women's XO55...|
|B00TGPM8PU|Handmade Wooden W...|
|B00VNXQQQ0|Eterna 2520-41-64...|
|B00B1PV1C4|Nautica Men's N19...|
|B00N1Y8TQ4|Tissot Men's T095...|
|B00G6DBTY6|red line Men's RL...|
|B00HM04AYI|Columbia Men's Fi...|
|B00VI8HB96|GUESS I90176L1 Wo...|
|B00IT25WJU|LanTac DGN556B Dr...|
|B0106S12XE|Skmei S Shock Ana...|
|B00FPSJ63Y|Michael Kors Ladi...|
+----------+--------------------+
only showing top 20 rows



In [55]:
#Table3 is customer_id, customer_count - is this count of reviews?

customers = dataframe.select(['customer_id'])

customers.show()

+-----------+
|customer_id|
+-----------+
|    3653882|
|   14661224|
|   27324930|
|    7211452|
|   12733322|
|    6576411|
|   11811565|
|   49401598|
|   45925069|
|   44751341|
|    9962330|
|   16097204|
|   51330346|
|    4201739|
|   26339765|
|    2692576|
|   44713366|
|   32778769|
|   27258523|
|   42646538|
+-----------+
only showing top 20 rows



In [56]:
customers = customers.groupBy('customer_id').count()
customers.orderBy("customer_id").select(["customer_id", "count"])


customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|    1567510|    1|
|   19502021|    1|
|   12819130|    1|
|   35329257|    2|
|     108460|    1|
|    5453476|    1|
|   29913055|    1|
|   30717305|    1|
|    1570030|    1|
|   19032020|    1|
|   44178035|    1|
|   26079415|    2|
|   14230926|    1|
|   43478048|    2|
|   43694941|    1|
|   12318815|    3|
|   13731855|    1|
|     740134|    1|
|   41956754|    1|
|   20324070|    3|
+-----------+-----+
only showing top 20 rows



In [58]:
customers = customers.withColumnRenamed('count','customer_count')

customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|    1567510|             1|
|   19502021|             1|
|   12819130|             1|
|   35329257|             2|
|     108460|             1|
|    5453476|             1|
|   29913055|             1|
|   30717305|             1|
|    1570030|             1|
|   19032020|             1|
|   44178035|             1|
|   26079415|             2|
|   14230926|             1|
|   43478048|             2|
|   43694941|             1|
|   12318815|             3|
|   13731855|             1|
|     740134|             1|
|   41956754|             1|
|   20324070|             3|
+-----------+--------------+
only showing top 20 rows



In [59]:
customers_df = customers.withColumn("customer_id",customers["customer_id"].cast(IntegerType()))\
    .withColumn("customer_count",customers["customer_count"].cast(IntegerType()))

customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_count: integer (nullable = false)



In [16]:
#Table4 is review_id, star_rating, helpful_votes, total_votes, vine
vine_table = dataframe.select(['review_id','star_rating','helpful_votes','total_votes','vine'])

vine_table.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R3O9SGZBVQBV76|          5|            0|          0|   N|
| RKH8BNC3L5DLF|          5|            0|          0|   N|
|R2HLE8WKZSU3NL|          2|            1|          1|   N|
|R31U3UH5AZ42LL|          5|            0|          0|   N|
|R2SV659OUJ945Y|          4|            0|          0|   N|
| RA51CP8TR5A2L|          5|            0|          0|   N|
| RB2Q7DLDN6TH6|          5|            1|          1|   N|
|R2RHFJV0UYBK3Y|          1|            1|          5|   N|
|R2Z6JOQ94LFHEP|          5|            1|          2|   N|
| RX27XIIWY5JPB|          4|            0|          0|   N|
|R15C7QEZT0LGZN|          4|            2|          2|   N|
|R361XSS37V0NCZ|          1|            0|          0|   N|
| ROTNLALUAJAUB|          3|            0|          0|   N|
|R2DYX7QU6BGOHR|          5|            

In [17]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<endpoint>:5432/<database_name>"
config = {"user":"postgres", 
          "password": "<password", 
          "driver":"org.postgresql.Driver"}

In [18]:
# Write DataFrame to table in RDS

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [23]:
products.write.jdbc(url=jdbc_url, table="products", mode=mode, properties=config)

In [60]:
customers.write.jdbc(url=jdbc_url, table="customers", mode=mode, properties=config)

In [61]:
vine_table.write.jdbc(url=jdbc_url, table="vine_table", mode=mode, properties=config)