In [2]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://security.ubuntu.com/ubuntu bionic-security InRelease
0%

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Amazon_PC").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_PC_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_PC_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True, timestampFormat="yyyy/MM/dd HH:mm:ss")
# Show DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   22873041|R3ARRMDEGED8RD|B00KJWQIIC|     335625766|Plemo 14-Inch Lap...|              PC|          5|            0|          0|   N|                Y|Pleasantly surprised|I was very surpri...| 2015-08-31|
|         US|   30088427| RQ28TSA020Y6J|B013ALA9LA|     671157305|TP-Link OnHub AC1...|              PC|          5|    

In [5]:
# Drop null and duplicates
dropna_df = df.dropna()
cleaned_df = dropna_df.dropDuplicates()

In [6]:
# Count the number of records (rows) in the dataset.
print(cleaned_df.count())

6908145


In [7]:
# Transform the dataset to fit the tables in the schema file.
# Be sure the DataFrames match in data type and in column name.
cleaned_df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [8]:
# Create customers table
from pyspark.sql.functions import desc
customer_count_df = cleaned_df.select("customer_id").groupby("customer_id")\
  .agg({"customer_id":"count"})
customer_count_df = customer_count_df.withColumnRenamed("count(customer_id)", "customer_count")
customer_count_df.show(truncate=False)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|36702702   |2             |
|38998964   |8             |
|49081248   |1             |
|28358430   |1             |
|21911449   |1             |
|50480727   |2             |
|9208062    |2             |
|42672285   |2             |
|26554579   |3             |
|16956038   |2             |
|6932710    |3             |
|33714184   |6             |
|21302050   |12            |
|17164585   |5             |
|51616677   |1             |
|41049980   |2             |
|828034     |1             |
|45932425   |3             |
|14337293   |7             |
|43821790   |11            |
+-----------+--------------+
only showing top 20 rows



In [9]:
# Create user dataframe to match review_id_table table
review_id_table = cleaned_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1001R3EZZ18B3|   13512246|B0096PD2SC|     422768802| 2013-01-27|
|R1004BM171V49W|   51335459|B004BFLP4A|     594900069| 2014-02-10|
|R100UWA60RMCZO|    7512500|B00FRHTTIU|      16015786| 2015-08-26|
|R100WQXUW2PBN3|   39247889|B001L1H0SC|     571829280| 2013-02-14|
| R100X9X33YXTK|   41228892|B00S9AMAPG|      92004405| 2015-05-15|
|R1018PWJWJL3DG|   52797442|B00LU7B8X0|     298756776| 2015-02-14|
|R101GZBULGZ95L|   15440915|B002NU5O3S|     463968341| 2013-04-17|
|R101IPFQKVUN9K|   16045822|B0040IEIII|     212966274| 2014-07-24|
|R101IU98XWDZM1|   15432781|B00425S1H8|     386765193| 2013-12-02|
|R101MVTU2LUCWC|   27493758|B00EXPNFUA|     124052875| 2014-08-10|
|R101WYR41JXHE2|   10863411|B00HJ0VSJ6|     627075706| 2014-12-10|
|R1024E7S3630L0|     584077|B00Y7MTL6G|     687620120| 2015-08

In [10]:
# Create user dataframe to match products table 
products = cleaned_df.select(["product_id", "product_title"])
products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B0096PD2SC|Targus iPad Mini ...|
|B004BFLP4A|Clickfree C2 Auto...|
|B00FRHTTIU|Seagate Backup Pl...|
|B001L1H0SC|SanDisk 16 GB mic...|
|B00S9AMAPG|Dragon Touch Y88X...|
|B00LU7B8X0|AmazonBasics Lapt...|
|B002NU5O3S|Targus Crave Slip...|
|B0040IEIII|TETC Tablet Car M...|
|B00425S1H8|Plugable USB to R...|
|B00EXPNFUA|Corsair Gaming Ga...|
|B00HJ0VSJ6|8.9 - 10.1 inch T...|
|B00Y7MTL6G|Vogue Shop PU Lea...|
|B009NCMFOU|iBuyPower GAMER P...|
|B00KKC2GKK|NEWSTYLE Detachab...|
|B005QCDY50|GreatShield Ultra...|
|B0051QVESA|Kindle, Wi-Fi, 6"...|
|B00RK0UZZS|Tagital 10.1 inch...|
|B002LVUX1W|Kindle Keyboard 3...|
|B00IKF2H12|ASUS ROG G750JM 1...|
|B0058Y1H78|Koolertron New Pe...|
+----------+--------------------+
only showing top 20 rows



Postgres Setup

In [12]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://ucsd-fyc.ckcik1qdrfa0.us-east-2.rds.amazonaws.com:5432/amazon_pc"
config = {"user":"root", 
          "password": "postgres", 
          "driver":"org.postgresql.Driver"}

In [13]:
# Write DataFrame to active_user table in RDS

review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

Py4JJavaError: ignored

In [14]:
# Write dataframe to products table in RDS

products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

Py4JJavaError: ignored

In [15]:
# Write dataframe to customers table in RDS

customer_count_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

Py4JJavaError: ignored